From 88220f1dd215fae4b5a112be2b631b8a184d3588 Mon Sep 17 00:00:00 2001 From: Huu Le <39040748+leehuwuj@users.noreply.github.com> Date: Thu, 10 Oct 2024 15:24:43 +0700 Subject: [PATCH] feat: add canceling workflow for multiagent (#361) --- .changeset/selfish-lemons-kiss.md | 5 ++ .changeset/tiny-items-divide.md | 5 ++ .../multiagent/python/app/api/routers/chat.py | 2 - .../python/app/api/routers/vercel_response.py | 46 +++++++++++-------- .../types/streaming/fastapi/pyproject.toml | 2 +- 5 files changed, 38 insertions(+), 22 deletions(-) create mode 100644 .changeset/selfish-lemons-kiss.md create mode 100644 .changeset/tiny-items-divide.md diff --git a/.changeset/selfish-lemons-kiss.md b/.changeset/selfish-lemons-kiss.md new file mode 100644 index 00000000..214e4251 --- /dev/null +++ b/.changeset/selfish-lemons-kiss.md @@ -0,0 +1,5 @@ +--- +"create-llama": patch +--- + +fix workflow doesn't stop when user presses stop generation button diff --git a/.changeset/tiny-items-divide.md b/.changeset/tiny-items-divide.md new file mode 100644 index 00000000..9bfcbf8a --- /dev/null +++ b/.changeset/tiny-items-divide.md @@ -0,0 +1,5 @@ +--- +"create-llama": patch +--- + +Bump llama_index@0.11.17 diff --git a/templates/components/multiagent/python/app/api/routers/chat.py b/templates/components/multiagent/python/app/api/routers/chat.py index 314f4761..9b9b6c9d 100644 --- a/templates/components/multiagent/python/app/api/routers/chat.py +++ b/templates/components/multiagent/python/app/api/routers/chat.py @@ -1,6 +1,5 @@ import logging -from app.api.routers.events import EventCallbackHandler from app.api.routers.models import ( ChatData, ) @@ -23,7 +22,6 @@ async def chat( last_message_content = data.get_last_message_content() messages = data.get_history_messages(include_agent_messages=True) - event_handler = EventCallbackHandler() # The chat API supports passing private document filters and chat params # but agent workflow does not support them yet # ignore chat params and use all documents for now diff --git a/templates/components/multiagent/python/app/api/routers/vercel_response.py b/templates/components/multiagent/python/app/api/routers/vercel_response.py index 12082496..82f2e705 100644 --- a/templates/components/multiagent/python/app/api/routers/vercel_response.py +++ b/templates/components/multiagent/python/app/api/routers/vercel_response.py @@ -1,6 +1,6 @@ +import asyncio import json import logging -from abc import ABC from typing import AsyncGenerator, List from aiostream import stream @@ -13,7 +13,7 @@ from fastapi.responses import StreamingResponse logger = logging.getLogger("uvicorn") -class VercelStreamResponse(StreamingResponse, ABC): +class VercelStreamResponse(StreamingResponse): """ Base class to convert the response from the chat engine to the streaming format expected by Vercel """ @@ -23,26 +23,34 @@ class VercelStreamResponse(StreamingResponse, ABC): def __init__(self, request: Request, chat_data: ChatData, *args, **kwargs): self.request = request - - stream = self._create_stream(request, chat_data, *args, **kwargs) - content = self.content_generator(stream) - + self.chat_data = chat_data + content = self.content_generator(*args, **kwargs) super().__init__(content=content) - async def content_generator(self, stream): + async def content_generator(self, event_handler, events): + logger.info("Starting content_generator") + stream = self._create_stream( + self.request, self.chat_data, event_handler, events + ) is_stream_started = False - - async with stream.stream() as streamer: - async for output in streamer: - if not is_stream_started: - is_stream_started = True - # Stream a blank message to start the stream - yield self.convert_text("") - - yield output - - if await self.request.is_disconnected(): - break + try: + async with stream.stream() as streamer: + async for output in streamer: + if not is_stream_started: + is_stream_started = True + # Stream a blank message to start the stream + yield self.convert_text("") + + yield output + except asyncio.CancelledError: + logger.info("Stopping workflow") + await event_handler.cancel_run() + except Exception as e: + logger.error( + f"Unexpected error in content_generator: {str(e)}", exc_info=True + ) + finally: + logger.info("The stream has been stopped!") def _create_stream( self, diff --git a/templates/types/streaming/fastapi/pyproject.toml b/templates/types/streaming/fastapi/pyproject.toml index 05ae67d0..6a894160 100644 --- a/templates/types/streaming/fastapi/pyproject.toml +++ b/templates/types/streaming/fastapi/pyproject.toml @@ -15,7 +15,7 @@ uvicorn = { extras = ["standard"], version = "^0.23.2" } python-dotenv = "^1.0.0" aiostream = "^0.5.2" cachetools = "^5.3.3" -llama-index = "0.11.6" +llama-index = "^0.11.17" [tool.poetry.group.dev.dependencies] mypy = "^1.8.0" -- GitLab