Skip to content
Snippets Groups Projects
Unverified Commit 563b51d7 authored by Huu Le's avatar Huu Le Committed by GitHub
Browse files

Fix: Vercel streaming (python) does not stream data events instantly (#111)

parent 88c88bf1
No related branches found
No related tags found
No related merge requests found
---
"create-llama": patch
---
Fix Vercel streaming (python) to stream data events instantly
......@@ -33,14 +33,6 @@ async def chat(
event_handler = EventCallbackHandler()
chat_engine.callback_manager.handlers.append(event_handler) # type: ignore
try:
response = await chat_engine.astream_chat(last_message_content, messages)
except Exception as e:
logger.exception("Error in chat engine", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error in chat engine: {e}",
) from e
async def content_generator():
# Yield the additional data
......@@ -49,12 +41,26 @@ async def chat(
yield VercelStreamResponse.convert_data(data_response)
# Yield the text response
async def _text_generator():
async def _chat_response_generator():
response = await chat_engine.astream_chat(last_message_content, messages)
async for token in response.async_response_gen():
yield VercelStreamResponse.convert_text(token)
# the text_generator is the leading stream, once it's finished, also finish the event stream
event_handler.is_done = True
# Yield the source nodes
yield VercelStreamResponse.convert_data(
{
"type": "sources",
"data": {
"nodes": [
SourceNodes.from_source_node(node).dict()
for node in response.source_nodes
]
},
}
)
# Yield the events from the event handler
async def _event_generator():
async for event in event_handler.async_event_gen():
......@@ -62,27 +68,28 @@ async def chat(
if event_response is not None:
yield VercelStreamResponse.convert_data(event_response)
combine = stream.merge(_text_generator(), _event_generator())
combine = stream.merge(_chat_response_generator(), _event_generator())
is_stream_started = False
async with combine.stream() as streamer:
async for item in streamer:
async for output in streamer:
if not is_stream_started:
is_stream_started = True
# Stream a blank message to start the stream
yield VercelStreamResponse.convert_text("")
yield output
if await request.is_disconnected():
break
yield item
# Yield the source nodes
yield VercelStreamResponse.convert_data(
{
"type": "sources",
"data": {
"nodes": [
SourceNodes.from_source_node(node).dict()
for node in response.source_nodes
]
},
}
)
return VercelStreamResponse(content=content_generator())
try:
return VercelStreamResponse(content=content_generator())
except Exception as e:
logger.exception("Error in chat engine", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error in chat engine: {e}",
) from e
# non-streaming endpoint - delete if not needed
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment