Skip to content
Snippets Groups Projects
Unverified Commit e072c453 authored by Alex Yang's avatar Alex Yang Committed by GitHub
Browse files

fix: remove non-standard API `pipeline` (#850)

parent 51241865
No related branches found
No related tags found
No related merge requests found
---
"llamaindex": patch
"@llamaindex/env": patch
---
fix: remove non-standard API `pipeline`
import { import { ReadableStream, TransformStream, randomUUID } from "@llamaindex/env";
ReadableStream,
TransformStream,
pipeline,
randomUUID,
} from "@llamaindex/env";
import { Settings } from "../Settings.js"; import { Settings } from "../Settings.js";
import { import {
type ChatEngine, type ChatEngine,
...@@ -339,46 +334,36 @@ export abstract class AgentRunner< ...@@ -339,46 +334,36 @@ export abstract class AgentRunner<
| ReadableStream<AgentStreamChatResponse<AdditionalMessageOptions>> | ReadableStream<AgentStreamChatResponse<AdditionalMessageOptions>>
> { > {
const task = this.createTask(params.message, !!params.stream); const task = this.createTask(params.message, !!params.stream);
const stepOutput = await pipeline( for await (const stepOutput of task) {
task, // update chat history for each round
async ( this.#chatHistory = [...stepOutput.taskStep.context.store.messages];
iter: AsyncIterable< if (stepOutput.isLast) {
TaskStepOutput<AI, Store, AdditionalMessageOptions> const { output, taskStep } = stepOutput;
>, if (isAsyncIterable(output)) {
) => { return output.pipeThrough<
for await (const stepOutput of iter) { AgentStreamChatResponse<AdditionalMessageOptions>
// update chat history for each round >(
this.#chatHistory = [...stepOutput.taskStep.context.store.messages]; new TransformStream({
if (stepOutput.isLast) { transform(chunk, controller) {
return stepOutput; controller.enqueue({
} response: chunk,
} get sources() {
throw new Error("Task did not complete"); return [...taskStep.context.store.toolOutputs];
}, },
); });
const { output, taskStep } = stepOutput;
if (isAsyncIterable(output)) {
return output.pipeThrough<
AgentStreamChatResponse<AdditionalMessageOptions>
>(
new TransformStream({
transform(chunk, controller) {
controller.enqueue({
response: chunk,
get sources() {
return [...taskStep.context.store.toolOutputs];
}, },
}); }),
}, );
}), } else {
); return {
} else { response: output,
return { get sources() {
response: output, return [...taskStep.context.store.toolOutputs];
get sources() { },
return [...taskStep.context.store.toolOutputs]; } satisfies AgentChatResponse<AdditionalMessageOptions>;
}, }
} satisfies AgentChatResponse<AdditionalMessageOptions>; }
} }
throw new Error("Task ended without a last step.");
} }
} }
import { pipeline, ReadableStream } from "@llamaindex/env"; import { ReadableStream } from "@llamaindex/env";
import { Settings } from "../Settings.js";
import { stringifyJSONToMessageContent } from "../internal/utils.js"; import { stringifyJSONToMessageContent } from "../internal/utils.js";
import type { import type {
ChatResponseChunk, ChatResponseChunk,
...@@ -8,7 +9,6 @@ import type { ...@@ -8,7 +9,6 @@ import type {
} from "../llm/index.js"; } from "../llm/index.js";
import { OpenAI } from "../llm/openai.js"; import { OpenAI } from "../llm/openai.js";
import { ObjectRetriever } from "../objects/index.js"; import { ObjectRetriever } from "../objects/index.js";
import { Settings } from "../Settings.js";
import type { BaseToolWithCall } from "../types.js"; import type { BaseToolWithCall } from "../types.js";
import { AgentRunner, AgentWorker, type AgentParamsBase } from "./base.js"; import { AgentRunner, AgentWorker, type AgentParamsBase } from "./base.js";
import type { TaskHandler } from "./types.js"; import type { TaskHandler } from "./types.js";
...@@ -130,22 +130,14 @@ export class OpenAIAgent extends AgentRunner<OpenAI> { ...@@ -130,22 +130,14 @@ export class OpenAIAgent extends AgentRunner<OpenAI> {
if (hasToolCall) { if (hasToolCall) {
// you need to consume the response to get the full toolCalls // you need to consume the response to get the full toolCalls
const toolCalls = await pipeline( const toolCalls = new Map<string, ToolCall | PartialToolCall>();
pipStream, for await (const chunk of pipStream) {
async ( if (chunk.options && "toolCall" in chunk.options) {
iter: AsyncIterable<ChatResponseChunk<ToolCallLLMMessageOptions>>, const toolCall = chunk.options.toolCall;
) => { toolCalls.set(toolCall.id, toolCall);
const toolCalls = new Map<string, ToolCall | PartialToolCall>(); }
for await (const chunk of iter) { }
if (chunk.options && "toolCall" in chunk.options) { for (const toolCall of toolCalls.values()) {
const toolCall = chunk.options.toolCall;
toolCalls.set(toolCall.id, toolCall);
}
}
return [...toolCalls.values()];
},
);
for (const toolCall of toolCalls) {
const targetTool = tools.find( const targetTool = tools.find(
(tool) => tool.metadata.name === toolCall.name, (tool) => tool.metadata.name === toolCall.name,
); );
......
...@@ -71,7 +71,6 @@ ...@@ -71,7 +71,6 @@
"@swc/core": "^1.5.5", "@swc/core": "^1.5.5",
"concurrently": "^8.2.2", "concurrently": "^8.2.2",
"pathe": "^1.1.2", "pathe": "^1.1.2",
"readable-stream": "^4.5.2",
"vitest": "^1.6.0" "vitest": "^1.6.0"
}, },
"dependencies": { "dependencies": {
...@@ -80,8 +79,7 @@ ...@@ -80,8 +79,7 @@
}, },
"peerDependencies": { "peerDependencies": {
"@aws-crypto/sha256-js": "^5.2.0", "@aws-crypto/sha256-js": "^5.2.0",
"pathe": "^1.1.2", "pathe": "^1.1.2"
"readable-stream": "^4.5.2"
}, },
"peerDependenciesMeta": { "peerDependenciesMeta": {
"@aws-crypto/sha256-js": { "@aws-crypto/sha256-js": {
...@@ -89,9 +87,6 @@ ...@@ -89,9 +87,6 @@
}, },
"pathe": { "pathe": {
"optional": true "optional": true
},
"readable-stream": {
"optional": true
} }
} }
} }
...@@ -15,7 +15,6 @@ import { ok } from "node:assert"; ...@@ -15,7 +15,6 @@ import { ok } from "node:assert";
import { createHash, randomUUID } from "node:crypto"; import { createHash, randomUUID } from "node:crypto";
import { EOL } from "node:os"; import { EOL } from "node:os";
import path from "node:path"; import path from "node:path";
import { pipeline } from "node:stream/promises";
import { import {
ReadableStream, ReadableStream,
TransformStream, TransformStream,
...@@ -45,6 +44,5 @@ export { ...@@ -45,6 +44,5 @@ export {
fs, fs,
ok, ok,
path, path,
pipeline,
randomUUID, randomUUID,
}; };
...@@ -9,13 +9,9 @@ ...@@ -9,13 +9,9 @@
*/ */
import { Sha256 } from "@aws-crypto/sha256-js"; import { Sha256 } from "@aws-crypto/sha256-js";
import pathe from "pathe"; import pathe from "pathe";
// @ts-expect-error
import { promises } from "readable-stream";
import { fs } from "./fs/memory.js"; import { fs } from "./fs/memory.js";
const { pipeline } = promises; export { fs, pathe as path };
export { fs, pathe as path, pipeline };
export interface SHA256 { export interface SHA256 {
update(data: string | Uint8Array): void; update(data: string | Uint8Array): void;
......
...@@ -652,9 +652,6 @@ importers: ...@@ -652,9 +652,6 @@ importers:
pathe: pathe:
specifier: ^1.1.2 specifier: ^1.1.2
version: 1.1.2 version: 1.1.2
readable-stream:
specifier: ^4.5.2
version: 4.5.2
vitest: vitest:
specifier: ^1.6.0 specifier: ^1.6.0
version: 1.6.0(@types/node@20.12.11)(terser@5.31.0) version: 1.6.0(@types/node@20.12.11)(terser@5.31.0)
...@@ -3915,9 +3912,6 @@ packages: ...@@ -3915,9 +3912,6 @@ packages:
buffer@5.7.1: buffer@5.7.1:
resolution: {integrity: sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==} resolution: {integrity: sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==}
   
buffer@6.0.3:
resolution: {integrity: sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==}
builtin-modules@3.3.0: builtin-modules@3.3.0:
resolution: {integrity: sha512-zhaCDicdLuWN5UbN5IMnFqNMhNfo919sH85y2/ea+5Yg9TsTkeZxpL+JLbp6cgYFS4sRLp3YV4S6yDuqVWHYOw==} resolution: {integrity: sha512-zhaCDicdLuWN5UbN5IMnFqNMhNfo919sH85y2/ea+5Yg9TsTkeZxpL+JLbp6cgYFS4sRLp3YV4S6yDuqVWHYOw==}
engines: {node: '>=6'} engines: {node: '>=6'}
...@@ -7907,10 +7901,6 @@ packages: ...@@ -7907,10 +7901,6 @@ packages:
process-nextick-args@2.0.1: process-nextick-args@2.0.1:
resolution: {integrity: sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==} resolution: {integrity: sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==}
   
process@0.11.10:
resolution: {integrity: sha512-cdGef/drWFoydD1JsMzuFf8100nZl+GT+yacc2bEced5f9Rjk4z+WtFUTBu9PhOi9j/jfmBPu0mMEY4wIdAF8A==}
engines: {node: '>= 0.6.0'}
prompts@2.4.2: prompts@2.4.2:
resolution: {integrity: sha512-NxNv/kLguCA7p3jE8oL2aEBsrJWgAakBpgmgK6lpPWV+WuOmY6r2/zbAVnP+T8bQlA0nzHXSJSJW0Hq7ylaD2Q==} resolution: {integrity: sha512-NxNv/kLguCA7p3jE8oL2aEBsrJWgAakBpgmgK6lpPWV+WuOmY6r2/zbAVnP+T8bQlA0nzHXSJSJW0Hq7ylaD2Q==}
engines: {node: '>= 6'} engines: {node: '>= 6'}
...@@ -8162,10 +8152,6 @@ packages: ...@@ -8162,10 +8152,6 @@ packages:
resolution: {integrity: sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==} resolution: {integrity: sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==}
engines: {node: '>= 6'} engines: {node: '>= 6'}
   
readable-stream@4.5.2:
resolution: {integrity: sha512-yjavECdqeZ3GLXNgRXgeQEdz9fvDDkNKyHnbHRFtOr7/LcfgBcmct7t/ET+HaCTqfh06OzoAxrkN/IfjJBVe+g==}
engines: {node: ^12.22.0 || ^14.17.0 || >=16.0.0}
readable-web-to-node-stream@3.0.2: readable-web-to-node-stream@3.0.2:
resolution: {integrity: sha512-ePeK6cc1EcKLEhJFt/AebMCLL+GgSKhuygrZ/GLaKZYEecIgIECf4UaUuaByiGtzckwR4ain9VzUh95T1exYGw==} resolution: {integrity: sha512-ePeK6cc1EcKLEhJFt/AebMCLL+GgSKhuygrZ/GLaKZYEecIgIECf4UaUuaByiGtzckwR4ain9VzUh95T1exYGw==}
engines: {node: '>=8'} engines: {node: '>=8'}
...@@ -13975,11 +13961,6 @@ snapshots: ...@@ -13975,11 +13961,6 @@ snapshots:
base64-js: 1.5.1 base64-js: 1.5.1
ieee754: 1.2.1 ieee754: 1.2.1
   
buffer@6.0.3:
dependencies:
base64-js: 1.5.1
ieee754: 1.2.1
builtin-modules@3.3.0: {} builtin-modules@3.3.0: {}
   
bunchee@5.1.5(typescript@5.4.5): bunchee@5.1.5(typescript@5.4.5):
...@@ -18689,8 +18670,6 @@ snapshots: ...@@ -18689,8 +18670,6 @@ snapshots:
   
process-nextick-args@2.0.1: {} process-nextick-args@2.0.1: {}
   
process@0.11.10: {}
prompts@2.4.2: prompts@2.4.2:
dependencies: dependencies:
kleur: 3.0.3 kleur: 3.0.3
...@@ -19021,14 +19000,6 @@ snapshots: ...@@ -19021,14 +19000,6 @@ snapshots:
string_decoder: 1.3.0 string_decoder: 1.3.0
util-deprecate: 1.0.2 util-deprecate: 1.0.2
   
readable-stream@4.5.2:
dependencies:
abort-controller: 3.0.0
buffer: 6.0.3
events: 3.3.0
process: 0.11.10
string_decoder: 1.3.0
readable-web-to-node-stream@3.0.2: readable-web-to-node-stream@3.0.2:
dependencies: dependencies:
readable-stream: 3.6.2 readable-stream: 3.6.2
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment