Skip to content
Snippets Groups Projects
Unverified Commit 6e9184dd authored by Marcus Schiesser's avatar Marcus Schiesser Committed by GitHub
Browse files

feat: use LlamaIndexAdapter (#302)

parent fa28cb5d
No related branches found
No related tags found
No related merge requests found
import {
StreamData,
createCallbacksTransformer,
createStreamDataTransformer,
trimStartOfStreamHelper,
type AIStreamCallbacksAndOptions,
} from "ai";
import { ChatMessage, EngineResponse } from "llamaindex";
import { generateNextQuestions } from "./suggestion";
export function LlamaIndexStream(
response: AsyncIterable<EngineResponse>,
data: StreamData,
chatHistory: ChatMessage[],
opts?: {
callbacks?: AIStreamCallbacksAndOptions;
},
): ReadableStream<Uint8Array> {
return createParser(response, data, chatHistory)
.pipeThrough(createCallbacksTransformer(opts?.callbacks))
.pipeThrough(createStreamDataTransformer());
}
function createParser(
res: AsyncIterable<EngineResponse>,
data: StreamData,
chatHistory: ChatMessage[],
) {
const it = res[Symbol.asyncIterator]();
const trimStartOfStream = trimStartOfStreamHelper();
let llmTextResponse = "";
return new ReadableStream<string>({
async pull(controller): Promise<void> {
const { value, done } = await it.next();
if (done) {
controller.close();
// LLM stream is done, generate the next questions with a new LLM call
chatHistory.push({ role: "assistant", content: llmTextResponse });
const questions: string[] = await generateNextQuestions(chatHistory);
if (questions.length > 0) {
data.appendMessageAnnotation({
type: "suggested_questions",
data: questions,
});
}
data.close();
return;
}
const text = trimStartOfStream(value.delta ?? "");
if (text) {
llmTextResponse += text;
controller.enqueue(text);
}
},
});
}
......@@ -15,7 +15,7 @@
"dev": "concurrently \"tsup index.ts --format esm --dts --watch\" \"nodemon --watch dist/index.js\""
},
"dependencies": {
"ai": "3.3.38",
"ai": "3.3.42",
"cors": "^2.8.5",
"dotenv": "^16.3.1",
"duck-duck-scrape": "^2.2.5",
......
import { JSONValue, Message, StreamData, streamToResponse } from "ai";
import {
JSONValue,
LlamaIndexAdapter,
Message,
StreamData,
streamToResponse,
} from "ai";
import { Request, Response } from "express";
import { ChatMessage, Settings } from "llamaindex";
import { createChatEngine } from "./engine/chat";
......@@ -10,7 +16,7 @@ import {
createCallbackManager,
createStreamTimeout,
} from "./llamaindex/streaming/events";
import { LlamaIndexStream } from "./llamaindex/streaming/stream";
import { generateNextQuestions } from "./llamaindex/streaming/suggestion";
export const chat = async (req: Request, res: Response) => {
// Init Vercel AI StreamData and timeout
......@@ -56,23 +62,34 @@ export const chat = async (req: Request, res: Response) => {
// Setup callbacks
const callbackManager = createCallbackManager(vercelStreamData);
const chatHistory: ChatMessage[] = messages as ChatMessage[];
// Calling LlamaIndex's ChatEngine to get a streamed response
const response = await Settings.withCallbackManager(callbackManager, () => {
return chatEngine.chat({
message: userMessageContent,
chatHistory: messages as ChatMessage[],
chatHistory,
stream: true,
});
});
// Return a stream, which can be consumed by the Vercel/AI client
const stream = LlamaIndexStream(
response,
vercelStreamData,
messages as ChatMessage[],
);
const onFinal = (content: string) => {
chatHistory.push({ role: "assistant", content: content });
generateNextQuestions(chatHistory)
.then((questions: string[]) => {
if (questions.length > 0) {
vercelStreamData.appendMessageAnnotation({
type: "suggested_questions",
data: questions,
});
}
})
.finally(() => {
vercelStreamData.close();
});
};
const stream = LlamaIndexAdapter.toDataStream(response, { onFinal });
return streamToResponse(stream, res, {}, vercelStreamData);
} catch (error) {
console.error("[LlamaIndex]", error);
......
import { initObservability } from "@/app/observability";
import { JSONValue, Message, StreamData, StreamingTextResponse } from "ai";
import { JSONValue, LlamaIndexAdapter, Message, StreamData } from "ai";
import { ChatMessage, Settings } from "llamaindex";
import { NextRequest, NextResponse } from "next/server";
import { createChatEngine } from "./engine/chat";
......@@ -12,7 +12,7 @@ import {
createCallbackManager,
createStreamTimeout,
} from "./llamaindex/streaming/events";
import { LlamaIndexStream } from "./llamaindex/streaming/stream";
import { generateNextQuestions } from "./llamaindex/streaming/suggestion";
initObservability();
initSettings();
......@@ -69,25 +69,37 @@ export async function POST(request: NextRequest) {
// Setup callbacks
const callbackManager = createCallbackManager(vercelStreamData);
const chatHistory: ChatMessage[] = messages as ChatMessage[];
// Calling LlamaIndex's ChatEngine to get a streamed response
const response = await Settings.withCallbackManager(callbackManager, () => {
return chatEngine.chat({
message: userMessageContent,
chatHistory: messages as ChatMessage[],
chatHistory,
stream: true,
});
});
// Transform LlamaIndex stream to Vercel/AI format
const stream = LlamaIndexStream(
response,
vercelStreamData,
messages as ChatMessage[],
);
const onFinal = (content: string) => {
chatHistory.push({ role: "assistant", content: content });
generateNextQuestions(chatHistory)
.then((questions: string[]) => {
if (questions.length > 0) {
vercelStreamData.appendMessageAnnotation({
type: "suggested_questions",
data: questions,
});
}
})
.finally(() => {
vercelStreamData.close();
});
};
// Return a StreamingTextResponse, which can be consumed by the Vercel/AI client
return new StreamingTextResponse(stream, {}, vercelStreamData);
return LlamaIndexAdapter.toDataStreamResponse(response, {
data: vercelStreamData,
callbacks: { onFinal },
});
} catch (error) {
console.error("[LlamaIndex]", error);
return NextResponse.json(
......
......@@ -17,7 +17,7 @@
"@radix-ui/react-hover-card": "^1.0.7",
"@radix-ui/react-select": "^2.1.1",
"@radix-ui/react-slot": "^1.0.2",
"ai": "3.3.38",
"ai": "3.3.42",
"ajv": "^8.12.0",
"class-variance-authority": "^0.7.0",
"clsx": "^2.1.1",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment