diff --git a/.changeset/pink-tigers-warn.md b/.changeset/pink-tigers-warn.md new file mode 100644 index 0000000000000000000000000000000000000000..628239ad64c8650f5d780311f672c4eab61601ef --- /dev/null +++ b/.changeset/pink-tigers-warn.md @@ -0,0 +1,5 @@ +--- +"llamaindex": patch +--- + +fix: agent stream diff --git a/.vscode/launch.json b/.vscode/launch.json index 91fc7023c6d240c434ea1159ed8c611fd5c985da..03e70f01aebc0bad8b381e58006ec00a65bdcd9b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,8 +10,9 @@ "name": "Debug Example", "skipFiles": ["<node_internals>/**"], "runtimeExecutable": "pnpm", + "console": "integratedTerminal", "cwd": "${workspaceFolder}/examples", - "runtimeArgs": ["ts-node", "${fileBasename}"] + "runtimeArgs": ["npx", "tsx", "${file}"] } ] } diff --git a/packages/core/src/agent/openai/worker.ts b/packages/core/src/agent/openai/worker.ts index 326ffea6cf88c9aafe0678fe8debd9960963e5ae..d2a77681737aca9837d3d33e973ff377a8a990c1 100644 --- a/packages/core/src/agent/openai/worker.ts +++ b/packages/core/src/agent/openai/worker.ts @@ -186,10 +186,13 @@ export class OpenAIAgentWorker for await (const chunk of stream) { controller.enqueue(chunk); } + controller.close(); }, }); const [pipStream, finalStream] = responseChunkStream.tee(); - const { value } = await pipStream.getReader().read(); + const reader = pipStream.getReader(); + const { value } = await reader.read(); + reader.releaseLock(); if (value === undefined) { throw new Error("first chunk value is undefined, this should not happen"); } @@ -212,14 +215,16 @@ export class OpenAIAgentWorker options: value.options, }); } else { + const [responseStream, chunkStream] = finalStream.tee(); let content = ""; - return pipeline( - finalStream.pipeThrough<Response>({ + return new StreamingAgentChatResponse( + responseStream.pipeThrough<Response>({ readable: new ReadableStream({ async start(controller) { - for await (const chunk of finalStream) { + for await (const chunk of chunkStream) { controller.enqueue(new Response(chunk.delta)); } + controller.close(); }, }), writable: new WritableStream({ @@ -234,12 +239,7 @@ export class OpenAIAgentWorker }, }), }), - async (iterator: AsyncIterable<Response>) => { - return new StreamingAgentChatResponse( - iterator, - task.extraState.sources, - ); - }, + task.extraState.sources, ); } } diff --git a/packages/core/src/agent/runner/base.ts b/packages/core/src/agent/runner/base.ts index 47e5469b2db12a614199a728d6d66c7de9c04cf8..361fe9aa5f3bfd6c4f316d72e573b88a488ae0d8 100644 --- a/packages/core/src/agent/runner/base.ts +++ b/packages/core/src/agent/runner/base.ts @@ -161,7 +161,7 @@ export class AgentRunner extends BaseAgentRunner { const task = this.state.getTask(taskId); const curStep = step || this.state.getStepQueue(taskId).shift(); - let curStepOutput; + let curStepOutput: TaskStepOutput; if (!curStep) { throw new Error(`No step found for task ${taskId}`);