diff --git a/semantic_router/routers/base.py b/semantic_router/routers/base.py index 087eb57c8fb8a5d2ab5613e2eb3b8c8d5d01c597..b092a297fa416ffe04f184575ae6d9d079532cda 100644 --- a/semantic_router/routers/base.py +++ b/semantic_router/routers/base.py @@ -250,39 +250,13 @@ class RouterConfig: return utterances def add(self, route: Route): - """Add a route to the local SemanticRouter and index. + """Add a route to the RouterConfig. :param route: The route to add. :type route: Route """ - current_local_hash = self._get_hash() - current_remote_hash = self.index._read_hash() - if current_remote_hash.value == "": - # if remote hash is empty, the index is to be initialized - current_remote_hash = current_local_hash - embedded_utterances = self.encoder(route.utterances) - self.index.add( - embeddings=embedded_utterances, - routes=[route.name] * len(route.utterances), - utterances=route.utterances, - function_schemas=( - route.function_schemas * len(route.utterances) - if route.function_schemas - else [{}] * len(route.utterances) - ), - metadata_list=[route.metadata if route.metadata else {}] - * len(route.utterances), - ) - self.routes.append(route) - if current_local_hash.value == current_remote_hash.value: - self._write_hash() # update current hash in index - else: - logger.warning( - "Local and remote route layers were not aligned. Remote hash " - "not updated. Use `SemanticRouter.get_utterance_diff()` to see " - "details." - ) + logger.info(f"Added route `{route.name}`") def get(self, name: str) -> Optional[Route]: for route in self.routes: @@ -307,7 +281,7 @@ class RouterConfig: class BaseRouter(BaseModel): - encoder: DenseEncoder + encoder: DenseEncoder = Field(default_factory=OpenAIEncoder) index: BaseIndex = Field(default_factory=BaseIndex) score_threshold: Optional[float] = Field(default=None) routes: List[Route] = [] @@ -339,18 +313,11 @@ class BaseRouter(BaseModel): aggregation=aggregation, auto_sync=auto_sync, ) - if encoder is None: - logger.warning( - "No encoder provided. Using default OpenAIEncoder. Ensure " - "that you have set OPENAI_API_KEY in your environment." - ) - self.encoder = OpenAIEncoder() - else: - self.encoder = encoder + self.encoder = self._get_encoder(encoder=encoder) self.llm = llm self.routes = routes.copy() if routes else [] # initialize index - self._set_index(index=index) + self.index =self._get_index(index=index) # set score threshold using default method self._set_score_threshold() self.top_k = top_k @@ -372,12 +339,21 @@ class BaseRouter(BaseModel): if self.auto_sync: self._init_index_state() - def _set_index(self, index: Optional[BaseIndex]): + def _get_index(self, index: Optional[BaseIndex]) -> BaseIndex: if index is None: logger.warning("No index provided. Using default LocalIndex.") - self.index = LocalIndex() + index = LocalIndex() + else: + index = index + return index + + def _get_encoder(self, encoder: Optional[DenseEncoder]) -> DenseEncoder: + if encoder is None: + logger.warning("No encoder provided. Using default OpenAIEncoder.") + encoder = OpenAIEncoder() else: - self.index = index + encoder = encoder + return encoder def _init_index_state(self): """Initializes an index (where required) and runs auto_sync if active.""" @@ -521,20 +497,57 @@ class BaseRouter(BaseModel): vector_arr = self._encode(text=text) else: vector_arr = np.array(vector) + print(f"{text=}") + print(f"{vector_arr}") # get relevant utterances results = self._retrieve(xq=vector_arr) - + print(f"{results=}") # decide most relevant routes categories_with_scores = self._semantic_classify_multiple_routes(results) + print(f"{categories_with_scores=}") + return [ + RouteChoice(name=category, similarity_score=score) for category, score in categories_with_scores + ] - route_choices = [] - for category, score in categories_with_scores: - route = self.check_for_matching_routes(category) - if route: - route_choice = RouteChoice(name=route.name, similarity_score=score) - route_choices.append(route_choice) + #route_choices = [] + # TODO JB: do we need this check? Maybe we should be returning directly + #for category, score in categories_with_scores: + # route = self.check_for_matching_routes(category) + # if route: + # route_choice = RouteChoice(name=route.name, similarity_score=score) + # route_choices.append(route_choice) - return route_choices + #return route_choices + + def _retrieve_top_route( + self, vector: List[float], route_filter: Optional[List[str]] = None + ) -> Tuple[Optional[Route], List[float]]: + """ + Retrieve the top matching route based on the given vector. + Returns a tuple of the route (if any) and the scores of the top class. + """ + # get relevant results (scores and routes) + results = self._retrieve( + xq=np.array(vector), top_k=self.top_k, route_filter=route_filter + ) + # decide most relevant routes + top_class, top_class_scores = self._semantic_classify(results) + # TODO do we need this check? + route = self.check_for_matching_routes(top_class) + return route, top_class_scores + + async def _async_retrieve_top_route( + self, vector: List[float], route_filter: Optional[List[str]] = None + ) -> Tuple[Optional[Route], List[float]]: + # get relevant results (scores and routes) + results = await self._async_retrieve( + xq=np.array(vector), top_k=self.top_k, route_filter=route_filter + ) + # decide most relevant routes + top_class, top_class_scores = await self._async_semantic_classify(results) + # TODO do we need this check? + route = self.check_for_matching_routes(top_class) + return route, top_class_scores def sync(self, sync_mode: str, force: bool = False) -> List[str]: """Runs a sync of the local routes with the remote index. @@ -661,40 +674,11 @@ class BaseRouter(BaseModel): self.routes = new_routes - def _retrieve_top_route( - self, vector: List[float], route_filter: Optional[List[str]] = None - ) -> Tuple[Optional[Route], List[float]]: - """ - Retrieve the top matching route based on the given vector. - Returns a tuple of the route (if any) and the scores of the top class. - """ - # get relevant results (scores and routes) - results = self._retrieve( - xq=np.array(vector), top_k=self.top_k, route_filter=route_filter - ) - # decide most relevant routes - top_class, top_class_scores = self._semantic_classify(results) - # TODO do we need this check? - route = self.check_for_matching_routes(top_class) - return route, top_class_scores - - async def _async_retrieve_top_route( - self, vector: List[float], route_filter: Optional[List[str]] = None - ) -> Tuple[Optional[Route], List[float]]: - # get relevant results (scores and routes) - results = await self._async_retrieve( - xq=np.array(vector), top_k=self.top_k, route_filter=route_filter - ) - # decide most relevant routes - top_class, top_class_scores = await self._async_semantic_classify(results) - # TODO do we need this check? - route = self.check_for_matching_routes(top_class) - return route, top_class_scores - def _check_threshold(self, scores: List[float], route: Optional[Route]) -> bool: """ Check if the route's score passes the specified threshold. """ + # TODO JB: do we need this? if route is None: return False threshold = ( @@ -1124,21 +1108,55 @@ class BaseRouter(BaseModel): scores_by_class[route] = [score] return scores_by_class - def _pass_threshold(self, scores: List[float], threshold: float) -> bool: + def _pass_threshold(self, scores: List[float], threshold: float | None) -> bool: + """Test if the route score passes the minimum threshold. If a threshold of None is + set, then the route will always pass no matter how low it scores. + + :param scores: The scores to test. + :type scores: List[float] + :param threshold: The minimum threshold to pass. + :type threshold: float | None + :return: True if the route passes the threshold, False otherwise. + :rtype: bool + """ + if threshold is None: + return True if scores: return max(scores) > threshold else: return False - def _update_thresholds(self, score_thresholds: Optional[Dict[str, float]] = None): + def _update_thresholds(self, route_thresholds: Optional[Dict[str, float]] = None): + """Update the score thresholds for each route using a dictionary of + route names and thresholds. + + :param route_thresholds: A dictionary of route names and thresholds. + :type route_thresholds: Dict[str, float] | None """ - Update the score thresholds for each route. + if route_thresholds: + for route, threshold in route_thresholds.items(): + self.set_threshold( + threshold=threshold, + route_name=route, + ) + + def set_threshold(self, threshold: float, route_name: str | None = None): + """Set the score threshold for a specific route or all routes. + + :param threshold: The threshold to set. + :type threshold: float + :param route_name: The name of the route to set the threshold for. If None, the threshold will be set for all routes. + :type route_name: str | None """ - if score_thresholds: + if route_name is None: for route in self.routes: - route.score_threshold = score_thresholds.get( - route.name, self.score_threshold - ) + route.score_threshold = threshold + else: + route = self.get(route_name) + if route is not None: + route.score_threshold = threshold + else: + logger.error(f"Route `{route_name}` not found") def to_config(self) -> RouterConfig: return RouterConfig( @@ -1207,7 +1225,7 @@ class BaseRouter(BaseModel): search_range=0.8, ) # update current route layer - self._update_thresholds(score_thresholds=thresholds) + self._update_thresholds(route_thresholds=thresholds) # evaluate acc = self._vec_evaluate(Xq=Xq, y=y) # update best @@ -1215,7 +1233,7 @@ class BaseRouter(BaseModel): best_acc = acc best_thresholds = thresholds # update route layer to best thresholds - self._update_thresholds(score_thresholds=best_thresholds) + self._update_thresholds(route_thresholds=best_thresholds) if local_execution: # Switch back to the original index diff --git a/semantic_router/routers/hybrid.py b/semantic_router/routers/hybrid.py index 0feb83794a820b808f5f552a64d72bbc62d191b8..64418f983d410a47b9f05538bb710918ce672a3e 100644 --- a/semantic_router/routers/hybrid.py +++ b/semantic_router/routers/hybrid.py @@ -10,7 +10,7 @@ from semantic_router.encoders import ( TfidfEncoder, ) from semantic_router.route import Route -from semantic_router.index.hybrid_local import HybridLocalIndex +from semantic_router.index import BaseIndex, HybridLocalIndex from semantic_router.schema import RouteChoice, SparseEmbedding from semantic_router.utils.logger import logger from semantic_router.routers.base import BaseRouter @@ -39,6 +39,7 @@ class HybridRouter(BaseRouter): if index is None: logger.warning("No index provided. Using default HybridLocalIndex.") index = HybridLocalIndex() + encoder = self._get_encoder(encoder=encoder) super().__init__( encoder=encoder, llm=llm, @@ -61,6 +62,14 @@ class HybridRouter(BaseRouter): if self.auto_sync: self._init_index_state() + def _get_index(self, index: Optional[BaseIndex]) -> BaseIndex: + if index is None: + logger.warning("No index provided. Using default HybridLocalIndex.") + index = HybridLocalIndex() + else: + index = index + return index + def _set_sparse_encoder(self, sparse_encoder: Optional[DenseEncoder]): if sparse_encoder is None: logger.warning("No sparse_encoder provided. Using default BM25Encoder.") @@ -146,43 +155,3 @@ class HybridRouter(BaseRouter): {k: v * (1 - self.alpha) for k, v in sparse_dict.items()} ) return scaled_dense, scaled_sparse - - def _set_aggregation_method(self, aggregation: str = "sum"): - if aggregation == "sum": - return lambda x: sum(x) - elif aggregation == "mean": - return lambda x: np.mean(x) - elif aggregation == "max": - return lambda x: max(x) - else: - raise ValueError( - f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'." - ) - - def _semantic_classify(self, query_results: List[Tuple]) -> Tuple[str, List[float]]: - scores_by_class: Dict[str, List[float]] = {} - for score, route in query_results: - if route in scores_by_class: - scores_by_class[route].append(score) - else: - scores_by_class[route] = [score] - - # Calculate total score for each class - total_scores = { - route: self.aggregation_method(scores) - for route, scores in scores_by_class.items() - } - top_class = max(total_scores, key=lambda x: total_scores[x], default=None) - - # Return the top class and its associated scores - if top_class is not None: - return str(top_class), scores_by_class.get(top_class, []) - else: - logger.warning("No classification found for semantic classifier.") - return "", [] - - def _pass_threshold(self, scores: List[float], threshold: float) -> bool: - if scores: - return max(scores) > threshold - else: - return False diff --git a/semantic_router/routers/semantic.py b/semantic_router/routers/semantic.py index 8a21fdf21dcafb918c76d37994700367dfbda359..59cd8de18319e3049a458da997022dc5cd407aee 100644 --- a/semantic_router/routers/semantic.py +++ b/semantic_router/routers/semantic.py @@ -1,52 +1,12 @@ -import json -import random -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, List, Optional import numpy as np -from tqdm.auto import tqdm -from semantic_router.encoders import AutoEncoder, DenseEncoder +from semantic_router.encoders import DenseEncoder from semantic_router.index.base import BaseIndex -from semantic_router.llms import BaseLLM, OpenAILLM +from semantic_router.llms import BaseLLM from semantic_router.route import Route -from semantic_router.routers.base import BaseRouter, RouterConfig -from semantic_router.schema import ( - ConfigParameter, - RouteChoice, - Utterance, - UtteranceDiff, -) -from semantic_router.utils.logger import logger - - -def is_valid(layer_config: str) -> bool: - """Make sure the given string is json format and contains the 3 keys: - ["encoder_name", "encoder_type", "routes"]""" - try: - output_json = json.loads(layer_config) - required_keys = ["encoder_name", "encoder_type", "routes"] - - if isinstance(output_json, list): - for item in output_json: - missing_keys = [key for key in required_keys if key not in item] - if missing_keys: - logger.warning( - f"Missing keys in layer config: {', '.join(missing_keys)}" - ) - return False - return True - else: - missing_keys = [key for key in required_keys if key not in output_json] - if missing_keys: - logger.warning( - f"Missing keys in layer config: {', '.join(missing_keys)}" - ) - return False - else: - return True - except json.JSONDecodeError as e: - logger.error(e) - return False +from semantic_router.routers.base import BaseRouter class SemanticRouter(BaseRouter): @@ -60,6 +20,8 @@ class SemanticRouter(BaseRouter): aggregation: str = "mean", auto_sync: Optional[str] = None, ): + index = self._get_index(index=index) + encoder = self._get_encoder(encoder=encoder) super().__init__( encoder=encoder, llm=llm, @@ -73,530 +35,6 @@ class SemanticRouter(BaseRouter): if self.auto_sync: self._init_index_state() - def check_for_matching_routes(self, top_class: str) -> Optional[Route]: - matching_route = next( - (route for route in self.routes if route.name == top_class), None - ) - if matching_route is None: - logger.error( - f"No route found with name {top_class}. Check to see if any Routes " - "have been defined." - ) - return None - return matching_route - - def __call__( - self, - text: Optional[str] = None, - vector: Optional[List[float]] = None, - simulate_static: bool = False, - route_filter: Optional[List[str]] = None, - ) -> RouteChoice: - # if no vector provided, encode text to get vector - if vector is None: - if text is None: - raise ValueError("Either text or vector must be provided") - vector = self._encode(text=text) - route, top_class_scores = self._retrieve_top_route(vector, route_filter) - passed = self._check_threshold(top_class_scores, route) - if passed and route is not None and not simulate_static: - if route.function_schemas and text is None: - raise ValueError( - "Route has a function schema, but no text was provided." - ) - if route.function_schemas and not isinstance(route.llm, BaseLLM): - if not self.llm: - logger.warning( - "No LLM provided for dynamic route, will use OpenAI LLM " - "default. Ensure API key is set in OPENAI_API_KEY environment " - "variable." - ) - - self.llm = OpenAILLM() - route.llm = self.llm - else: - route.llm = self.llm - return route(text) - elif passed and route is not None and simulate_static: - return RouteChoice( - name=route.name, - function_call=None, - similarity_score=None, - ) - else: - # if no route passes threshold, return empty route choice - return RouteChoice() - - async def acall( - self, - text: Optional[str] = None, - vector: Optional[List[float]] = None, - simulate_static: bool = False, - route_filter: Optional[List[str]] = None, - ) -> RouteChoice: - # if no vector provided, encode text to get vector - if vector is None: - if text is None: - raise ValueError("Either text or vector must be provided") - vector = await self._async_encode(text=text) - - route, top_class_scores = await self._async_retrieve_top_route( - vector, route_filter - ) - passed = self._check_threshold(top_class_scores, route) - if passed and route is not None and not simulate_static: - if route.function_schemas and text is None: - raise ValueError( - "Route has a function schema, but no text was provided." - ) - if route.function_schemas and not isinstance(route.llm, BaseLLM): - if not self.llm: - logger.warning( - "No LLM provided for dynamic route, will use OpenAI LLM default" - ) - self.llm = OpenAILLM() - route.llm = self.llm - else: - route.llm = self.llm - return await route.acall(text) - elif passed and route is not None and simulate_static: - return RouteChoice( - name=route.name, - function_call=None, - similarity_score=None, - ) - else: - # if no route passes threshold, return empty route choice - return RouteChoice() - - def retrieve_multiple_routes( - self, - text: Optional[str] = None, - vector: Optional[List[float]] = None, - ) -> List[RouteChoice]: - if vector is None: - if text is None: - raise ValueError("Either text or vector must be provided") - vector_arr = self._encode(text=text) - else: - vector_arr = np.array(vector) - # get relevant utterances - results = self._retrieve(xq=vector_arr) - - # decide most relevant routes - categories_with_scores = self._semantic_classify_multiple_routes(results) - - route_choices = [] - for category, score in categories_with_scores: - route = self.check_for_matching_routes(category) - if route: - route_choice = RouteChoice(name=route.name, similarity_score=score) - route_choices.append(route_choice) - - return route_choices - - def sync(self, sync_mode: str, force: bool = False) -> List[str]: - """Runs a sync of the local routes with the remote index. - - :param sync_mode: The mode to sync the routes with the remote index. - :type sync_mode: str - :param force: Whether to force the sync even if the local and remote - hashes already match. Defaults to False. - :type force: bool, optional - :return: A list of diffs describing the addressed differences between - the local and remote route layers. - :rtype: List[str] - """ - if not force and self.is_synced(): - logger.warning("Local and remote route layers are already synchronized.") - # create utterance diff to return, but just using local instance - # for speed - local_utterances = self.to_config().to_utterances() - diff = UtteranceDiff.from_utterances( - local_utterances=local_utterances, - remote_utterances=local_utterances, - ) - return diff.to_utterance_str() - # otherwise we continue with the sync, first creating a diff - local_utterances = self.to_config().to_utterances() - remote_utterances = self.index.get_utterances() - diff = UtteranceDiff.from_utterances( - local_utterances=local_utterances, - remote_utterances=remote_utterances, - ) - # generate sync strategy - sync_strategy = diff.get_sync_strategy(sync_mode=sync_mode) - # and execute - self._execute_sync_strategy(sync_strategy) - return diff.to_utterance_str() - - def _execute_sync_strategy(self, strategy: Dict[str, Dict[str, List[Utterance]]]): - """Executes the provided sync strategy, either deleting or upserting - routes from the local and remote instances as defined in the strategy. - - :param strategy: The sync strategy to execute. - :type strategy: Dict[str, Dict[str, List[Utterance]]] - """ - if strategy["remote"]["delete"]: - data_to_delete = {} # type: ignore - for utt_obj in strategy["remote"]["delete"]: - data_to_delete.setdefault(utt_obj.route, []).append(utt_obj.utterance) - # TODO: switch to remove without sync?? - self.index._remove_and_sync(data_to_delete) - if strategy["remote"]["upsert"]: - utterances_text = [utt.utterance for utt in strategy["remote"]["upsert"]] - self.index.add( - embeddings=self.encoder(utterances_text), - routes=[utt.route for utt in strategy["remote"]["upsert"]], - utterances=utterances_text, - function_schemas=[ - utt.function_schemas for utt in strategy["remote"]["upsert"] # type: ignore - ], - metadata_list=[utt.metadata for utt in strategy["remote"]["upsert"]], - ) - if strategy["local"]["delete"]: - self._local_delete(utterances=strategy["local"]["delete"]) - if strategy["local"]["upsert"]: - self._local_upsert(utterances=strategy["local"]["upsert"]) - # update hash - self._write_hash() - - def _local_upsert(self, utterances: List[Utterance]): - """Adds new routes to the SemanticRouter. - - :param utterances: The utterances to add to the local SemanticRouter. - :type utterances: List[Utterance] - """ - new_routes = {route.name: route for route in self.routes} - for utt_obj in utterances: - if utt_obj.route not in new_routes.keys(): - new_routes[utt_obj.route] = Route( - name=utt_obj.route, - utterances=[utt_obj.utterance], - function_schemas=utt_obj.function_schemas, - metadata=utt_obj.metadata, - ) - else: - if utt_obj.utterance not in new_routes[utt_obj.route].utterances: - new_routes[utt_obj.route].utterances.append(utt_obj.utterance) - new_routes[utt_obj.route].function_schemas = utt_obj.function_schemas - new_routes[utt_obj.route].metadata = utt_obj.metadata - self.routes = list(new_routes.values()) - - def _local_delete(self, utterances: List[Utterance]): - """Deletes routes from the local SemanticRouter. - - :param utterances: The utterances to delete from the local SemanticRouter. - :type utterances: List[Utterance] - """ - # create dictionary of route names to utterances - route_dict: dict[str, List[str]] = {} - for utt in utterances: - route_dict.setdefault(utt.route, []).append(utt.utterance) - # iterate over current routes and delete specific utterance if found - new_routes = [] - for route in self.routes: - if route.name in route_dict.keys(): - # drop utterances that are in route_dict deletion list - new_utterances = list( - set(route.utterances) - set(route_dict[route.name]) - ) - if len(new_utterances) == 0: - # the route is now empty, so we skip it - continue - else: - new_routes.append( - Route( - name=route.name, - utterances=new_utterances, - # use existing function schemas and metadata - function_schemas=route.function_schemas, - metadata=route.metadata, - ) - ) - else: - # the route is not in the route_dict, so we keep it as is - new_routes.append(route) - - self.routes = new_routes - - def _retrieve_top_route( - self, vector: List[float], route_filter: Optional[List[str]] = None - ) -> Tuple[Optional[Route], List[float]]: - """ - Retrieve the top matching route based on the given vector. - Returns a tuple of the route (if any) and the scores of the top class. - """ - # get relevant results (scores and routes) - results = self._retrieve( - xq=np.array(vector), top_k=self.top_k, route_filter=route_filter - ) - # decide most relevant routes - top_class, top_class_scores = self._semantic_classify(results) - # TODO do we need this check? - route = self.check_for_matching_routes(top_class) - return route, top_class_scores - - async def _async_retrieve_top_route( - self, vector: List[float], route_filter: Optional[List[str]] = None - ) -> Tuple[Optional[Route], List[float]]: - # get relevant results (scores and routes) - results = await self._async_retrieve( - xq=np.array(vector), top_k=self.top_k, route_filter=route_filter - ) - # decide most relevant routes - top_class, top_class_scores = await self._async_semantic_classify(results) - # TODO do we need this check? - route = self.check_for_matching_routes(top_class) - return route, top_class_scores - - def _check_threshold(self, scores: List[float], route: Optional[Route]) -> bool: - """ - Check if the route's score passes the specified threshold. - """ - if route is None: - return False - threshold = ( - route.score_threshold - if route.score_threshold is not None - else self.score_threshold - ) - return self._pass_threshold(scores, threshold) - - def __str__(self): - return ( - f"SemanticRouter(encoder={self.encoder}, " - f"score_threshold={self.score_threshold}, " - f"routes={self.routes})" - ) - - @classmethod - def from_json(cls, file_path: str): - config = RouterConfig.from_file(file_path) - encoder = AutoEncoder(type=config.encoder_type, name=config.encoder_name).model - return cls(encoder=encoder, routes=config.routes) - - @classmethod - def from_yaml(cls, file_path: str): - config = RouterConfig.from_file(file_path) - encoder = AutoEncoder(type=config.encoder_type, name=config.encoder_name).model - return cls(encoder=encoder, routes=config.routes) - - @classmethod - def from_config(cls, config: RouterConfig, index: Optional[BaseIndex] = None): - encoder = AutoEncoder(type=config.encoder_type, name=config.encoder_name).model - return cls(encoder=encoder, routes=config.routes, index=index) - - def list_route_names(self) -> List[str]: - return [route.name for route in self.routes] - - def update( - self, - name: str, - threshold: Optional[float] = None, - utterances: Optional[List[str]] = None, - ): - """Updates the route specified in name. Allows the update of - threshold and/or utterances. If no values are provided via the - threshold or utterances parameters, those fields are not updated. - If neither field is provided raises a ValueError. - - The name must exist within the local SemanticRouter, if not a - KeyError will be raised. - """ - current_local_hash = self._get_hash() - current_remote_hash = self.index._read_hash() - if current_remote_hash.value == "": - # if remote hash is empty, the index is to be initialized - current_remote_hash = current_local_hash - - if threshold is None and utterances is None: - raise ValueError( - "At least one of 'threshold' or 'utterances' must be provided." - ) - if utterances: - raise NotImplementedError( - "The update method cannot be used for updating utterances yet." - ) - - route = self.get(name) - if route: - if threshold: - old_threshold = route.score_threshold - route.score_threshold = threshold - logger.info( - f"Updated threshold for route '{route.name}' from {old_threshold} to {threshold}" - ) - else: - raise ValueError(f"Route '{name}' not found. Nothing updated.") - - if current_local_hash.value == current_remote_hash.value: - self._write_hash() # update current hash in index - else: - logger.warning( - "Local and remote route layers were not aligned. Remote hash " - "not updated. Use `SemanticRouter.get_utterance_diff()` to see " - "details." - ) - - def delete(self, route_name: str): - """Deletes a route given a specific route name. - - :param route_name: the name of the route to be deleted - :type str: - """ - current_local_hash = self._get_hash() - current_remote_hash = self.index._read_hash() - if current_remote_hash.value == "": - # if remote hash is empty, the index is to be initialized - current_remote_hash = current_local_hash - - if route_name not in [route.name for route in self.routes]: - err_msg = f"Route `{route_name}` not found in SemanticRouter" - logger.warning(err_msg) - try: - self.index.delete(route_name=route_name) - except Exception as e: - logger.error(f"Failed to delete route from the index: {e}") - else: - self.routes = [route for route in self.routes if route.name != route_name] - self.index.delete(route_name=route_name) - - if current_local_hash.value == current_remote_hash.value: - self._write_hash() # update current hash in index - else: - logger.warning( - "Local and remote route layers were not aligned. Remote hash " - "not updated. Use `SemanticRouter.get_utterance_diff()` to see " - "details." - ) - - def _refresh_routes(self): - """Pulls out the latest routes from the index.""" - raise NotImplementedError("This method has not yet been implemented.") - route_mapping = {route.name: route for route in self.routes} - index_routes = self.index.get_utterances() - new_routes_names = [] - new_routes = [] - for route_name, utterance in index_routes: - if route_name in route_mapping: - if route_name not in new_routes_names: - existing_route = route_mapping[route_name] - new_routes.append(existing_route) - - new_routes.append(Route(name=route_name, utterances=[utterance])) - route = route_mapping[route_name] - self.routes.append(route) - - def _add_routes(self, routes: List[Route]): - current_local_hash = self._get_hash() - current_remote_hash = self.index._read_hash() - if current_remote_hash.value == "": - # if remote hash is empty, the index is to be initialized - current_remote_hash = current_local_hash - - if not routes: - logger.warning("No routes provided to add.") - return - # create embeddings for all routes - route_names, all_utterances, all_function_schemas, all_metadata = ( - self._extract_routes_details(routes, include_metadata=True) - ) - embedded_utterances = self.encoder(all_utterances) - try: - # Batch insertion into the index - self.index.add( - embeddings=embedded_utterances, - routes=route_names, - utterances=all_utterances, - function_schemas=all_function_schemas, - metadata_list=all_metadata, - ) - except Exception as e: - logger.error(f"Failed to add routes to the index: {e}") - raise Exception("Indexing error occurred") from e - - if current_local_hash.value == current_remote_hash.value: - self._write_hash() # update current hash in index - else: - logger.warning( - "Local and remote route layers were not aligned. Remote hash " - "not updated. Use `SemanticRouter.get_utterance_diff()` to see " - "details." - ) - - def _get_hash(self) -> ConfigParameter: - config = self.to_config() - return config.get_hash() - - def _write_hash(self) -> ConfigParameter: - config = self.to_config() - hash_config = config.get_hash() - self.index._write_config(config=hash_config) - return hash_config - - def is_synced(self) -> bool: - """Check if the local and remote route layer instances are - synchronized. - - :return: True if the local and remote route layers are synchronized, - False otherwise. - :rtype: bool - """ - # first check hash - local_hash = self._get_hash() - remote_hash = self.index._read_hash() - if local_hash.value == remote_hash.value: - return True - else: - return False - - def get_utterance_diff(self, include_metadata: bool = False) -> List[str]: - """Get the difference between the local and remote utterances. Returns - a list of strings showing what is different in the remote when compared - to the local. For example: - - [" route1: utterance1", - " route1: utterance2", - "- route2: utterance3", - "- route2: utterance4"] - - Tells us that the remote is missing "route2: utterance3" and "route2: - utterance4", which do exist locally. If we see: - - [" route1: utterance1", - " route1: utterance2", - "+ route2: utterance3", - "+ route2: utterance4"] - - This diff tells us that the remote has "route2: utterance3" and - "route2: utterance4", which do not exist locally. - """ - # first we get remote and local utterances - remote_utterances = self.index.get_utterances() - local_utterances = self.to_config().to_utterances() - - diff_obj = UtteranceDiff.from_utterances( - local_utterances=local_utterances, remote_utterances=remote_utterances - ) - return diff_obj.to_utterance_str(include_metadata=include_metadata) - - def _extract_routes_details( - self, routes: List[Route], include_metadata: bool = False - ) -> Tuple: - route_names = [route.name for route in routes for _ in route.utterances] - utterances = [utterance for route in routes for utterance in route.utterances] - function_schemas = [ - route.function_schemas[0] if route.function_schemas is not None else {} - for route in routes - for _ in route.utterances - ] - - if include_metadata: - metadata = [route.metadata for route in routes for _ in route.utterances] - return route_names, utterances, function_schemas, metadata - return route_names, utterances, function_schemas - def _encode(self, text: str) -> Any: """Given some text, encode it.""" # create query vector @@ -610,279 +48,3 @@ class SemanticRouter(BaseRouter): xq = np.array(await self.encoder.acall(docs=[text])) xq = np.squeeze(xq) # Reduce to 1d array. return xq - - def _retrieve( - self, xq: Any, top_k: int = 5, route_filter: Optional[List[str]] = None - ) -> List[Dict]: - """Given a query vector, retrieve the top_k most similar records.""" - # get scores and routes - scores, routes = self.index.query( - vector=xq, top_k=top_k, route_filter=route_filter - ) - return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)] - - async def _async_retrieve( - self, xq: Any, top_k: int = 5, route_filter: Optional[List[str]] = None - ) -> List[Dict]: - """Given a query vector, retrieve the top_k most similar records.""" - # get scores and routes - scores, routes = await self.index.aquery( - vector=xq, top_k=top_k, route_filter=route_filter - ) - return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)] - - def _set_aggregation_method(self, aggregation: str = "sum"): - if aggregation == "sum": - return lambda x: sum(x) - elif aggregation == "mean": - return lambda x: np.mean(x) - elif aggregation == "max": - return lambda x: max(x) - else: - raise ValueError( - f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'." - ) - - def _semantic_classify(self, query_results: List[Dict]) -> Tuple[str, List[float]]: - scores_by_class = self.group_scores_by_class(query_results) - - # Calculate total score for each class - total_scores = { - route: self.aggregation_method(scores) - for route, scores in scores_by_class.items() - } - top_class = max(total_scores, key=lambda x: total_scores[x], default=None) - - # Return the top class and its associated scores - if top_class is not None: - return str(top_class), scores_by_class.get(top_class, []) - else: - logger.warning("No classification found for semantic classifier.") - return "", [] - - async def _async_semantic_classify( - self, query_results: List[Dict] - ) -> Tuple[str, List[float]]: - scores_by_class = await self.async_group_scores_by_class(query_results) - - # Calculate total score for each class - total_scores = { - route: self.aggregation_method(scores) - for route, scores in scores_by_class.items() - } - top_class = max(total_scores, key=lambda x: total_scores[x], default=None) - - # Return the top class and its associated scores - if top_class is not None: - return str(top_class), scores_by_class.get(top_class, []) - else: - logger.warning("No classification found for semantic classifier.") - return "", [] - - def get(self, name: str) -> Optional[Route]: - for route in self.routes: - if route.name == name: - return route - logger.error(f"Route `{name}` not found") - return None - - def _semantic_classify_multiple_routes( - self, query_results: List[Dict] - ) -> List[Tuple[str, float]]: - scores_by_class = self.group_scores_by_class(query_results) - - # Filter classes based on threshold and find max score for each - classes_above_threshold = [] - for route_name, scores in scores_by_class.items(): - # Use the get method to find the Route object by its name - route_obj = self.get(route_name) - if route_obj is not None: - # Use the Route object's threshold if it exists, otherwise use the provided threshold - _threshold = ( - route_obj.score_threshold - if route_obj.score_threshold is not None - else self.score_threshold - ) - if self._pass_threshold(scores, _threshold): - max_score = max(scores) - classes_above_threshold.append((route_name, max_score)) - - return classes_above_threshold - - def group_scores_by_class( - self, query_results: List[Dict] - ) -> Dict[str, List[float]]: - scores_by_class: Dict[str, List[float]] = {} - for result in query_results: - score = result["score"] - route = result["route"] - if route in scores_by_class: - scores_by_class[route].append(score) - else: - scores_by_class[route] = [score] - return scores_by_class - - async def async_group_scores_by_class( - self, query_results: List[Dict] - ) -> Dict[str, List[float]]: - scores_by_class: Dict[str, List[float]] = {} - for result in query_results: - score = result["score"] - route = result["route"] - if route in scores_by_class: - scores_by_class[route].append(score) - else: - scores_by_class[route] = [score] - return scores_by_class - - def _pass_threshold(self, scores: List[float], threshold: float) -> bool: - if scores: - return max(scores) > threshold - else: - return False - - def _update_thresholds(self, score_thresholds: Optional[Dict[str, float]] = None): - """ - Update the score thresholds for each route. - """ - if score_thresholds: - for route in self.routes: - route.score_threshold = score_thresholds.get( - route.name, self.score_threshold - ) - - def to_config(self) -> RouterConfig: - return RouterConfig( - encoder_type=self.encoder.type, - encoder_name=self.encoder.name, - routes=self.routes, - ) - - def to_json(self, file_path: str): - config = self.to_config() - config.to_file(file_path) - - def to_yaml(self, file_path: str): - config = self.to_config() - config.to_file(file_path) - - def get_thresholds(self) -> Dict[str, float]: - # TODO: float() below is hacky fix for lint, fix this with new type? - thresholds = { - route.name: float(route.score_threshold or self.score_threshold) - for route in self.routes - } - return thresholds - - def fit( - self, - X: List[str], - y: List[str], - batch_size: int = 500, - max_iter: int = 500, - local_execution: bool = False, - ): - original_index = self.index - if local_execution: - # Switch to a local index for fitting - from semantic_router.index.local import LocalIndex - - remote_routes = self.index.get_utterances() - # TODO Enhance by retrieving directly the vectors instead of embedding all utterances again - routes, utterances, function_schemas, metadata = map( - list, zip(*remote_routes) - ) - embeddings = self.encoder(utterances) - self.index = LocalIndex() - self.index.add( - embeddings=embeddings, - routes=routes, - utterances=utterances, - metadata_list=metadata, - ) - - # convert inputs into array - Xq: List[List[float]] = [] - for i in tqdm(range(0, len(X), batch_size), desc="Generating embeddings"): - emb = np.array(self.encoder(X[i : i + batch_size])) - Xq.extend(emb) - # initial eval (we will iterate from here) - best_acc = self._vec_evaluate(Xq=np.array(Xq), y=y) - best_thresholds = self.get_thresholds() - # begin fit - for _ in (pbar := tqdm(range(max_iter), desc="Training")): - pbar.set_postfix({"acc": round(best_acc, 2)}) - # Find the best score threshold for each route - thresholds = threshold_random_search( - route_layer=self, - search_range=0.8, - ) - # update current route layer - self._update_thresholds(score_thresholds=thresholds) - # evaluate - acc = self._vec_evaluate(Xq=Xq, y=y) - # update best - if acc > best_acc: - best_acc = acc - best_thresholds = thresholds - # update route layer to best thresholds - self._update_thresholds(score_thresholds=best_thresholds) - - if local_execution: - # Switch back to the original index - self.index = original_index - - def evaluate(self, X: List[str], y: List[str], batch_size: int = 500) -> float: - """ - Evaluate the accuracy of the route selection. - """ - Xq: List[List[float]] = [] - for i in tqdm(range(0, len(X), batch_size), desc="Generating embeddings"): - emb = np.array(self.encoder(X[i : i + batch_size])) - Xq.extend(emb) - - accuracy = self._vec_evaluate(Xq=np.array(Xq), y=y) - return accuracy - - def _vec_evaluate(self, Xq: Union[List[float], Any], y: List[str]) -> float: - """ - Evaluate the accuracy of the route selection. - """ - correct = 0 - for xq, target_route in zip(Xq, y): - # We treate dynamic routes as static here, because when evaluating we use only vectors, and dynamic routes expect strings by default. - route_choice = self(vector=xq, simulate_static=True) - if route_choice.name == target_route: - correct += 1 - accuracy = correct / len(Xq) - return accuracy - - def _get_route_names(self) -> List[str]: - return [route.name for route in self.routes] - - -def threshold_random_search( - route_layer: SemanticRouter, - search_range: Union[int, float], -) -> Dict[str, float]: - """Performs a random search iteration given a route layer and a search range.""" - # extract the route names - routes = route_layer.get_thresholds() - route_names = list(routes.keys()) - route_thresholds = list(routes.values()) - # generate search range for each - score_threshold_values = [] - for threshold in route_thresholds: - score_threshold_values.append( - np.linspace( - start=max(threshold - search_range, 0.0), - stop=min(threshold + search_range, 1.0), - num=100, - ) - ) - # Generate a random threshold for each route - score_thresholds = { - route: random.choice(score_threshold_values[i]) - for i, route in enumerate(route_names) - } - return score_thresholds diff --git a/semantic_router/schema.py b/semantic_router/schema.py index 616416e701e432e4ce1220a00697d5256d2c5bcf..b54a452f5f86e58a4a8212d1db95f7bf6605ce6d 100644 --- a/semantic_router/schema.py +++ b/semantic_router/schema.py @@ -428,9 +428,12 @@ class SparseEmbedding(BaseModel): return cls(embedding=array) @classmethod - def from_array(cls, array: np.ndarray): - """Consumes a single sparse vector which contains zero-values. + def from_vector(cls, vector: np.ndarray): + """Consumes an array of sparse vectors containing zero-values. """ + if vector.ndim != 1: + raise ValueError(f"Expected a 1D array, got a {vector.ndim}D array.") + return cls.from_compact_array(np.array([np.arange(len(vector)), vector]).T) @classmethod def from_aurelio(cls, embedding: BM25Embedding): diff --git a/tests/unit/test_router.py b/tests/unit/test_router.py index 8d47ee4b1e76d3f8526e2bbd14a588d789e8ad0d..62b49fcd6b1888d720c309ae96291fb1b3781f6a 100644 --- a/tests/unit/test_router.py +++ b/tests/unit/test_router.py @@ -644,7 +644,7 @@ class TestSemanticRouter: file.write(invalid_config_json) # Patch the is_valid function to return False for this test - with patch("semantic_router.layer.is_valid", return_value=False): + with patch("semantic_router.routers.base.is_valid", return_value=False): # Attempt to load the RouterConfig from the temporary file # and assert that it raises an exception due to invalid configuration with pytest.raises(Exception) as excinfo: @@ -720,8 +720,6 @@ class TestSemanticRouter: {"route": "Route 2", "score": 0.7}, {"route": "Route 1", "score": 0.8}, ] - # Override _pass_threshold to always return True for this test - route_layer._pass_threshold = lambda scores, threshold: True expected = [("Route 1", 0.8), ("Route 2", 0.7)] results = route_layer._semantic_classify_multiple_routes(query_results) assert sorted(results) == sorted( @@ -731,9 +729,8 @@ class TestSemanticRouter: def test_with_no_routes_passing_threshold(self, openai_encoder, routes, index_cls): index = init_index(index_cls) route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index) - route_layer.score_threshold = 0.5 - # Override _pass_threshold to always return False for this test - route_layer._pass_threshold = lambda scores, threshold: False + # set threshold to 1.0 so that no routes pass + route_layer.score_threshold = 1.0 query_results = [ {"route": "Route 1", "score": 0.3}, {"route": "Route 2", "score": 0.2}, @@ -815,11 +812,13 @@ class TestSemanticRouter: auto_sync="local", ) text = "Asparagus" + if index_cls is PineconeIndex: + time.sleep(PINECONE_SLEEP) results = route_layer.retrieve_multiple_routes(text=text) assert len(results) == 0, f"Expected no results, but got {len(results)}" def test_retrieve_one_match(self, openai_encoder, routes_3, index_cls): - index = init_index(index_cls) + index = init_index(index_cls, dimensions=3) route_layer = SemanticRouter( encoder=openai_encoder, routes=routes_3, @@ -827,6 +826,8 @@ class TestSemanticRouter: auto_sync="local", ) text = "Hello" + # set low threshold + route_layer.set_threshold(threshold=0.1, route_name="Route 1") if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) results = route_layer.retrieve_multiple_routes(text=text) @@ -845,6 +846,7 @@ class TestSemanticRouter: auto_sync="local", ) text = "Hello" + route_layer.set_threshold(threshold=0.01, route_name=None) if index_cls is PineconeIndex: time.sleep(PINECONE_SLEEP) results = route_layer.retrieve_multiple_routes(text=text)