diff --git a/packages/server/src/utils/workflow.ts b/packages/server/src/utils/workflow.ts index d8e9fd672a1d0a4e1953851010aa7404cfd373fd..ed893fbf38944dbc61ea0354dbf910e94f5296f8 100644 --- a/packages/server/src/utils/workflow.ts +++ b/packages/server/src/utils/workflow.ts @@ -1,10 +1,10 @@ import { LlamaIndexAdapter, StreamData, type JSONValue } from "ai"; import { + AgentStream, AgentWorkflow, EngineResponse, StopEvent, - WorkflowContext, - WorkflowEvent, + Workflow, type ChatResponseChunk, } from "llamaindex"; import { ReadableStream } from "stream/web"; @@ -15,71 +15,67 @@ export async function runWorkflow( agentInput: AgentInput, ) { if (workflow instanceof AgentWorkflow) { - const { userInput, chatHistory } = agentInput; - const context = workflow.run(userInput, { chatHistory }); - const { stream, dataStream } = await createStreamFromWorkflowContext( - // eslint-disable-next-line @typescript-eslint/no-explicit-any - context as any, - ); - return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream }); + return runAgentWorkflow(workflow, agentInput); } - - const context = workflow.run(agentInput); - const { stream, dataStream } = await createStreamFromWorkflowContext(context); - return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream }); + return runNormalWorkflow(workflow, agentInput); } -async function createStreamFromWorkflowContext<Input, Output, Context>( - context: WorkflowContext<Input, Output, Context>, -): Promise<{ stream: ReadableStream<EngineResponse>; dataStream: StreamData }> { - const dataStream = new StreamData(); - let generator: AsyncGenerator<ChatResponseChunk> | undefined; +async function runAgentWorkflow( + workflow: AgentWorkflow, + agentInput: AgentInput, +) { + const { userInput, chatHistory } = agentInput; + const context = workflow.run(userInput, { chatHistory }); - const closeStreams = (controller: ReadableStreamDefaultController) => { - controller.close(); - dataStream.close(); - }; + const dataStream = new StreamData(); const stream = new ReadableStream<EngineResponse>({ - async start(controller) { - // Kickstart the stream by sending an empty string - controller.enqueue({ delta: "" } as EngineResponse); - }, async pull(controller) { - while (!generator) { - // get next event from workflow context - const { value: event, done } = - await context[Symbol.asyncIterator]().next(); - if (done) { - closeStreams(controller); - return; + for await (const event of context) { + if (event instanceof AgentStream) { + // for agent workflow, get the delta from AgentStream event and enqueue it + const delta = event.data.delta; + if (delta) { + controller.enqueue({ delta } as EngineResponse); + } + } else { + dataStream.appendMessageAnnotation(event.data as JSONValue); } - generator = handleEvent(event, dataStream); - } - - const { value: chunk, done } = await generator.next(); - if (done) { - closeStreams(controller); - return; - } - const delta = chunk.delta ?? ""; - if (delta) { - controller.enqueue({ delta } as EngineResponse); } + controller.close(); + dataStream.close(); }, }); - return { stream, dataStream }; + return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream }); } -function handleEvent( - event: WorkflowEvent<unknown>, - dataStream: StreamData, -): AsyncGenerator<ChatResponseChunk> | undefined { - // Handle for StopEvent - if (event instanceof StopEvent) { - return event.data as AsyncGenerator<ChatResponseChunk>; - } else { - dataStream.appendMessageAnnotation(event.data as JSONValue); - } +async function runNormalWorkflow( + workflow: Workflow<null, AgentInput, ChatResponseChunk>, + agentInput: AgentInput, +) { + const context = workflow.run(agentInput); + const dataStream = new StreamData(); + + const stream = new ReadableStream<EngineResponse>({ + async pull(controller) { + for await (const event of context) { + if (event instanceof StopEvent) { + // for normal workflow, the event data from StopEvent is a generator of ChatResponseChunk + // iterate over the generator and enqueue the delta of each chunk + const generator = event.data as AsyncGenerator<ChatResponseChunk>; + for await (const chunk of generator) { + controller.enqueue({ delta: chunk.delta } as EngineResponse); + } + } else { + // append data of other events to the data stream as message annotations + dataStream.appendMessageAnnotation(event.data as JSONValue); + } + } + controller.close(); + dataStream.close(); + }, + }); + + return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream }); }