diff --git a/.changeset/stale-dots-search.md b/.changeset/stale-dots-search.md new file mode 100644 index 0000000000000000000000000000000000000000..2e87a8362a58089f694a127040b2afcd9979f5a1 --- /dev/null +++ b/.changeset/stale-dots-search.md @@ -0,0 +1,5 @@ +--- +"llamaindex": patch +--- + +add concurrency management for SimpleDirectoryReader diff --git a/examples/data/parallel/brk-2022-001-024.pdf b/examples/data/parallel/brk-2022-001-024.pdf new file mode 100644 index 0000000000000000000000000000000000000000..761005b41c835ac38cef92450a4a8e4e00e0af82 Binary files /dev/null and b/examples/data/parallel/brk-2022-001-024.pdf differ diff --git a/examples/data/parallel/brk-2022-025-048.pdf b/examples/data/parallel/brk-2022-025-048.pdf new file mode 100644 index 0000000000000000000000000000000000000000..12049bf586574f9d5019890cb0d921181b892e11 Binary files /dev/null and b/examples/data/parallel/brk-2022-025-048.pdf differ diff --git a/examples/data/parallel/brk-2022-049-072.pdf b/examples/data/parallel/brk-2022-049-072.pdf new file mode 100644 index 0000000000000000000000000000000000000000..1b44725f1df1d7a6c70adb1a238bd5e3d7b49b9c Binary files /dev/null and b/examples/data/parallel/brk-2022-049-072.pdf differ diff --git a/examples/data/parallel/brk-2022-073-096.pdf b/examples/data/parallel/brk-2022-073-096.pdf new file mode 100644 index 0000000000000000000000000000000000000000..a0fec57c765e4de15b90191406b4b0f942f00a00 Binary files /dev/null and b/examples/data/parallel/brk-2022-073-096.pdf differ diff --git a/examples/data/parallel/brk-2022-097-120.pdf b/examples/data/parallel/brk-2022-097-120.pdf new file mode 100644 index 0000000000000000000000000000000000000000..1841336e3f2bb6671351d17bdd7a11e790c27b19 Binary files /dev/null and b/examples/data/parallel/brk-2022-097-120.pdf differ diff --git a/examples/data/parallel/brk-2022-121-144.pdf b/examples/data/parallel/brk-2022-121-144.pdf new file mode 100644 index 0000000000000000000000000000000000000000..c83853cb2ca7cdcc115fe5dde329f2156b60f806 Binary files /dev/null and b/examples/data/parallel/brk-2022-121-144.pdf differ diff --git a/examples/readers/package.json b/examples/readers/package.json index 15116d74c732daf080fc35631ce0d96632665535..47a7bb4e0fcdf51b468b7c1cc1aeedbcd68fc1f8 100644 --- a/examples/readers/package.json +++ b/examples/readers/package.json @@ -11,7 +11,7 @@ "start:pdf": "node --import tsx ./src/pdf.ts", "start:llamaparse": "node --import tsx ./src/llamaparse.ts", "start:notion": "node --import tsx ./src/notion.ts", - "start:llamaparse2": "node --import tsx ./src/llamaparse_2.ts" + "start:llamaparse-dir": "node --import tsx ./src/simple-directory-reader-with-llamaparse.ts" }, "dependencies": { "llamaindex": "*" diff --git a/examples/readers/src/llamaparse_2.ts b/examples/readers/src/llamaparse_2.ts deleted file mode 100644 index 369079e9bae39f82023e9e45306d763f5be9bc9c..0000000000000000000000000000000000000000 --- a/examples/readers/src/llamaparse_2.ts +++ /dev/null @@ -1,26 +0,0 @@ -import fs from "fs/promises"; -import { LlamaParseReader } from "llamaindex"; - -async function main() { - // Load PDF using LlamaParse. set apiKey here or in environment variable LLAMA_CLOUD_API_KEY - const reader = new LlamaParseReader({ - resultType: "markdown", - language: "en", - parsingInstruction: - "The provided document is a manga comic book. Most pages do NOT have title. It does not contain tables. Try to reconstruct the dialogue happening in a cohesive way. Output any math equation in LATEX markdown (between $$)", - }); - const documents = await reader.loadData("../data/manga.pdf"); // The manga.pdf in the data folder is just a copy of the TOS, due to copyright laws. You have to place your own. I used "The Manga Guide to Calculus" by Hiroyuki Kojima - - // Assuming documents contain an array of pages or sections - const parsedManga = documents.map((page) => page.text).join("\n---\n"); - - // Output the parsed manga to .md file. Will be placed in ../example/readers/ - try { - await fs.writeFile("./parsedManga.md", parsedManga); - console.log("Output successfully written to parsedManga.md"); - } catch (err) { - console.error("Error writing to file:", err); - } -} - -main().catch(console.error); diff --git a/examples/readers/src/simple-directory-reader-with-llamaparse.ts b/examples/readers/src/simple-directory-reader-with-llamaparse.ts new file mode 100644 index 0000000000000000000000000000000000000000..2c8e8019f948f46d472bc6be46a963c08db81d64 --- /dev/null +++ b/examples/readers/src/simple-directory-reader-with-llamaparse.ts @@ -0,0 +1,35 @@ +import { + LlamaParseReader, + SimpleDirectoryReader, + VectorStoreIndex, +} from "llamaindex"; + +async function main() { + const reader = new SimpleDirectoryReader(); + + const docs = await reader.loadData({ + directoryPath: "../data/parallel", // brk-2022.pdf split into 6 parts + numWorkers: 2, + // set LlamaParse as the default reader for all file types. Set apiKey here or in environment variable LLAMA_CLOUD_API_KEY + overrideReader: new LlamaParseReader({ + language: "en", + resultType: "markdown", + parsingInstruction: + "The provided files is Berkshire Hathaway's 2022 Annual Report. They contain figures, tables and raw data. Capture the data in a structured format. Mathematical equation should be put out as LATEX markdown (between $$).", + }), + }); + + const index = await VectorStoreIndex.fromDocuments(docs); + + // Query the index + const queryEngine = index.asQueryEngine(); + const response = await queryEngine.query({ + query: + "What is the general strategy for shareholder safety outlined in the report? Use a concrete example with numbers", + }); + + // Output response + console.log(response.toString()); +} + +main().catch(console.error); diff --git a/packages/core/package.json b/packages/core/package.json index 457f371c041dfb20b3b5d051cf65e2d9223d8170..9b6289dd8e1196deb0579f4519761153470fc90f 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -50,6 +50,7 @@ "mongodb": "^6.6.1", "notion-md-crawler": "^1.0.0", "openai": "^4.46.0", + "p-limit": "^5.0.0", "papaparse": "^5.4.1", "pathe": "^1.1.2", "pdf2json": "3.0.5", diff --git a/packages/core/src/readers/LlamaParseReader.ts b/packages/core/src/readers/LlamaParseReader.ts index 5bc4934b1e86305dcd2ee9a5c744f2cee5d63f7b..2fc6dbcefadb80190881c4884731252a8078d8ca 100644 --- a/packages/core/src/readers/LlamaParseReader.ts +++ b/packages/core/src/readers/LlamaParseReader.ts @@ -130,6 +130,7 @@ export class LlamaParseReader implements FileReader { gpt4oMode: boolean = false; // The API key for the GPT-4o API. Lowers the cost of parsing. gpt4oApiKey?: string; + // numWorkers is implemented in SimpleDirectoryReader constructor(params: Partial<LlamaParseReader> = {}) { Object.assign(this, params); @@ -156,6 +157,10 @@ export class LlamaParseReader implements FileReader { const data = await fs.readFile(file); const mimeType = await this.getMimeType(data); + if (this.verbose) { + console.log(`Starting load for file: ${file}`); + } + const body = new FormData(); body.set("file", new Blob([data], { type: mimeType }), file); body.append("language", this.language); diff --git a/packages/core/src/readers/SimpleDirectoryReader.edge.ts b/packages/core/src/readers/SimpleDirectoryReader.edge.ts index a26eae4f96e07d0a757704100a5526a52c6bfa4a..e138fb722a3b3fc00062a5ab706f0c94eda579b3 100644 --- a/packages/core/src/readers/SimpleDirectoryReader.edge.ts +++ b/packages/core/src/readers/SimpleDirectoryReader.edge.ts @@ -1,4 +1,5 @@ import { fs, path } from "@llamaindex/env"; +import pLimit from "p-limit"; import { Document, type Metadata } from "../Node.js"; import { walk } from "../storage/FileSystem.js"; import { TextFileReader } from "./TextFileReader.js"; @@ -18,10 +19,21 @@ enum ReaderStatus { export type SimpleDirectoryReaderLoadDataParams = { directoryPath: string; + // Fallback Reader, defaults to TextFileReader defaultReader?: BaseReader | null; + // Map file extensions individually to readers fileExtToReader?: Record<string, BaseReader>; + // Number of workers, defaults to 1. Must be between 1 and 9. + numWorkers?: number; + // Overrides reader for all file extensions + overrideReader?: BaseReader; }; +type ProcessFileParams = Omit< + SimpleDirectoryReaderLoadDataParams, + "directoryPath" +>; + /** * Read all the documents in a directory. * By default, supports the list of file types @@ -45,8 +57,14 @@ export class SimpleDirectoryReader implements BaseReader { directoryPath, defaultReader = new TextFileReader(), fileExtToReader, + numWorkers = 1, + overrideReader, } = params; + if (numWorkers < 1 || numWorkers > 9) { + throw new Error("The number of workers must be between 1 - 9."); + } + // Observer can decide to skip the directory if ( !this.doObserverCheck("directory", directoryPath, ReaderStatus.STARTED) @@ -54,58 +72,85 @@ export class SimpleDirectoryReader implements BaseReader { return []; } - const docs: Document[] = []; + // Crates a queue of file paths each worker accesses individually + const filePathQueue: string[] = []; + for await (const filePath of walk(directoryPath)) { - try { - const fileExt = path.extname(filePath).slice(1).toLowerCase(); + filePathQueue.push(filePath); + } - // Observer can decide to skip each file - if (!this.doObserverCheck("file", filePath, ReaderStatus.STARTED)) { - // Skip this file - continue; - } + const processFileParams: ProcessFileParams = { + defaultReader, + fileExtToReader, + overrideReader, + }; - let reader: BaseReader; + // Uses pLimit to control number of parallel requests + const limit = pLimit(numWorkers); + const workerPromises = filePathQueue.map((filePath) => + limit(() => this.processFile(filePath, processFileParams)), + ); - if (fileExtToReader && fileExt in fileExtToReader) { - reader = fileExtToReader[fileExt]; - } else if (defaultReader != null) { - reader = defaultReader; - } else { - const msg = `No reader for file extension of ${filePath}`; - console.warn(msg); + const results: Document[][] = await Promise.all(workerPromises); - // In an error condition, observer's false cancels the whole process. - if ( - !this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg) - ) { - return []; - } + // After successful import of all files, directory completion + // is only a notification for observer, cannot be cancelled. + this.doObserverCheck("directory", directoryPath, ReaderStatus.COMPLETE); - continue; - } + return results.flat(); + } + + private async processFile( + filePath: string, + params: ProcessFileParams, + ): Promise<Document[]> { + const docs: Document[] = []; - const fileDocs = await reader.loadData(filePath, fs); - fileDocs.forEach(addMetaData(filePath)); + try { + const fileExt = path.extname(filePath).slice(1).toLowerCase(); - // Observer can still cancel addition of the resulting docs from this file - if (this.doObserverCheck("file", filePath, ReaderStatus.COMPLETE)) { - docs.push(...fileDocs); - } - } catch (e) { - const msg = `Error reading file ${filePath}: ${e}`; - console.error(msg); + // Observer can decide to skip each file + if (!this.doObserverCheck("file", filePath, ReaderStatus.STARTED)) { + // Skip this file + return []; + } + + let reader: BaseReader; + + if (params.overrideReader) { + reader = params.overrideReader; + } else if (params.fileExtToReader && fileExt in params.fileExtToReader) { + reader = params.fileExtToReader[fileExt]; + } else if (params.defaultReader != null) { + reader = params.defaultReader; + } else { + const msg = `No reader for file extension of ${filePath}`; + console.warn(msg); // In an error condition, observer's false cancels the whole process. if (!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)) { return []; } + + return []; } - } - // After successful import of all files, directory completion - // is only a notification for observer, cannot be cancelled. - this.doObserverCheck("directory", directoryPath, ReaderStatus.COMPLETE); + const fileDocs = await reader.loadData(filePath, fs); + fileDocs.forEach(addMetaData(filePath)); + + // Observer can still cancel addition of the resulting docs from this file + if (this.doObserverCheck("file", filePath, ReaderStatus.COMPLETE)) { + docs.push(...fileDocs); + } + } catch (e) { + const msg = `Error reading file ${filePath}: ${e}`; + console.error(msg); + + // In an error condition, observer's false cancels the whole process. + if (!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)) { + return []; + } + } return docs; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f1df99407c31f5f2b240f7acb81a4d36b9b462df..50f0f33417b0e03645ad689c89281b770a3093d3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -421,6 +421,9 @@ importers: openai: specifier: ^4.46.0 version: 4.47.1(encoding@0.1.13) + p-limit: + specifier: ^5.0.0 + version: 5.0.0 papaparse: specifier: ^5.4.1 version: 5.4.1