Skip to content
Snippets Groups Projects
Unverified Commit da1f0252 authored by Fabian Wimmer's avatar Fabian Wimmer Committed by GitHub
Browse files

feat: add parallel processing to SimpleDirectoryReader (#883)

parent 6b1ded41
No related branches found
No related tags found
No related merge requests found
Showing
with 132 additions and 64 deletions
---
"llamaindex": patch
---
add concurrency management for SimpleDirectoryReader
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
......@@ -11,7 +11,7 @@
"start:pdf": "node --import tsx ./src/pdf.ts",
"start:llamaparse": "node --import tsx ./src/llamaparse.ts",
"start:notion": "node --import tsx ./src/notion.ts",
"start:llamaparse2": "node --import tsx ./src/llamaparse_2.ts"
"start:llamaparse-dir": "node --import tsx ./src/simple-directory-reader-with-llamaparse.ts"
},
"dependencies": {
"llamaindex": "*"
......
import fs from "fs/promises";
import { LlamaParseReader } from "llamaindex";
async function main() {
// Load PDF using LlamaParse. set apiKey here or in environment variable LLAMA_CLOUD_API_KEY
const reader = new LlamaParseReader({
resultType: "markdown",
language: "en",
parsingInstruction:
"The provided document is a manga comic book. Most pages do NOT have title. It does not contain tables. Try to reconstruct the dialogue happening in a cohesive way. Output any math equation in LATEX markdown (between $$)",
});
const documents = await reader.loadData("../data/manga.pdf"); // The manga.pdf in the data folder is just a copy of the TOS, due to copyright laws. You have to place your own. I used "The Manga Guide to Calculus" by Hiroyuki Kojima
// Assuming documents contain an array of pages or sections
const parsedManga = documents.map((page) => page.text).join("\n---\n");
// Output the parsed manga to .md file. Will be placed in ../example/readers/
try {
await fs.writeFile("./parsedManga.md", parsedManga);
console.log("Output successfully written to parsedManga.md");
} catch (err) {
console.error("Error writing to file:", err);
}
}
main().catch(console.error);
import {
LlamaParseReader,
SimpleDirectoryReader,
VectorStoreIndex,
} from "llamaindex";
async function main() {
const reader = new SimpleDirectoryReader();
const docs = await reader.loadData({
directoryPath: "../data/parallel", // brk-2022.pdf split into 6 parts
numWorkers: 2,
// set LlamaParse as the default reader for all file types. Set apiKey here or in environment variable LLAMA_CLOUD_API_KEY
overrideReader: new LlamaParseReader({
language: "en",
resultType: "markdown",
parsingInstruction:
"The provided files is Berkshire Hathaway's 2022 Annual Report. They contain figures, tables and raw data. Capture the data in a structured format. Mathematical equation should be put out as LATEX markdown (between $$).",
}),
});
const index = await VectorStoreIndex.fromDocuments(docs);
// Query the index
const queryEngine = index.asQueryEngine();
const response = await queryEngine.query({
query:
"What is the general strategy for shareholder safety outlined in the report? Use a concrete example with numbers",
});
// Output response
console.log(response.toString());
}
main().catch(console.error);
......@@ -50,6 +50,7 @@
"mongodb": "^6.6.1",
"notion-md-crawler": "^1.0.0",
"openai": "^4.46.0",
"p-limit": "^5.0.0",
"papaparse": "^5.4.1",
"pathe": "^1.1.2",
"pdf2json": "3.0.5",
......
......@@ -130,6 +130,7 @@ export class LlamaParseReader implements FileReader {
gpt4oMode: boolean = false;
// The API key for the GPT-4o API. Lowers the cost of parsing.
gpt4oApiKey?: string;
// numWorkers is implemented in SimpleDirectoryReader
constructor(params: Partial<LlamaParseReader> = {}) {
Object.assign(this, params);
......@@ -156,6 +157,10 @@ export class LlamaParseReader implements FileReader {
const data = await fs.readFile(file);
const mimeType = await this.getMimeType(data);
if (this.verbose) {
console.log(`Starting load for file: ${file}`);
}
const body = new FormData();
body.set("file", new Blob([data], { type: mimeType }), file);
body.append("language", this.language);
......
import { fs, path } from "@llamaindex/env";
import pLimit from "p-limit";
import { Document, type Metadata } from "../Node.js";
import { walk } from "../storage/FileSystem.js";
import { TextFileReader } from "./TextFileReader.js";
......@@ -18,10 +19,21 @@ enum ReaderStatus {
export type SimpleDirectoryReaderLoadDataParams = {
directoryPath: string;
// Fallback Reader, defaults to TextFileReader
defaultReader?: BaseReader | null;
// Map file extensions individually to readers
fileExtToReader?: Record<string, BaseReader>;
// Number of workers, defaults to 1. Must be between 1 and 9.
numWorkers?: number;
// Overrides reader for all file extensions
overrideReader?: BaseReader;
};
type ProcessFileParams = Omit<
SimpleDirectoryReaderLoadDataParams,
"directoryPath"
>;
/**
* Read all the documents in a directory.
* By default, supports the list of file types
......@@ -45,8 +57,14 @@ export class SimpleDirectoryReader implements BaseReader {
directoryPath,
defaultReader = new TextFileReader(),
fileExtToReader,
numWorkers = 1,
overrideReader,
} = params;
if (numWorkers < 1 || numWorkers > 9) {
throw new Error("The number of workers must be between 1 - 9.");
}
// Observer can decide to skip the directory
if (
!this.doObserverCheck("directory", directoryPath, ReaderStatus.STARTED)
......@@ -54,58 +72,85 @@ export class SimpleDirectoryReader implements BaseReader {
return [];
}
const docs: Document[] = [];
// Crates a queue of file paths each worker accesses individually
const filePathQueue: string[] = [];
for await (const filePath of walk(directoryPath)) {
try {
const fileExt = path.extname(filePath).slice(1).toLowerCase();
filePathQueue.push(filePath);
}
// Observer can decide to skip each file
if (!this.doObserverCheck("file", filePath, ReaderStatus.STARTED)) {
// Skip this file
continue;
}
const processFileParams: ProcessFileParams = {
defaultReader,
fileExtToReader,
overrideReader,
};
let reader: BaseReader;
// Uses pLimit to control number of parallel requests
const limit = pLimit(numWorkers);
const workerPromises = filePathQueue.map((filePath) =>
limit(() => this.processFile(filePath, processFileParams)),
);
if (fileExtToReader && fileExt in fileExtToReader) {
reader = fileExtToReader[fileExt];
} else if (defaultReader != null) {
reader = defaultReader;
} else {
const msg = `No reader for file extension of ${filePath}`;
console.warn(msg);
const results: Document[][] = await Promise.all(workerPromises);
// In an error condition, observer's false cancels the whole process.
if (
!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)
) {
return [];
}
// After successful import of all files, directory completion
// is only a notification for observer, cannot be cancelled.
this.doObserverCheck("directory", directoryPath, ReaderStatus.COMPLETE);
continue;
}
return results.flat();
}
private async processFile(
filePath: string,
params: ProcessFileParams,
): Promise<Document[]> {
const docs: Document[] = [];
const fileDocs = await reader.loadData(filePath, fs);
fileDocs.forEach(addMetaData(filePath));
try {
const fileExt = path.extname(filePath).slice(1).toLowerCase();
// Observer can still cancel addition of the resulting docs from this file
if (this.doObserverCheck("file", filePath, ReaderStatus.COMPLETE)) {
docs.push(...fileDocs);
}
} catch (e) {
const msg = `Error reading file ${filePath}: ${e}`;
console.error(msg);
// Observer can decide to skip each file
if (!this.doObserverCheck("file", filePath, ReaderStatus.STARTED)) {
// Skip this file
return [];
}
let reader: BaseReader;
if (params.overrideReader) {
reader = params.overrideReader;
} else if (params.fileExtToReader && fileExt in params.fileExtToReader) {
reader = params.fileExtToReader[fileExt];
} else if (params.defaultReader != null) {
reader = params.defaultReader;
} else {
const msg = `No reader for file extension of ${filePath}`;
console.warn(msg);
// In an error condition, observer's false cancels the whole process.
if (!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)) {
return [];
}
return [];
}
}
// After successful import of all files, directory completion
// is only a notification for observer, cannot be cancelled.
this.doObserverCheck("directory", directoryPath, ReaderStatus.COMPLETE);
const fileDocs = await reader.loadData(filePath, fs);
fileDocs.forEach(addMetaData(filePath));
// Observer can still cancel addition of the resulting docs from this file
if (this.doObserverCheck("file", filePath, ReaderStatus.COMPLETE)) {
docs.push(...fileDocs);
}
} catch (e) {
const msg = `Error reading file ${filePath}: ${e}`;
console.error(msg);
// In an error condition, observer's false cancels the whole process.
if (!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)) {
return [];
}
}
return docs;
}
......
......@@ -421,6 +421,9 @@ importers:
openai:
specifier: ^4.46.0
version: 4.47.1(encoding@0.1.13)
p-limit:
specifier: ^5.0.0
version: 5.0.0
papaparse:
specifier: ^5.4.1
version: 5.4.1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment