Skip to content
Snippets Groups Projects
Unverified Commit 96f8f402 authored by Alex Yang's avatar Alex Yang Committed by GitHub
Browse files

fix: agent stream (#710)

parent 1c698df6
No related branches found
No related tags found
No related merge requests found
---
"llamaindex": patch
---
fix: agent stream
...@@ -10,8 +10,9 @@ ...@@ -10,8 +10,9 @@
"name": "Debug Example", "name": "Debug Example",
"skipFiles": ["<node_internals>/**"], "skipFiles": ["<node_internals>/**"],
"runtimeExecutable": "pnpm", "runtimeExecutable": "pnpm",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}/examples", "cwd": "${workspaceFolder}/examples",
"runtimeArgs": ["ts-node", "${fileBasename}"] "runtimeArgs": ["npx", "tsx", "${file}"]
} }
] ]
} }
...@@ -186,10 +186,13 @@ export class OpenAIAgentWorker ...@@ -186,10 +186,13 @@ export class OpenAIAgentWorker
for await (const chunk of stream) { for await (const chunk of stream) {
controller.enqueue(chunk); controller.enqueue(chunk);
} }
controller.close();
}, },
}); });
const [pipStream, finalStream] = responseChunkStream.tee(); const [pipStream, finalStream] = responseChunkStream.tee();
const { value } = await pipStream.getReader().read(); const reader = pipStream.getReader();
const { value } = await reader.read();
reader.releaseLock();
if (value === undefined) { if (value === undefined) {
throw new Error("first chunk value is undefined, this should not happen"); throw new Error("first chunk value is undefined, this should not happen");
} }
...@@ -212,14 +215,16 @@ export class OpenAIAgentWorker ...@@ -212,14 +215,16 @@ export class OpenAIAgentWorker
options: value.options, options: value.options,
}); });
} else { } else {
const [responseStream, chunkStream] = finalStream.tee();
let content = ""; let content = "";
return pipeline( return new StreamingAgentChatResponse(
finalStream.pipeThrough<Response>({ responseStream.pipeThrough<Response>({
readable: new ReadableStream({ readable: new ReadableStream({
async start(controller) { async start(controller) {
for await (const chunk of finalStream) { for await (const chunk of chunkStream) {
controller.enqueue(new Response(chunk.delta)); controller.enqueue(new Response(chunk.delta));
} }
controller.close();
}, },
}), }),
writable: new WritableStream({ writable: new WritableStream({
...@@ -234,12 +239,7 @@ export class OpenAIAgentWorker ...@@ -234,12 +239,7 @@ export class OpenAIAgentWorker
}, },
}), }),
}), }),
async (iterator: AsyncIterable<Response>) => { task.extraState.sources,
return new StreamingAgentChatResponse(
iterator,
task.extraState.sources,
);
},
); );
} }
} }
......
...@@ -161,7 +161,7 @@ export class AgentRunner extends BaseAgentRunner { ...@@ -161,7 +161,7 @@ export class AgentRunner extends BaseAgentRunner {
const task = this.state.getTask(taskId); const task = this.state.getTask(taskId);
const curStep = step || this.state.getStepQueue(taskId).shift(); const curStep = step || this.state.getStepQueue(taskId).shift();
let curStepOutput; let curStepOutput: TaskStepOutput;
if (!curStep) { if (!curStep) {
throw new Error(`No step found for task ${taskId}`); throw new Error(`No step found for task ${taskId}`);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment