diff --git a/semantic_router/index/base.py b/semantic_router/index/base.py index 50702bdfe21cd28af27808c801b772e1c2e39fb9..0391e3fdcb4006f92e02efea7396a72928183250 100644 --- a/semantic_router/index/base.py +++ b/semantic_router/index/base.py @@ -241,7 +241,7 @@ class BaseIndex(BaseModel): :return: The config parameter that was read. :rtype: ConfigParameter """ - logger.warning("Async method not implemented.") + logger.warning("_async_read_config method not implemented.") return self._read_config(field=field, scope=scope) def _write_config(self, config: ConfigParameter) -> ConfigParameter: @@ -356,6 +356,7 @@ class BaseIndex(BaseModel): """Lock/unlock the index for a given scope (if applicable). If index already locked/unlocked, raises ValueError. """ + logger.warning(f"JBTEMP alock method called with {value=} {wait=} {scope=}") start_time = datetime.now() while True: if await self._ais_locked(scope=scope) != value: diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index da24e226b5bd4ae91762078c1078ed284938c56f..a92c57bd70109b399fed1e698598e33ee0be9835 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -525,7 +525,7 @@ class PineconeIndex(BaseIndex): ids=[config_id], namespace="sr_config", ) - if config_record["vectors"]: + if config_record.get("vectors"): return ConfigParameter( field=field, value=config_record["vectors"][config_id]["metadata"]["value"], @@ -542,6 +542,47 @@ class PineconeIndex(BaseIndex): scope=scope, ) + async def _async_read_config(self, field: str, scope: str | None = None) -> ConfigParameter: + """Read a config parameter from the index asynchronously. + + :param field: The field to read. + :type field: str + :param scope: The scope to read. + :type scope: str | None + :return: The config parameter that was read. + :rtype: ConfigParameter + """ + scope = scope or self.namespace + if self.index is None: + return ConfigParameter( + field=field, + value="", + scope=scope, + ) + config_id = f"{field}#{scope}" + logger.warning(f"JBTEMP Pinecone config id: {config_id}") + config_record = await self._async_fetch_metadata( + vector_id=config_id, namespace="sr_config" + ) + logger.warning(f"JBTEMP Pinecone config record: {config_record}") + if config_record: + try: + return ConfigParameter( + field=field, + value=config_record["value"], + created_at=config_record["created_at"], + scope=scope, + ) + except KeyError: + raise ValueError(f"Found invalid config record during sync: {config_record}") + else: + logger.warning(f"Configuration for {field} parameter not found in index.") + return ConfigParameter( + field=field, + value="", + scope=scope, + ) + def _write_config(self, config: ConfigParameter) -> ConfigParameter: """Method to write a config parameter to the remote Pinecone index. @@ -570,8 +611,10 @@ class PineconeIndex(BaseIndex): raise ValueError("Index has not been initialized.") if self.dimensions is None: raise ValueError("Must set PineconeIndex.dimensions before writing config.") - self.index.upsert( - vectors=[config.to_pinecone(dimensions=self.dimensions)], + pinecone_config = config.to_pinecone(dimensions=self.dimensions) + logger.warning(f"JBTEMP Pinecone config to upsert: {pinecone_config}") + await self._async_upsert( + vectors=[pinecone_config], namespace="sr_config", ) return config @@ -682,11 +725,14 @@ class PineconeIndex(BaseIndex): "vectors": vectors, "namespace": namespace, } + logger.warning(f"JBTEMP Pinecone upsert params: {params}") async with self.async_client.post( - f"{self.base_url}/vectors/upsert", + f"https://{self.host}/vectors/upsert", json=params, ) as response: - return await response.json(content_type=None) + res = await response.json(content_type=None) + logger.warning(f"JBTEMP Pinecone upsert response: {res}") + return res async def _async_create_index( self, @@ -704,7 +750,6 @@ class PineconeIndex(BaseIndex): } async with self.async_client.post( f"{self.base_url}/indexes", - headers={"Api-Key": self.api_key}, json=params, ) as response: return await response.json(content_type=None) @@ -715,7 +760,7 @@ class PineconeIndex(BaseIndex): "namespace": namespace, } async with self.async_client.post( - f"{self.base_url}/vectors/delete", json=params + f"https://{self.host}/vectors/delete", json=params, ) as response: return await response.json(content_type=None) @@ -785,12 +830,18 @@ class PineconeIndex(BaseIndex): return all_vector_ids, metadata - async def _async_fetch_metadata(self, vector_id: str) -> dict: + async def _async_fetch_metadata( + self, + vector_id: str, + namespace: str | None = None, + ) -> dict: """Fetch metadata for a single vector ID asynchronously using the async_client. :param vector_id: The ID of the vector to fetch metadata for. :type vector_id: str + :param namespace: The namespace to fetch metadata for. + :type namespace: str | None :return: A dictionary containing the metadata for the vector. :rtype: dict """ @@ -801,8 +852,11 @@ class PineconeIndex(BaseIndex): params = { "ids": [vector_id], } + logger.warning(f"JBTEMP Pinecone fetch params: {params}") - if self.namespace: + if namespace: + params["namespace"] = [namespace] + elif self.namespace: params["namespace"] = [self.namespace] headers = {