From 0ebbfc10312ecf7ae44c62a79462c2d2a9eea987 Mon Sep 17 00:00:00 2001
From: Thuc Pham <51660321+thucpn@users.noreply.github.com>
Date: Tue, 7 Jan 2025 11:58:09 +0700
Subject: [PATCH] fix: clean up docstore when generating embedding fail (#1588)

Co-authored-by: Marcus Schiesser <mail@marcusschiesser.de>
---
 .changeset/proud-zoos-compare.md              |  5 +++++
 .../src/indices/vectorStore/index.ts          |  7 ++++++-
 .../strategies/DuplicatesStrategy.ts          |  5 +++--
 .../strategies/UpsertsAndDeleteStrategy.ts    |  5 +++--
 .../ingestion/strategies/UpsertsStrategy.ts   |  5 +++--
 .../src/ingestion/strategies/index.ts         |  6 +++---
 .../src/ingestion/strategies/rollback.ts      | 19 +++++++++++++++++++
 7 files changed, 42 insertions(+), 10 deletions(-)
 create mode 100644 .changeset/proud-zoos-compare.md
 create mode 100644 packages/llamaindex/src/ingestion/strategies/rollback.ts

diff --git a/.changeset/proud-zoos-compare.md b/.changeset/proud-zoos-compare.md
new file mode 100644
index 000000000..31228ed7a
--- /dev/null
+++ b/.changeset/proud-zoos-compare.md
@@ -0,0 +1,5 @@
+---
+"llamaindex": patch
+---
+
+fix: clean up docstore when generating embedding fail
diff --git a/packages/llamaindex/src/indices/vectorStore/index.ts b/packages/llamaindex/src/indices/vectorStore/index.ts
index 5c8536f1a..52bafb239 100644
--- a/packages/llamaindex/src/indices/vectorStore/index.ts
+++ b/packages/llamaindex/src/indices/vectorStore/index.ts
@@ -237,7 +237,12 @@ export class VectorStoreIndex extends BaseIndex<IndexDict> {
     if (args.logProgress) {
       console.log("Finished parsing documents.");
     }
-    return await this.init(args);
+    try {
+      return await this.init(args);
+    } catch (error) {
+      await docStoreStrategy.rollback(args.storageContext.docStore, args.nodes);
+      throw error;
+    }
   }
 
   static async fromVectorStores(
diff --git a/packages/llamaindex/src/ingestion/strategies/DuplicatesStrategy.ts b/packages/llamaindex/src/ingestion/strategies/DuplicatesStrategy.ts
index dc97aa6d0..c2f3741b8 100644
--- a/packages/llamaindex/src/ingestion/strategies/DuplicatesStrategy.ts
+++ b/packages/llamaindex/src/ingestion/strategies/DuplicatesStrategy.ts
@@ -1,10 +1,11 @@
-import { BaseNode, TransformComponent } from "@llamaindex/core/schema";
+import { BaseNode } from "@llamaindex/core/schema";
 import type { BaseDocumentStore } from "@llamaindex/core/storage/doc-store";
+import { RollbackableTransformComponent } from "./rollback.js";
 
 /**
  * Handle doc store duplicates by checking all hashes.
  */
-export class DuplicatesStrategy extends TransformComponent {
+export class DuplicatesStrategy extends RollbackableTransformComponent {
   private docStore: BaseDocumentStore;
 
   constructor(docStore: BaseDocumentStore) {
diff --git a/packages/llamaindex/src/ingestion/strategies/UpsertsAndDeleteStrategy.ts b/packages/llamaindex/src/ingestion/strategies/UpsertsAndDeleteStrategy.ts
index e28a508b6..f36dc7cd7 100644
--- a/packages/llamaindex/src/ingestion/strategies/UpsertsAndDeleteStrategy.ts
+++ b/packages/llamaindex/src/ingestion/strategies/UpsertsAndDeleteStrategy.ts
@@ -1,13 +1,14 @@
-import { BaseNode, TransformComponent } from "@llamaindex/core/schema";
+import { BaseNode } from "@llamaindex/core/schema";
 import type { BaseDocumentStore } from "@llamaindex/core/storage/doc-store";
 import type { BaseVectorStore } from "../../vector-store/types.js";
 import { classify } from "./classify.js";
+import { RollbackableTransformComponent } from "./rollback.js";
 
 /**
  * Handle docstore upserts by checking hashes and ids.
  * Identify missing docs and delete them from docstore and vector store
  */
-export class UpsertsAndDeleteStrategy extends TransformComponent {
+export class UpsertsAndDeleteStrategy extends RollbackableTransformComponent {
   protected docStore: BaseDocumentStore;
   protected vectorStores: BaseVectorStore[] | undefined;
 
diff --git a/packages/llamaindex/src/ingestion/strategies/UpsertsStrategy.ts b/packages/llamaindex/src/ingestion/strategies/UpsertsStrategy.ts
index 3fb158e98..370994cd8 100644
--- a/packages/llamaindex/src/ingestion/strategies/UpsertsStrategy.ts
+++ b/packages/llamaindex/src/ingestion/strategies/UpsertsStrategy.ts
@@ -1,12 +1,13 @@
-import { BaseNode, TransformComponent } from "@llamaindex/core/schema";
+import { BaseNode } from "@llamaindex/core/schema";
 import type { BaseDocumentStore } from "@llamaindex/core/storage/doc-store";
 import type { BaseVectorStore } from "../../vector-store/types.js";
 import { classify } from "./classify.js";
+import { RollbackableTransformComponent } from "./rollback.js";
 
 /**
  * Handles doc store upserts by checking hashes and ids.
  */
-export class UpsertsStrategy extends TransformComponent {
+export class UpsertsStrategy extends RollbackableTransformComponent {
   protected docStore: BaseDocumentStore;
   protected vectorStores: BaseVectorStore[] | undefined;
 
diff --git a/packages/llamaindex/src/ingestion/strategies/index.ts b/packages/llamaindex/src/ingestion/strategies/index.ts
index e3516fca9..a6ba3573f 100644
--- a/packages/llamaindex/src/ingestion/strategies/index.ts
+++ b/packages/llamaindex/src/ingestion/strategies/index.ts
@@ -1,9 +1,9 @@
-import { TransformComponent } from "@llamaindex/core/schema";
 import type { BaseDocumentStore } from "@llamaindex/core/storage/doc-store";
 import type { BaseVectorStore } from "../../vector-store/types.js";
 import { DuplicatesStrategy } from "./DuplicatesStrategy.js";
 import { UpsertsAndDeleteStrategy } from "./UpsertsAndDeleteStrategy.js";
 import { UpsertsStrategy } from "./UpsertsStrategy.js";
+import { RollbackableTransformComponent } from "./rollback.js";
 
 /**
  * Document de-deduplication strategies work by comparing the hashes or ids stored in the document store.
@@ -19,7 +19,7 @@ export enum DocStoreStrategy {
   NONE = "none", // no-op strategy
 }
 
-class NoOpStrategy extends TransformComponent {
+class NoOpStrategy extends RollbackableTransformComponent {
   constructor() {
     super(async (nodes) => nodes);
   }
@@ -29,7 +29,7 @@ export function createDocStoreStrategy(
   docStoreStrategy: DocStoreStrategy,
   docStore?: BaseDocumentStore,
   vectorStores: BaseVectorStore[] = [],
-): TransformComponent {
+): RollbackableTransformComponent {
   if (docStoreStrategy === DocStoreStrategy.NONE) {
     return new NoOpStrategy();
   }
diff --git a/packages/llamaindex/src/ingestion/strategies/rollback.ts b/packages/llamaindex/src/ingestion/strategies/rollback.ts
new file mode 100644
index 000000000..36c3f026c
--- /dev/null
+++ b/packages/llamaindex/src/ingestion/strategies/rollback.ts
@@ -0,0 +1,19 @@
+import { BaseNode, TransformComponent } from "@llamaindex/core/schema";
+import type { BaseDocumentStore } from "../../index.edge.js";
+import { classify } from "./classify.js";
+
+export class RollbackableTransformComponent extends TransformComponent {
+  // Remove unused docs from the doc store. It is useful in case
+  // generating embeddings fails and we want to remove the unused docs
+  // TODO: override this in UpsertsStrategy if we want to revert removed docs also
+  public async rollback(
+    docStore: BaseDocumentStore,
+    nodes: BaseNode[],
+  ): Promise<void> {
+    const { unusedDocs } = await classify(docStore, nodes);
+    for (const docId of unusedDocs) {
+      await docStore.deleteDocument(docId, false);
+    }
+    docStore.persist();
+  }
+}
-- 
GitLab