diff --git a/.changeset/few-geckos-confess.md b/.changeset/few-geckos-confess.md new file mode 100644 index 0000000000000000000000000000000000000000..0d9d61a77bbd5ad4202f0fae68ca709736e4e711 --- /dev/null +++ b/.changeset/few-geckos-confess.md @@ -0,0 +1,5 @@ +--- +"create-llama": patch +--- + +Fix Vercel streaming (python) to stream data events instantly diff --git a/templates/types/streaming/fastapi/app/api/routers/chat.py b/templates/types/streaming/fastapi/app/api/routers/chat.py index d6373cc9201b97081a41fc28e3125388f6534ff2..5ff2ce05bbf006b1265114636a8a2b74118a09a7 100644 --- a/templates/types/streaming/fastapi/app/api/routers/chat.py +++ b/templates/types/streaming/fastapi/app/api/routers/chat.py @@ -33,14 +33,6 @@ async def chat( event_handler = EventCallbackHandler() chat_engine.callback_manager.handlers.append(event_handler) # type: ignore - try: - response = await chat_engine.astream_chat(last_message_content, messages) - except Exception as e: - logger.exception("Error in chat engine", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error in chat engine: {e}", - ) from e async def content_generator(): # Yield the additional data @@ -49,12 +41,26 @@ async def chat( yield VercelStreamResponse.convert_data(data_response) # Yield the text response - async def _text_generator(): + async def _chat_response_generator(): + response = await chat_engine.astream_chat(last_message_content, messages) async for token in response.async_response_gen(): yield VercelStreamResponse.convert_text(token) # the text_generator is the leading stream, once it's finished, also finish the event stream event_handler.is_done = True + # Yield the source nodes + yield VercelStreamResponse.convert_data( + { + "type": "sources", + "data": { + "nodes": [ + SourceNodes.from_source_node(node).dict() + for node in response.source_nodes + ] + }, + } + ) + # Yield the events from the event handler async def _event_generator(): async for event in event_handler.async_event_gen(): @@ -62,27 +68,28 @@ async def chat( if event_response is not None: yield VercelStreamResponse.convert_data(event_response) - combine = stream.merge(_text_generator(), _event_generator()) + combine = stream.merge(_chat_response_generator(), _event_generator()) + is_stream_started = False async with combine.stream() as streamer: - async for item in streamer: + async for output in streamer: + if not is_stream_started: + is_stream_started = True + # Stream a blank message to start the stream + yield VercelStreamResponse.convert_text("") + + yield output + if await request.is_disconnected(): break - yield item - - # Yield the source nodes - yield VercelStreamResponse.convert_data( - { - "type": "sources", - "data": { - "nodes": [ - SourceNodes.from_source_node(node).dict() - for node in response.source_nodes - ] - }, - } - ) - - return VercelStreamResponse(content=content_generator()) + + try: + return VercelStreamResponse(content=content_generator()) + except Exception as e: + logger.exception("Error in chat engine", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error in chat engine: {e}", + ) from e # non-streaming endpoint - delete if not needed