From 0e5aa5dcf76b6755a20a837b4b0b42e0a9a98a2c Mon Sep 17 00:00:00 2001 From: James Briggs <james.briggs@hotmail.com> Date: Fri, 15 Nov 2024 00:16:40 +0100 Subject: [PATCH] fix: pinecone namespace test and index sync removal --- semantic_router/index/base.py | 36 ------ semantic_router/index/local.py | 14 +-- semantic_router/index/pinecone.py | 2 - semantic_router/index/postgres.py | 2 +- semantic_router/index/qdrant.py | 14 +-- tests/unit/test_layer.py | 181 +++++++++++++----------------- 6 files changed, 80 insertions(+), 169 deletions(-) diff --git a/semantic_router/index/base.py b/semantic_router/index/base.py index d8b7fc5f..24fd337b 100644 --- a/semantic_router/index/base.py +++ b/semantic_router/index/base.py @@ -24,7 +24,6 @@ class BaseIndex(BaseModel): dimensions: Union[int, None] = None type: str = "base" init_async_index: bool = False - sync: Union[str, None] = None def add( self, @@ -153,41 +152,6 @@ class BaseIndex(BaseModel): logger.warning("This method should be implemented by subclasses.") self.index = None - def is_synced( - self, - local_route_names: List[str], - local_utterances_list: List[str], - local_function_schemas_list: List[Dict[str, Any]], - local_metadata_list: List[Dict[str, Any]], - ) -> bool: - """ - Checks whether local and remote index are synchronized. - This method should be implemented by subclasses. - """ - raise NotImplementedError("This method should be implemented by subclasses.") - - # def _sync_index( - # self, - # local_route_names: List[str], - # local_utterances: List[str], - # local_function_schemas: List[Dict[str, Any]], - # local_metadata: List[Dict[str, Any]], - # dimensions: int, - # ): - # """ - # Synchronize the local index with the remote index based on the specified mode. - # Modes: - # - "error": Raise an error if local and remote are not synchronized. - # - "remote": Take remote as the source of truth and update local to align. - # - "local": Take local as the source of truth and update remote to align. - # - "merge-force-remote": Merge both local and remote taking only remote routes features when a route with same route name is present both locally and remotely. - # - "merge-force-local": Merge both local and remote taking only local routes features when a route with same route name is present both locally and remotely. - # - "merge": Merge both local and remote, merging also local and remote features when a route with same route name is present both locally and remotely. - - # This method should be implemented by subclasses. - # """ - # raise NotImplementedError("This method should be implemented by subclasses.") - def _read_hash(self) -> ConfigParameter: """ Read the hash of the previously written index. diff --git a/semantic_router/index/local.py b/semantic_router/index/local.py index 476c1dd9..721adf05 100644 --- a/semantic_router/index/local.py +++ b/semantic_router/index/local.py @@ -47,19 +47,7 @@ class LocalIndex(BaseIndex): self.utterances = np.concatenate([self.utterances, utterances_arr]) def _remove_and_sync(self, routes_to_delete: dict): - if self.sync is not None: - logger.warning("Sync remove is not implemented for LocalIndex.") - - # def _sync_index( - # self, - # local_route_names: List[str], - # local_utterances: List[str], - # local_function_schemas: List[Dict[str, Any]], - # local_metadata: List[Dict[str, Any]], - # dimensions: int, - # ): - # if self.sync is not None: - # logger.error("Sync remove is not implemented for LocalIndex.") + logger.warning("Sync remove is not implemented for LocalIndex.") def get_utterances(self) -> List[Utterance]: """ diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index 47a9307c..5b2cdcd8 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -76,7 +76,6 @@ class PineconeIndex(BaseIndex): host: str = "", namespace: Optional[str] = "", base_url: Optional[str] = "https://api.pinecone.io", - sync: Optional[str] = None, init_async_index: bool = False, ): super().__init__() @@ -92,7 +91,6 @@ class PineconeIndex(BaseIndex): self.type = "pinecone" self.api_key = api_key or os.getenv("PINECONE_API_KEY") self.base_url = base_url - self.sync = sync if self.api_key is None: raise ValueError("Pinecone API key is required.") diff --git a/semantic_router/index/postgres.py b/semantic_router/index/postgres.py index 9ce6eec0..0d80d745 100644 --- a/semantic_router/index/postgres.py +++ b/semantic_router/index/postgres.py @@ -450,7 +450,7 @@ class PostgresIndex(BaseIndex): self.conn.commit() def aget_routes(self): - raise NotImplementedError("Sync remove is not implemented for PostgresIndex.") + raise NotImplementedError("Async get is not implemented for PostgresIndex.") def _write_config(self, config: ConfigParameter): logger.warning("No config is written for PostgresIndex.") diff --git a/semantic_router/index/qdrant.py b/semantic_router/index/qdrant.py index 2c28aaaf..eb7dc688 100644 --- a/semantic_router/index/qdrant.py +++ b/semantic_router/index/qdrant.py @@ -160,19 +160,7 @@ class QdrantIndex(BaseIndex): ) def _remove_and_sync(self, routes_to_delete: dict): - if self.sync is not None: - logger.error("Sync remove is not implemented for QdrantIndex.") - - # def _sync_index( - # self, - # local_route_names: List[str], - # local_utterances_list: List[str], - # local_function_schemas: List[Dict[str, Any]], - # local_metadata_list: List[Dict[str, Any]], - # dimensions: int, - # ): - # if self.sync is not None: - # logger.error("Sync remove is not implemented for QdrantIndex.") + logger.error("Sync remove is not implemented for QdrantIndex.") def add( self, diff --git a/tests/unit/test_layer.py b/tests/unit/test_layer.py index ea98bf0c..76e048cd 100644 --- a/tests/unit/test_layer.py +++ b/tests/unit/test_layer.py @@ -41,14 +41,13 @@ def init_index( index_cls, dimensions: Optional[int] = None, namespace: Optional[str] = "", - sync: str = "local", ): """We use this function to initialize indexes with different names to avoid issues during testing. """ if index_cls is PineconeIndex: index = index_cls( - index_name=TEST_ID, dimensions=dimensions, namespace=namespace, sync=sync + index_name=TEST_ID, dimensions=dimensions, namespace=namespace ) else: index = index_cls() @@ -228,9 +227,10 @@ class TestRouteLayer: def test_initialization_dynamic_route( self, cohere_encoder, openai_encoder, dynamic_routes, index_cls ): - index = init_index(index_cls, sync="local") + index = init_index(index_cls) route_layer_cohere = RouteLayer( - encoder=cohere_encoder, routes=dynamic_routes, index=index + encoder=cohere_encoder, routes=dynamic_routes, index=index, + auto_sync="local" if index_cls is PineconeIndex else None, ) assert route_layer_cohere.score_threshold == 0.3 route_layer_openai = RouteLayer( @@ -251,7 +251,7 @@ class TestRouteLayer: assert route_layer.index.get_utterances() == [] def test_add_route(self, routes, openai_encoder, index_cls): - index = init_index(index_cls, sync="local") + index = init_index(index_cls) route_layer = RouteLayer( encoder=openai_encoder, routes=[], index=index, auto_sync="local" @@ -280,16 +280,26 @@ class TestRouteLayer: assert len(route_layer.index.get_utterances()) == 5 def test_list_route_names(self, openai_encoder, routes, index_cls): - index = init_index(index_cls, sync="local") - route_layer = RouteLayer(encoder=openai_encoder, routes=routes, index=index) + index = init_index(index_cls) + route_layer = RouteLayer( + encoder=openai_encoder, routes=routes, index=index, + auto_sync="local" if index_cls is PineconeIndex else None, + ) + if index_cls is PineconeIndex: + time.sleep(PINECONE_SLEEP) # allow for index to be populated route_names = route_layer.list_route_names() assert set(route_names) == { route.name for route in routes }, "The list of route names should match the names of the routes added." def test_delete_route(self, openai_encoder, routes, index_cls): - index = init_index(index_cls, sync="local") - route_layer = RouteLayer(encoder=openai_encoder, routes=routes, index=index) + index = init_index(index_cls) + route_layer = RouteLayer( + encoder=openai_encoder, routes=routes, index=index, + auto_sync="local" if index_cls is PineconeIndex else None, + ) + if index_cls is PineconeIndex: + time.sleep(PINECONE_SLEEP) # allow for index to be populated # Delete a route by name route_to_delete = routes[0].name route_layer.delete(route_to_delete) @@ -316,8 +326,11 @@ class TestRouteLayer: # we should see warning in logs only (ie no errors) def test_add_multiple_routes(self, openai_encoder, routes, index_cls): - index = init_index(index_cls, sync="local") - route_layer = RouteLayer(encoder=openai_encoder, index=index) + index = init_index(index_cls) + route_layer = RouteLayer( + encoder=openai_encoder, index=index, + auto_sync="local", + ) if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) route_layer._add_routes(routes=routes) @@ -327,17 +340,22 @@ class TestRouteLayer: assert len(route_layer.index.get_utterances()) == 5 def test_query_and_classification(self, openai_encoder, routes, index_cls): - index = init_index(index_cls, dimensions=3, sync="local") - route_layer = RouteLayer(encoder=openai_encoder, routes=routes, index=index) + index = init_index(index_cls, dimensions=3) + route_layer = RouteLayer( + encoder=openai_encoder, routes=routes, index=index, + auto_sync="local", + ) if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) # allow for index to be populated query_result = route_layer(text="Hello").name assert query_result in ["Route 1", "Route 2"] def test_query_filter(self, openai_encoder, routes, index_cls): - index = init_index(index_cls, dimensions=3, sync="local") - route_layer = RouteLayer(encoder=openai_encoder, routes=routes, index=index) - + index = init_index(index_cls, dimensions=3) + route_layer = RouteLayer( + encoder=openai_encoder, routes=routes, index=index, + auto_sync="local", + ) if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) # allow for index to be populated query_result = route_layer(text="Hello", route_filter=["Route 1"]).name @@ -354,9 +372,10 @@ class TestRouteLayer: ) def test_query_filter_pinecone(self, openai_encoder, routes, index_cls): if index_cls is PineconeIndex: - pineconeindex = init_index(index_cls, dimensions=3, sync="local") + pineconeindex = init_index(index_cls, dimensions=3) route_layer = RouteLayer( - encoder=openai_encoder, routes=routes, index=pineconeindex + encoder=openai_encoder, routes=routes, index=pineconeindex, + auto_sync="local", ) time.sleep(PINECONE_SLEEP) # allow for index to be populated query_result = route_layer(text="Hello", route_filter=["Route 1"]).name @@ -375,7 +394,8 @@ class TestRouteLayer: if index_cls is PineconeIndex: pineconeindex = init_index(index_cls, namespace="test") route_layer = RouteLayer( - encoder=openai_encoder, routes=routes, index=pineconeindex + encoder=openai_encoder, routes=routes, index=pineconeindex, + auto_sync="local", ) time.sleep(PINECONE_SLEEP) # allow for index to be populated query_result = route_layer(text="Hello", route_filter=["Route 1"]).name @@ -388,86 +408,17 @@ class TestRouteLayer: assert query_result in ["Route 1"] route_layer.index.index.delete(namespace="test", delete_all=True) - @pytest.mark.skipif( - os.environ.get("PINECONE_API_KEY") is None, reason="Pinecone API key required" - ) - def test_sync_pinecone(self, openai_encoder, routes, routes_2, routes_4, index_cls): - if index_cls is PineconeIndex: - # TEST LOCAL - pinecone_index = init_index(index_cls, sync="local") - route_layer = RouteLayer( - encoder=openai_encoder, routes=routes_2, index=pinecone_index - ) - time.sleep(PINECONE_SLEEP) # allow for index to be populated - assert route_layer.index.get_utterances() == [ - ("Route 1", "Hello", None, {}), - ("Route 2", "Hi", None, {}), - ], "The routes in the index should match the local routes" - - # TEST REMOTE - pinecone_index = init_index(index_cls, sync="remote") - route_layer = RouteLayer( - encoder=openai_encoder, routes=routes, index=pinecone_index - ) - - time.sleep(PINECONE_SLEEP) # allow for index to be populated - assert route_layer.index.get_utterances() == [ - ("Route 1", "Hello", None, {}), - ("Route 2", "Hi", None, {}), - ], "The routes in the index should match the local routes" - - # TEST MERGE FORCE REMOTE - pinecone_index = init_index(index_cls, sync="merge-force-remote") - route_layer = RouteLayer( - encoder=openai_encoder, routes=routes, index=pinecone_index - ) - - time.sleep(PINECONE_SLEEP) # allow for index to be populated - assert route_layer.index.get_utterances() == [ - ("Route 1", "Hello", None, {}), - ("Route 2", "Hi", None, {}), - ], "The routes in the index should match the local routes" - - # TEST MERGE FORCE LOCAL - pinecone_index = init_index(index_cls, sync="merge-force-local") - route_layer = RouteLayer( - encoder=openai_encoder, routes=routes, index=pinecone_index - ) - - time.sleep(PINECONE_SLEEP) # allow for index to be populated - assert route_layer.index.get_utterances() == [ - ("Route 1", "Hello", None, {"type": "default"}), - ("Route 1", "Hi", None, {"type": "default"}), - ("Route 2", "Bye", None, {}), - ("Route 2", "Au revoir", None, {}), - ("Route 2", "Goodbye", None, {}), - ], "The routes in the index should match the local routes" - - # TEST MERGE - pinecone_index = init_index(index_cls, sync="merge") - route_layer = RouteLayer( - encoder=openai_encoder, routes=routes_4, index=pinecone_index - ) - - time.sleep(PINECONE_SLEEP) # allow for index to be populated - assert route_layer.index.get_utterances() == [ - ("Route 1", "Hello", None, {"type": "default"}), - ("Route 1", "Hi", None, {"type": "default"}), - ("Route 1", "Goodbye", None, {"type": "default"}), - ("Route 2", "Bye", None, {}), - ("Route 2", "Asparagus", None, {}), - ("Route 2", "Au revoir", None, {}), - ("Route 2", "Goodbye", None, {}), - ], "The routes in the index should match the local routes" - def test_query_with_no_index(self, openai_encoder, index_cls): route_layer = RouteLayer(encoder=openai_encoder) with pytest.raises(ValueError): assert route_layer(text="Anything").name is None def test_query_with_vector(self, openai_encoder, routes, index_cls): - index = init_index(index_cls, sync="local") - route_layer = RouteLayer(encoder=openai_encoder, routes=routes, index=index) + index = init_index(index_cls) + route_layer = RouteLayer( + encoder=openai_encoder, routes=routes, index=index, + auto_sync="local", + ) if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) # allow for index to be populated vector = [0.1, 0.2, 0.3] @@ -481,8 +432,13 @@ class TestRouteLayer: route_layer() def test_semantic_classify(self, openai_encoder, routes, index_cls): - index = init_index(index_cls, sync="local") - route_layer = RouteLayer(encoder=openai_encoder, routes=routes, index=index) + index = init_index(index_cls) + route_layer = RouteLayer( + encoder=openai_encoder, routes=routes, index=index, + auto_sync="local", + ) + if index_cls is PineconeIndex: + time.sleep(PINECONE_SLEEP) # allow for index to be populated classification, score = route_layer._semantic_classify( [ {"route": "Route 1", "score": 0.9}, @@ -493,8 +449,13 @@ class TestRouteLayer: assert score == [0.9] def test_semantic_classify_multiple_routes(self, openai_encoder, routes, index_cls): - index = init_index(index_cls, sync="local") - route_layer = RouteLayer(encoder=openai_encoder, routes=routes, index=index) + index = init_index(index_cls) + route_layer = RouteLayer( + encoder=openai_encoder, routes=routes, index=index, + auto_sync="local", + ) + if index_cls is PineconeIndex: + time.sleep(PINECONE_SLEEP) # allow for index to be populated classification, score = route_layer._semantic_classify( [ {"route": "Route 1", "score": 0.9}, @@ -517,14 +478,20 @@ class TestRouteLayer: route_layer(vector=vector) def test_pass_threshold(self, openai_encoder, index_cls): - index = init_index(index_cls, sync="local") - route_layer = RouteLayer(encoder=openai_encoder, index=index) + index = init_index(index_cls) + route_layer = RouteLayer( + encoder=openai_encoder, index=index, + auto_sync="local", + ) assert not route_layer._pass_threshold([], 0.3) assert route_layer._pass_threshold([0.6, 0.7], 0.3) def test_failover_score_threshold(self, openai_encoder, index_cls): - index = init_index(index_cls, sync="local") - route_layer = RouteLayer(encoder=openai_encoder, index=index) + index = init_index(index_cls) + route_layer = RouteLayer( + encoder=openai_encoder, index=index, + auto_sync="local", + ) assert route_layer.score_threshold == 0.3 def test_json(self, openai_encoder, routes, index_cls): @@ -533,8 +500,11 @@ class TestRouteLayer: temp_path = temp.name # Save the temporary file's path temp.close() # Close the file to ensure it can be opened again on Windows os.environ["OPENAI_API_KEY"] = "test_api_key" - index = init_index(index_cls, sync="local") - route_layer = RouteLayer(encoder=openai_encoder, routes=routes, index=index) + index = init_index(index_cls) + route_layer = RouteLayer( + encoder=openai_encoder, routes=routes, index=index, + auto_sync="local", + ) route_layer.to_json(temp_path) assert os.path.exists(temp_path) route_layer_from_file = RouteLayer.from_json(temp_path) @@ -554,7 +524,10 @@ class TestRouteLayer: temp.close() # Close the file to ensure it can be opened again on Windows os.environ["OPENAI_API_KEY"] = "test_api_key" index = init_index(index_cls) - route_layer = RouteLayer(encoder=openai_encoder, routes=routes, index=index) + route_layer = RouteLayer( + encoder=openai_encoder, routes=routes, index=index, + auto_sync="local", + ) route_layer.to_yaml(temp_path) assert os.path.exists(temp_path) route_layer_from_file = RouteLayer.from_yaml(temp_path) -- GitLab