Skip to content
Snippets Groups Projects
Commit 55293998 authored by thucpn's avatar thucpn
Browse files

workflow factory

parent c3d9b2f1
No related branches found
No related tags found
No related merge requests found
import { type Message } from "ai"; import { type Message } from "ai";
import { IncomingMessage, ServerResponse } from "http"; import { IncomingMessage, ServerResponse } from "http";
import { type ChatMessage } from "llamaindex"; import { type ChatMessage } from "llamaindex";
import type { ServerWorkflow } from "../types"; import type { WorkflowFactory } from "../types";
import { import {
parseRequestBody, parseRequestBody,
pipeResponse, pipeStreamToResponse,
sendJSONResponse, sendJSONResponse,
} from "../utils/request"; } from "../utils/request";
import { runWorkflow } from "../utils/workflow"; import { runWorkflow } from "../utils/workflow";
export const handleChat = async ( export const handleChat = async (
workflow: ServerWorkflow, workflowFactory: WorkflowFactory,
req: IncomingMessage, req: IncomingMessage,
res: ServerResponse, res: ServerResponse,
) => { ) => {
...@@ -25,10 +25,14 @@ export const handleChat = async ( ...@@ -25,10 +25,14 @@ export const handleChat = async (
}); });
} }
const userInput = lastMessage.content; const workflow = await workflowFactory(body);
const chatHistory = messages.slice(0, -1) as ChatMessage[];
const streamResponse = await runWorkflow(workflow, userInput, chatHistory); const stream = await runWorkflow(workflow, {
pipeResponse(res, streamResponse); userInput: lastMessage.content,
chatHistory: messages.slice(0, -1) as ChatMessage[],
});
pipeStreamToResponse(res, stream);
} catch (error) { } catch (error) {
console.error("Chat error:", error); console.error("Chat error:", error);
return sendJSONResponse(res, 500, { return sendJSONResponse(res, 500, {
......
...@@ -3,24 +3,18 @@ import next from "next"; ...@@ -3,24 +3,18 @@ import next from "next";
import path from "path"; import path from "path";
import { parse } from "url"; import { parse } from "url";
import { handleChat } from "./handlers/chat"; import { handleChat } from "./handlers/chat";
import type { ServerWorkflow } from "./types"; import type { LlamaIndexServerOptions, ServerWorkflow } from "./types";
type NextAppOptions = Omit<Parameters<typeof next>[0], "dir">;
export type LlamaIndexServerOptions = NextAppOptions & {
workflow: ServerWorkflow;
};
export class LlamaIndexServer { export class LlamaIndexServer {
port: number; port: number;
app: ReturnType<typeof next>; app: ReturnType<typeof next>;
workflow: ServerWorkflow; workflowFactory: () => Promise<ServerWorkflow> | ServerWorkflow;
constructor({ workflow, ...nextAppOptions }: LlamaIndexServerOptions) { constructor({ workflow, ...nextAppOptions }: LlamaIndexServerOptions) {
const nextDir = path.join(__dirname, ".."); // location of the .next after build next app const nextDir = path.join(__dirname, ".."); // location of the .next after build next app
this.app = next({ ...nextAppOptions, dir: nextDir }); this.app = next({ ...nextAppOptions, dir: nextDir });
this.port = nextAppOptions.port ?? 3000; this.port = nextAppOptions.port ?? 3000;
this.workflow = workflow; this.workflowFactory = workflow;
} }
async start() { async start() {
...@@ -31,7 +25,7 @@ export class LlamaIndexServer { ...@@ -31,7 +25,7 @@ export class LlamaIndexServer {
const pathname = parsedUrl.pathname; const pathname = parsedUrl.pathname;
if (pathname === "/api/chat" && req.method === "POST") { if (pathname === "/api/chat" && req.method === "POST") {
return handleChat(this.workflow, req, res); return handleChat(this.workflowFactory, req, res);
} }
const handle = this.app.getRequestHandler(); const handle = this.app.getRequestHandler();
......
...@@ -4,6 +4,7 @@ import { ...@@ -4,6 +4,7 @@ import {
type ChatMessage, type ChatMessage,
type ChatResponseChunk, type ChatResponseChunk,
} from "llamaindex"; } from "llamaindex";
import type next from "next";
export type AgentInput = { export type AgentInput = {
userInput: string; // the last message content from the user userInput: string; // the last message content from the user
...@@ -13,3 +14,17 @@ export type AgentInput = { ...@@ -13,3 +14,17 @@ export type AgentInput = {
export type ServerWorkflow = export type ServerWorkflow =
| Workflow<null, AgentInput, ChatResponseChunk> | Workflow<null, AgentInput, ChatResponseChunk>
| AgentWorkflow; | AgentWorkflow;
/**
* A factory function that creates a ServerWorkflow instance, possibly asynchronously.
* The requestBody parameter is the body from the request, which can be used to customize the workflow per request.
*/
export type WorkflowFactory = (
requestBody?: unknown,
) => Promise<ServerWorkflow> | ServerWorkflow;
export type NextAppOptions = Omit<Parameters<typeof next>[0], "dir">;
export type LlamaIndexServerOptions = NextAppOptions & {
workflow: WorkflowFactory;
};
...@@ -27,15 +27,15 @@ export function sendJSONResponse( ...@@ -27,15 +27,15 @@ export function sendJSONResponse(
response.end(typeof body === "string" ? body : JSON.stringify(body)); response.end(typeof body === "string" ? body : JSON.stringify(body));
} }
export async function pipeResponse( export async function pipeStreamToResponse(
serverResponse: ServerResponse, response: ServerResponse,
streamResponse: Response, stream: Response,
) { ) {
if (!streamResponse.body) return; if (!stream.body) return;
const reader = streamResponse.body.getReader(); const reader = stream.body.getReader();
while (true) { while (true) {
const { done, value } = await reader.read(); const { done, value } = await reader.read();
if (done) return serverResponse.end(); if (done) return response.end();
serverResponse.write(value); response.write(value);
} }
} }
...@@ -5,18 +5,17 @@ import { ...@@ -5,18 +5,17 @@ import {
StopEvent, StopEvent,
WorkflowContext, WorkflowContext,
WorkflowEvent, WorkflowEvent,
type ChatMessage,
type ChatResponseChunk, type ChatResponseChunk,
} from "llamaindex"; } from "llamaindex";
import { ReadableStream } from "stream/web"; import { ReadableStream } from "stream/web";
import type { ServerWorkflow } from "../types"; import type { AgentInput, ServerWorkflow } from "../types";
export async function runWorkflow( export async function runWorkflow(
workflow: ServerWorkflow, workflow: ServerWorkflow,
userInput: string, agentInput: AgentInput,
chatHistory: ChatMessage[],
) { ) {
if (workflow instanceof AgentWorkflow) { if (workflow instanceof AgentWorkflow) {
const { userInput, chatHistory } = agentInput;
const context = workflow.run(userInput, { chatHistory }); const context = workflow.run(userInput, { chatHistory });
const { stream, dataStream } = await createStreamFromWorkflowContext( const { stream, dataStream } = await createStreamFromWorkflowContext(
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
...@@ -25,7 +24,7 @@ export async function runWorkflow( ...@@ -25,7 +24,7 @@ export async function runWorkflow(
return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream }); return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream });
} }
const context = workflow.run({ userInput, chatHistory }); const context = workflow.run(agentInput);
const { stream, dataStream } = await createStreamFromWorkflowContext(context); const { stream, dataStream } = await createStreamFromWorkflowContext(context);
return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream }); return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream });
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment