diff --git a/.changeset/young-bananas-wave.md b/.changeset/young-bananas-wave.md new file mode 100644 index 0000000000000000000000000000000000000000..bddc8978c7220e23111340091a94dda5ab617d98 --- /dev/null +++ b/.changeset/young-bananas-wave.md @@ -0,0 +1,5 @@ +--- +"llamaindex": patch +--- + +Add support for doc store strategies to VectorStoreIndex.fromDocuments diff --git a/packages/core/src/indices/vectorStore/index.ts b/packages/core/src/indices/vectorStore/index.ts index dcd18c8e5ddc0b825acd3eae451a7d1fd832ec2e..7aebb45c20a31bac63ccc86abda236ef267dd74b 100644 --- a/packages/core/src/indices/vectorStore/index.ts +++ b/packages/core/src/indices/vectorStore/index.ts @@ -25,17 +25,21 @@ import type { } from "../../embeddings/index.js"; import { ClipEmbedding } from "../../embeddings/index.js"; import { RetrieverQueryEngine } from "../../engines/query/RetrieverQueryEngine.js"; -import { runTransformations } from "../../ingestion/index.js"; +import { runTransformations } from "../../ingestion/IngestionPipeline.js"; +import { + DocStoreStrategy, + createDocStoreStrategy, +} from "../../ingestion/strategies/index.js"; import type { BaseNodePostprocessor } from "../../postprocessors/types.js"; import type { StorageContext } from "../../storage/StorageContext.js"; import { storageContextFromDefaults } from "../../storage/StorageContext.js"; -import type { BaseIndexStore } from "../../storage/indexStore/types.js"; import type { MetadataFilters, VectorStore, VectorStoreQuery, VectorStoreQueryResult, -} from "../../storage/vectorStore/types.js"; +} from "../../storage/index.js"; +import type { BaseIndexStore } from "../../storage/indexStore/types.js"; import { VectorStoreQueryMode } from "../../storage/vectorStore/types.js"; import type { BaseSynthesizer } from "../../synthesizers/types.js"; import type { BaseQueryEngine } from "../../types.js"; @@ -195,17 +199,7 @@ export class VectorStoreIndex extends BaseIndex<IndexDict> { nodes: BaseNode[], options?: { logProgress?: boolean }, ) { - // Check if the index already has nodes with the same hash - const newNodes = nodes.filter((node) => - Object.entries(this.indexStruct.nodesDict).reduce((acc, [key, value]) => { - if (value.hash === node.hash) { - acc = false; - } - return acc; - }, true), - ); - - await this.insertNodes(newNodes, options); + await this.insertNodes(nodes, options); } /** @@ -216,23 +210,37 @@ export class VectorStoreIndex extends BaseIndex<IndexDict> { */ static async fromDocuments( documents: Document[], - args: VectorIndexOptions = {}, + args: VectorIndexOptions & { + docStoreStrategy?: DocStoreStrategy; + } = {}, ): Promise<VectorStoreIndex> { + args.docStoreStrategy = + args.docStoreStrategy ?? + // set doc store strategy defaults to the same as for the IngestionPipeline + (args.vectorStore + ? DocStoreStrategy.UPSERTS + : DocStoreStrategy.DUPLICATES_ONLY); args.storageContext = args.storageContext ?? (await storageContextFromDefaults({})); args.serviceContext = args.serviceContext ?? serviceContextFromDefaults({}); const docStore = args.storageContext.docStore; - for (const doc of documents) { - docStore.setDocumentHash(doc.id_, doc.hash); - } - if (args.logProgress) { console.log("Using node parser on documents..."); } - args.nodes = await runTransformations(documents, [ - args.serviceContext.nodeParser, - ]); + + // use doc store strategy to avoid duplicates + const docStoreStrategy = createDocStoreStrategy( + args.docStoreStrategy, + docStore, + args.vectorStore, + ); + args.nodes = await runTransformations( + documents, + [args.serviceContext.nodeParser], + {}, + { docStoreStrategy }, + ); if (args.logProgress) { console.log("Finished parsing documents."); } diff --git a/packages/core/src/ingestion/IngestionPipeline.ts b/packages/core/src/ingestion/IngestionPipeline.ts index 07e82e43b10c418b9a91a8c58cace4e623062874..775f1712359b828cd5b0f2ca36f27d0eb89f73c0 100644 --- a/packages/core/src/ingestion/IngestionPipeline.ts +++ b/packages/core/src/ingestion/IngestionPipeline.ts @@ -25,18 +25,22 @@ type IngestionRunArgs = { type TransformRunArgs = { inPlace?: boolean; cache?: IngestionCache; + docStoreStrategy?: TransformComponent; }; export async function runTransformations( nodesToRun: BaseNode[], transformations: TransformComponent[], transformOptions: any = {}, - { inPlace = true, cache }: TransformRunArgs = {}, + { inPlace = true, cache, docStoreStrategy }: TransformRunArgs = {}, ): Promise<BaseNode[]> { let nodes = nodesToRun; if (!inPlace) { nodes = [...nodesToRun]; } + if (docStoreStrategy) { + nodes = await docStoreStrategy.transform(nodes); + } for (const transform of transformations) { if (cache) { const hash = getTransformationHash(nodes, transform); @@ -73,6 +77,9 @@ export class IngestionPipeline { constructor(init?: Partial<IngestionPipeline> & ClientParams) { Object.assign(this, init); this.clientParams = { apiKey: init?.apiKey, baseUrl: init?.baseUrl }; + if (!this.docStore) { + this.docStoreStrategy = DocStoreStrategy.NONE; + } this._docStoreStrategy = createDocStoreStrategy( this.docStoreStrategy, this.docStore, @@ -108,16 +115,10 @@ export class IngestionPipeline { transformOptions?: any, ): Promise<BaseNode[]> { args.cache = args.cache ?? this.cache; + args.docStoreStrategy = args.docStoreStrategy ?? this._docStoreStrategy; const inputNodes = await this.prepareInput(args.documents, args.nodes); - let nodesToRun; - if (this._docStoreStrategy) { - nodesToRun = await this._docStoreStrategy.transform(inputNodes); - } else { - nodesToRun = inputNodes; - } - const nodes = await runTransformations( - nodesToRun, + inputNodes, this.transformations, transformOptions, args, diff --git a/packages/core/src/ingestion/strategies/UpsertsAndDeleteStrategy.ts b/packages/core/src/ingestion/strategies/UpsertsAndDeleteStrategy.ts index ac9ceeb4e3acf34a34f122bd8ecf1f6ed1a8cfc8..cfeae7f785b7f382b4cbe7c40ed468f6e6ab4ec4 100644 --- a/packages/core/src/ingestion/strategies/UpsertsAndDeleteStrategy.ts +++ b/packages/core/src/ingestion/strategies/UpsertsAndDeleteStrategy.ts @@ -1,13 +1,14 @@ import type { BaseNode } from "../../Node.js"; import type { BaseDocumentStore } from "../../storage/docStore/types.js"; import type { VectorStore } from "../../storage/vectorStore/types.js"; +import type { TransformComponent } from "../types.js"; import { classify } from "./classify.js"; /** * Handle docstore upserts by checking hashes and ids. * Identify missing docs and delete them from docstore and vector store */ -export class UpsertsAndDeleteStrategy { +export class UpsertsAndDeleteStrategy implements TransformComponent { protected docStore: BaseDocumentStore; protected vectorStore?: VectorStore; diff --git a/packages/core/src/ingestion/strategies/index.ts b/packages/core/src/ingestion/strategies/index.ts index 0248169fd70273f26183a6f4608e274af3e856b5..7eda995e96d060356e49512b0bb12c9fa1191526 100644 --- a/packages/core/src/ingestion/strategies/index.ts +++ b/packages/core/src/ingestion/strategies/index.ts @@ -2,31 +2,44 @@ import type { BaseDocumentStore } from "../../storage/docStore/types.js"; import type { VectorStore } from "../../storage/vectorStore/types.js"; import type { TransformComponent } from "../types.js"; import { DuplicatesStrategy } from "./DuplicatesStrategy.js"; +import { UpsertsAndDeleteStrategy } from "./UpsertsAndDeleteStrategy.js"; import { UpsertsStrategy } from "./UpsertsStrategy.js"; export enum DocStoreStrategy { UPSERTS = "upserts", DUPLICATES_ONLY = "duplicates_only", UPSERTS_AND_DELETE = "upserts_and_delete", + NONE = "none", // no-op strategy +} + +class NoOpStrategy implements TransformComponent { + async transform(nodes: any[]): Promise<any[]> { + return nodes; + } } export function createDocStoreStrategy( docStoreStrategy: DocStoreStrategy, docStore?: BaseDocumentStore, vectorStore?: VectorStore, -): TransformComponent | undefined { - if (docStore && vectorStore) { - if ( - docStoreStrategy === DocStoreStrategy.UPSERTS || - docStoreStrategy === DocStoreStrategy.UPSERTS_AND_DELETE - ) { +): TransformComponent { + if (docStoreStrategy === DocStoreStrategy.NONE) { + return new NoOpStrategy(); + } + if (!docStore) { + throw new Error("docStore is required to create a doc store strategy."); + } + if (vectorStore) { + if (docStoreStrategy === DocStoreStrategy.UPSERTS) { return new UpsertsStrategy(docStore, vectorStore); + } else if (docStoreStrategy === DocStoreStrategy.UPSERTS_AND_DELETE) { + return new UpsertsAndDeleteStrategy(docStore, vectorStore); } else if (docStoreStrategy === DocStoreStrategy.DUPLICATES_ONLY) { return new DuplicatesStrategy(docStore); } else { throw new Error(`Invalid docstore strategy: ${docStoreStrategy}`); } - } else if (docStore && !vectorStore) { + } else { if (docStoreStrategy === DocStoreStrategy.UPSERTS) { console.warn( "Docstore strategy set to upserts, but no vector store. Switching to duplicates_only strategy.", diff --git a/packages/core/tests/indices/VectorStoreIndex.test.ts b/packages/core/tests/indices/VectorStoreIndex.test.ts index 43b6aa02372ea5d3a52f7fb7e7d7b46123deda8b..2d05195d4c2abc09f0872a7b9f2b4561cc5cfe50 100644 --- a/packages/core/tests/indices/VectorStoreIndex.test.ts +++ b/packages/core/tests/indices/VectorStoreIndex.test.ts @@ -4,6 +4,7 @@ import { VectorStoreIndex, storageContextFromDefaults, } from "llamaindex"; +import { DocStoreStrategy } from "llamaindex/ingestion/strategies/index"; import { rmSync } from "node:fs"; import { mkdtemp } from "node:fs/promises"; import { tmpdir } from "node:os"; @@ -24,7 +25,7 @@ describe.sequential("VectorStoreIndex", () => { let serviceContext: ServiceContext; let storageContext: StorageContext; let testStrategy: ( - // strategy?: DocStoreStrategy, + strategy: DocStoreStrategy, runs?: number, ) => Promise<Array<number>>; @@ -34,7 +35,7 @@ describe.sequential("VectorStoreIndex", () => { persistDir: testDir, }); testStrategy = async ( - // strategy?: DocStoreStrategy, + strategy: DocStoreStrategy, runs: number = 2, ): Promise<Array<number>> => { const documents = [new Document({ text: "lorem ipsem", id_: "1" })]; @@ -43,7 +44,7 @@ describe.sequential("VectorStoreIndex", () => { await VectorStoreIndex.fromDocuments(documents, { serviceContext, storageContext, - // docStoreStrategy: strategy, + docStoreStrategy: strategy, }); const docs = await storageContext.docStore.docs(); entries.push(Object.keys(docs).length); @@ -52,15 +53,15 @@ describe.sequential("VectorStoreIndex", () => { }; }); - test("fromDocuments does not stores duplicates per default", async () => { - const entries = await testStrategy(); - expect(entries[0]).toBe(entries[1]); + test("fromDocuments stores duplicates without a doc store strategy", async () => { + const entries = await testStrategy(DocStoreStrategy.NONE); + expect(entries[0] + 1).toBe(entries[1]); }); - // test("fromDocuments ignores duplicates in upserts", async () => { - // const entries = await testStrategy(DocStoreStrategy.DUPLICATES_ONLY); - // expect(entries[0]).toBe(entries[1]); - // }); + test("fromDocuments ignores duplicates with upserts doc store strategy", async () => { + const entries = await testStrategy(DocStoreStrategy.UPSERTS); + expect(entries[0]).toBe(entries[1]); + }); afterAll(async () => { // TODO: VectorStoreIndex.fromDocuments running twice is causing a cleanup issue