From dd593093e02c84f9af566d1f5a104bc6c84cac89 Mon Sep 17 00:00:00 2001 From: Logan <logan.markewich@live.com> Date: Wed, 26 Feb 2025 21:20:39 -0600 Subject: [PATCH] safer workflow cancel + fix restored context bug (#17938) --- .../agent/workflow/multi_agent_workflow.py | 2 ++ .../llama_index/core/workflow/workflow.py | 21 ++++++++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py b/llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py index 247f88beed..4e3a9bc552 100644 --- a/llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py +++ b/llama-index-core/llama_index/core/agent/workflow/multi_agent_workflow.py @@ -496,6 +496,7 @@ class AgentWorkflow(Workflow, PromptMixin, metaclass=AgentWorkflowMeta): state_prompt: Optional[Union[str, BasePromptTemplate]] = None, initial_state: Optional[dict] = None, timeout: Optional[float] = None, + verbose: bool = False, ) -> "AgentWorkflow": """Initializes an AgentWorkflow from a list of tools or functions. @@ -528,4 +529,5 @@ class AgentWorkflow(Workflow, PromptMixin, metaclass=AgentWorkflowMeta): state_prompt=state_prompt, initial_state=initial_state, timeout=timeout, + verbose=verbose, ) diff --git a/llama-index-core/llama_index/core/workflow/workflow.py b/llama-index-core/llama_index/core/workflow/workflow.py index 366545f54c..5c9a3d86ee 100644 --- a/llama-index-core/llama_index/core/workflow/workflow.py +++ b/llama-index-core/llama_index/core/workflow/workflow.py @@ -369,8 +369,11 @@ class Workflow(metaclass=WorkflowMeta): # add dedicated cancel task async def _cancel_workflow_task() -> None: - await ctx._cancel_flag.wait() - raise WorkflowCancelledByUser + try: + await ctx._cancel_flag.wait() + raise WorkflowCancelledByUser + except asyncio.CancelledError: + return ctx._tasks.add( asyncio.create_task( @@ -458,11 +461,6 @@ class Workflow(metaclass=WorkflowMeta): # the context is now running ctx.is_running = True - else: - # resend in-progress events if already running - for name, evs in ctx._in_progress.items(): - for ev in evs: - ctx.send_event(ev, step=name) done, unfinished = await asyncio.wait( ctx._tasks, @@ -485,7 +483,14 @@ class Workflow(metaclass=WorkflowMeta): t.cancel() # wait for cancelled tasks to cleanup - await asyncio.gather(*unfinished, return_exceptions=True) + # prevents any tasks from being stuck + try: + await asyncio.wait_for( + asyncio.gather(*unfinished, return_exceptions=True), + timeout=0.5, + ) + except asyncio.TimeoutError: + logger.warning("Some tasks did not clean up within timeout") # the context is no longer running ctx.is_running = False -- GitLab