From e072c4539367ba0b057c914213acfddb8969c88c Mon Sep 17 00:00:00 2001
From: Alex Yang <himself65@outlook.com>
Date: Thu, 16 May 2024 16:31:48 -0700
Subject: [PATCH] fix: remove non-standard API `pipeline` (#850)

---
 .changeset/calm-owls-notice.md    |  6 +++
 packages/core/src/agent/base.ts   | 75 +++++++++++++------------------
 packages/core/src/agent/openai.ts | 28 +++++-------
 packages/env/package.json         |  7 +--
 packages/env/src/index.ts         |  2 -
 packages/env/src/polyfill.ts      |  6 +--
 pnpm-lock.yaml                    | 29 ------------
 7 files changed, 48 insertions(+), 105 deletions(-)
 create mode 100644 .changeset/calm-owls-notice.md

diff --git a/.changeset/calm-owls-notice.md b/.changeset/calm-owls-notice.md
new file mode 100644
index 000000000..716834264
--- /dev/null
+++ b/.changeset/calm-owls-notice.md
@@ -0,0 +1,6 @@
+---
+"llamaindex": patch
+"@llamaindex/env": patch
+---
+
+fix: remove non-standard API `pipeline`
diff --git a/packages/core/src/agent/base.ts b/packages/core/src/agent/base.ts
index bf521e9e7..1005c1559 100644
--- a/packages/core/src/agent/base.ts
+++ b/packages/core/src/agent/base.ts
@@ -1,9 +1,4 @@
-import {
-  ReadableStream,
-  TransformStream,
-  pipeline,
-  randomUUID,
-} from "@llamaindex/env";
+import { ReadableStream, TransformStream, randomUUID } from "@llamaindex/env";
 import { Settings } from "../Settings.js";
 import {
   type ChatEngine,
@@ -339,46 +334,36 @@ export abstract class AgentRunner<
     | ReadableStream<AgentStreamChatResponse<AdditionalMessageOptions>>
   > {
     const task = this.createTask(params.message, !!params.stream);
-    const stepOutput = await pipeline(
-      task,
-      async (
-        iter: AsyncIterable<
-          TaskStepOutput<AI, Store, AdditionalMessageOptions>
-        >,
-      ) => {
-        for await (const stepOutput of iter) {
-          // update chat history for each round
-          this.#chatHistory = [...stepOutput.taskStep.context.store.messages];
-          if (stepOutput.isLast) {
-            return stepOutput;
-          }
-        }
-        throw new Error("Task did not complete");
-      },
-    );
-    const { output, taskStep } = stepOutput;
-    if (isAsyncIterable(output)) {
-      return output.pipeThrough<
-        AgentStreamChatResponse<AdditionalMessageOptions>
-      >(
-        new TransformStream({
-          transform(chunk, controller) {
-            controller.enqueue({
-              response: chunk,
-              get sources() {
-                return [...taskStep.context.store.toolOutputs];
+    for await (const stepOutput of task) {
+      // update chat history for each round
+      this.#chatHistory = [...stepOutput.taskStep.context.store.messages];
+      if (stepOutput.isLast) {
+        const { output, taskStep } = stepOutput;
+        if (isAsyncIterable(output)) {
+          return output.pipeThrough<
+            AgentStreamChatResponse<AdditionalMessageOptions>
+          >(
+            new TransformStream({
+              transform(chunk, controller) {
+                controller.enqueue({
+                  response: chunk,
+                  get sources() {
+                    return [...taskStep.context.store.toolOutputs];
+                  },
+                });
               },
-            });
-          },
-        }),
-      );
-    } else {
-      return {
-        response: output,
-        get sources() {
-          return [...taskStep.context.store.toolOutputs];
-        },
-      } satisfies AgentChatResponse<AdditionalMessageOptions>;
+            }),
+          );
+        } else {
+          return {
+            response: output,
+            get sources() {
+              return [...taskStep.context.store.toolOutputs];
+            },
+          } satisfies AgentChatResponse<AdditionalMessageOptions>;
+        }
+      }
     }
+    throw new Error("Task ended without a last step.");
   }
 }
diff --git a/packages/core/src/agent/openai.ts b/packages/core/src/agent/openai.ts
index fec65b7dd..61b73193e 100644
--- a/packages/core/src/agent/openai.ts
+++ b/packages/core/src/agent/openai.ts
@@ -1,4 +1,5 @@
-import { pipeline, ReadableStream } from "@llamaindex/env";
+import { ReadableStream } from "@llamaindex/env";
+import { Settings } from "../Settings.js";
 import { stringifyJSONToMessageContent } from "../internal/utils.js";
 import type {
   ChatResponseChunk,
@@ -8,7 +9,6 @@ import type {
 } from "../llm/index.js";
 import { OpenAI } from "../llm/openai.js";
 import { ObjectRetriever } from "../objects/index.js";
-import { Settings } from "../Settings.js";
 import type { BaseToolWithCall } from "../types.js";
 import { AgentRunner, AgentWorker, type AgentParamsBase } from "./base.js";
 import type { TaskHandler } from "./types.js";
@@ -130,22 +130,14 @@ export class OpenAIAgent extends AgentRunner<OpenAI> {
 
       if (hasToolCall) {
         // you need to consume the response to get the full toolCalls
-        const toolCalls = await pipeline(
-          pipStream,
-          async (
-            iter: AsyncIterable<ChatResponseChunk<ToolCallLLMMessageOptions>>,
-          ) => {
-            const toolCalls = new Map<string, ToolCall | PartialToolCall>();
-            for await (const chunk of iter) {
-              if (chunk.options && "toolCall" in chunk.options) {
-                const toolCall = chunk.options.toolCall;
-                toolCalls.set(toolCall.id, toolCall);
-              }
-            }
-            return [...toolCalls.values()];
-          },
-        );
-        for (const toolCall of toolCalls) {
+        const toolCalls = new Map<string, ToolCall | PartialToolCall>();
+        for await (const chunk of pipStream) {
+          if (chunk.options && "toolCall" in chunk.options) {
+            const toolCall = chunk.options.toolCall;
+            toolCalls.set(toolCall.id, toolCall);
+          }
+        }
+        for (const toolCall of toolCalls.values()) {
           const targetTool = tools.find(
             (tool) => tool.metadata.name === toolCall.name,
           );
diff --git a/packages/env/package.json b/packages/env/package.json
index 5ceb7fa83..da45517dc 100644
--- a/packages/env/package.json
+++ b/packages/env/package.json
@@ -71,7 +71,6 @@
     "@swc/core": "^1.5.5",
     "concurrently": "^8.2.2",
     "pathe": "^1.1.2",
-    "readable-stream": "^4.5.2",
     "vitest": "^1.6.0"
   },
   "dependencies": {
@@ -80,8 +79,7 @@
   },
   "peerDependencies": {
     "@aws-crypto/sha256-js": "^5.2.0",
-    "pathe": "^1.1.2",
-    "readable-stream": "^4.5.2"
+    "pathe": "^1.1.2"
   },
   "peerDependenciesMeta": {
     "@aws-crypto/sha256-js": {
@@ -89,9 +87,6 @@
     },
     "pathe": {
       "optional": true
-    },
-    "readable-stream": {
-      "optional": true
     }
   }
 }
diff --git a/packages/env/src/index.ts b/packages/env/src/index.ts
index a2d2c04fc..672182904 100644
--- a/packages/env/src/index.ts
+++ b/packages/env/src/index.ts
@@ -15,7 +15,6 @@ import { ok } from "node:assert";
 import { createHash, randomUUID } from "node:crypto";
 import { EOL } from "node:os";
 import path from "node:path";
-import { pipeline } from "node:stream/promises";
 import {
   ReadableStream,
   TransformStream,
@@ -45,6 +44,5 @@ export {
   fs,
   ok,
   path,
-  pipeline,
   randomUUID,
 };
diff --git a/packages/env/src/polyfill.ts b/packages/env/src/polyfill.ts
index 196a0e291..9f47010be 100644
--- a/packages/env/src/polyfill.ts
+++ b/packages/env/src/polyfill.ts
@@ -9,13 +9,9 @@
  */
 import { Sha256 } from "@aws-crypto/sha256-js";
 import pathe from "pathe";
-// @ts-expect-error
-import { promises } from "readable-stream";
 import { fs } from "./fs/memory.js";
 
-const { pipeline } = promises;
-
-export { fs, pathe as path, pipeline };
+export { fs, pathe as path };
 
 export interface SHA256 {
   update(data: string | Uint8Array): void;
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index f197b38fc..aafc53cb7 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -652,9 +652,6 @@ importers:
       pathe:
         specifier: ^1.1.2
         version: 1.1.2
-      readable-stream:
-        specifier: ^4.5.2
-        version: 4.5.2
       vitest:
         specifier: ^1.6.0
         version: 1.6.0(@types/node@20.12.11)(terser@5.31.0)
@@ -3915,9 +3912,6 @@ packages:
   buffer@5.7.1:
     resolution: {integrity: sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==}
 
-  buffer@6.0.3:
-    resolution: {integrity: sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==}
-
   builtin-modules@3.3.0:
     resolution: {integrity: sha512-zhaCDicdLuWN5UbN5IMnFqNMhNfo919sH85y2/ea+5Yg9TsTkeZxpL+JLbp6cgYFS4sRLp3YV4S6yDuqVWHYOw==}
     engines: {node: '>=6'}
@@ -7907,10 +7901,6 @@ packages:
   process-nextick-args@2.0.1:
     resolution: {integrity: sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==}
 
-  process@0.11.10:
-    resolution: {integrity: sha512-cdGef/drWFoydD1JsMzuFf8100nZl+GT+yacc2bEced5f9Rjk4z+WtFUTBu9PhOi9j/jfmBPu0mMEY4wIdAF8A==}
-    engines: {node: '>= 0.6.0'}
-
   prompts@2.4.2:
     resolution: {integrity: sha512-NxNv/kLguCA7p3jE8oL2aEBsrJWgAakBpgmgK6lpPWV+WuOmY6r2/zbAVnP+T8bQlA0nzHXSJSJW0Hq7ylaD2Q==}
     engines: {node: '>= 6'}
@@ -8162,10 +8152,6 @@ packages:
     resolution: {integrity: sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==}
     engines: {node: '>= 6'}
 
-  readable-stream@4.5.2:
-    resolution: {integrity: sha512-yjavECdqeZ3GLXNgRXgeQEdz9fvDDkNKyHnbHRFtOr7/LcfgBcmct7t/ET+HaCTqfh06OzoAxrkN/IfjJBVe+g==}
-    engines: {node: ^12.22.0 || ^14.17.0 || >=16.0.0}
-
   readable-web-to-node-stream@3.0.2:
     resolution: {integrity: sha512-ePeK6cc1EcKLEhJFt/AebMCLL+GgSKhuygrZ/GLaKZYEecIgIECf4UaUuaByiGtzckwR4ain9VzUh95T1exYGw==}
     engines: {node: '>=8'}
@@ -13975,11 +13961,6 @@ snapshots:
       base64-js: 1.5.1
       ieee754: 1.2.1
 
-  buffer@6.0.3:
-    dependencies:
-      base64-js: 1.5.1
-      ieee754: 1.2.1
-
   builtin-modules@3.3.0: {}
 
   bunchee@5.1.5(typescript@5.4.5):
@@ -18689,8 +18670,6 @@ snapshots:
 
   process-nextick-args@2.0.1: {}
 
-  process@0.11.10: {}
-
   prompts@2.4.2:
     dependencies:
       kleur: 3.0.3
@@ -19021,14 +19000,6 @@ snapshots:
       string_decoder: 1.3.0
       util-deprecate: 1.0.2
 
-  readable-stream@4.5.2:
-    dependencies:
-      abort-controller: 3.0.0
-      buffer: 6.0.3
-      events: 3.3.0
-      process: 0.11.10
-      string_decoder: 1.3.0
-
   readable-web-to-node-stream@3.0.2:
     dependencies:
       readable-stream: 3.6.2
-- 
GitLab