From 58388b06b4f83a1f9a74c58430fe8ea2f82edf68 Mon Sep 17 00:00:00 2001 From: Marcus Schiesser <mail@marcusschiesser.de> Date: Fri, 5 Apr 2024 15:18:36 +0800 Subject: [PATCH] fix: remove non-working vercel streaming with source nodes --- .../streaming/fastapi/app/api/routers/chat.py | 31 +++++------------ .../app/api/routers/vercel_response.py | 33 ------------------- 2 files changed, 9 insertions(+), 55 deletions(-) delete mode 100644 templates/types/streaming/fastapi/app/api/routers/vercel_response.py diff --git a/templates/types/streaming/fastapi/app/api/routers/chat.py b/templates/types/streaming/fastapi/app/api/routers/chat.py index e97ab2df..94d3b652 100644 --- a/templates/types/streaming/fastapi/app/api/routers/chat.py +++ b/templates/types/streaming/fastapi/app/api/routers/chat.py @@ -1,14 +1,13 @@ from pydantic import BaseModel from typing import List, Any, Optional, Dict, Tuple from fastapi import APIRouter, Depends, HTTPException, Request, status +from fastapi.responses import StreamingResponse from llama_index.core.chat_engine.types import ( BaseChatEngine, - StreamingAgentChatResponse, ) from llama_index.core.schema import NodeWithScore from llama_index.core.llms import ChatMessage, MessageRole from app.engine import get_chat_engine -from app.api.routers.vercel_response import VercelStreamResponse chat_router = r = APIRouter() @@ -17,23 +16,23 @@ class _Message(BaseModel): role: MessageRole content: str + +class _ChatData(BaseModel): + messages: List[_Message] + class Config: json_schema_extra = { "example": { "messages": [ { "role": "user", - "content": "How to tune a guitar?", + "content": "What standards for letters exist?", } ] } } -class _ChatData(BaseModel): - messages: List[_Message] - - class _SourceNodes(BaseModel): id: str metadata: Dict[str, Any] @@ -92,25 +91,13 @@ async def chat( response = await chat_engine.astream_chat(last_message_content, messages) - async def event_generator(request: Request, response: StreamingAgentChatResponse): - # Yield the text response + async def event_generator(): async for token in response.async_response_gen(): - # If client closes connection, stop sending events if await request.is_disconnected(): break - yield VercelStreamResponse.convert_text(token) - - # Yield the source nodes - yield VercelStreamResponse.convert_data( - { - "nodes": [ - _SourceNodes.from_source_node(node).dict() - for node in response.source_nodes - ] - } - ) + yield token - return VercelStreamResponse(content=event_generator(request, response)) + return StreamingResponse(event_generator(), media_type="text/plain") # non-streaming endpoint - delete if not needed diff --git a/templates/types/streaming/fastapi/app/api/routers/vercel_response.py b/templates/types/streaming/fastapi/app/api/routers/vercel_response.py deleted file mode 100644 index 37392cc9..00000000 --- a/templates/types/streaming/fastapi/app/api/routers/vercel_response.py +++ /dev/null @@ -1,33 +0,0 @@ -import json -from typing import Any -from fastapi.responses import StreamingResponse - - -class VercelStreamResponse(StreamingResponse): - """ - Class to convert the response from the chat engine to the streaming format expected by Vercel/AI - """ - - TEXT_PREFIX = "0:" - DATA_PREFIX = "2:" - VERCEL_HEADERS = { - "X-Experimental-Stream-Data": "true", - "Content-Type": "text/plain; charset=utf-8", - "Access-Control-Expose-Headers": "X-Experimental-Stream-Data", - } - - @classmethod - def convert_text(cls, token: str): - return f'{cls.TEXT_PREFIX}"{token}"\n' - - @classmethod - def convert_data(cls, data: dict): - data_str = json.dumps(data) - return f"{cls.DATA_PREFIX}[{data_str}]\n" - - def __init__(self, content: Any, **kwargs): - super().__init__( - content=content, - headers=self.VERCEL_HEADERS, - **kwargs, - ) -- GitLab