From 55293998962523b1b5ed9402160aacf7ed65dc50 Mon Sep 17 00:00:00 2001 From: thucpn <thucsh2@gmail.com> Date: Wed, 19 Mar 2025 16:50:52 +0700 Subject: [PATCH] workflow factory --- packages/server/src/handlers/chat.ts | 18 +++++++++++------- packages/server/src/server.ts | 14 ++++---------- packages/server/src/types.ts | 15 +++++++++++++++ packages/server/src/utils/request.ts | 14 +++++++------- packages/server/src/utils/workflow.ts | 9 ++++----- 5 files changed, 41 insertions(+), 29 deletions(-) diff --git a/packages/server/src/handlers/chat.ts b/packages/server/src/handlers/chat.ts index d8b56308c..10bf8484a 100644 --- a/packages/server/src/handlers/chat.ts +++ b/packages/server/src/handlers/chat.ts @@ -1,16 +1,16 @@ import { type Message } from "ai"; import { IncomingMessage, ServerResponse } from "http"; import { type ChatMessage } from "llamaindex"; -import type { ServerWorkflow } from "../types"; +import type { WorkflowFactory } from "../types"; import { parseRequestBody, - pipeResponse, + pipeStreamToResponse, sendJSONResponse, } from "../utils/request"; import { runWorkflow } from "../utils/workflow"; export const handleChat = async ( - workflow: ServerWorkflow, + workflowFactory: WorkflowFactory, req: IncomingMessage, res: ServerResponse, ) => { @@ -25,10 +25,14 @@ export const handleChat = async ( }); } - const userInput = lastMessage.content; - const chatHistory = messages.slice(0, -1) as ChatMessage[]; - const streamResponse = await runWorkflow(workflow, userInput, chatHistory); - pipeResponse(res, streamResponse); + const workflow = await workflowFactory(body); + + const stream = await runWorkflow(workflow, { + userInput: lastMessage.content, + chatHistory: messages.slice(0, -1) as ChatMessage[], + }); + + pipeStreamToResponse(res, stream); } catch (error) { console.error("Chat error:", error); return sendJSONResponse(res, 500, { diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 2b75ea440..4b18c771b 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -3,24 +3,18 @@ import next from "next"; import path from "path"; import { parse } from "url"; import { handleChat } from "./handlers/chat"; -import type { ServerWorkflow } from "./types"; - -type NextAppOptions = Omit<Parameters<typeof next>[0], "dir">; - -export type LlamaIndexServerOptions = NextAppOptions & { - workflow: ServerWorkflow; -}; +import type { LlamaIndexServerOptions, ServerWorkflow } from "./types"; export class LlamaIndexServer { port: number; app: ReturnType<typeof next>; - workflow: ServerWorkflow; + workflowFactory: () => Promise<ServerWorkflow> | ServerWorkflow; constructor({ workflow, ...nextAppOptions }: LlamaIndexServerOptions) { const nextDir = path.join(__dirname, ".."); // location of the .next after build next app this.app = next({ ...nextAppOptions, dir: nextDir }); this.port = nextAppOptions.port ?? 3000; - this.workflow = workflow; + this.workflowFactory = workflow; } async start() { @@ -31,7 +25,7 @@ export class LlamaIndexServer { const pathname = parsedUrl.pathname; if (pathname === "/api/chat" && req.method === "POST") { - return handleChat(this.workflow, req, res); + return handleChat(this.workflowFactory, req, res); } const handle = this.app.getRequestHandler(); diff --git a/packages/server/src/types.ts b/packages/server/src/types.ts index c0d0744fc..9567c41ef 100644 --- a/packages/server/src/types.ts +++ b/packages/server/src/types.ts @@ -4,6 +4,7 @@ import { type ChatMessage, type ChatResponseChunk, } from "llamaindex"; +import type next from "next"; export type AgentInput = { userInput: string; // the last message content from the user @@ -13,3 +14,17 @@ export type AgentInput = { export type ServerWorkflow = | Workflow<null, AgentInput, ChatResponseChunk> | AgentWorkflow; + +/** + * A factory function that creates a ServerWorkflow instance, possibly asynchronously. + * The requestBody parameter is the body from the request, which can be used to customize the workflow per request. + */ +export type WorkflowFactory = ( + requestBody?: unknown, +) => Promise<ServerWorkflow> | ServerWorkflow; + +export type NextAppOptions = Omit<Parameters<typeof next>[0], "dir">; + +export type LlamaIndexServerOptions = NextAppOptions & { + workflow: WorkflowFactory; +}; diff --git a/packages/server/src/utils/request.ts b/packages/server/src/utils/request.ts index 416b97c6a..fc5992b1b 100644 --- a/packages/server/src/utils/request.ts +++ b/packages/server/src/utils/request.ts @@ -27,15 +27,15 @@ export function sendJSONResponse( response.end(typeof body === "string" ? body : JSON.stringify(body)); } -export async function pipeResponse( - serverResponse: ServerResponse, - streamResponse: Response, +export async function pipeStreamToResponse( + response: ServerResponse, + stream: Response, ) { - if (!streamResponse.body) return; - const reader = streamResponse.body.getReader(); + if (!stream.body) return; + const reader = stream.body.getReader(); while (true) { const { done, value } = await reader.read(); - if (done) return serverResponse.end(); - serverResponse.write(value); + if (done) return response.end(); + response.write(value); } } diff --git a/packages/server/src/utils/workflow.ts b/packages/server/src/utils/workflow.ts index 328f9f975..9bebd3198 100644 --- a/packages/server/src/utils/workflow.ts +++ b/packages/server/src/utils/workflow.ts @@ -5,18 +5,17 @@ import { StopEvent, WorkflowContext, WorkflowEvent, - type ChatMessage, type ChatResponseChunk, } from "llamaindex"; import { ReadableStream } from "stream/web"; -import type { ServerWorkflow } from "../types"; +import type { AgentInput, ServerWorkflow } from "../types"; export async function runWorkflow( workflow: ServerWorkflow, - userInput: string, - chatHistory: ChatMessage[], + agentInput: AgentInput, ) { if (workflow instanceof AgentWorkflow) { + const { userInput, chatHistory } = agentInput; const context = workflow.run(userInput, { chatHistory }); const { stream, dataStream } = await createStreamFromWorkflowContext( // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -25,7 +24,7 @@ export async function runWorkflow( return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream }); } - const context = workflow.run({ userInput, chatHistory }); + const context = workflow.run(agentInput); const { stream, dataStream } = await createStreamFromWorkflowContext(context); return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream }); } -- GitLab