Skip to content
Snippets Groups Projects
Commit 6db7f23e authored by Marcus Schiesser's avatar Marcus Schiesser
Browse files

Revert "feat: add parallel processing to SimpleDirectoryReader (#883)"

This reverts commit da1f0252.
parent 0721a849
No related branches found
No related tags found
No related merge requests found
Showing
with 64 additions and 132 deletions
---
"llamaindex": patch
---
add concurrency management for SimpleDirectoryReader
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
"start:pdf": "node --import tsx ./src/pdf.ts", "start:pdf": "node --import tsx ./src/pdf.ts",
"start:llamaparse": "node --import tsx ./src/llamaparse.ts", "start:llamaparse": "node --import tsx ./src/llamaparse.ts",
"start:notion": "node --import tsx ./src/notion.ts", "start:notion": "node --import tsx ./src/notion.ts",
"start:llamaparse-dir": "node --import tsx ./src/simple-directory-reader-with-llamaparse.ts" "start:llamaparse2": "node --import tsx ./src/llamaparse_2.ts"
}, },
"dependencies": { "dependencies": {
"llamaindex": "*" "llamaindex": "*"
......
import fs from "fs/promises";
import { LlamaParseReader } from "llamaindex";
async function main() {
// Load PDF using LlamaParse. set apiKey here or in environment variable LLAMA_CLOUD_API_KEY
const reader = new LlamaParseReader({
resultType: "markdown",
language: "en",
parsingInstruction:
"The provided document is a manga comic book. Most pages do NOT have title. It does not contain tables. Try to reconstruct the dialogue happening in a cohesive way. Output any math equation in LATEX markdown (between $$)",
});
const documents = await reader.loadData("../data/manga.pdf"); // The manga.pdf in the data folder is just a copy of the TOS, due to copyright laws. You have to place your own. I used "The Manga Guide to Calculus" by Hiroyuki Kojima
// Assuming documents contain an array of pages or sections
const parsedManga = documents.map((page) => page.text).join("\n---\n");
// Output the parsed manga to .md file. Will be placed in ../example/readers/
try {
await fs.writeFile("./parsedManga.md", parsedManga);
console.log("Output successfully written to parsedManga.md");
} catch (err) {
console.error("Error writing to file:", err);
}
}
main().catch(console.error);
import {
LlamaParseReader,
SimpleDirectoryReader,
VectorStoreIndex,
} from "llamaindex";
async function main() {
const reader = new SimpleDirectoryReader();
const docs = await reader.loadData({
directoryPath: "../data/parallel", // brk-2022.pdf split into 6 parts
numWorkers: 2,
// set LlamaParse as the default reader for all file types. Set apiKey here or in environment variable LLAMA_CLOUD_API_KEY
overrideReader: new LlamaParseReader({
language: "en",
resultType: "markdown",
parsingInstruction:
"The provided files is Berkshire Hathaway's 2022 Annual Report. They contain figures, tables and raw data. Capture the data in a structured format. Mathematical equation should be put out as LATEX markdown (between $$).",
}),
});
const index = await VectorStoreIndex.fromDocuments(docs);
// Query the index
const queryEngine = index.asQueryEngine();
const response = await queryEngine.query({
query:
"What is the general strategy for shareholder safety outlined in the report? Use a concrete example with numbers",
});
// Output response
console.log(response.toString());
}
main().catch(console.error);
...@@ -50,7 +50,6 @@ ...@@ -50,7 +50,6 @@
"mongodb": "^6.6.1", "mongodb": "^6.6.1",
"notion-md-crawler": "^1.0.0", "notion-md-crawler": "^1.0.0",
"openai": "^4.46.0", "openai": "^4.46.0",
"p-limit": "^5.0.0",
"papaparse": "^5.4.1", "papaparse": "^5.4.1",
"pathe": "^1.1.2", "pathe": "^1.1.2",
"pdf2json": "3.0.5", "pdf2json": "3.0.5",
......
...@@ -130,7 +130,6 @@ export class LlamaParseReader implements FileReader { ...@@ -130,7 +130,6 @@ export class LlamaParseReader implements FileReader {
gpt4oMode: boolean = false; gpt4oMode: boolean = false;
// The API key for the GPT-4o API. Optional, lowers the cost of parsing. Can be set as an env variable: LLAMA_CLOUD_GPT4O_API_KEY. // The API key for the GPT-4o API. Optional, lowers the cost of parsing. Can be set as an env variable: LLAMA_CLOUD_GPT4O_API_KEY.
gpt4oApiKey?: string; gpt4oApiKey?: string;
// numWorkers is implemented in SimpleDirectoryReader
constructor(params: Partial<LlamaParseReader> = {}) { constructor(params: Partial<LlamaParseReader> = {}) {
Object.assign(this, params); Object.assign(this, params);
...@@ -157,10 +156,6 @@ export class LlamaParseReader implements FileReader { ...@@ -157,10 +156,6 @@ export class LlamaParseReader implements FileReader {
const data = await fs.readFile(file); const data = await fs.readFile(file);
const mimeType = await this.getMimeType(data); const mimeType = await this.getMimeType(data);
if (this.verbose) {
console.log(`Starting load for file: ${file}`);
}
const body = new FormData(); const body = new FormData();
body.set("file", new Blob([data], { type: mimeType }), file); body.set("file", new Blob([data], { type: mimeType }), file);
body.append("language", this.language); body.append("language", this.language);
......
import { fs, path } from "@llamaindex/env"; import { fs, path } from "@llamaindex/env";
import pLimit from "p-limit";
import { Document, type Metadata } from "../Node.js"; import { Document, type Metadata } from "../Node.js";
import { walk } from "../storage/FileSystem.js"; import { walk } from "../storage/FileSystem.js";
import { TextFileReader } from "./TextFileReader.js"; import { TextFileReader } from "./TextFileReader.js";
...@@ -19,21 +18,10 @@ enum ReaderStatus { ...@@ -19,21 +18,10 @@ enum ReaderStatus {
export type SimpleDirectoryReaderLoadDataParams = { export type SimpleDirectoryReaderLoadDataParams = {
directoryPath: string; directoryPath: string;
// Fallback Reader, defaults to TextFileReader
defaultReader?: BaseReader | null; defaultReader?: BaseReader | null;
// Map file extensions individually to readers
fileExtToReader?: Record<string, BaseReader>; fileExtToReader?: Record<string, BaseReader>;
// Number of workers, defaults to 1. Must be between 1 and 9.
numWorkers?: number;
// Overrides reader for all file extensions
overrideReader?: BaseReader;
}; };
type ProcessFileParams = Omit<
SimpleDirectoryReaderLoadDataParams,
"directoryPath"
>;
/** /**
* Read all the documents in a directory. * Read all the documents in a directory.
* By default, supports the list of file types * By default, supports the list of file types
...@@ -57,14 +45,8 @@ export class SimpleDirectoryReader implements BaseReader { ...@@ -57,14 +45,8 @@ export class SimpleDirectoryReader implements BaseReader {
directoryPath, directoryPath,
defaultReader = new TextFileReader(), defaultReader = new TextFileReader(),
fileExtToReader, fileExtToReader,
numWorkers = 1,
overrideReader,
} = params; } = params;
if (numWorkers < 1 || numWorkers > 9) {
throw new Error("The number of workers must be between 1 - 9.");
}
// Observer can decide to skip the directory // Observer can decide to skip the directory
if ( if (
!this.doObserverCheck("directory", directoryPath, ReaderStatus.STARTED) !this.doObserverCheck("directory", directoryPath, ReaderStatus.STARTED)
...@@ -72,86 +54,59 @@ export class SimpleDirectoryReader implements BaseReader { ...@@ -72,86 +54,59 @@ export class SimpleDirectoryReader implements BaseReader {
return []; return [];
} }
// Crates a queue of file paths each worker accesses individually const docs: Document[] = [];
const filePathQueue: string[] = [];
for await (const filePath of walk(directoryPath)) { for await (const filePath of walk(directoryPath)) {
filePathQueue.push(filePath); try {
} const fileExt = path.extname(filePath).slice(1).toLowerCase();
const processFileParams: ProcessFileParams = {
defaultReader,
fileExtToReader,
overrideReader,
};
// Uses pLimit to control number of parallel requests // Observer can decide to skip each file
const limit = pLimit(numWorkers); if (!this.doObserverCheck("file", filePath, ReaderStatus.STARTED)) {
const workerPromises = filePathQueue.map((filePath) => // Skip this file
limit(() => this.processFile(filePath, processFileParams)), continue;
); }
const results: Document[][] = await Promise.all(workerPromises);
// After successful import of all files, directory completion
// is only a notification for observer, cannot be cancelled.
this.doObserverCheck("directory", directoryPath, ReaderStatus.COMPLETE);
return results.flat(); let reader: BaseReader;
}
private async processFile( if (fileExtToReader && fileExt in fileExtToReader) {
filePath: string, reader = fileExtToReader[fileExt];
params: ProcessFileParams, } else if (defaultReader != null) {
): Promise<Document[]> { reader = defaultReader;
const docs: Document[] = []; } else {
const msg = `No reader for file extension of ${filePath}`;
console.warn(msg);
try { // In an error condition, observer's false cancels the whole process.
const fileExt = path.extname(filePath).slice(1).toLowerCase(); if (
!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)
) {
return [];
}
// Observer can decide to skip each file continue;
if (!this.doObserverCheck("file", filePath, ReaderStatus.STARTED)) { }
// Skip this file
return [];
}
let reader: BaseReader; const fileDocs = await reader.loadData(filePath, fs);
fileDocs.forEach(addMetaData(filePath));
if (params.overrideReader) { // Observer can still cancel addition of the resulting docs from this file
reader = params.overrideReader; if (this.doObserverCheck("file", filePath, ReaderStatus.COMPLETE)) {
} else if (params.fileExtToReader && fileExt in params.fileExtToReader) { docs.push(...fileDocs);
reader = params.fileExtToReader[fileExt]; }
} else if (params.defaultReader != null) { } catch (e) {
reader = params.defaultReader; const msg = `Error reading file ${filePath}: ${e}`;
} else { console.error(msg);
const msg = `No reader for file extension of ${filePath}`;
console.warn(msg);
// In an error condition, observer's false cancels the whole process. // In an error condition, observer's false cancels the whole process.
if (!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)) { if (!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)) {
return []; return [];
} }
return [];
}
const fileDocs = await reader.loadData(filePath, fs);
fileDocs.forEach(addMetaData(filePath));
// Observer can still cancel addition of the resulting docs from this file
if (this.doObserverCheck("file", filePath, ReaderStatus.COMPLETE)) {
docs.push(...fileDocs);
}
} catch (e) {
const msg = `Error reading file ${filePath}: ${e}`;
console.error(msg);
// In an error condition, observer's false cancels the whole process.
if (!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)) {
return [];
} }
} }
// After successful import of all files, directory completion
// is only a notification for observer, cannot be cancelled.
this.doObserverCheck("directory", directoryPath, ReaderStatus.COMPLETE);
return docs; return docs;
} }
......
...@@ -446,9 +446,6 @@ importers: ...@@ -446,9 +446,6 @@ importers:
openai: openai:
specifier: ^4.46.0 specifier: ^4.46.0
version: 4.47.1(encoding@0.1.13) version: 4.47.1(encoding@0.1.13)
p-limit:
specifier: ^5.0.0
version: 5.0.0
papaparse: papaparse:
specifier: ^5.4.1 specifier: ^5.4.1
version: 5.4.1 version: 5.4.1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment