From 96f8f402913123fc984376f582ccd58803f453d6 Mon Sep 17 00:00:00 2001 From: Alex Yang <himself65@outlook.com> Date: Wed, 10 Apr 2024 23:22:11 -0500 Subject: [PATCH] fix: agent stream (#710) --- .changeset/pink-tigers-warn.md | 5 +++++ .vscode/launch.json | 3 ++- packages/core/src/agent/openai/worker.ts | 20 ++++++++++---------- packages/core/src/agent/runner/base.ts | 2 +- 4 files changed, 18 insertions(+), 12 deletions(-) create mode 100644 .changeset/pink-tigers-warn.md diff --git a/.changeset/pink-tigers-warn.md b/.changeset/pink-tigers-warn.md new file mode 100644 index 000000000..628239ad6 --- /dev/null +++ b/.changeset/pink-tigers-warn.md @@ -0,0 +1,5 @@ +--- +"llamaindex": patch +--- + +fix: agent stream diff --git a/.vscode/launch.json b/.vscode/launch.json index 91fc7023c..03e70f01a 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,8 +10,9 @@ "name": "Debug Example", "skipFiles": ["<node_internals>/**"], "runtimeExecutable": "pnpm", + "console": "integratedTerminal", "cwd": "${workspaceFolder}/examples", - "runtimeArgs": ["ts-node", "${fileBasename}"] + "runtimeArgs": ["npx", "tsx", "${file}"] } ] } diff --git a/packages/core/src/agent/openai/worker.ts b/packages/core/src/agent/openai/worker.ts index 326ffea6c..d2a776817 100644 --- a/packages/core/src/agent/openai/worker.ts +++ b/packages/core/src/agent/openai/worker.ts @@ -186,10 +186,13 @@ export class OpenAIAgentWorker for await (const chunk of stream) { controller.enqueue(chunk); } + controller.close(); }, }); const [pipStream, finalStream] = responseChunkStream.tee(); - const { value } = await pipStream.getReader().read(); + const reader = pipStream.getReader(); + const { value } = await reader.read(); + reader.releaseLock(); if (value === undefined) { throw new Error("first chunk value is undefined, this should not happen"); } @@ -212,14 +215,16 @@ export class OpenAIAgentWorker options: value.options, }); } else { + const [responseStream, chunkStream] = finalStream.tee(); let content = ""; - return pipeline( - finalStream.pipeThrough<Response>({ + return new StreamingAgentChatResponse( + responseStream.pipeThrough<Response>({ readable: new ReadableStream({ async start(controller) { - for await (const chunk of finalStream) { + for await (const chunk of chunkStream) { controller.enqueue(new Response(chunk.delta)); } + controller.close(); }, }), writable: new WritableStream({ @@ -234,12 +239,7 @@ export class OpenAIAgentWorker }, }), }), - async (iterator: AsyncIterable<Response>) => { - return new StreamingAgentChatResponse( - iterator, - task.extraState.sources, - ); - }, + task.extraState.sources, ); } } diff --git a/packages/core/src/agent/runner/base.ts b/packages/core/src/agent/runner/base.ts index 47e5469b2..361fe9aa5 100644 --- a/packages/core/src/agent/runner/base.ts +++ b/packages/core/src/agent/runner/base.ts @@ -161,7 +161,7 @@ export class AgentRunner extends BaseAgentRunner { const task = this.state.getTask(taskId); const curStep = step || this.state.getStepQueue(taskId).shift(); - let curStepOutput; + let curStepOutput: TaskStepOutput; if (!curStep) { throw new Error(`No step found for task ${taskId}`); -- GitLab