From 025ffe6b508c2763cd23598086c2e475a5d7e252 Mon Sep 17 00:00:00 2001
From: Goran <gospaso@gmail.com>
Date: Mon, 23 Sep 2024 19:46:11 +0200
Subject: [PATCH] fix: update `PostgresKVStore` constructor params (#1240)

Co-authored-by: Alex Yang <himself65@outlook.com>
---
 .changeset/fluffy-apes-guess.md               |  5 ++
 .../storage/docStore/PostgresDocumentStore.ts | 24 ++++--
 .../storage/indexStore/PostgresIndexStore.ts  | 24 ++++--
 .../src/storage/kvStore/PostgresKVStore.ts    | 81 +++++++++++++------
 4 files changed, 97 insertions(+), 37 deletions(-)
 create mode 100644 .changeset/fluffy-apes-guess.md

diff --git a/.changeset/fluffy-apes-guess.md b/.changeset/fluffy-apes-guess.md
new file mode 100644
index 000000000..d8727d21a
--- /dev/null
+++ b/.changeset/fluffy-apes-guess.md
@@ -0,0 +1,5 @@
+---
+"llamaindex": patch
+---
+
+fix: update `PostgresKVStore` constructor params
diff --git a/packages/llamaindex/src/storage/docStore/PostgresDocumentStore.ts b/packages/llamaindex/src/storage/docStore/PostgresDocumentStore.ts
index b61f15d90..2a221d953 100644
--- a/packages/llamaindex/src/storage/docStore/PostgresDocumentStore.ts
+++ b/packages/llamaindex/src/storage/docStore/PostgresDocumentStore.ts
@@ -1,19 +1,29 @@
 import { DEFAULT_NAMESPACE } from "@llamaindex/core/global";
-import { PostgresKVStore } from "../kvStore/PostgresKVStore.js";
+import {
+  PostgresKVStore,
+  type PostgresKVStoreConfig,
+} from "../kvStore/PostgresKVStore.js";
 import { KVDocumentStore } from "./KVDocumentStore.js";
 
 const DEFAULT_TABLE_NAME = "llamaindex_doc_store";
 
+export type PostgresDocumentStoreConfig = PostgresKVStoreConfig & {
+  namespace?: string;
+};
+
 export class PostgresDocumentStore extends KVDocumentStore {
-  constructor(config?: {
-    schemaName?: string;
-    tableName?: string;
-    connectionString?: string;
-    namespace?: string;
-  }) {
+  constructor(config?: PostgresDocumentStoreConfig) {
     const kvStore = new PostgresKVStore({
       schemaName: config?.schemaName,
       tableName: config?.tableName || DEFAULT_TABLE_NAME,
+      ...(config && "clientConfig" in config
+        ? { clientConfig: config.clientConfig }
+        : config && "client" in config
+          ? {
+              client: config.client,
+              shouldConnect: config.shouldConnect ?? false,
+            }
+          : {}),
     });
     const namespace = config?.namespace || DEFAULT_NAMESPACE;
     super(kvStore, namespace);
diff --git a/packages/llamaindex/src/storage/indexStore/PostgresIndexStore.ts b/packages/llamaindex/src/storage/indexStore/PostgresIndexStore.ts
index ee7991816..6e4420743 100644
--- a/packages/llamaindex/src/storage/indexStore/PostgresIndexStore.ts
+++ b/packages/llamaindex/src/storage/indexStore/PostgresIndexStore.ts
@@ -1,19 +1,29 @@
 import { DEFAULT_NAMESPACE } from "@llamaindex/core/global";
-import { PostgresKVStore } from "../kvStore/PostgresKVStore.js";
+import {
+  PostgresKVStore,
+  type PostgresKVStoreConfig,
+} from "../kvStore/PostgresKVStore.js";
 import { KVIndexStore } from "./KVIndexStore.js";
 
 const DEFAULT_TABLE_NAME = "llamaindex_index_store";
 
+export type PostgresIndexStoreConfig = PostgresKVStoreConfig & {
+  namespace?: string;
+};
+
 export class PostgresIndexStore extends KVIndexStore {
-  constructor(config?: {
-    schemaName?: string;
-    tableName?: string;
-    connectionString?: string;
-    namespace?: string;
-  }) {
+  constructor(config?: PostgresIndexStoreConfig) {
     const kvStore = new PostgresKVStore({
       schemaName: config?.schemaName,
       tableName: config?.tableName || DEFAULT_TABLE_NAME,
+      ...(config && "clientConfig" in config
+        ? { clientConfig: config.clientConfig }
+        : config && "client" in config
+          ? {
+              client: config.client,
+              shouldConnect: config.shouldConnect ?? false,
+            }
+          : {}),
     });
     const namespace = config?.namespace || DEFAULT_NAMESPACE;
     super(kvStore, namespace);
diff --git a/packages/llamaindex/src/storage/kvStore/PostgresKVStore.ts b/packages/llamaindex/src/storage/kvStore/PostgresKVStore.ts
index cadcb06b7..ee96040b9 100644
--- a/packages/llamaindex/src/storage/kvStore/PostgresKVStore.ts
+++ b/packages/llamaindex/src/storage/kvStore/PostgresKVStore.ts
@@ -7,41 +7,76 @@ export type DataType = Record<string, Record<string, any>>;
 const DEFAULT_SCHEMA_NAME = "public";
 const DEFAULT_TABLE_NAME = "llamaindex_kv_store";
 
+export type PostgresKVStoreBaseConfig = {
+  schemaName?: string | undefined;
+  tableName?: string | undefined;
+};
+
+export type PostgresKVStoreClientConfig =
+  | {
+      /**
+       * Client configuration options for the pg client.
+       *
+       * {@link https://node-postgres.com/apis/client#new-client PostgresSQL Client API}
+       */
+      clientConfig?: pg.ClientConfig | undefined;
+    }
+  | {
+      /**
+       * A pg client or pool client instance.
+       * If provided, make sure it is not connected to the database yet, or it will throw an error.
+       */
+      shouldConnect?: boolean | undefined;
+      client?: pg.Client | pg.PoolClient;
+    };
+
+export type PostgresKVStoreConfig = PostgresKVStoreBaseConfig &
+  PostgresKVStoreClientConfig;
+
 export class PostgresKVStore extends BaseKVStore {
   private schemaName: string;
   private tableName: string;
-  private connectionString: string | undefined = undefined;
-  private db?: pg.Client;
 
-  constructor(config?: {
-    schemaName?: string | undefined;
-    tableName?: string | undefined;
-    connectionString?: string | undefined;
-  }) {
+  private isDBConnected: boolean = false;
+  private clientConfig: pg.ClientConfig | undefined = undefined;
+  private db?: pg.ClientBase | undefined = undefined;
+
+  constructor(config?: PostgresKVStoreConfig) {
     super();
     this.schemaName = config?.schemaName || DEFAULT_SCHEMA_NAME;
     this.tableName = config?.tableName || DEFAULT_TABLE_NAME;
-    this.connectionString = config?.connectionString;
+    if (config) {
+      if ("clientConfig" in config) {
+        this.clientConfig = config.clientConfig;
+      } else if ("client" in config) {
+        this.isDBConnected =
+          config?.shouldConnect !== undefined ? !config.shouldConnect : false;
+        this.db = config.client;
+      }
+    }
   }
 
-  private async getDb(): Promise<pg.Client> {
+  private async getDb(): Promise<pg.ClientBase> {
     if (!this.db) {
-      try {
-        const pg = await import("pg");
-        const { Client } = pg.default ? pg.default : pg;
-        const db = new Client({ connectionString: this.connectionString });
-        await db.connect();
-        await this.checkSchema(db);
-        this.db = db;
-      } catch (err) {
-        console.error(err);
-        return Promise.reject(err instanceof Error ? err : new Error(`${err}`));
-      }
+      const pg = await import("pg");
+      const { Client } = pg.default ? pg.default : pg;
+      const db = new Client({ ...this.clientConfig });
+      await db.connect();
+      this.isDBConnected = true;
+      this.db = db;
+    }
+    if (this.db && !this.isDBConnected) {
+      await this.db.connect();
+      this.isDBConnected = true;
     }
-    return Promise.resolve(this.db);
+    this.db.on("end", () => {
+      this.isDBConnected = false;
+    });
+    await this.checkSchema(this.db);
+    return this.db;
   }
 
-  private async checkSchema(db: pg.Client) {
+  private async checkSchema(db: pg.ClientBase) {
     await db.query(`CREATE SCHEMA IF NOT EXISTS ${this.schemaName}`);
     const tbl = `CREATE TABLE IF NOT EXISTS ${this.schemaName}.${this.tableName} (
       id uuid DEFAULT gen_random_uuid() PRIMARY KEY,
@@ -97,7 +132,7 @@ export class PostgresKVStore extends BaseKVStore {
       const sql = `SELECT * FROM ${this.schemaName}.${this.tableName} WHERE key = $1 AND collection = $2`;
       const result = await db.query(sql, [key, collection]);
       await db.query("COMMIT");
-      return result.rows[0].value;
+      return result.rows[0]?.value;
     } catch (error) {
       await db.query("ROLLBACK");
       throw error;
-- 
GitLab