Skip to content
Snippets Groups Projects
Unverified Commit 025ffe6b authored by Goran's avatar Goran Committed by GitHub
Browse files

fix: update `PostgresKVStore` constructor params (#1240)


Co-authored-by: default avatarAlex Yang <himself65@outlook.com>
parent a6595747
Branches
Tags
No related merge requests found
---
"llamaindex": patch
---
fix: update `PostgresKVStore` constructor params
import { DEFAULT_NAMESPACE } from "@llamaindex/core/global"; import { DEFAULT_NAMESPACE } from "@llamaindex/core/global";
import { PostgresKVStore } from "../kvStore/PostgresKVStore.js"; import {
PostgresKVStore,
type PostgresKVStoreConfig,
} from "../kvStore/PostgresKVStore.js";
import { KVDocumentStore } from "./KVDocumentStore.js"; import { KVDocumentStore } from "./KVDocumentStore.js";
const DEFAULT_TABLE_NAME = "llamaindex_doc_store"; const DEFAULT_TABLE_NAME = "llamaindex_doc_store";
export type PostgresDocumentStoreConfig = PostgresKVStoreConfig & {
namespace?: string;
};
export class PostgresDocumentStore extends KVDocumentStore { export class PostgresDocumentStore extends KVDocumentStore {
constructor(config?: { constructor(config?: PostgresDocumentStoreConfig) {
schemaName?: string;
tableName?: string;
connectionString?: string;
namespace?: string;
}) {
const kvStore = new PostgresKVStore({ const kvStore = new PostgresKVStore({
schemaName: config?.schemaName, schemaName: config?.schemaName,
tableName: config?.tableName || DEFAULT_TABLE_NAME, tableName: config?.tableName || DEFAULT_TABLE_NAME,
...(config && "clientConfig" in config
? { clientConfig: config.clientConfig }
: config && "client" in config
? {
client: config.client,
shouldConnect: config.shouldConnect ?? false,
}
: {}),
}); });
const namespace = config?.namespace || DEFAULT_NAMESPACE; const namespace = config?.namespace || DEFAULT_NAMESPACE;
super(kvStore, namespace); super(kvStore, namespace);
......
import { DEFAULT_NAMESPACE } from "@llamaindex/core/global"; import { DEFAULT_NAMESPACE } from "@llamaindex/core/global";
import { PostgresKVStore } from "../kvStore/PostgresKVStore.js"; import {
PostgresKVStore,
type PostgresKVStoreConfig,
} from "../kvStore/PostgresKVStore.js";
import { KVIndexStore } from "./KVIndexStore.js"; import { KVIndexStore } from "./KVIndexStore.js";
const DEFAULT_TABLE_NAME = "llamaindex_index_store"; const DEFAULT_TABLE_NAME = "llamaindex_index_store";
export type PostgresIndexStoreConfig = PostgresKVStoreConfig & {
namespace?: string;
};
export class PostgresIndexStore extends KVIndexStore { export class PostgresIndexStore extends KVIndexStore {
constructor(config?: { constructor(config?: PostgresIndexStoreConfig) {
schemaName?: string;
tableName?: string;
connectionString?: string;
namespace?: string;
}) {
const kvStore = new PostgresKVStore({ const kvStore = new PostgresKVStore({
schemaName: config?.schemaName, schemaName: config?.schemaName,
tableName: config?.tableName || DEFAULT_TABLE_NAME, tableName: config?.tableName || DEFAULT_TABLE_NAME,
...(config && "clientConfig" in config
? { clientConfig: config.clientConfig }
: config && "client" in config
? {
client: config.client,
shouldConnect: config.shouldConnect ?? false,
}
: {}),
}); });
const namespace = config?.namespace || DEFAULT_NAMESPACE; const namespace = config?.namespace || DEFAULT_NAMESPACE;
super(kvStore, namespace); super(kvStore, namespace);
......
...@@ -7,41 +7,76 @@ export type DataType = Record<string, Record<string, any>>; ...@@ -7,41 +7,76 @@ export type DataType = Record<string, Record<string, any>>;
const DEFAULT_SCHEMA_NAME = "public"; const DEFAULT_SCHEMA_NAME = "public";
const DEFAULT_TABLE_NAME = "llamaindex_kv_store"; const DEFAULT_TABLE_NAME = "llamaindex_kv_store";
export type PostgresKVStoreBaseConfig = {
schemaName?: string | undefined;
tableName?: string | undefined;
};
export type PostgresKVStoreClientConfig =
| {
/**
* Client configuration options for the pg client.
*
* {@link https://node-postgres.com/apis/client#new-client PostgresSQL Client API}
*/
clientConfig?: pg.ClientConfig | undefined;
}
| {
/**
* A pg client or pool client instance.
* If provided, make sure it is not connected to the database yet, or it will throw an error.
*/
shouldConnect?: boolean | undefined;
client?: pg.Client | pg.PoolClient;
};
export type PostgresKVStoreConfig = PostgresKVStoreBaseConfig &
PostgresKVStoreClientConfig;
export class PostgresKVStore extends BaseKVStore { export class PostgresKVStore extends BaseKVStore {
private schemaName: string; private schemaName: string;
private tableName: string; private tableName: string;
private connectionString: string | undefined = undefined;
private db?: pg.Client;
constructor(config?: { private isDBConnected: boolean = false;
schemaName?: string | undefined; private clientConfig: pg.ClientConfig | undefined = undefined;
tableName?: string | undefined; private db?: pg.ClientBase | undefined = undefined;
connectionString?: string | undefined;
}) { constructor(config?: PostgresKVStoreConfig) {
super(); super();
this.schemaName = config?.schemaName || DEFAULT_SCHEMA_NAME; this.schemaName = config?.schemaName || DEFAULT_SCHEMA_NAME;
this.tableName = config?.tableName || DEFAULT_TABLE_NAME; this.tableName = config?.tableName || DEFAULT_TABLE_NAME;
this.connectionString = config?.connectionString; if (config) {
if ("clientConfig" in config) {
this.clientConfig = config.clientConfig;
} else if ("client" in config) {
this.isDBConnected =
config?.shouldConnect !== undefined ? !config.shouldConnect : false;
this.db = config.client;
}
}
} }
private async getDb(): Promise<pg.Client> { private async getDb(): Promise<pg.ClientBase> {
if (!this.db) { if (!this.db) {
try { const pg = await import("pg");
const pg = await import("pg"); const { Client } = pg.default ? pg.default : pg;
const { Client } = pg.default ? pg.default : pg; const db = new Client({ ...this.clientConfig });
const db = new Client({ connectionString: this.connectionString }); await db.connect();
await db.connect(); this.isDBConnected = true;
await this.checkSchema(db); this.db = db;
this.db = db; }
} catch (err) { if (this.db && !this.isDBConnected) {
console.error(err); await this.db.connect();
return Promise.reject(err instanceof Error ? err : new Error(`${err}`)); this.isDBConnected = true;
}
} }
return Promise.resolve(this.db); this.db.on("end", () => {
this.isDBConnected = false;
});
await this.checkSchema(this.db);
return this.db;
} }
private async checkSchema(db: pg.Client) { private async checkSchema(db: pg.ClientBase) {
await db.query(`CREATE SCHEMA IF NOT EXISTS ${this.schemaName}`); await db.query(`CREATE SCHEMA IF NOT EXISTS ${this.schemaName}`);
const tbl = `CREATE TABLE IF NOT EXISTS ${this.schemaName}.${this.tableName} ( const tbl = `CREATE TABLE IF NOT EXISTS ${this.schemaName}.${this.tableName} (
id uuid DEFAULT gen_random_uuid() PRIMARY KEY, id uuid DEFAULT gen_random_uuid() PRIMARY KEY,
...@@ -97,7 +132,7 @@ export class PostgresKVStore extends BaseKVStore { ...@@ -97,7 +132,7 @@ export class PostgresKVStore extends BaseKVStore {
const sql = `SELECT * FROM ${this.schemaName}.${this.tableName} WHERE key = $1 AND collection = $2`; const sql = `SELECT * FROM ${this.schemaName}.${this.tableName} WHERE key = $1 AND collection = $2`;
const result = await db.query(sql, [key, collection]); const result = await db.query(sql, [key, collection]);
await db.query("COMMIT"); await db.query("COMMIT");
return result.rows[0].value; return result.rows[0]?.value;
} catch (error) { } catch (error) {
await db.query("ROLLBACK"); await db.query("ROLLBACK");
throw error; throw error;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment