Skip to content
Snippets Groups Projects
Unverified Commit 88220f1d authored by Huu Le's avatar Huu Le Committed by GitHub
Browse files

feat: add canceling workflow for multiagent (#361)

parent 6304114e
No related branches found
No related tags found
No related merge requests found
---
"create-llama": patch
---
fix workflow doesn't stop when user presses stop generation button
---
"create-llama": patch
---
Bump llama_index@0.11.17
import logging
from app.api.routers.events import EventCallbackHandler
from app.api.routers.models import (
ChatData,
)
......@@ -23,7 +22,6 @@ async def chat(
last_message_content = data.get_last_message_content()
messages = data.get_history_messages(include_agent_messages=True)
event_handler = EventCallbackHandler()
# The chat API supports passing private document filters and chat params
# but agent workflow does not support them yet
# ignore chat params and use all documents for now
......
import asyncio
import json
import logging
from abc import ABC
from typing import AsyncGenerator, List
from aiostream import stream
......@@ -13,7 +13,7 @@ from fastapi.responses import StreamingResponse
logger = logging.getLogger("uvicorn")
class VercelStreamResponse(StreamingResponse, ABC):
class VercelStreamResponse(StreamingResponse):
"""
Base class to convert the response from the chat engine to the streaming format expected by Vercel
"""
......@@ -23,26 +23,34 @@ class VercelStreamResponse(StreamingResponse, ABC):
def __init__(self, request: Request, chat_data: ChatData, *args, **kwargs):
self.request = request
stream = self._create_stream(request, chat_data, *args, **kwargs)
content = self.content_generator(stream)
self.chat_data = chat_data
content = self.content_generator(*args, **kwargs)
super().__init__(content=content)
async def content_generator(self, stream):
async def content_generator(self, event_handler, events):
logger.info("Starting content_generator")
stream = self._create_stream(
self.request, self.chat_data, event_handler, events
)
is_stream_started = False
async with stream.stream() as streamer:
async for output in streamer:
if not is_stream_started:
is_stream_started = True
# Stream a blank message to start the stream
yield self.convert_text("")
yield output
if await self.request.is_disconnected():
break
try:
async with stream.stream() as streamer:
async for output in streamer:
if not is_stream_started:
is_stream_started = True
# Stream a blank message to start the stream
yield self.convert_text("")
yield output
except asyncio.CancelledError:
logger.info("Stopping workflow")
await event_handler.cancel_run()
except Exception as e:
logger.error(
f"Unexpected error in content_generator: {str(e)}", exc_info=True
)
finally:
logger.info("The stream has been stopped!")
def _create_stream(
self,
......
......@@ -15,7 +15,7 @@ uvicorn = { extras = ["standard"], version = "^0.23.2" }
python-dotenv = "^1.0.0"
aiostream = "^0.5.2"
cachetools = "^5.3.3"
llama-index = "0.11.6"
llama-index = "^0.11.17"
[tool.poetry.group.dev.dependencies]
mypy = "^1.8.0"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment