diff --git a/templates/components/llamaindex/typescript/streaming/stream.ts b/templates/components/llamaindex/typescript/streaming/stream.ts deleted file mode 100644 index 99cbf7f35e29108e7e6430d197fa2578f41c0f9b..0000000000000000000000000000000000000000 --- a/templates/components/llamaindex/typescript/streaming/stream.ts +++ /dev/null @@ -1,57 +0,0 @@ -import { - StreamData, - createCallbacksTransformer, - createStreamDataTransformer, - trimStartOfStreamHelper, - type AIStreamCallbacksAndOptions, -} from "ai"; -import { ChatMessage, EngineResponse } from "llamaindex"; -import { generateNextQuestions } from "./suggestion"; - -export function LlamaIndexStream( - response: AsyncIterable<EngineResponse>, - data: StreamData, - chatHistory: ChatMessage[], - opts?: { - callbacks?: AIStreamCallbacksAndOptions; - }, -): ReadableStream<Uint8Array> { - return createParser(response, data, chatHistory) - .pipeThrough(createCallbacksTransformer(opts?.callbacks)) - .pipeThrough(createStreamDataTransformer()); -} - -function createParser( - res: AsyncIterable<EngineResponse>, - data: StreamData, - chatHistory: ChatMessage[], -) { - const it = res[Symbol.asyncIterator](); - const trimStartOfStream = trimStartOfStreamHelper(); - let llmTextResponse = ""; - - return new ReadableStream<string>({ - async pull(controller): Promise<void> { - const { value, done } = await it.next(); - if (done) { - controller.close(); - // LLM stream is done, generate the next questions with a new LLM call - chatHistory.push({ role: "assistant", content: llmTextResponse }); - const questions: string[] = await generateNextQuestions(chatHistory); - if (questions.length > 0) { - data.appendMessageAnnotation({ - type: "suggested_questions", - data: questions, - }); - } - data.close(); - return; - } - const text = trimStartOfStream(value.delta ?? ""); - if (text) { - llmTextResponse += text; - controller.enqueue(text); - } - }, - }); -} diff --git a/templates/types/streaming/express/package.json b/templates/types/streaming/express/package.json index e860060f2b324ffc4ac2712cf49d2b816374c387..ff54cf261f98d9765272f9963257605507e9a0a7 100644 --- a/templates/types/streaming/express/package.json +++ b/templates/types/streaming/express/package.json @@ -15,7 +15,7 @@ "dev": "concurrently \"tsup index.ts --format esm --dts --watch\" \"nodemon --watch dist/index.js\"" }, "dependencies": { - "ai": "3.3.38", + "ai": "3.3.42", "cors": "^2.8.5", "dotenv": "^16.3.1", "duck-duck-scrape": "^2.2.5", diff --git a/templates/types/streaming/express/src/controllers/chat.controller.ts b/templates/types/streaming/express/src/controllers/chat.controller.ts index 50b70789cc7fc591d4beb113c3ff9b4ee8e0ce09..9e4901b1c1c99560af5a27fd2c109aacd733ddc2 100644 --- a/templates/types/streaming/express/src/controllers/chat.controller.ts +++ b/templates/types/streaming/express/src/controllers/chat.controller.ts @@ -1,4 +1,10 @@ -import { JSONValue, Message, StreamData, streamToResponse } from "ai"; +import { + JSONValue, + LlamaIndexAdapter, + Message, + StreamData, + streamToResponse, +} from "ai"; import { Request, Response } from "express"; import { ChatMessage, Settings } from "llamaindex"; import { createChatEngine } from "./engine/chat"; @@ -10,7 +16,7 @@ import { createCallbackManager, createStreamTimeout, } from "./llamaindex/streaming/events"; -import { LlamaIndexStream } from "./llamaindex/streaming/stream"; +import { generateNextQuestions } from "./llamaindex/streaming/suggestion"; export const chat = async (req: Request, res: Response) => { // Init Vercel AI StreamData and timeout @@ -56,23 +62,34 @@ export const chat = async (req: Request, res: Response) => { // Setup callbacks const callbackManager = createCallbackManager(vercelStreamData); + const chatHistory: ChatMessage[] = messages as ChatMessage[]; // Calling LlamaIndex's ChatEngine to get a streamed response const response = await Settings.withCallbackManager(callbackManager, () => { return chatEngine.chat({ message: userMessageContent, - chatHistory: messages as ChatMessage[], + chatHistory, stream: true, }); }); - // Return a stream, which can be consumed by the Vercel/AI client - const stream = LlamaIndexStream( - response, - vercelStreamData, - messages as ChatMessage[], - ); + const onFinal = (content: string) => { + chatHistory.push({ role: "assistant", content: content }); + generateNextQuestions(chatHistory) + .then((questions: string[]) => { + if (questions.length > 0) { + vercelStreamData.appendMessageAnnotation({ + type: "suggested_questions", + data: questions, + }); + } + }) + .finally(() => { + vercelStreamData.close(); + }); + }; + const stream = LlamaIndexAdapter.toDataStream(response, { onFinal }); return streamToResponse(stream, res, {}, vercelStreamData); } catch (error) { console.error("[LlamaIndex]", error); diff --git a/templates/types/streaming/nextjs/app/api/chat/route.ts b/templates/types/streaming/nextjs/app/api/chat/route.ts index adfccf13be62b311833f4f0bf127208ea75177cc..fbb4774c7cb217797c3cba3b6710e6ed78ccca3a 100644 --- a/templates/types/streaming/nextjs/app/api/chat/route.ts +++ b/templates/types/streaming/nextjs/app/api/chat/route.ts @@ -1,5 +1,5 @@ import { initObservability } from "@/app/observability"; -import { JSONValue, Message, StreamData, StreamingTextResponse } from "ai"; +import { JSONValue, LlamaIndexAdapter, Message, StreamData } from "ai"; import { ChatMessage, Settings } from "llamaindex"; import { NextRequest, NextResponse } from "next/server"; import { createChatEngine } from "./engine/chat"; @@ -12,7 +12,7 @@ import { createCallbackManager, createStreamTimeout, } from "./llamaindex/streaming/events"; -import { LlamaIndexStream } from "./llamaindex/streaming/stream"; +import { generateNextQuestions } from "./llamaindex/streaming/suggestion"; initObservability(); initSettings(); @@ -69,25 +69,37 @@ export async function POST(request: NextRequest) { // Setup callbacks const callbackManager = createCallbackManager(vercelStreamData); + const chatHistory: ChatMessage[] = messages as ChatMessage[]; // Calling LlamaIndex's ChatEngine to get a streamed response const response = await Settings.withCallbackManager(callbackManager, () => { return chatEngine.chat({ message: userMessageContent, - chatHistory: messages as ChatMessage[], + chatHistory, stream: true, }); }); - // Transform LlamaIndex stream to Vercel/AI format - const stream = LlamaIndexStream( - response, - vercelStreamData, - messages as ChatMessage[], - ); + const onFinal = (content: string) => { + chatHistory.push({ role: "assistant", content: content }); + generateNextQuestions(chatHistory) + .then((questions: string[]) => { + if (questions.length > 0) { + vercelStreamData.appendMessageAnnotation({ + type: "suggested_questions", + data: questions, + }); + } + }) + .finally(() => { + vercelStreamData.close(); + }); + }; - // Return a StreamingTextResponse, which can be consumed by the Vercel/AI client - return new StreamingTextResponse(stream, {}, vercelStreamData); + return LlamaIndexAdapter.toDataStreamResponse(response, { + data: vercelStreamData, + callbacks: { onFinal }, + }); } catch (error) { console.error("[LlamaIndex]", error); return NextResponse.json( diff --git a/templates/types/streaming/nextjs/package.json b/templates/types/streaming/nextjs/package.json index b244f641a1b6839653641a502eab672a6d02ed0d..2839737d24c9685ce5ff28c801a9efc5a550307b 100644 --- a/templates/types/streaming/nextjs/package.json +++ b/templates/types/streaming/nextjs/package.json @@ -17,7 +17,7 @@ "@radix-ui/react-hover-card": "^1.0.7", "@radix-ui/react-select": "^2.1.1", "@radix-ui/react-slot": "^1.0.2", - "ai": "3.3.38", + "ai": "3.3.42", "ajv": "^8.12.0", "class-variance-authority": "^0.7.0", "clsx": "^2.1.1",