Skip to content
Snippets Groups Projects
Commit 78de0419 authored by thucpn's avatar thucpn
Browse files

refactor: runAgentWorkflow and runNormalWorkflow

parent 900123c1
No related branches found
No related tags found
No related merge requests found
import { LlamaIndexAdapter, StreamData, type JSONValue } from "ai"; import { LlamaIndexAdapter, StreamData, type JSONValue } from "ai";
import { import {
AgentStream,
AgentWorkflow, AgentWorkflow,
EngineResponse, EngineResponse,
StopEvent, StopEvent,
WorkflowContext, Workflow,
WorkflowEvent,
type ChatResponseChunk, type ChatResponseChunk,
} from "llamaindex"; } from "llamaindex";
import { ReadableStream } from "stream/web"; import { ReadableStream } from "stream/web";
...@@ -15,71 +15,67 @@ export async function runWorkflow( ...@@ -15,71 +15,67 @@ export async function runWorkflow(
agentInput: AgentInput, agentInput: AgentInput,
) { ) {
if (workflow instanceof AgentWorkflow) { if (workflow instanceof AgentWorkflow) {
const { userInput, chatHistory } = agentInput; return runAgentWorkflow(workflow, agentInput);
const context = workflow.run(userInput, { chatHistory });
const { stream, dataStream } = await createStreamFromWorkflowContext(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
context as any,
);
return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream });
} }
return runNormalWorkflow(workflow, agentInput);
const context = workflow.run(agentInput);
const { stream, dataStream } = await createStreamFromWorkflowContext(context);
return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream });
} }
async function createStreamFromWorkflowContext<Input, Output, Context>( async function runAgentWorkflow(
context: WorkflowContext<Input, Output, Context>, workflow: AgentWorkflow,
): Promise<{ stream: ReadableStream<EngineResponse>; dataStream: StreamData }> { agentInput: AgentInput,
const dataStream = new StreamData(); ) {
let generator: AsyncGenerator<ChatResponseChunk> | undefined; const { userInput, chatHistory } = agentInput;
const context = workflow.run(userInput, { chatHistory });
const closeStreams = (controller: ReadableStreamDefaultController) => { const dataStream = new StreamData();
controller.close();
dataStream.close();
};
const stream = new ReadableStream<EngineResponse>({ const stream = new ReadableStream<EngineResponse>({
async start(controller) {
// Kickstart the stream by sending an empty string
controller.enqueue({ delta: "" } as EngineResponse);
},
async pull(controller) { async pull(controller) {
while (!generator) { for await (const event of context) {
// get next event from workflow context if (event instanceof AgentStream) {
const { value: event, done } = // for agent workflow, get the delta from AgentStream event and enqueue it
await context[Symbol.asyncIterator]().next(); const delta = event.data.delta;
if (done) { if (delta) {
closeStreams(controller); controller.enqueue({ delta } as EngineResponse);
return; }
} else {
dataStream.appendMessageAnnotation(event.data as JSONValue);
} }
generator = handleEvent(event, dataStream);
}
const { value: chunk, done } = await generator.next();
if (done) {
closeStreams(controller);
return;
}
const delta = chunk.delta ?? "";
if (delta) {
controller.enqueue({ delta } as EngineResponse);
} }
controller.close();
dataStream.close();
}, },
}); });
return { stream, dataStream }; return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream });
} }
function handleEvent( async function runNormalWorkflow(
event: WorkflowEvent<unknown>, workflow: Workflow<null, AgentInput, ChatResponseChunk>,
dataStream: StreamData, agentInput: AgentInput,
): AsyncGenerator<ChatResponseChunk> | undefined { ) {
// Handle for StopEvent const context = workflow.run(agentInput);
if (event instanceof StopEvent) { const dataStream = new StreamData();
return event.data as AsyncGenerator<ChatResponseChunk>;
} else { const stream = new ReadableStream<EngineResponse>({
dataStream.appendMessageAnnotation(event.data as JSONValue); async pull(controller) {
} for await (const event of context) {
if (event instanceof StopEvent) {
// for normal workflow, the event data from StopEvent is a generator of ChatResponseChunk
// iterate over the generator and enqueue the delta of each chunk
const generator = event.data as AsyncGenerator<ChatResponseChunk>;
for await (const chunk of generator) {
controller.enqueue({ delta: chunk.delta } as EngineResponse);
}
} else {
// append data of other events to the data stream as message annotations
dataStream.appendMessageAnnotation(event.data as JSONValue);
}
}
controller.close();
dataStream.close();
},
});
return LlamaIndexAdapter.toDataStreamResponse(stream, { data: dataStream });
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment