diff --git a/semantic_router/index/base.py b/semantic_router/index/base.py index 76388d1dd7dfcdde1d5630324f82361a1341936b..c51c3ff2a543d4496ee000fd8e2776faaa0f32cc 100644 --- a/semantic_router/index/base.py +++ b/semantic_router/index/base.py @@ -33,14 +33,9 @@ class BaseIndex(BaseModel): """ raise NotImplementedError("This method should be implemented by subclasses.") - def _add_and_sync( - self, - embeddings: List[List[float]], - routes: List[str], - utterances: List[Any], - ): + def _remove_and_sync(self, routes_to_delete: dict): """ - Add embeddings to the index and manage index syncing if necessary. + Remove embeddings in a routes syncing process from the index. This method should be implemented by subclasses. """ raise NotImplementedError("This method should be implemented by subclasses.") @@ -91,7 +86,9 @@ class BaseIndex(BaseModel): """ raise NotImplementedError("This method should be implemented by subclasses.") - def _sync_index(self, local_routes: dict): + def _sync_index( + self, local_route_names: List[str], local_utterances: List[str], dimensions: int + ): """ Synchronize the local index with the remote index based on the specified mode. Modes: diff --git a/semantic_router/index/local.py b/semantic_router/index/local.py index 5426ec76d8efc856a2c036a6dcd8afc33d69d091..7150b267587715d09124eb6f94ab242a7795c3f9 100644 --- a/semantic_router/index/local.py +++ b/semantic_router/index/local.py @@ -42,15 +42,15 @@ class LocalIndex(BaseIndex): self.routes = np.concatenate([self.routes, routes_arr]) self.utterances = np.concatenate([self.utterances, utterances_arr]) - def _add_and_sync( - self, - embeddings: List[List[float]], - routes: List[str], - utterances: List[str], + def _remove_and_sync(self, routes_to_delete: dict): + if self.sync is not None: + logger.warning("Sync remove is not implemented for LocalIndex.") + + def _sync_index( + self, local_route_names: List[str], local_utterances: List[str], dimensions: int ): if self.sync is not None: - logger.warning("Sync add is not implemented for LocalIndex.") - self.add(embeddings, routes, utterances) + logger.error("Sync remove is not implemented for LocalIndex.") def get_routes(self) -> List[Tuple]: """ diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index 858d56c9013c4d0190240d3cbc9bb6ab65d90b6d..b4f6033fb897ff77fe649f83ebd90705e5a50f52 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -11,7 +11,6 @@ from pydantic.v1 import BaseModel, Field from semantic_router.index.base import BaseIndex from semantic_router.utils.logger import logger -from semantic_router.route import Route def clean_route_name(route_name: str) -> str: @@ -202,15 +201,21 @@ class PineconeIndex(BaseIndex): logger.warning("Index could not be initialized.") self.host = index_stats["host"] if index_stats else None - def _sync_index(self, local_routes: dict): + def _sync_index( + self, local_route_names: List[str], local_utterances: List[str], dimensions: int + ): + if self.index is None: + self.dimensions = self.dimensions or dimensions + self.index = self._init_index(force_create=True) + remote_routes = self.get_routes() remote_dict: dict = {route: set() for route, _ in remote_routes} for route, utterance in remote_routes: remote_dict[route].add(utterance) - local_dict: dict = {route: set() for route in local_routes["routes"]} - for route, utterance in zip(local_routes["routes"], local_routes["utterances"]): + local_dict: dict = {route: set() for route in local_route_names} + for route, utterance in zip(local_route_names, local_utterances): local_dict[route].add(utterance) all_routes = set(remote_dict.keys()).union(local_dict.keys()) @@ -251,7 +256,7 @@ class PineconeIndex(BaseIndex): layer_routes[route] = list(local_utterances) elif self.sync == "merge-force-remote": if route in local_dict and route not in remote_dict: - utterances_to_include = local_utterances + utterances_to_include = set(local_utterances) if local_utterances: layer_routes[route] = list(local_utterances) else: @@ -284,17 +289,7 @@ class PineconeIndex(BaseIndex): raise ValueError("Invalid sync mode specified") for utterance in utterances_to_include: - indices = [ - i - for i, x in enumerate(local_routes["utterances"]) - if x == utterance and local_routes["routes"][i] == route - ] - routes_to_add.extend( - [ - (local_routes["embeddings"][idx], route, utterance) - for idx in indices - ] - ) + routes_to_add.append((route, utterance)) return routes_to_add, routes_to_delete, layer_routes @@ -326,64 +321,17 @@ class PineconeIndex(BaseIndex): batch = vectors_to_upsert[i : i + batch_size] self._batch_upsert(batch) - def _add_and_sync( - self, - embeddings: List[List[float]], - routes: List[str], - utterances: List[str], - batch_size: int = 100, - ) -> List[Route]: - """Add vectors to Pinecone in batches and return the overall updated list of Route objects.""" - if self.index is None: - self.dimensions = self.dimensions or len(embeddings[0]) - self.index = self._init_index(force_create=True) - - local_routes = { - "routes": routes, - "utterances": utterances, - "embeddings": embeddings, - } - if self.sync is not None: - data_to_upsert, data_to_delete, layer_routes_dict = self._sync_index( - local_routes=local_routes - ) - - layer_routes = [ - Route(name=route, utterances=layer_routes_dict[route]) - for route in layer_routes_dict.keys() - ] - - routes_to_delete: dict = {} - for route, utterance in data_to_delete: - routes_to_delete.setdefault(route, []).append(utterance) - - for route, utterances in routes_to_delete.items(): - remote_routes = self._get_routes_with_ids(route_name=route) - ids_to_delete = [ - r["id"] - for r in remote_routes - if (r["route"], r["utterance"]) - in zip([route] * len(utterances), utterances) - ] - if ids_to_delete and self.index: - self.index.delete(ids=ids_to_delete) - - else: - data_to_upsert = [ - (vector, route, utterance) - for vector, route, utterance in zip(embeddings, routes, utterances) + def _remove_and_sync(self, routes_to_delete: dict): + for route, utterances in routes_to_delete.items(): + remote_routes = self._get_routes_with_ids(route_name=route) + ids_to_delete = [ + r["id"] + for r in remote_routes + if (r["route"], r["utterance"]) + in zip([route] * len(utterances), utterances) ] - - vectors_to_upsert = [ - PineconeRecord(values=vector, route=route, utterance=utterance).to_dict() - for vector, route, utterance in data_to_upsert - ] - - for i in range(0, len(vectors_to_upsert), batch_size): - batch = vectors_to_upsert[i : i + batch_size] - self._batch_upsert(batch) - - return layer_routes + if ids_to_delete and self.index: + self.index.delete(ids=ids_to_delete) def _get_route_ids(self, route_name: str): clean_route = clean_route_name(route_name) diff --git a/semantic_router/index/qdrant.py b/semantic_router/index/qdrant.py index 0fff231495d236120bc1dd4d3756fbd6a9359e38..c1a5e28b58792091e4fb568248827954547e37a5 100644 --- a/semantic_router/index/qdrant.py +++ b/semantic_router/index/qdrant.py @@ -160,16 +160,15 @@ class QdrantIndex(BaseIndex): **self.config, ) - def _add_and_sync( - self, - embeddings: List[List[float]], - routes: List[str], - utterances: List[str], - batch_size: int = DEFAULT_UPLOAD_BATCH_SIZE, + def _remove_and_sync(self, routes_to_delete: dict): + if self.sync is not None: + logger.error("Sync remove is not implemented for QdrantIndex.") + + def _sync_index( + self, local_route_names: List[str], local_utterances: List[str], dimensions: int ): if self.sync is not None: - logger.warning("Sync add is not implemented for QdrantIndex") - self.add(embeddings, routes, utterances, batch_size) + logger.error("Sync remove is not implemented for QdrantIndex.") def add( self, diff --git a/semantic_router/layer.py b/semantic_router/layer.py index 7f80945e4085f2e5f28263ff1e4cb8b5838e3fed..25e14b8f4343c03c13bc709cae7b5d2bf37ddb8c 100644 --- a/semantic_router/layer.py +++ b/semantic_router/layer.py @@ -222,14 +222,7 @@ class RouteLayer: if len(self.routes) > 0: self._add_and_sync_routes(routes=self.routes) else: - dummy_embedding = self.encoder(["dummy"]) - - layer_routes = self.index._add_and_sync( - embeddings=dummy_embedding, - routes=[], - utterances=[], - ) - self._set_layer_routes(layer_routes) + self._add_and_sync_routes(routes=[]) elif len(self.routes) > 0: self._add_routes(routes=self.routes) @@ -484,12 +477,9 @@ class RouteLayer: def _add_routes(self, routes: List[Route]): # create embeddings for all routes - all_utterances = [ - utterance for route in routes for utterance in route.utterances - ] + route_names, all_utterances = self._extract_routes_details(routes) embedded_utterances = self.encoder(all_utterances) # create route array - route_names = [route.name for route in routes for _ in route.utterances] # add everything to the index self.index.add( embeddings=embedded_utterances, @@ -499,20 +489,43 @@ class RouteLayer: def _add_and_sync_routes(self, routes: List[Route]): # create embeddings for all routes and sync at startup with remote ones based on sync setting - all_utterances = [ - utterance for route in routes for utterance in route.utterances + local_route_names, local_utterances = self._extract_routes_details(routes) + routes_to_add, routes_to_delete, layer_routes_dict = self.index._sync_index( + local_route_names=local_route_names, + local_utterances=local_utterances, + dimensions=len(self.encoder(["dummy"])[0]), + ) + + layer_routes = [ + Route(name=route, utterances=layer_routes_dict[route]) + for route in layer_routes_dict.keys() ] - embedded_utterances = self.encoder(all_utterances) - # create route array - route_names = [route.name for route in routes for _ in route.utterances] - # add everything to the index - layer_routes = self.index._add_and_sync( - embeddings=embedded_utterances, - routes=route_names, - utterances=all_utterances, + + data_to_delete: dict = {} + for route, utterance in routes_to_delete: + data_to_delete.setdefault(route, []).append(utterance) + self.index._remove_and_sync(data_to_delete) + + all_utterances_to_add = [utt for _, utt in routes_to_add] + embedded_utterances_to_add = ( + self.encoder(all_utterances_to_add) if all_utterances_to_add else [] ) + + route_names_to_add = [route for route, _, in routes_to_add] + + self.index.add( + embeddings=embedded_utterances_to_add, + routes=route_names_to_add, + utterances=all_utterances_to_add, + ) + self._set_layer_routes(layer_routes) + def _extract_routes_details(self, routes: List[Route]) -> Tuple: + route_names = [route.name for route in routes for _ in route.utterances] + utterances = [utterance for route in routes for utterance in route.utterances] + return route_names, utterances + def _encode(self, text: str) -> Any: """Given some text, encode it.""" # create query vector