diff --git a/semantic_router/index/base.py b/semantic_router/index/base.py index 9eb99532eb0db9d6128260f77f7fce1027393e6d..76388d1dd7dfcdde1d5630324f82361a1341936b 100644 --- a/semantic_router/index/base.py +++ b/semantic_router/index/base.py @@ -32,10 +32,15 @@ class BaseIndex(BaseModel): This method should be implemented by subclasses. """ raise NotImplementedError("This method should be implemented by subclasses.") - - def _remove_and_sync(self, routes_to_delete: dict): + + def _add_and_sync( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[Any], + ): """ - Remove embeddings in a routes syncing process from the index. + Add embeddings to the index and manage index syncing if necessary. This method should be implemented by subclasses. """ raise NotImplementedError("This method should be implemented by subclasses.") @@ -86,7 +91,7 @@ class BaseIndex(BaseModel): """ raise NotImplementedError("This method should be implemented by subclasses.") - def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int): + def _sync_index(self, local_routes: dict): """ Synchronize the local index with the remote index based on the specified mode. Modes: diff --git a/semantic_router/index/local.py b/semantic_router/index/local.py index dbc41f1a6af2ffc11ec05a4d6f6c208bb5a7e358..5426ec76d8efc856a2c036a6dcd8afc33d69d091 100644 --- a/semantic_router/index/local.py +++ b/semantic_router/index/local.py @@ -42,13 +42,15 @@ class LocalIndex(BaseIndex): self.routes = np.concatenate([self.routes, routes_arr]) self.utterances = np.concatenate([self.utterances, utterances_arr]) - def _remove_and_sync(self, routes_to_delete: dict): - if self.sync is not None: - logger.warning("Sync remove is not implemented for LocalIndex.") - - def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int): + def _add_and_sync( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[str], + ): if self.sync is not None: - logger.error("Sync remove is not implemented for LocalIndex.") + logger.warning("Sync add is not implemented for LocalIndex.") + self.add(embeddings, routes, utterances) def get_routes(self) -> List[Tuple]: """ diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index e88fa1485e151720fe1a39767043b0367f159358..858d56c9013c4d0190240d3cbc9bb6ab65d90b6d 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -202,24 +202,17 @@ class PineconeIndex(BaseIndex): logger.warning("Index could not be initialized.") self.host = index_stats["host"] if index_stats else None - def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int): - if self.index is None: - self.dimensions = self.dimensions or dimensions - self.index = self._init_index(force_create=True) - + def _sync_index(self, local_routes: dict): remote_routes = self.get_routes() remote_dict: dict = {route: set() for route, _ in remote_routes} for route, utterance in remote_routes: remote_dict[route].add(utterance) - local_dict: dict = {route: set() for route in local_route_names} - for route, utterance in zip(local_route_names, local_utterances): + local_dict: dict = {route: set() for route in local_routes["routes"]} + for route, utterance in zip(local_routes["routes"], local_routes["utterances"]): local_dict[route].add(utterance) - logger.info(f"Local routes: {local_dict}") - logger.info(f"Remote routes: {remote_dict}") - all_routes = set(remote_dict.keys()).union(local_dict.keys()) routes_to_add = [] @@ -291,9 +284,17 @@ class PineconeIndex(BaseIndex): raise ValueError("Invalid sync mode specified") for utterance in utterances_to_include: - routes_to_add.append((route, utterance)) - - logger.info(f"Layer routes: {layer_routes}") + indices = [ + i + for i, x in enumerate(local_routes["utterances"]) + if x == utterance and local_routes["routes"][i] == route + ] + routes_to_add.extend( + [ + (local_routes["embeddings"][idx], route, utterance) + for idx in indices + ] + ) return routes_to_add, routes_to_delete, layer_routes @@ -324,18 +325,65 @@ class PineconeIndex(BaseIndex): for i in range(0, len(vectors_to_upsert), batch_size): batch = vectors_to_upsert[i : i + batch_size] self._batch_upsert(batch) - - def _remove_and_sync(self, routes_to_delete: dict): - for route, utterances in routes_to_delete.items(): - remote_routes = self._get_routes_with_ids(route_name=route) - ids_to_delete = [ - r["id"] - for r in remote_routes - if (r["route"], r["utterance"]) - in zip([route] * len(utterances), utterances) + + def _add_and_sync( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[str], + batch_size: int = 100, + ) -> List[Route]: + """Add vectors to Pinecone in batches and return the overall updated list of Route objects.""" + if self.index is None: + self.dimensions = self.dimensions or len(embeddings[0]) + self.index = self._init_index(force_create=True) + + local_routes = { + "routes": routes, + "utterances": utterances, + "embeddings": embeddings, + } + if self.sync is not None: + data_to_upsert, data_to_delete, layer_routes_dict = self._sync_index( + local_routes=local_routes + ) + + layer_routes = [ + Route(name=route, utterances=layer_routes_dict[route]) + for route in layer_routes_dict.keys() ] - if ids_to_delete and self.index: - self.index.delete(ids=ids_to_delete) + + routes_to_delete: dict = {} + for route, utterance in data_to_delete: + routes_to_delete.setdefault(route, []).append(utterance) + + for route, utterances in routes_to_delete.items(): + remote_routes = self._get_routes_with_ids(route_name=route) + ids_to_delete = [ + r["id"] + for r in remote_routes + if (r["route"], r["utterance"]) + in zip([route] * len(utterances), utterances) + ] + if ids_to_delete and self.index: + self.index.delete(ids=ids_to_delete) + + else: + data_to_upsert = [ + (vector, route, utterance) + for vector, route, utterance in zip(embeddings, routes, utterances) + ] + + vectors_to_upsert = [ + PineconeRecord(values=vector, route=route, utterance=utterance).to_dict() + for vector, route, utterance in data_to_upsert + ] + + for i in range(0, len(vectors_to_upsert), batch_size): + batch = vectors_to_upsert[i : i + batch_size] + self._batch_upsert(batch) + + return layer_routes def _get_route_ids(self, route_name: str): clean_route = clean_route_name(route_name) diff --git a/semantic_router/index/qdrant.py b/semantic_router/index/qdrant.py index a77e6f888ba8023cd5ce6d3fc955bd6b1f0615ce..0fff231495d236120bc1dd4d3756fbd6a9359e38 100644 --- a/semantic_router/index/qdrant.py +++ b/semantic_router/index/qdrant.py @@ -160,13 +160,16 @@ class QdrantIndex(BaseIndex): **self.config, ) - def _remove_and_sync(self, routes_to_delete: dict): - if self.sync is not None: - logger.error("Sync remove is not implemented for LocalIndex.") - - def _sync_index(self, local_route_names: List[str], local_utterances: List[str], dimensions: int): + def _add_and_sync( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[str], + batch_size: int = DEFAULT_UPLOAD_BATCH_SIZE, + ): if self.sync is not None: - logger.error("Sync remove is not implemented for QdrantIndex.") + logger.warning("Sync add is not implemented for QdrantIndex") + self.add(embeddings, routes, utterances, batch_size) def add( self, diff --git a/semantic_router/layer.py b/semantic_router/layer.py index bbe291ce088444471f5f18363576b76f0f54d1c5..61824033d7b0fe69919e9d577c309b3fa85cabe5 100644 --- a/semantic_router/layer.py +++ b/semantic_router/layer.py @@ -222,7 +222,14 @@ class RouteLayer: if len(self.routes) > 0: self._add_and_sync_routes(routes=self.routes) else: - self._add_and_sync_routes(routes=[]) + dummy_embedding = self.encoder(["dummy"]) + + layer_routes = self.index._add_and_sync( + embeddings=dummy_embedding, + routes=[], + utterances=[], + ) + self._set_layer_routes(layer_routes) elif len(self.routes) > 0: self._add_routes(routes=self.routes) @@ -472,9 +479,12 @@ class RouteLayer: def _add_routes(self, routes: List[Route]): # create embeddings for all routes - route_names, all_utterances = self._extract_routes_details(routes) + all_utterances = [ + utterance for route in routes for utterance in route.utterances + ] embedded_utterances = self.encoder(all_utterances) # create route array + route_names = [route.name for route in routes for _ in route.utterances] # add everything to the index self.index.add( embeddings=embedded_utterances, @@ -482,47 +492,22 @@ class RouteLayer: utterances=all_utterances, ) - def _add_and_sync_routes(self, routes: List[Route]): # create embeddings for all routes and sync at startup with remote ones based on sync setting - local_route_names, local_utterances = self._extract_routes_details(routes) - routes_to_add, routes_to_delete, layer_routes_dict = self.index._sync_index( - local_route_names=local_route_names, - local_utterances=local_utterances, - dimensions=len(self.encoder(["dummy"])[0]) - ) - - logger.info(f"ROUTES TO ADD: {(routes_to_add)}") - logger.info(f"ROUTES TO DELETE: {(routes_to_delete)}") - - layer_routes = [ - Route(name=route, utterances=layer_routes_dict[route]) - for route in layer_routes_dict.keys() + all_utterances = [ + utterance for route in routes for utterance in route.utterances ] - - data_to_delete: dict = {} - for route, utterance in routes_to_delete: - data_to_delete.setdefault(route, []).append(utterance) - self.index._remove_and_sync(data_to_delete) - - all_utterances_to_add = [utt for _, utt in routes_to_add] - embedded_utterances_to_add = self.encoder(all_utterances_to_add) if all_utterances_to_add else [] - - route_names_to_add = [route for route, _, in routes_to_add] - - self.index.add( - embeddings=embedded_utterances_to_add, - routes=route_names_to_add, - utterances=all_utterances_to_add, + embedded_utterances = self.encoder(all_utterances) + # create route array + route_names = [route.name for route in routes for _ in route.utterances] + # add everything to the index + layer_routes = self.index._add_and_sync( + embeddings=embedded_utterances, + routes=route_names, + utterances=all_utterances, ) - self._set_layer_routes(layer_routes) - def _extract_routes_details(self, routes: List[Route]) -> Tuple[List[str], List[str]]: - route_names = [route.name for route in routes for _ in route.utterances] - utterances = [utterance for route in routes for utterance in route.utterances] - return route_names, utterances - def _encode(self, text: str) -> Any: """Given some text, encode it.""" # create query vector