diff --git a/packages/server/src/handlers/chat.ts b/packages/server/src/handlers/chat.ts new file mode 100644 index 0000000000000000000000000000000000000000..8f2d1a7e97739de3f9115f06f078f9831478bf6e --- /dev/null +++ b/packages/server/src/handlers/chat.ts @@ -0,0 +1,44 @@ +import { LlamaIndexAdapter } from "ai"; +import { IncomingMessage, ServerResponse } from "http"; +import { type ChatMessage } from "llamaindex"; +import type { ServerWorkflow } from "../types"; +import { + parseRequestBody, + pipeResponse, + sendJSONResponse, +} from "../utils/request"; +import { createStreamFromWorkflowContext } from "../utils/stream"; + +export const handleChat = async ( + workflow: ServerWorkflow, + req: IncomingMessage, + res: ServerResponse, +) => { + try { + const body = await parseRequestBody(req); + const { messages } = body as { messages: ChatMessage[] }; + + const lastMessage = messages[messages.length - 1]; + if (lastMessage?.role !== "user") { + return sendJSONResponse(res, 400, { + error: "Messages cannot be empty and last message must be from user", + }); + } + + const userMessage = lastMessage.content; + const chatHistory = messages.slice(0, -1); + + const context = workflow.run({ userMessage, chatHistory }); + const { stream, dataStream } = + await createStreamFromWorkflowContext(context); + const streamResponse = LlamaIndexAdapter.toDataStreamResponse(stream, { + data: dataStream, + }); + pipeResponse(res, streamResponse); + } catch (error) { + console.error("Chat error:", error); + return sendJSONResponse(res, 500, { + detail: (error as Error).message || "Internal server error", + }); + } +}; diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index a4caa0856c4441a228dbb050c057816f03aeeab5..f555f31660cb88b34c41ad343fa525b1615cf696 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -1,3 +1,2 @@ export * from "./server"; -export * from "./workflow/stream"; -export * from "./workflow/type"; +export * from "./types"; diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 68a5fdb4cf8d7e276697a3d6c1117ed9792240a0..2b75ea440dcfc9f1e09d02b3fa09dc3daa973490 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -1,14 +1,9 @@ -import { createServer, IncomingMessage, ServerResponse } from "http"; -import { type ChatMessage } from "llamaindex"; +import { createServer } from "http"; import next from "next"; import path from "path"; import { parse } from "url"; -import { - chatWithWorkflow, - parseRequestBody, - pipeResponse, -} from "./workflow/stream"; -import type { ServerWorkflow } from "./workflow/type"; +import { handleChat } from "./handlers/chat"; +import type { ServerWorkflow } from "./types"; type NextAppOptions = Omit<Parameters<typeof next>[0], "dir">; @@ -28,18 +23,6 @@ export class LlamaIndexServer { this.workflow = workflow; } - async handleChat(req: IncomingMessage, res: ServerResponse) { - try { - const body = await parseRequestBody(req); - const { messages } = body as { messages: ChatMessage[] }; - const streamResponse = await chatWithWorkflow(this.workflow, messages); - pipeResponse(res, streamResponse); - } catch (error) { - console.error("Chat error:", error); - res.end("Internal server error"); - } - } - async start() { await this.app.prepare(); @@ -48,7 +31,7 @@ export class LlamaIndexServer { const pathname = parsedUrl.pathname; if (pathname === "/api/chat" && req.method === "POST") { - return this.handleChat(req, res); + return handleChat(this.workflow, req, res); } const handle = this.app.getRequestHandler(); diff --git a/packages/server/src/types.ts b/packages/server/src/types.ts new file mode 100644 index 0000000000000000000000000000000000000000..9a41a7dce678fdf18801cae3d8916e9b0e10f42c --- /dev/null +++ b/packages/server/src/types.ts @@ -0,0 +1,13 @@ +import { + Workflow, + type ChatMessage, + type ChatResponseChunk, + type MessageContent, +} from "llamaindex"; + +export type AgentInput = { + userMessage: MessageContent; + chatHistory: ChatMessage[]; +}; + +export type ServerWorkflow = Workflow<null, AgentInput, ChatResponseChunk>; diff --git a/packages/server/src/utils/request.ts b/packages/server/src/utils/request.ts new file mode 100644 index 0000000000000000000000000000000000000000..416b97c6abfe56a082df664ffc708bc6438e6b9d --- /dev/null +++ b/packages/server/src/utils/request.ts @@ -0,0 +1,41 @@ +import type { IncomingMessage, ServerResponse } from "http"; + +export async function parseRequestBody(request: IncomingMessage) { + const body = new Promise((resolve) => { + const bodyParts: Buffer[] = []; + let body: string; + request + .on("data", (chunk) => { + bodyParts.push(chunk); + }) + .on("end", () => { + body = Buffer.concat(bodyParts).toString(); + resolve(body); + }); + }) as Promise<string>; + const data = await body; + return JSON.parse(data); +} + +export function sendJSONResponse( + response: ServerResponse, + statusCode: number, + body: Record<string, unknown> | string, +) { + response.statusCode = statusCode; + response.setHeader("Content-Type", "application/json"); + response.end(typeof body === "string" ? body : JSON.stringify(body)); +} + +export async function pipeResponse( + serverResponse: ServerResponse, + streamResponse: Response, +) { + if (!streamResponse.body) return; + const reader = streamResponse.body.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) return serverResponse.end(); + serverResponse.write(value); + } +} diff --git a/packages/server/src/workflow/stream.ts b/packages/server/src/utils/stream.ts similarity index 53% rename from packages/server/src/workflow/stream.ts rename to packages/server/src/utils/stream.ts index 2838ccfcc0aa0aef89476d99c206418a8c9c1de1..056a5ac8275b830f3a6b26ff3e67bc0924ee8b5d 100644 --- a/packages/server/src/workflow/stream.ts +++ b/packages/server/src/utils/stream.ts @@ -1,30 +1,14 @@ -import { LlamaIndexAdapter, StreamData, type JSONValue } from "ai"; -import type { IncomingMessage, ServerResponse } from "http"; +import { StreamData, type JSONValue } from "ai"; import { EngineResponse, StopEvent, - Workflow, WorkflowContext, WorkflowEvent, - type ChatMessage, type ChatResponseChunk, } from "llamaindex"; import { ReadableStream } from "stream/web"; -import { AgentRunEvent, type AgentInput } from "./type"; -export async function chatWithWorkflow( - workflow: Workflow<null, AgentInput, ChatResponseChunk>, - messages: ChatMessage[], -): Promise<Response> { - const context = workflow.run({ messages }); - const { stream, dataStream } = await createStreamFromWorkflowContext(context); - const response = LlamaIndexAdapter.toDataStreamResponse(stream, { - data: dataStream, - }); - return response; -} - -async function createStreamFromWorkflowContext<Input, Output, Context>( +export async function createStreamFromWorkflowContext<Input, Output, Context>( context: WorkflowContext<Input, Output, Context>, ): Promise<{ stream: ReadableStream<EngineResponse>; dataStream: StreamData }> { const dataStream = new StreamData(); @@ -74,42 +58,11 @@ function handleEvent( // Handle for StopEvent if (event instanceof StopEvent) { return event.data as AsyncGenerator<ChatResponseChunk>; - } - // Handle for AgentRunEvent - if (event instanceof AgentRunEvent) { + } else { + console.log("handleWorkflowEvent", event, event instanceof WorkflowEvent); dataStream.appendMessageAnnotation({ type: "agent", data: event.data as JSONValue, }); } } - -export async function pipeResponse( - response: ServerResponse, - streamResponse: Response, -) { - if (!streamResponse.body) return; - const reader = streamResponse.body.getReader(); - while (true) { - const { done, value } = await reader.read(); - if (done) return response.end(); - response.write(value); - } -} - -export async function parseRequestBody(request: IncomingMessage) { - const body = new Promise((resolve) => { - const bodyParts: Buffer[] = []; - let body: string; - request - .on("data", (chunk) => { - bodyParts.push(chunk); - }) - .on("end", () => { - body = Buffer.concat(bodyParts).toString(); - resolve(body); - }); - }) as Promise<string>; - const data = await body; - return JSON.parse(data); -} diff --git a/packages/server/src/workflow/type.ts b/packages/server/src/workflow/type.ts deleted file mode 100644 index 3f08d1f314e21213c5bed836f0eeb3f4c790fdc0..0000000000000000000000000000000000000000 --- a/packages/server/src/workflow/type.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { - Workflow, - WorkflowEvent, - type ChatMessage, - type ChatResponseChunk, -} from "llamaindex"; - -export type AgentInput = { - messages: ChatMessage[]; -}; - -export type AgentRunEventType = "text" | "progress"; - -export type ProgressEventData = { - id: string; - total: number; - current: number; -}; - -export type AgentRunEventData = ProgressEventData; - -export class AgentRunEvent extends WorkflowEvent<{ - agent: string; - text: string; - type: AgentRunEventType; - data?: AgentRunEventData; -}> {} - -export type ServerWorkflow = Workflow<null, AgentInput, ChatResponseChunk>;