import importlib import json import os import random from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np import yaml from tqdm.auto import tqdm from semantic_router.encoders import BaseEncoder, OpenAIEncoder from semantic_router.index.base import BaseIndex from semantic_router.index.local import LocalIndex from semantic_router.llms import BaseLLM, OpenAILLM from semantic_router.route import Route from semantic_router.schema import Encoder, EncoderType, RouteChoice from semantic_router.utils.defaults import EncoderDefault from semantic_router.utils.logger import logger def is_valid(layer_config: str) -> bool: """Make sure the given string is json format and contains the 3 keys: ["encoder_name", "encoder_type", "routes"]""" try: output_json = json.loads(layer_config) required_keys = ["encoder_name", "encoder_type", "routes"] if isinstance(output_json, list): for item in output_json: missing_keys = [key for key in required_keys if key not in item] if missing_keys: logger.warning( f"Missing keys in layer config: {', '.join(missing_keys)}" ) return False return True else: missing_keys = [key for key in required_keys if key not in output_json] if missing_keys: logger.warning( f"Missing keys in layer config: {', '.join(missing_keys)}" ) return False else: return True except json.JSONDecodeError as e: logger.error(e) return False class LayerConfig: """ Generates a LayerConfig object that can be used for initializing a RouteLayer. """ routes: List[Route] = [] def __init__( self, routes: List[Route] = [], encoder_type: str = "openai", encoder_name: Optional[str] = None, ): self.encoder_type = encoder_type if encoder_name is None: for encode_type in EncoderType: if encode_type.value == self.encoder_type: if self.encoder_type == EncoderType.HUGGINGFACE.value: raise NotImplementedError( "HuggingFace encoder not supported by LayerConfig yet." ) encoder_name = EncoderDefault[encode_type.name].value[ "embedding_model" ] break logger.info(f"Using default {encoder_type} encoder: {encoder_name}") self.encoder_name = encoder_name self.routes = routes @classmethod def from_file(cls, path: str) -> "LayerConfig": logger.info(f"Loading route config from {path}") _, ext = os.path.splitext(path) with open(path, "r") as f: if ext == ".json": layer = json.load(f) elif ext in [".yaml", ".yml"]: layer = yaml.safe_load(f) else: raise ValueError( "Unsupported file type. Only .json and .yaml are supported" ) if not is_valid(json.dumps(layer)): raise Exception("Invalid config JSON or YAML") encoder_type = layer["encoder_type"] encoder_name = layer["encoder_name"] routes = [] for route_data in layer["routes"]: # Handle the 'llm' field specially if it exists if "llm" in route_data and route_data["llm"] is not None: llm_data = route_data.pop( "llm" ) # Remove 'llm' from route_data and handle it separately # Use the module path directly from llm_data without modification llm_module_path = llm_data["module"] # Dynamically import the module and then the class from that module llm_module = importlib.import_module(llm_module_path) llm_class = getattr(llm_module, llm_data["class"]) # Instantiate the LLM class with the provided model name llm = llm_class(name=llm_data["model"]) route_data[ "llm" ] = llm # Reassign the instantiated llm object back to route_data # Dynamically create the Route object using the remaining route_data route = Route(**route_data) routes.append(route) return cls( encoder_type=encoder_type, encoder_name=encoder_name, routes=routes ) def to_dict(self) -> Dict[str, Any]: return { "encoder_type": self.encoder_type, "encoder_name": self.encoder_name, "routes": [route.to_dict() for route in self.routes], } def to_file(self, path: str): """Save the routes to a file in JSON or YAML format""" logger.info(f"Saving route config to {path}") _, ext = os.path.splitext(path) # Check file extension before creating directories or files if ext not in [".json", ".yaml", ".yml"]: raise ValueError( "Unsupported file type. Only .json and .yaml are supported" ) dir_name = os.path.dirname(path) # Create the directory if it doesn't exist and dir_name is not an empty string if dir_name and not os.path.exists(dir_name): os.makedirs(dir_name) with open(path, "w") as f: if ext == ".json": json.dump(self.to_dict(), f, indent=4) elif ext in [".yaml", ".yml"]: yaml.safe_dump(self.to_dict(), f) def add(self, route: Route): self.routes.append(route) logger.info(f"Added route `{route.name}`") def get(self, name: str) -> Optional[Route]: for route in self.routes: if route.name == name: return route logger.error(f"Route `{name}` not found") return None def remove(self, name: str): if name not in [route.name for route in self.routes]: logger.error(f"Route `{name}` not found") else: self.routes = [route for route in self.routes if route.name != name] logger.info(f"Removed route `{name}`") class RouteLayer: score_threshold: float encoder: BaseEncoder index: BaseIndex def __init__( self, encoder: Optional[BaseEncoder] = None, llm: Optional[BaseLLM] = None, routes: Optional[List[Route]] = None, index: Optional[BaseIndex] = None, # type: ignore ): logger.info("local") self.index: BaseIndex = index if index is not None else LocalIndex() if encoder is None: logger.warning( "No encoder provided. Using default OpenAIEncoder. Ensure " "that you have set OPENAI_API_KEY in your environment." ) self.encoder = OpenAIEncoder() else: self.encoder = encoder self.llm = llm self.routes: list[Route] = routes if routes is not None else [] self.score_threshold = self.encoder.score_threshold # set route score thresholds if not already set for route in self.routes: if route.score_threshold is None: route.score_threshold = self.score_threshold # if routes list has been passed, we initialize index now if len(self.routes) > 0: # initialize index now self._add_routes(routes=self.routes) def check_for_matching_routes(self, top_class: str) -> Optional[Route]: matching_routes = [route for route in self.routes if route.name == top_class] if not matching_routes: logger.error( f"No route found with name {top_class}. Check to see if any Routes " "have been defined." ) return None return matching_routes[0] def __call__( self, text: Optional[str] = None, vector: Optional[List[float]] = None, simulate_static: bool = False, ) -> RouteChoice: # if no vector provided, encode text to get vector if vector is None: if text is None: raise ValueError("Either text or vector must be provided") vector = self._encode(text=text) route, top_class_scores = self._retrieve_top_route(vector) passed = self._check_threshold(top_class_scores, route) if passed and route is not None and not simulate_static: if route.function_schema and text is None: raise ValueError( "Route has a function schema, but no text was provided." ) if route.function_schema and not isinstance(route.llm, BaseLLM): if not self.llm: logger.warning( "No LLM provided for dynamic route, will use OpenAI LLM " "default. Ensure API key is set in OPENAI_API_KEY environment " "variable." ) self.llm = OpenAILLM() route.llm = self.llm else: route.llm = self.llm return route(text) elif passed and route is not None and simulate_static: return RouteChoice( name=route.name, function_call=None, similarity_score=None, ) else: # if no route passes threshold, return empty route choice return RouteChoice() def _retrieve_top_route( self, vector: List[float] ) -> Tuple[Optional[Route], List[float]]: """ Retrieve the top matching route based on the given vector. Returns a tuple of the route (if any) and the scores of the top class. """ # get relevant results (scores and routes) results = self._retrieve(xq=np.array(vector)) # decide most relevant routes top_class, top_class_scores = self._semantic_classify(results) # TODO do we need this check? route = self.check_for_matching_routes(top_class) return route, top_class_scores def _check_threshold(self, scores: List[float], route: Optional[Route]) -> bool: """ Check if the route's score passes the specified threshold. """ if route is None: return False threshold = ( route.score_threshold if route.score_threshold is not None else self.score_threshold ) return self._pass_threshold(scores, threshold) def __str__(self): return ( f"RouteLayer(encoder={self.encoder}, " f"score_threshold={self.score_threshold}, " f"routes={self.routes})" ) @classmethod def from_json(cls, file_path: str): config = LayerConfig.from_file(file_path) encoder = Encoder(type=config.encoder_type, name=config.encoder_name).model return cls(encoder=encoder, routes=config.routes) @classmethod def from_yaml(cls, file_path: str): config = LayerConfig.from_file(file_path) encoder = Encoder(type=config.encoder_type, name=config.encoder_name).model return cls(encoder=encoder, routes=config.routes) @classmethod def from_config(cls, config: LayerConfig): encoder = Encoder(type=config.encoder_type, name=config.encoder_name).model return cls(encoder=encoder, routes=config.routes) def add(self, route: Route): logger.info(f"Adding `{route.name}` route") # create embeddings embeds = self.encoder(route.utterances) # if route has no score_threshold, use default if route.score_threshold is None: route.score_threshold = self.score_threshold # add routes to the index self.index.add( embeddings=embeds, routes=[route.name] * len(route.utterances), utterances=route.utterances, ) self.routes.append(route) def list_route_names(self) -> List[str]: return [route.name for route in self.routes] def update(self, route_name: str, utterances: List[str]): raise NotImplementedError("This method has not yet been implemented.") def delete(self, route_name: str): """Deletes a route given a specific route name. :param route_name: the name of the route to be deleted :type str: """ if route_name not in [route.name for route in self.routes]: err_msg = f"Route `{route_name}` not found" logger.error(err_msg) raise ValueError(err_msg) else: self.routes = [route for route in self.routes if route.name != route_name] self.index.delete(route_name=route_name) def _refresh_routes(self): """Pulls out the latest routes from the index.""" raise NotImplementedError("This method has not yet been implemented.") route_mapping = {route.name: route for route in self.routes} index_routes = self.index.get_routes() new_routes_names = [] new_routes = [] for route_name, utterance in index_routes: if route_name in route_mapping: if route_name not in new_routes_names: existing_route = route_mapping[route_name] new_routes.append(existing_route) new_routes.append(Route(name=route_name, utterances=[utterance])) route = route_mapping[route_name] self.routes.append(route) def _add_routes(self, routes: List[Route]): # create embeddings for all routes all_utterances = [ utterance for route in routes for utterance in route.utterances ] embedded_utterances = self.encoder(all_utterances) # create route array route_names = [route.name for route in routes for _ in route.utterances] # add everything to the index self.index.add( embeddings=embedded_utterances, routes=route_names, utterances=all_utterances, ) def _encode(self, text: str) -> Any: """Given some text, encode it.""" # create query vector xq = np.array(self.encoder([text])) xq = np.squeeze(xq) # Reduce to 1d array. return xq def _retrieve(self, xq: Any, top_k: int = 5) -> List[dict]: """Given a query vector, retrieve the top_k most similar records.""" # get scores and routes scores, routes = self.index.query(vector=xq, top_k=top_k) return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)] def _semantic_classify(self, query_results: List[dict]) -> Tuple[str, List[float]]: scores_by_class: Dict[str, List[float]] = {} for result in query_results: score = result["score"] route = result["route"] if route in scores_by_class: scores_by_class[route].append(score) else: scores_by_class[route] = [score] # Calculate total score for each class total_scores = {route: sum(scores) for route, scores in scores_by_class.items()} top_class = max(total_scores, key=lambda x: total_scores[x], default=None) # Return the top class and its associated scores if top_class is not None: return str(top_class), scores_by_class.get(top_class, []) else: logger.warning("No classification found for semantic classifier.") return "", [] def _pass_threshold(self, scores: List[float], threshold: float) -> bool: if scores: return max(scores) > threshold else: return False def _update_thresholds(self, score_thresholds: Optional[Dict[str, float]] = None): """ Update the score thresholds for each route. """ if score_thresholds: for route in self.routes: route.score_threshold = score_thresholds.get( route.name, self.score_threshold ) def to_config(self) -> LayerConfig: return LayerConfig( encoder_type=self.encoder.type, encoder_name=self.encoder.name, routes=self.routes, ) def to_json(self, file_path: str): config = self.to_config() config.to_file(file_path) def to_yaml(self, file_path: str): config = self.to_config() config.to_file(file_path) def get_thresholds(self) -> Dict[str, float]: # TODO: float() below is hacky fix for lint, fix this with new type? thresholds = { route.name: float(route.score_threshold or self.score_threshold) for route in self.routes } return thresholds def fit( self, X: List[str], y: List[str], batch_size: int = 500, max_iter: int = 500, ): # convert inputs into array Xq: List[List[float]] = [] for i in tqdm(range(0, len(X), batch_size), desc="Generating embeddings"): emb = np.array(self.encoder(X[i : i + batch_size])) Xq.extend(emb) # initial eval (we will iterate from here) best_acc = self._vec_evaluate(Xq=np.array(Xq), y=y) best_thresholds = self.get_thresholds() # begin fit for _ in (pbar := tqdm(range(max_iter), desc="Training")): pbar.set_postfix({"acc": round(best_acc, 2)}) # Find the best score threshold for each route thresholds = threshold_random_search( route_layer=self, search_range=0.8, ) # update current route layer self._update_thresholds(score_thresholds=thresholds) # evaluate acc = self._vec_evaluate(Xq=Xq, y=y) # update best if acc > best_acc: best_acc = acc best_thresholds = thresholds # update route layer to best thresholds self._update_thresholds(score_thresholds=best_thresholds) def evaluate(self, X: List[str], y: List[str], batch_size: int = 500) -> float: """ Evaluate the accuracy of the route selection. """ Xq: List[List[float]] = [] for i in tqdm(range(0, len(X), batch_size), desc="Generating embeddings"): emb = np.array(self.encoder(X[i : i + batch_size])) Xq.extend(emb) accuracy = self._vec_evaluate(Xq=np.array(Xq), y=y) return accuracy def _vec_evaluate(self, Xq: Union[List[float], Any], y: List[str]) -> float: """ Evaluate the accuracy of the route selection. """ correct = 0 for xq, target_route in zip(Xq, y): # We treate dynamic routes as static here, because when evaluating we use only vectors, and dynamic routes expect strings by default. route_choice = self(vector=xq, simulate_static=True) if route_choice.name == target_route: correct += 1 accuracy = correct / len(Xq) return accuracy def _get_route_names(self) -> List[str]: return [route.name for route in self.routes] def threshold_random_search( route_layer: RouteLayer, search_range: Union[int, float], ) -> Dict[str, float]: """Performs a random search iteration given a route layer and a search range.""" # extract the route names routes = route_layer.get_thresholds() route_names = list(routes.keys()) route_thresholds = list(routes.values()) # generate search range for each score_threshold_values = [] for threshold in route_thresholds: score_threshold_values.append( np.linspace( start=max(threshold - search_range, 0.0), stop=min(threshold + search_range, 1.0), num=100, ) ) # Generate a random threshold for each route score_thresholds = { route: random.choice(score_threshold_values[i]) for i, route in enumerate(route_names) } return score_thresholds