diff --git a/docs/source/conf.py b/docs/source/conf.py index 2dae787f1cb09b53476cd424cb45108ea15386d0..0340e029dfeb8e4c547313698748e475d1c173d0 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -15,7 +15,7 @@ sys.path.insert(0, os.path.abspath("../..")) # Source code dir relative to this project = "Semantic Router" copyright = "2024, Aurelio AI" author = "Aurelio AI" -release = "0.1.0.dev1" +release = "0.1.0.dev2" # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/pyproject.toml b/pyproject.toml index 68881c73da1889d6316580a45261065234881e7c..98fa4ec70b5c4f82a3ed839229a3b438f28e7eff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "semantic-router" -version = "0.1.0.dev1" +version = "0.1.0.dev2" description = "Super fast semantic router for AI decision making" authors = ["Aurelio AI <hello@aurelio.ai>"] readme = "README.md" diff --git a/semantic_router/__init__.py b/semantic_router/__init__.py index a2ed0d0718f3ed59947678c574c44805c979afee..80f33f231ccb296fa8c9b81f1340d9bd37903a59 100644 --- a/semantic_router/__init__.py +++ b/semantic_router/__init__.py @@ -1,7 +1,6 @@ -from semantic_router.hybrid_layer import HybridRouteLayer -from semantic_router.layer import LayerConfig, RouteLayer +from semantic_router.routers import LayerConfig, RouteLayer, HybridRouteLayer from semantic_router.route import Route __all__ = ["RouteLayer", "HybridRouteLayer", "Route", "LayerConfig"] -__version__ = "0.1.0.dev1" +__version__ = "0.1.0.dev2" diff --git a/semantic_router/hybrid_layer.py b/semantic_router/hybrid_layer.py deleted file mode 100644 index 2aaf6f17b72b05d6c2d02958c8e098b312d37bf4..0000000000000000000000000000000000000000 --- a/semantic_router/hybrid_layer.py +++ /dev/null @@ -1,220 +0,0 @@ -from typing import Dict, List, Optional, Tuple - -import numpy as np -from numpy.linalg import norm - -from semantic_router.encoders import ( - BaseEncoder, - BM25Encoder, - TfidfEncoder, -) -from semantic_router.route import Route -from semantic_router.utils.logger import logger - - -class HybridRouteLayer: - index = None - sparse_index = None - categories = None - score_threshold: float - - def __init__( - self, - encoder: BaseEncoder, - sparse_encoder: Optional[BM25Encoder] = None, - routes: List[Route] = [], - alpha: float = 0.3, - top_k: int = 5, - aggregation: str = "sum", - ): - self.encoder = encoder - if self.encoder.score_threshold is None: - raise ValueError( - "No score threshold provided for encoder. Please set the score threshold " - "in the encoder config." - ) - self.score_threshold = self.encoder.score_threshold - - if sparse_encoder is None: - logger.warning("No sparse_encoder provided. Using default BM25Encoder.") - self.sparse_encoder = BM25Encoder() - else: - self.sparse_encoder = sparse_encoder - - self.alpha = alpha - self.top_k = top_k - if self.top_k < 1: - raise ValueError(f"top_k needs to be >= 1, but was: {self.top_k}.") - self.aggregation = aggregation - if self.aggregation not in ["sum", "mean", "max"]: - raise ValueError( - f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'." - ) - self.aggregation_method = self._set_aggregation_method(self.aggregation) - self.routes = routes - if isinstance(self.sparse_encoder, TfidfEncoder) and hasattr( - self.sparse_encoder, "fit" - ): - self.sparse_encoder.fit(routes) - # if routes list has been passed, we initialize index now - if routes: - # initialize index now - # for route in tqdm(routes): - # self._add_route(route=route) - self._add_routes(routes) - - def __call__(self, text: str) -> Optional[str]: - results = self._query(text, self.top_k) - top_class, top_class_scores = self._semantic_classify(results) - passed = self._pass_threshold(top_class_scores, self.score_threshold) - if passed: - return top_class - else: - return None - - def add(self, route: Route): - self._add_route(route=route) - - def _add_route(self, route: Route): - self.routes += [route] - - self.update_dense_embeddings_index(route.utterances) - - if isinstance(self.sparse_encoder, TfidfEncoder) and hasattr( - self.sparse_encoder, "fit" - ): - self.sparse_encoder.fit(self.routes) - # re-build index - self.sparse_index = None - all_utterances = [ - utterance for route in self.routes for utterance in route.utterances - ] - self.update_sparse_embeddings_index(all_utterances) - else: - self.update_sparse_embeddings_index(route.utterances) - - # create route array - if self.categories is None: - self.categories = np.array([route.name] * len(route.utterances)) - else: - str_arr = np.array([route.name] * len(route.utterances)) - self.categories = np.concatenate([self.categories, str_arr]) - self.routes.append(route) - - def _add_routes(self, routes: List[Route]): - # create embeddings for all routes - logger.info("Creating embeddings for all routes...") - all_utterances = [ - utterance for route in routes for utterance in route.utterances - ] - self.update_dense_embeddings_index(all_utterances) - self.update_sparse_embeddings_index(all_utterances) - - # create route array - route_names = [route.name for route in routes for _ in route.utterances] - route_array = np.array(route_names) - self.categories = ( - np.concatenate([self.categories, route_array]) - if self.categories is not None - else route_array - ) - - def update_dense_embeddings_index(self, utterances: list): - dense_embeds = np.array(self.encoder(utterances)) - # create utterance array (the dense index) - self.index = ( - np.concatenate([self.index, dense_embeds]) - if self.index is not None - else dense_embeds - ) - - def update_sparse_embeddings_index(self, utterances: list): - sparse_embeds = np.array(self.sparse_encoder(utterances)) - # create sparse utterance array - self.sparse_index = ( - np.concatenate([self.sparse_index, sparse_embeds]) - if self.sparse_index is not None - else sparse_embeds - ) - - def _query(self, text: str, top_k: int = 5): - """Given some text, encodes and searches the index vector space to - retrieve the top_k most similar records. - """ - # create dense query vector - xq_d = np.array(self.encoder([text])) - xq_d = np.squeeze(xq_d) # Reduce to 1d array. - # create sparse query vector - xq_s = np.array(self.sparse_encoder([text])) - xq_s = np.squeeze(xq_s) - # convex scaling - xq_d, xq_s = self._convex_scaling(xq_d, xq_s) - - if self.index is not None and self.sparse_index is not None: - # calculate dense vec similarity - index_norm = norm(self.index, axis=1) - xq_d_norm = norm(xq_d.T) - sim_d = np.dot(self.index, xq_d.T) / (index_norm * xq_d_norm) - # calculate sparse vec similarity - sparse_norm = norm(self.sparse_index, axis=1) - xq_s_norm = norm(xq_s.T) - sim_s = np.dot(self.sparse_index, xq_s.T) / (sparse_norm * xq_s_norm) - total_sim = sim_d + sim_s - # get indices of top_k records - top_k = min(top_k, total_sim.shape[0]) - idx = np.argpartition(total_sim, -top_k)[-top_k:] - scores = total_sim[idx] - # get the utterance categories (route names) - routes = self.categories[idx] if self.categories is not None else [] - return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)] - else: - logger.warning("No index found. Please add routes to the layer.") - return [] - - def _convex_scaling(self, dense: np.ndarray, sparse: np.ndarray): - # scale sparse and dense vecs - dense = np.array(dense) * self.alpha - sparse = np.array(sparse) * (1 - self.alpha) - return dense, sparse - - def _set_aggregation_method(self, aggregation: str = "sum"): - if aggregation == "sum": - return lambda x: sum(x) - elif aggregation == "mean": - return lambda x: np.mean(x) - elif aggregation == "max": - return lambda x: max(x) - else: - raise ValueError( - f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'." - ) - - def _semantic_classify(self, query_results: List[Dict]) -> Tuple[str, List[float]]: - scores_by_class: Dict[str, List[float]] = {} - for result in query_results: - score = result["score"] - route = result["route"] - if route in scores_by_class: - scores_by_class[route].append(score) - else: - scores_by_class[route] = [score] - - # Calculate total score for each class - total_scores = { - route: self.aggregation_method(scores) - for route, scores in scores_by_class.items() - } - top_class = max(total_scores, key=lambda x: total_scores[x], default=None) - - # Return the top class and its associated scores - if top_class is not None: - return str(top_class), scores_by_class.get(top_class, []) - else: - logger.warning("No classification found for semantic classifier.") - return "", [] - - def _pass_threshold(self, scores: List[float], threshold: float) -> bool: - if scores: - return max(scores) > threshold - else: - return False diff --git a/semantic_router/index/__init__.py b/semantic_router/index/__init__.py index 9a01b996985414a5b60bd1c4f1280a4488999275..3a43abe936a929cf9811d0b8d6a47b6cbf3af34d 100644 --- a/semantic_router/index/__init__.py +++ b/semantic_router/index/__init__.py @@ -1,10 +1,12 @@ from semantic_router.index.base import BaseIndex +from semantic_router.index.hybrid_local import HybridLocalIndex from semantic_router.index.local import LocalIndex from semantic_router.index.pinecone import PineconeIndex from semantic_router.index.qdrant import QdrantIndex __all__ = [ "BaseIndex", + "HybridLocalIndex", "LocalIndex", "QdrantIndex", "PineconeIndex", diff --git a/semantic_router/index/base.py b/semantic_router/index/base.py index 4893987a78335f2dda3039c9929b2c8503d108d7..65f2cf1e0e3e1ec3a31be519ab61784cfd4c1ba8 100644 --- a/semantic_router/index/base.py +++ b/semantic_router/index/base.py @@ -18,12 +18,12 @@ class BaseIndex(BaseModel): # You can define common attributes here if there are any. # For example, a placeholder for the index attribute: - index: Optional[Any] = None routes: Optional[np.ndarray] = None utterances: Optional[np.ndarray] = None dimensions: Union[int, None] = None type: str = "base" init_async_index: bool = False + index: Optional[Any] = None def add( self, diff --git a/semantic_router/index/hybrid_local.py b/semantic_router/index/hybrid_local.py new file mode 100644 index 0000000000000000000000000000000000000000..9316487e33d193494fec0cfd7b1664951413d8be --- /dev/null +++ b/semantic_router/index/hybrid_local.py @@ -0,0 +1,190 @@ +from typing import List, Optional, Tuple, Dict + +import numpy as np +from numpy.linalg import norm + +from semantic_router.schema import ConfigParameter, Utterance +from semantic_router.index.local import LocalIndex +from semantic_router.linear import similarity_matrix, top_scores +from semantic_router.utils.logger import logger +from typing import Any + + +class HybridLocalIndex(LocalIndex): + type: str = "hybrid_local" + sparse_index: Optional[np.ndarray] = None + route_names: Optional[np.ndarray] = None + + class Config: + # Stop pydantic from complaining about Optional[np.ndarray]type hints. + arbitrary_types_allowed = True + + def add( + self, + embeddings: List[List[float]], + routes: List[str], + utterances: List[str], + function_schemas: Optional[List[Dict[str, Any]]] = None, + metadata_list: List[Dict[str, Any]] = [], + sparse_embeddings: Optional[List[List[float]]] = None, + ): + if sparse_embeddings is None: + raise ValueError("Sparse embeddings are required for HybridLocalIndex.") + if function_schemas is not None: + raise ValueError("Function schemas are not supported for HybridLocalIndex.") + if metadata_list: + raise ValueError("Metadata is not supported for HybridLocalIndex.") + embeds = np.array(embeddings) + sparse_embeds = np.array(sparse_embeddings) + routes_arr = np.array(routes) + if isinstance(utterances[0], str): + utterances_arr = np.array(utterances) + else: + utterances_arr = np.array(utterances, dtype=object) + if self.index is None or self.sparse_index is None: + self.index = embeds + self.sparse_index = sparse_embeds + self.routes = routes_arr + self.utterances = utterances_arr + else: + # TODO: we should probably switch to an `upsert` method and standardize elsewhere + self.index = np.concatenate([self.index, embeds]) + self.sparse_index = np.concatenate([self.sparse_index, sparse_embeds]) + self.routes = np.concatenate([self.routes, routes_arr]) + self.utterances = np.concatenate([self.utterances, utterances_arr]) + + def get_utterances(self) -> List[Utterance]: + """Gets a list of route and utterance objects currently stored in the index. + + Returns: + List[Tuple]: A list of (route_name, utterance) objects. + """ + if self.routes is None or self.utterances is None: + return [] + return [Utterance.from_tuple(x) for x in zip(self.routes, self.utterances)] + + def describe(self) -> Dict: + return { + "type": self.type, + "dimensions": self.index.shape[1] if self.index is not None else 0, + "vectors": self.index.shape[0] if self.index is not None else 0, + } + + def query( + self, + vector: np.ndarray, + top_k: int = 5, + route_filter: Optional[List[str]] = None, + sparse_vector: Optional[np.ndarray] = None, + ) -> Tuple[np.ndarray, List[str]]: + """Search the index for the query and return top_k results. + + :param vector: The query vector to search for. + :type vector: np.ndarray + :param top_k: The number of top results to return, defaults to 5. + :type top_k: int, optional + :param route_filter: A list of route names to filter the search results, defaults to None. + :type route_filter: Optional[List[str]], optional + :param sparse_vector: The sparse vector to search for, must be provided. + :type sparse_vector: np.ndarray + """ + if route_filter: + raise ValueError("Route filter is not supported for HybridLocalIndex.") + + xq_d = vector.copy() + if sparse_vector is None: + raise ValueError("Sparse vector is required for HybridLocalIndex.") + xq_s = sparse_vector.copy() + + if self.index is not None and self.sparse_index is not None: + # calculate dense vec similarity + index_norm = norm(self.index, axis=1) + xq_d_norm = norm(xq_d) # TODO: this used to be xq_d.T, should work without + sim_d = np.squeeze(np.dot(self.index, xq_d.T)) / (index_norm * xq_d_norm) + # calculate sparse vec similarity + sparse_norm = norm(self.sparse_index, axis=1) + xq_s_norm = norm(xq_s) # TODO: this used to be xq_s.T, should work without + sim_s = np.squeeze(np.dot(self.sparse_index, xq_s.T)) / (sparse_norm * xq_s_norm) + total_sim = sim_d + sim_s + # get indices of top_k records + top_k = min(top_k, total_sim.shape[0]) + idx = np.argpartition(total_sim, -top_k)[-top_k:] + scores = total_sim[idx] + # get the utterance categories (route names) + route_names = self.routes[idx] if self.routes is not None else [] + return scores, route_names + else: + raise ValueError("Index or sparse index is not populated.") + + async def aquery( + self, + vector: np.ndarray, + top_k: int = 5, + route_filter: Optional[List[str]] = None, + sparse_vector: Optional[np.ndarray] = None, + ) -> Tuple[np.ndarray, List[str]]: + """Search the index for the query and return top_k results. This method calls the + sync `query` method as everything uses numpy computations which is CPU-bound + and so no benefit can be gained from making this async. + + :param vector: The query vector to search for. + :type vector: np.ndarray + :param top_k: The number of top results to return, defaults to 5. + :type top_k: int, optional + :param route_filter: A list of route names to filter the search results, defaults to None. + :type route_filter: Optional[List[str]], optional + :param sparse_vector: The sparse vector to search for, must be provided. + :type sparse_vector: np.ndarray + """ + return self.query( + vector=vector, + top_k=top_k, + route_filter=route_filter, + sparse_vector=sparse_vector, + ) + + def aget_routes(self): + logger.error("Sync remove is not implemented for LocalIndex.") + + def _write_config(self, config: ConfigParameter): + logger.warning("No config is written for LocalIndex.") + + def delete(self, route_name: str): + """ + Delete all records of a specific route from the index. + """ + if ( + self.index is not None + and self.routes is not None + and self.utterances is not None + ): + delete_idx = self._get_indices_for_route(route_name=route_name) + self.index = np.delete(self.index, delete_idx, axis=0) + self.routes = np.delete(self.routes, delete_idx, axis=0) + self.utterances = np.delete(self.utterances, delete_idx, axis=0) + else: + raise ValueError( + "Attempted to delete route records but either index, routes or " + "utterances is None." + ) + + def delete_index(self): + """ + Deletes the index, effectively clearing it and setting it to None. + """ + self.index = None + self.routes = None + self.utterances = None + + def _get_indices_for_route(self, route_name: str): + """Gets an array of indices for a specific route.""" + if self.routes is None: + raise ValueError("Routes are not populated.") + idx = [i for i, route in enumerate(self.routes) if route == route_name] + return idx + + def __len__(self): + if self.index is not None: + return self.index.shape[0] + else: + return 0 diff --git a/semantic_router/index/local.py b/semantic_router/index/local.py index 509e0fdfded4b3021b328b14d0bf7ffdd55ccb31..420ad30f6ec29c0f1bfffbc432582b85e2f6d6fa 100644 --- a/semantic_router/index/local.py +++ b/semantic_router/index/local.py @@ -10,14 +10,10 @@ from typing import Any class LocalIndex(BaseIndex): - def __init__( - self, - index: Optional[np.ndarray] = None, - routes: Optional[np.ndarray] = None, - utterances: Optional[np.ndarray] = None, - ): - super().__init__(index=index, routes=routes, utterances=utterances) - self.type = "local" + type: str = "local" + + def __init__(self): + super().__init__() class Config: # Stop pydantic from complaining about Optional[np.ndarray]type hints. @@ -47,7 +43,9 @@ class LocalIndex(BaseIndex): self.utterances = np.concatenate([self.utterances, utterances_arr]) def _remove_and_sync(self, routes_to_delete: dict): - logger.warning("Sync remove is not implemented for LocalIndex.") + logger.warning( + f"Sync remove is not implemented for {self.__class__.__name__}." + ) def get_utterances(self) -> List[Utterance]: """ diff --git a/semantic_router/routers/__init__.py b/semantic_router/routers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..4c8031ab50bd7512f23225c91deed5c1e0908c15 --- /dev/null +++ b/semantic_router/routers/__init__.py @@ -0,0 +1,10 @@ +from semantic_router.routers.base import BaseRouteLayer, LayerConfig +from semantic_router.routers.semantic import RouteLayer +from semantic_router.routers.hybrid import HybridRouteLayer + +__all__ = [ + "BaseRouteLayer", + "LayerConfig", + "RouteLayer", + "HybridRouteLayer", +] diff --git a/semantic_router/routers/base.py b/semantic_router/routers/base.py new file mode 100644 index 0000000000000000000000000000000000000000..100663da12f369b729fcca6823b82c9c900cab30 --- /dev/null +++ b/semantic_router/routers/base.py @@ -0,0 +1,1240 @@ +import importlib +import json +import os +import random +import hashlib +from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from pydantic.v1 import BaseModel, Field, validator + +import numpy as np +import yaml # type: ignore +from tqdm.auto import tqdm + +from semantic_router.encoders import AutoEncoder, BaseEncoder, OpenAIEncoder +from semantic_router.index.base import BaseIndex +from semantic_router.index.local import LocalIndex +from semantic_router.index.pinecone import PineconeIndex +from semantic_router.llms import BaseLLM, OpenAILLM +from semantic_router.route import Route +from semantic_router.schema import ( + ConfigParameter, + EncoderType, + RouteChoice, + Utterance, + UtteranceDiff, +) +from semantic_router.utils.defaults import EncoderDefault +from semantic_router.utils.logger import logger + + +def is_valid(layer_config: str) -> bool: + """Make sure the given string is json format and contains the 3 keys: + ["encoder_name", "encoder_type", "routes"]""" + try: + output_json = json.loads(layer_config) + required_keys = ["encoder_name", "encoder_type", "routes"] + + if isinstance(output_json, list): + for item in output_json: + missing_keys = [key for key in required_keys if key not in item] + if missing_keys: + logger.warning( + f"Missing keys in layer config: {', '.join(missing_keys)}" + ) + return False + return True + else: + missing_keys = [key for key in required_keys if key not in output_json] + if missing_keys: + logger.warning( + f"Missing keys in layer config: {', '.join(missing_keys)}" + ) + return False + else: + return True + except json.JSONDecodeError as e: + logger.error(e) + return False + + +class LayerConfig: + """ + Generates a LayerConfig object that can be used for initializing a + RouteLayer. + """ + + routes: List[Route] = [] + + class Config: + arbitrary_types_allowed = True + + def __init__( + self, + routes: List[Route] = [], + encoder_type: str = "openai", + encoder_name: Optional[str] = None, + ): + self.encoder_type = encoder_type + if encoder_name is None: + for encode_type in EncoderType: + if encode_type.value == self.encoder_type: + if self.encoder_type == EncoderType.HUGGINGFACE.value: + raise NotImplementedError( + "HuggingFace encoder not supported by LayerConfig yet." + ) + encoder_name = EncoderDefault[encode_type.name].value[ + "embedding_model" + ] + break + logger.info(f"Using default {encoder_type} encoder: {encoder_name}") + self.encoder_name = encoder_name + self.routes = routes + + @classmethod + def from_file(cls, path: str) -> "LayerConfig": + logger.info(f"Loading route config from {path}") + _, ext = os.path.splitext(path) + with open(path, "r") as f: + if ext == ".json": + layer = json.load(f) + elif ext in [".yaml", ".yml"]: + layer = yaml.safe_load(f) + else: + raise ValueError( + "Unsupported file type. Only .json and .yaml are supported" + ) + + if not is_valid(json.dumps(layer)): + raise Exception("Invalid config JSON or YAML") + + encoder_type = layer["encoder_type"] + encoder_name = layer["encoder_name"] + routes = [] + for route_data in layer["routes"]: + # Handle the 'llm' field specially if it exists + if "llm" in route_data and route_data["llm"] is not None: + llm_data = route_data.pop( + "llm" + ) # Remove 'llm' from route_data and handle it separately + # Use the module path directly from llm_data without modification + llm_module_path = llm_data["module"] + # Dynamically import the module and then the class from that module + llm_module = importlib.import_module(llm_module_path) + llm_class = getattr(llm_module, llm_data["class"]) + # Instantiate the LLM class with the provided model name + llm = llm_class(name=llm_data["model"]) + # Reassign the instantiated llm object back to route_data + route_data["llm"] = llm + + # Dynamically create the Route object using the remaining route_data + route = Route(**route_data) + routes.append(route) + + return cls( + encoder_type=encoder_type, encoder_name=encoder_name, routes=routes + ) + + @classmethod + def from_tuples( + cls, + route_tuples: List[ + Tuple[str, str, Optional[List[Dict[str, Any]]], Dict[str, Any]] + ], + encoder_type: str = "openai", + encoder_name: Optional[str] = None, + ): + """Initialize a LayerConfig from a list of tuples of routes and + utterances. + + :param route_tuples: A list of tuples, each containing a route name and an + associated utterance. + :type route_tuples: List[Tuple[str, str]] + :param encoder_type: The type of encoder to use, defaults to "openai". + :type encoder_type: str, optional + :param encoder_name: The name of the encoder to use, defaults to None. + :type encoder_name: Optional[str], optional + """ + routes_dict: Dict[str, Route] = {} + # first create a dictionary of route names to Route objects + # TODO: duplicated code with BaseIndex.get_routes() + for route_name, utterance, function_schema, metadata in route_tuples: + # if the route is not in the dictionary, add it + if route_name not in routes_dict: + routes_dict[route_name] = Route( + name=route_name, + utterances=[utterance], + function_schemas=function_schema, + metadata=metadata, + ) + else: + # otherwise, add the utterance to the route + routes_dict[route_name].utterances.append(utterance) + # then create a list of routes from the dictionary + routes: List[Route] = [] + for route_name, route in routes_dict.items(): + routes.append(route) + return cls(routes=routes, encoder_type=encoder_type, encoder_name=encoder_name) + + @classmethod + def from_index( + cls, + index: BaseIndex, + encoder_type: str = "openai", + encoder_name: Optional[str] = None, + ): + """Initialize a LayerConfig from a BaseIndex object. + + :param index: The index to initialize the LayerConfig from. + :type index: BaseIndex + :param encoder_type: The type of encoder to use, defaults to "openai". + :type encoder_type: str, optional + :param encoder_name: The name of the encoder to use, defaults to None. + :type encoder_name: Optional[str], optional + """ + remote_routes = index.get_utterances() + return cls.from_tuples( + route_tuples=[utt.to_tuple() for utt in remote_routes], + encoder_type=encoder_type, + encoder_name=encoder_name, + ) + + def to_dict(self) -> Dict[str, Any]: + return { + "encoder_type": self.encoder_type, + "encoder_name": self.encoder_name, + "routes": [route.to_dict() for route in self.routes], + } + + def to_file(self, path: str): + """Save the routes to a file in JSON or YAML format""" + logger.info(f"Saving route config to {path}") + _, ext = os.path.splitext(path) + + # Check file extension before creating directories or files + if ext not in [".json", ".yaml", ".yml"]: + raise ValueError( + "Unsupported file type. Only .json and .yaml are supported" + ) + + dir_name = os.path.dirname(path) + + # Create the directory if it doesn't exist and dir_name is not an empty string + if dir_name and not os.path.exists(dir_name): + os.makedirs(dir_name) + + with open(path, "w") as f: + if ext == ".json": + json.dump(self.to_dict(), f, indent=4) + elif ext in [".yaml", ".yml"]: + yaml.safe_dump(self.to_dict(), f) + + def to_utterances(self) -> List[Utterance]: + """Convert the routes to a list of Utterance objects. + + :return: A list of Utterance objects. + :rtype: List[Utterance] + """ + utterances = [] + for route in self.routes: + utterances.extend( + [ + Utterance( + route=route.name, + utterance=x, + function_schemas=route.function_schemas, + metadata=route.metadata or {}, + ) + for x in route.utterances + ] + ) + return utterances + + def add(self, route: Route): + self.routes.append(route) + logger.info(f"Added route `{route.name}`") + + def get(self, name: str) -> Optional[Route]: + for route in self.routes: + if route.name == name: + return route + logger.error(f"Route `{name}` not found") + return None + + def remove(self, name: str): + if name not in [route.name for route in self.routes]: + logger.error(f"Route `{name}` not found") + else: + self.routes = [route for route in self.routes if route.name != name] + logger.info(f"Removed route `{name}`") + + def get_hash(self) -> ConfigParameter: + layer = self.to_dict() + return ConfigParameter( + field="sr_hash", + value=hashlib.sha256(json.dumps(layer).encode()).hexdigest(), + ) + + +class BaseRouteLayer(BaseModel): + encoder: BaseEncoder + index: BaseIndex = Field(default_factory=BaseIndex) + score_threshold: Optional[float] = None + routes: List[Route] = [] + llm: Optional[BaseLLM] = None + top_k: int = 5 + aggregation: str = "mean" + aggregation_method: Optional[Callable] = None + auto_sync: Optional[str] = None + + class Config: + arbitrary_types_allowed = True + + @validator("score_threshold", pre=True, always=True) + def set_score_threshold(cls, v): + return float(v) if v is not None else None + + @validator("index", pre=True, always=True) + def set_index(cls, v): + return v if v is not None else LocalIndex() + + def __init__( + self, + encoder: Optional[BaseEncoder] = None, + llm: Optional[BaseLLM] = None, + routes: List[Route] = [], + index: Optional[BaseIndex] = None, # type: ignore + top_k: int = 5, + aggregation: str = "mean", + auto_sync: Optional[str] = None, + ): + super().__init__( + encoder=encoder, + llm=llm, + routes=routes, + index=index, + top_k=top_k, + aggregation=aggregation, + auto_sync=auto_sync, + ) + if encoder is None: + logger.warning( + "No encoder provided. Using default OpenAIEncoder. Ensure " + "that you have set OPENAI_API_KEY in your environment." + ) + self.encoder = OpenAIEncoder() + else: + self.encoder = encoder + self.llm = llm + self.routes = routes if routes else [] + if self.encoder.score_threshold is None: + raise ValueError( + "No score threshold provided for encoder. Please set the score threshold " + "in the encoder config." + ) + self.score_threshold = self.encoder.score_threshold + self.top_k = top_k + if self.top_k < 1: + raise ValueError(f"top_k needs to be >= 1, but was: {self.top_k}.") + self.aggregation = aggregation + if self.aggregation not in ["sum", "mean", "max"]: + raise ValueError( + f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'." + ) + self.aggregation_method = self._set_aggregation_method(self.aggregation) + self.auto_sync = auto_sync + + # set route score thresholds if not already set + for route in self.routes: + if route.score_threshold is None: + route.score_threshold = self.score_threshold + # if routes list has been passed, we initialize index now + if self.auto_sync: + # initialize index now, check if we need dimensions + if self.index.dimensions is None: + dims = len(self.encoder(["test"])[0]) + self.index.dimensions = dims + # now init index + if isinstance(self.index, PineconeIndex): + self.index.index = self.index._init_index(force_create=True) + local_utterances = self.to_config().to_utterances() + remote_utterances = self.index.get_utterances() + diff = UtteranceDiff.from_utterances( + local_utterances=local_utterances, + remote_utterances=remote_utterances, + ) + sync_strategy = diff.get_sync_strategy(self.auto_sync) + self._execute_sync_strategy(sync_strategy) + + def check_for_matching_routes(self, top_class: str) -> Optional[Route]: + matching_route = next( + (route for route in self.routes if route.name == top_class), None + ) + if matching_route is None: + logger.error( + f"No route found with name {top_class}. Check to see if any Routes " + "have been defined." + ) + return None + return matching_route + + def __call__( + self, + text: Optional[str] = None, + vector: Optional[List[float]] = None, + simulate_static: bool = False, + route_filter: Optional[List[str]] = None, + ) -> RouteChoice: + # if no vector provided, encode text to get vector + if vector is None: + if text is None: + raise ValueError("Either text or vector must be provided") + vector = self._encode(text=text) + route, top_class_scores = self._retrieve_top_route(vector, route_filter) + passed = self._check_threshold(top_class_scores, route) + if passed and route is not None and not simulate_static: + if route.function_schemas and text is None: + raise ValueError( + "Route has a function schema, but no text was provided." + ) + if route.function_schemas and not isinstance(route.llm, BaseLLM): + if not self.llm: + logger.warning( + "No LLM provided for dynamic route, will use OpenAI LLM " + "default. Ensure API key is set in OPENAI_API_KEY environment " + "variable." + ) + + self.llm = OpenAILLM() + route.llm = self.llm + else: + route.llm = self.llm + return route(text) + elif passed and route is not None and simulate_static: + return RouteChoice( + name=route.name, + function_call=None, + similarity_score=None, + ) + else: + # if no route passes threshold, return empty route choice + return RouteChoice() + + async def acall( + self, + text: Optional[str] = None, + vector: Optional[List[float]] = None, + simulate_static: bool = False, + route_filter: Optional[List[str]] = None, + ) -> RouteChoice: + # if no vector provided, encode text to get vector + if vector is None: + if text is None: + raise ValueError("Either text or vector must be provided") + vector = await self._async_encode(text=text) + + route, top_class_scores = await self._async_retrieve_top_route( + vector, route_filter + ) + passed = self._check_threshold(top_class_scores, route) + if passed and route is not None and not simulate_static: + if route.function_schemas and text is None: + raise ValueError( + "Route has a function schema, but no text was provided." + ) + if route.function_schemas and not isinstance(route.llm, BaseLLM): + if not self.llm: + logger.warning( + "No LLM provided for dynamic route, will use OpenAI LLM default" + ) + self.llm = OpenAILLM() + route.llm = self.llm + else: + route.llm = self.llm + return await route.acall(text) + elif passed and route is not None and simulate_static: + return RouteChoice( + name=route.name, + function_call=None, + similarity_score=None, + ) + else: + # if no route passes threshold, return empty route choice + return RouteChoice() + + def retrieve_multiple_routes( + self, + text: Optional[str] = None, + vector: Optional[List[float]] = None, + ) -> List[RouteChoice]: + if vector is None: + if text is None: + raise ValueError("Either text or vector must be provided") + vector_arr = self._encode(text=text) + else: + vector_arr = np.array(vector) + # get relevant utterances + results = self._retrieve(xq=vector_arr) + + # decide most relevant routes + categories_with_scores = self._semantic_classify_multiple_routes(results) + + route_choices = [] + for category, score in categories_with_scores: + route = self.check_for_matching_routes(category) + if route: + route_choice = RouteChoice(name=route.name, similarity_score=score) + route_choices.append(route_choice) + + return route_choices + + def sync(self, sync_mode: str, force: bool = False) -> List[str]: + """Runs a sync of the local routes with the remote index. + + :param sync_mode: The mode to sync the routes with the remote index. + :type sync_mode: str + :param force: Whether to force the sync even if the local and remote + hashes already match. Defaults to False. + :type force: bool, optional + :return: A list of diffs describing the addressed differences between + the local and remote route layers. + :rtype: List[str] + """ + if not force and self.is_synced(): + logger.warning("Local and remote route layers are already synchronized.") + # create utterance diff to return, but just using local instance + # for speed + local_utterances = self.to_config().to_utterances() + diff = UtteranceDiff.from_utterances( + local_utterances=local_utterances, + remote_utterances=local_utterances, + ) + return diff.to_utterance_str() + # otherwise we continue with the sync, first creating a diff + local_utterances = self.to_config().to_utterances() + remote_utterances = self.index.get_utterances() + diff = UtteranceDiff.from_utterances( + local_utterances=local_utterances, + remote_utterances=remote_utterances, + ) + # generate sync strategy + sync_strategy = diff.get_sync_strategy(sync_mode=sync_mode) + # and execute + self._execute_sync_strategy(sync_strategy) + return diff.to_utterance_str() + + def _execute_sync_strategy(self, strategy: Dict[str, Dict[str, List[Utterance]]]): + """Executes the provided sync strategy, either deleting or upserting + routes from the local and remote instances as defined in the strategy. + + :param strategy: The sync strategy to execute. + :type strategy: Dict[str, Dict[str, List[Utterance]]] + """ + if strategy["remote"]["delete"]: + data_to_delete = {} # type: ignore + for utt_obj in strategy["remote"]["delete"]: + data_to_delete.setdefault(utt_obj.route, []).append(utt_obj.utterance) + # TODO: switch to remove without sync?? + self.index._remove_and_sync(data_to_delete) + if strategy["remote"]["upsert"]: + utterances_text = [utt.utterance for utt in strategy["remote"]["upsert"]] + self.index.add( + embeddings=self.encoder(utterances_text), + routes=[utt.route for utt in strategy["remote"]["upsert"]], + utterances=utterances_text, + function_schemas=[ + utt.function_schemas for utt in strategy["remote"]["upsert"] # type: ignore + ], + metadata_list=[utt.metadata for utt in strategy["remote"]["upsert"]], + ) + if strategy["local"]["delete"]: + self._local_delete(utterances=strategy["local"]["delete"]) + if strategy["local"]["upsert"]: + self._local_upsert(utterances=strategy["local"]["upsert"]) + # update hash + self._write_hash() + + def _local_upsert(self, utterances: List[Utterance]): + """Adds new routes to the RouteLayer. + + :param utterances: The utterances to add to the local RouteLayer. + :type utterances: List[Utterance] + """ + new_routes = {route.name: route for route in self.routes} + for utt_obj in utterances: + if utt_obj.route not in new_routes.keys(): + new_routes[utt_obj.route] = Route( + name=utt_obj.route, + utterances=[utt_obj.utterance], + function_schemas=utt_obj.function_schemas, + metadata=utt_obj.metadata, + ) + else: + if utt_obj.utterance not in new_routes[utt_obj.route].utterances: + new_routes[utt_obj.route].utterances.append(utt_obj.utterance) + new_routes[utt_obj.route].function_schemas = utt_obj.function_schemas + new_routes[utt_obj.route].metadata = utt_obj.metadata + temp = "\n".join([f"{name}: {r.utterances}" for name, r in new_routes.items()]) + logger.warning("TEMP | _local_upsert:\n" + temp) + self.routes = list(new_routes.values()) + + def _local_delete(self, utterances: List[Utterance]): + """Deletes routes from the local RouteLayer. + + :param utterances: The utterances to delete from the local RouteLayer. + :type utterances: List[Utterance] + """ + # create dictionary of route names to utterances + route_dict: dict[str, List[str]] = {} + for utt in utterances: + route_dict.setdefault(utt.route, []).append(utt.utterance) + temp = "\n".join([f"{r}: {u}" for r, u in route_dict.items()]) + logger.warning("TEMP | _local_delete:\n" + temp) + # iterate over current routes and delete specific utterance if found + new_routes = [] + for route in self.routes: + if route.name in route_dict.keys(): + # drop utterances that are in route_dict deletion list + new_utterances = list( + set(route.utterances) - set(route_dict[route.name]) + ) + if len(new_utterances) == 0: + # the route is now empty, so we skip it + continue + else: + new_routes.append( + Route( + name=route.name, + utterances=new_utterances, + # use existing function schemas and metadata + function_schemas=route.function_schemas, + metadata=route.metadata, + ) + ) + logger.warning( + f"TEMP | _local_delete OLD | {route.name}: {route.utterances}" + ) + logger.warning( + f"TEMP | _local_delete NEW | {route.name}: {new_routes[-1].utterances}" + ) + else: + # the route is not in the route_dict, so we keep it as is + new_routes.append(route) + temp = "\n".join([f"{r}: {u}" for r, u in route_dict.items()]) + logger.warning("TEMP | _local_delete:\n" + temp) + + self.routes = new_routes + + def _retrieve_top_route( + self, vector: List[float], route_filter: Optional[List[str]] = None + ) -> Tuple[Optional[Route], List[float]]: + """ + Retrieve the top matching route based on the given vector. + Returns a tuple of the route (if any) and the scores of the top class. + """ + # get relevant results (scores and routes) + results = self._retrieve( + xq=np.array(vector), top_k=self.top_k, route_filter=route_filter + ) + # decide most relevant routes + top_class, top_class_scores = self._semantic_classify(results) + # TODO do we need this check? + route = self.check_for_matching_routes(top_class) + return route, top_class_scores + + async def _async_retrieve_top_route( + self, vector: List[float], route_filter: Optional[List[str]] = None + ) -> Tuple[Optional[Route], List[float]]: + # get relevant results (scores and routes) + results = await self._async_retrieve( + xq=np.array(vector), top_k=self.top_k, route_filter=route_filter + ) + # decide most relevant routes + top_class, top_class_scores = await self._async_semantic_classify(results) + # TODO do we need this check? + route = self.check_for_matching_routes(top_class) + return route, top_class_scores + + def _check_threshold(self, scores: List[float], route: Optional[Route]) -> bool: + """ + Check if the route's score passes the specified threshold. + """ + if route is None: + return False + threshold = ( + route.score_threshold + if route.score_threshold is not None + else self.score_threshold + ) + return self._pass_threshold(scores, threshold) + + def __str__(self): + return ( + f"{self.__class__.__name__}(encoder={self.encoder}, " + f"score_threshold={self.score_threshold}, " + f"routes={self.routes})" + ) + + @classmethod + def from_json(cls, file_path: str): + config = LayerConfig.from_file(file_path) + encoder = AutoEncoder(type=config.encoder_type, name=config.encoder_name).model + return cls(encoder=encoder, routes=config.routes) + + @classmethod + def from_yaml(cls, file_path: str): + config = LayerConfig.from_file(file_path) + encoder = AutoEncoder(type=config.encoder_type, name=config.encoder_name).model + return cls(encoder=encoder, routes=config.routes) + + @classmethod + def from_config(cls, config: LayerConfig, index: Optional[BaseIndex] = None): + encoder = AutoEncoder(type=config.encoder_type, name=config.encoder_name).model + return cls(encoder=encoder, routes=config.routes, index=index) + + def add(self, route: Route): + """Add a route to the local RouteLayer and index. + + :param route: The route to add. + :type route: Route + """ + current_local_hash = self._get_hash() + current_remote_hash = self.index._read_hash() + if current_remote_hash.value == "": + # if remote hash is empty, the index is to be initialized + current_remote_hash = current_local_hash + embedded_utterances = self.encoder(route.utterances) + self.index.add( + embeddings=embedded_utterances, + routes=[route.name] * len(route.utterances), + utterances=route.utterances, + function_schemas=( + route.function_schemas * len(route.utterances) + if route.function_schemas + else [{}] * len(route.utterances) + ), + metadata_list=[route.metadata if route.metadata else {}] + * len(route.utterances), + ) + + self.routes.append(route) + if current_local_hash.value == current_remote_hash.value: + self._write_hash() # update current hash in index + else: + logger.warning( + "Local and remote route layers were not aligned. Remote hash " + "not updated. Use `RouteLayer.get_utterance_diff()` to see " + "details." + ) + + def list_route_names(self) -> List[str]: + return [route.name for route in self.routes] + + def update( + self, + name: str, + threshold: Optional[float] = None, + utterances: Optional[List[str]] = None, + ): + """Updates the route specified in name. Allows the update of + threshold and/or utterances. If no values are provided via the + threshold or utterances parameters, those fields are not updated. + If neither field is provided raises a ValueError. + + The name must exist within the local RouteLayer, if not a + KeyError will be raised. + """ + current_local_hash = self._get_hash() + current_remote_hash = self.index._read_hash() + if current_remote_hash.value == "": + # if remote hash is empty, the index is to be initialized + current_remote_hash = current_local_hash + + if threshold is None and utterances is None: + raise ValueError( + "At least one of 'threshold' or 'utterances' must be provided." + ) + if utterances: + raise NotImplementedError( + "The update method cannot be used for updating utterances yet." + ) + + route = self.get(name) + if route: + if threshold: + old_threshold = route.score_threshold + route.score_threshold = threshold + logger.info( + f"Updated threshold for route '{route.name}' from {old_threshold} to {threshold}" + ) + else: + raise ValueError(f"Route '{name}' not found. Nothing updated.") + + if current_local_hash.value == current_remote_hash.value: + self._write_hash() # update current hash in index + else: + logger.warning( + "Local and remote route layers were not aligned. Remote hash " + f"not updated. Use `{self.__class__.__name__}.get_utterance_diff()` " + "to see details." + ) + + def delete(self, route_name: str): + """Deletes a route given a specific route name. + + :param route_name: the name of the route to be deleted + :type str: + """ + current_local_hash = self._get_hash() + current_remote_hash = self.index._read_hash() + if current_remote_hash.value == "": + # if remote hash is empty, the index is to be initialized + current_remote_hash = current_local_hash + + if route_name not in [route.name for route in self.routes]: + err_msg = f"Route `{route_name}` not found in {self.__class__.__name__}" + logger.warning(err_msg) + try: + self.index.delete(route_name=route_name) + except Exception as e: + logger.error(f"Failed to delete route from the index: {e}") + else: + self.routes = [route for route in self.routes if route.name != route_name] + self.index.delete(route_name=route_name) + + if current_local_hash.value == current_remote_hash.value: + self._write_hash() # update current hash in index + else: + logger.warning( + "Local and remote route layers were not aligned. Remote hash " + f"not updated. Use `{self.__class__.__name__}.get_utterance_diff()` " + "to see details." + ) + + def _refresh_routes(self): + """Pulls out the latest routes from the index.""" + raise NotImplementedError("This method has not yet been implemented.") + route_mapping = {route.name: route for route in self.routes} + index_routes = self.index.get_utterances() + new_routes_names = [] + new_routes = [] + for route_name, utterance in index_routes: + if route_name in route_mapping: + if route_name not in new_routes_names: + existing_route = route_mapping[route_name] + new_routes.append(existing_route) + + new_routes.append(Route(name=route_name, utterances=[utterance])) + route = route_mapping[route_name] + self.routes.append(route) + + def _add_routes(self, routes: List[Route]): + current_local_hash = self._get_hash() + current_remote_hash = self.index._read_hash() + if current_remote_hash.value == "": + # if remote hash is empty, the index is to be initialized + current_remote_hash = current_local_hash + + if not routes: + logger.warning("No routes provided to add.") + return + # create embeddings for all routes + route_names, all_utterances, all_function_schemas, all_metadata = ( + self._extract_routes_details(routes, include_metadata=True) + ) + embedded_utterances = self.encoder(all_utterances) + try: + # Batch insertion into the index + self.index.add( + embeddings=embedded_utterances, + routes=route_names, + utterances=all_utterances, + function_schemas=all_function_schemas, + metadata_list=all_metadata, + ) + except Exception as e: + logger.error(f"Failed to add routes to the index: {e}") + raise Exception("Indexing error occurred") from e + + if current_local_hash.value == current_remote_hash.value: + self._write_hash() # update current hash in index + else: + logger.warning( + "Local and remote route layers were not aligned. Remote hash " + f"not updated. Use `{self.__class__.__name__}.get_utterance_diff()` " + "to see details." + ) + + def _get_hash(self) -> ConfigParameter: + config = self.to_config() + return config.get_hash() + + def _write_hash(self) -> ConfigParameter: + config = self.to_config() + hash_config = config.get_hash() + self.index._write_config(config=hash_config) + return hash_config + + def is_synced(self) -> bool: + """Check if the local and remote route layer instances are + synchronized. + + :return: True if the local and remote route layers are synchronized, + False otherwise. + :rtype: bool + """ + # first check hash + local_hash = self._get_hash() + remote_hash = self.index._read_hash() + if local_hash.value == remote_hash.value: + return True + else: + return False + + def get_utterance_diff(self, include_metadata: bool = False) -> List[str]: + """Get the difference between the local and remote utterances. Returns + a list of strings showing what is different in the remote when compared + to the local. For example: + + [" route1: utterance1", + " route1: utterance2", + "- route2: utterance3", + "- route2: utterance4"] + + Tells us that the remote is missing "route2: utterance3" and "route2: + utterance4", which do exist locally. If we see: + + [" route1: utterance1", + " route1: utterance2", + "+ route2: utterance3", + "+ route2: utterance4"] + + This diff tells us that the remote has "route2: utterance3" and + "route2: utterance4", which do not exist locally. + """ + # first we get remote and local utterances + remote_utterances = self.index.get_utterances() + local_utterances = self.to_config().to_utterances() + + diff_obj = UtteranceDiff.from_utterances( + local_utterances=local_utterances, remote_utterances=remote_utterances + ) + return diff_obj.to_utterance_str(include_metadata=include_metadata) + + def _extract_routes_details( + self, routes: List[Route], include_metadata: bool = False + ) -> Tuple: + route_names = [route.name for route in routes for _ in route.utterances] + utterances = [utterance for route in routes for utterance in route.utterances] + function_schemas = [ + route.function_schemas[0] if route.function_schemas is not None else {} + for route in routes + for _ in route.utterances + ] + + if include_metadata: + metadata = [route.metadata for route in routes for _ in route.utterances] + return route_names, utterances, function_schemas, metadata + return route_names, utterances, function_schemas + + def _encode(self, text: str) -> Any: + """Generates embeddings for a given text. + + Must be implemented by a subclass. + + :param text: The text to encode. + :type text: str + :return: The embeddings of the text. + :rtype: Any + """ + # TODO: should encode "content" rather than text + raise NotImplementedError("This method should be implemented by subclasses.") + + async def _async_encode(self, text: str) -> Any: + """Asynchronously generates embeddings for a given text. + + Must be implemented by a subclass. + + :param text: The text to encode. + :type text: str + :return: The embeddings of the text. + :rtype: Any + """ + # TODO: should encode "content" rather than text + raise NotImplementedError("This method should be implemented by subclasses.") + + def _retrieve( + self, xq: Any, top_k: int = 5, route_filter: Optional[List[str]] = None + ) -> List[Dict]: + """Given a query vector, retrieve the top_k most similar records.""" + # get scores and routes + scores, routes = self.index.query( + vector=xq, top_k=top_k, route_filter=route_filter + ) + return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)] + + async def _async_retrieve( + self, xq: Any, top_k: int = 5, route_filter: Optional[List[str]] = None + ) -> List[Dict]: + """Given a query vector, retrieve the top_k most similar records.""" + # get scores and routes + scores, routes = await self.index.aquery( + vector=xq, top_k=top_k, route_filter=route_filter + ) + return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)] + + def _set_aggregation_method(self, aggregation: str = "sum"): + # TODO is this really needed? + if aggregation == "sum": + return lambda x: sum(x) + elif aggregation == "mean": + return lambda x: np.mean(x) + elif aggregation == "max": + return lambda x: max(x) + else: + raise ValueError( + f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'." + ) + + def _semantic_classify(self, query_results: List[Dict]) -> Tuple[str, List[float]]: + scores_by_class = self.group_scores_by_class(query_results) + + # Calculate total score for each class + total_scores = { + route: self.aggregation_method(scores) + for route, scores in scores_by_class.items() + } + top_class = max(total_scores, key=lambda x: total_scores[x], default=None) + + # Return the top class and its associated scores + if top_class is not None: + return str(top_class), scores_by_class.get(top_class, []) + else: + logger.warning("No classification found for semantic classifier.") + return "", [] + + async def _async_semantic_classify( + self, query_results: List[Dict] + ) -> Tuple[str, List[float]]: + scores_by_class = await self.async_group_scores_by_class(query_results) + + # Calculate total score for each class + total_scores = { + route: self.aggregation_method(scores) + for route, scores in scores_by_class.items() + } + top_class = max(total_scores, key=lambda x: total_scores[x], default=None) + + # Return the top class and its associated scores + if top_class is not None: + return str(top_class), scores_by_class.get(top_class, []) + else: + logger.warning("No classification found for semantic classifier.") + return "", [] + + def get(self, name: str) -> Optional[Route]: + for route in self.routes: + if route.name == name: + return route + logger.error(f"Route `{name}` not found") + return None + + def _semantic_classify_multiple_routes( + self, query_results: List[Dict] + ) -> List[Tuple[str, float]]: + scores_by_class = self.group_scores_by_class(query_results) + + # Filter classes based on threshold and find max score for each + classes_above_threshold = [] + for route_name, scores in scores_by_class.items(): + # Use the get method to find the Route object by its name + route_obj = self.get(route_name) + if route_obj is not None: + # Use the Route object's threshold if it exists, otherwise use the provided threshold + _threshold = ( + route_obj.score_threshold + if route_obj.score_threshold is not None + else self.score_threshold + ) + if self._pass_threshold(scores, _threshold): + max_score = max(scores) + classes_above_threshold.append((route_name, max_score)) + + return classes_above_threshold + + def group_scores_by_class( + self, query_results: List[Dict] + ) -> Dict[str, List[float]]: + scores_by_class: Dict[str, List[float]] = {} + for result in query_results: + score = result["score"] + route = result["route"] + if route in scores_by_class: + scores_by_class[route].append(score) + else: + scores_by_class[route] = [score] + return scores_by_class + + async def async_group_scores_by_class( + self, query_results: List[Dict] + ) -> Dict[str, List[float]]: + scores_by_class: Dict[str, List[float]] = {} + for result in query_results: + score = result["score"] + route = result["route"] + if route in scores_by_class: + scores_by_class[route].append(score) + else: + scores_by_class[route] = [score] + return scores_by_class + + def _pass_threshold(self, scores: List[float], threshold: float) -> bool: + if scores: + return max(scores) > threshold + else: + return False + + def _update_thresholds(self, score_thresholds: Optional[Dict[str, float]] = None): + """ + Update the score thresholds for each route. + """ + if score_thresholds: + for route in self.routes: + route.score_threshold = score_thresholds.get( + route.name, self.score_threshold + ) + + def to_config(self) -> LayerConfig: + return LayerConfig( + encoder_type=self.encoder.type, + encoder_name=self.encoder.name, + routes=self.routes, + ) + + def to_json(self, file_path: str): + config = self.to_config() + config.to_file(file_path) + + def to_yaml(self, file_path: str): + config = self.to_config() + config.to_file(file_path) + + def get_thresholds(self) -> Dict[str, float]: + # TODO: float() below is hacky fix for lint, fix this with new type? + thresholds = { + route.name: float(route.score_threshold or self.score_threshold) + for route in self.routes + } + return thresholds + + def fit( + self, + X: List[str], + y: List[str], + batch_size: int = 500, + max_iter: int = 500, + local_execution: bool = False, + ): + original_index = self.index + if local_execution: + # Switch to a local index for fitting + from semantic_router.index.local import LocalIndex + + remote_routes = self.index.get_utterances() + # TODO Enhance by retrieving directly the vectors instead of embedding all utterances again + routes, utterances, function_schemas, metadata = map( + list, zip(*remote_routes) + ) + embeddings = self.encoder(utterances) + self.index = LocalIndex() + self.index.add( + embeddings=embeddings, + routes=routes, + utterances=utterances, + metadata_list=metadata, + ) + + # convert inputs into array + Xq: List[List[float]] = [] + for i in tqdm(range(0, len(X), batch_size), desc="Generating embeddings"): + emb = np.array(self.encoder(X[i : i + batch_size])) + Xq.extend(emb) + # initial eval (we will iterate from here) + best_acc = self._vec_evaluate(Xq=np.array(Xq), y=y) + best_thresholds = self.get_thresholds() + # begin fit + for _ in (pbar := tqdm(range(max_iter), desc="Training")): + pbar.set_postfix({"acc": round(best_acc, 2)}) + # Find the best score threshold for each route + thresholds = threshold_random_search( + route_layer=self, + search_range=0.8, + ) + # update current route layer + self._update_thresholds(score_thresholds=thresholds) + # evaluate + acc = self._vec_evaluate(Xq=Xq, y=y) + # update best + if acc > best_acc: + best_acc = acc + best_thresholds = thresholds + # update route layer to best thresholds + self._update_thresholds(score_thresholds=best_thresholds) + + if local_execution: + # Switch back to the original index + self.index = original_index + + def evaluate(self, X: List[str], y: List[str], batch_size: int = 500) -> float: + """ + Evaluate the accuracy of the route selection. + """ + Xq: List[List[float]] = [] + for i in tqdm(range(0, len(X), batch_size), desc="Generating embeddings"): + emb = np.array(self.encoder(X[i : i + batch_size])) + Xq.extend(emb) + + accuracy = self._vec_evaluate(Xq=np.array(Xq), y=y) + return accuracy + + def _vec_evaluate(self, Xq: Union[List[float], Any], y: List[str]) -> float: + """ + Evaluate the accuracy of the route selection. + """ + correct = 0 + for xq, target_route in zip(Xq, y): + # We treate dynamic routes as static here, because when evaluating we use only vectors, and dynamic routes expect strings by default. + route_choice = self(vector=xq, simulate_static=True) + if route_choice.name == target_route: + correct += 1 + accuracy = correct / len(Xq) + return accuracy + + def _get_route_names(self) -> List[str]: + return [route.name for route in self.routes] + + +def threshold_random_search( + route_layer: BaseRouteLayer, + search_range: Union[int, float], +) -> Dict[str, float]: + """Performs a random search iteration given a route layer and a search range.""" + # extract the route names + routes = route_layer.get_thresholds() + route_names = list(routes.keys()) + route_thresholds = list(routes.values()) + # generate search range for each + score_threshold_values = [] + for threshold in route_thresholds: + score_threshold_values.append( + np.linspace( + start=max(threshold - search_range, 0.0), + stop=min(threshold + search_range, 1.0), + num=100, + ) + ) + # Generate a random threshold for each route + score_thresholds = { + route: random.choice(score_threshold_values[i]) + for i, route in enumerate(route_names) + } + return score_thresholds diff --git a/semantic_router/routers/hybrid.py b/semantic_router/routers/hybrid.py new file mode 100644 index 0000000000000000000000000000000000000000..b65607e74add2c642b478373e83d033d5cffa736 --- /dev/null +++ b/semantic_router/routers/hybrid.py @@ -0,0 +1,212 @@ +from typing import Any, Dict, List, Optional, Tuple +import asyncio +from pydantic.v1 import validator, Field + +import numpy as np +from numpy.linalg import norm + +from semantic_router.encoders import ( + BaseEncoder, + BM25Encoder, + TfidfEncoder, +) +from semantic_router.route import Route +from semantic_router.index.hybrid_local import HybridLocalIndex +from semantic_router.schema import RouteChoice +from semantic_router.utils.logger import logger +from semantic_router.routers.base import BaseRouteLayer +from semantic_router.llms import BaseLLM + + +class HybridRouteLayer(BaseRouteLayer): + """A hybrid layer that uses both dense and sparse embeddings to classify routes. + """ + # there are a few additional attributes for hybrid + sparse_encoder: BM25Encoder = Field(default_factory=BM25Encoder) + alpha: float = 0.3 + index: HybridLocalIndex = Field(default_factory=HybridLocalIndex) + + def __init__( + self, + encoder: BaseEncoder, + sparse_encoder: Optional[BM25Encoder] = None, + llm: Optional[BaseLLM] = None, + routes: List[Route] = [], + index: Optional[HybridLocalIndex] = None, + top_k: int = 5, + aggregation: str = "mean", + auto_sync: Optional[str] = None, + alpha: float = 0.3, + ): + super().__init__( + encoder=encoder, + llm=llm, + routes=routes.copy(), + index=index, + top_k=top_k, + aggregation=aggregation, + auto_sync=auto_sync, + ) + # initialize sparse encoder + if sparse_encoder is None: + logger.warning("No sparse_encoder provided. Using default BM25Encoder.") + self.sparse_encoder = BM25Encoder() + else: + self.sparse_encoder = sparse_encoder + # set alpha + self.alpha = alpha + # fit sparse encoder if needed + if isinstance(self.sparse_encoder, TfidfEncoder) and hasattr( + self.sparse_encoder, "fit" + ): + self.sparse_encoder.fit(routes) + # initialize index if not provided + # TODO: add check for hybrid compatible index + if self.index is None: + logger.warning("No index provided. Using default HybridLocalIndex.") + self.index = HybridLocalIndex() + # add routes if we have them + if routes: + for route in routes: + self.add(route) + + @validator("sparse_encoder", pre=True, always=True) + def set_sparse_encoder(cls, v): + return v if v is not None else BM25Encoder() + + @validator("index", pre=True, always=True) + def set_index(cls, v): + return v if v is not None else HybridLocalIndex() + + def _encode(self, text: List[str]) -> Any: + """Given some text, generates dense and sparse embeddings, then scales them + using the chosen alpha value. + """ + # TODO: should encode "content" rather than text + # TODO: add alpha as a parameter + # create dense query vector + xq_d = np.array(self.encoder(text)) + #xq_d = np.squeeze(xq_d) # Reduce to 1d array. + # create sparse query vector + xq_s = np.array(self.sparse_encoder(text)) + #xq_s = np.squeeze(xq_s) + # convex scaling + xq_d, xq_s = self._convex_scaling(xq_d, xq_s) + return xq_d, xq_s + + async def _async_encode(self, text: List[str]) -> Any: + """Given some text, generates dense and sparse embeddings, then scales them + using the chosen alpha value. + """ + # TODO: should encode "content" rather than text + # TODO: add alpha as a parameter + # async encode both dense and sparse + dense_coro = self.encoder.acall(text) + sparse_coro = self.sparse_encoder.acall(text) + dense_vec, sparse_vec = await asyncio.gather(dense_coro, sparse_coro) + # create dense query vector + xq_d = np.array(dense_vec) + #xq_d = np.squeeze(xq_d) # reduce to 1d array + # create sparse query vector + xq_s = np.array(sparse_vec) + #xq_s = np.squeeze(xq_s) + # convex scaling + xq_d, xq_s = self._convex_scaling(xq_d, xq_s) + return xq_d, xq_s + + def __call__( + self, + text: Optional[str] = None, + vector: Optional[List[float]] = None, + simulate_static: bool = False, + route_filter: Optional[List[str]] = None, + sparse_vector: Optional[List[float]] = None, + ) -> RouteChoice: + # if no vector provided, encode text to get vector + if vector is None: + if text is None: + raise ValueError("Either text or vector must be provided") + vector, potential_sparse_vector = self._encode(text=[text]) + if sparse_vector is None: + if text is None: + raise ValueError("Either text or sparse_vector must be provided") + sparse_vector = potential_sparse_vector + # TODO: add alpha as a parameter + scores, route_names = self.index.query( + vector=np.array(vector) if isinstance(vector, list) else vector, + top_k=self.top_k, + route_filter=route_filter, + sparse_vector=np.array(sparse_vector) if isinstance(sparse_vector, list) else sparse_vector, + ) + top_class, top_class_scores = self._semantic_classify(list(zip(scores, route_names))) + passed = self._pass_threshold(top_class_scores, self.score_threshold) + if passed: + return RouteChoice( + name=top_class, + similarity_score=max(top_class_scores) + ) + else: + return RouteChoice() + + def add(self, route: Route): + self.routes += [route] + + route_names = [route.name] * len(route.utterances) + + # create embeddings for all routes + logger.info(f"Encoding route {route.name}") + dense_embeds, sparse_embeds = self._encode(route.utterances) + self.index.add( + embeddings=dense_embeds, + sparse_embeddings=sparse_embeds, + routes=route_names, # TODO: aligning names of routes v route_names + utterances=route.utterances, + ) + # TODO: in some places we say vector, sparse_vector and in others + # TODO: we say embeddings, sparse_embeddings + + def _convex_scaling(self, dense: np.ndarray, sparse: np.ndarray): + # scale sparse and dense vecs + dense = np.array(dense) * self.alpha + sparse = np.array(sparse) * (1 - self.alpha) + return dense, sparse + + def _set_aggregation_method(self, aggregation: str = "sum"): + if aggregation == "sum": + return lambda x: sum(x) + elif aggregation == "mean": + return lambda x: np.mean(x) + elif aggregation == "max": + return lambda x: max(x) + else: + raise ValueError( + f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'." + ) + + def _semantic_classify(self, query_results: List[Tuple]) -> Tuple[str, List[float]]: + scores_by_class: Dict[str, List[float]] = {} + for score, route in query_results: + if route in scores_by_class: + scores_by_class[route].append(score) + else: + scores_by_class[route] = [score] + + # Calculate total score for each class + total_scores = { + route: self.aggregation_method(scores) + for route, scores in scores_by_class.items() + } + top_class = max(total_scores, key=lambda x: total_scores[x], default=None) + + # Return the top class and its associated scores + if top_class is not None: + return str(top_class), scores_by_class.get(top_class, []) + else: + logger.warning("No classification found for semantic classifier.") + return "", [] + + def _pass_threshold(self, scores: List[float], threshold: float) -> bool: + if scores: + return max(scores) > threshold + else: + return False diff --git a/semantic_router/layer.py b/semantic_router/routers/semantic.py similarity index 98% rename from semantic_router/layer.py rename to semantic_router/routers/semantic.py index ec051633364696ebf3c0361bd229e4a445a1f392..499bdd73722df88b9e02d3abd5d0afaee4e73cdd 100644 --- a/semantic_router/layer.py +++ b/semantic_router/routers/semantic.py @@ -4,6 +4,7 @@ import os import random import hashlib from typing import Any, Dict, List, Optional, Tuple, Union +from pydantic.v1 import BaseModel import numpy as np import yaml # type: ignore @@ -271,10 +272,14 @@ class LayerConfig: ) -class RouteLayer: +class RouteLayer(BaseModel): score_threshold: float encoder: BaseEncoder index: BaseIndex + llm: Optional[BaseLLM] = None + top_k: int = 5 + aggregation: str = "mean" + auto_sync: Optional[str] = None def __init__( self, @@ -283,7 +288,7 @@ class RouteLayer: routes: Optional[List[Route]] = None, index: Optional[BaseIndex] = None, # type: ignore top_k: int = 5, - aggregation: str = "sum", + aggregation: str = "mean", auto_sync: Optional[str] = None, ): self.index: BaseIndex = index if index is not None else LocalIndex() @@ -891,20 +896,6 @@ class RouteLayer: ) return diff_obj.to_utterance_str(include_metadata=include_metadata) - def _add_and_sync_routes(self, routes: List[Route]): - self.routes.extend(routes) - # first we get remote and local utterances - remote_utterances = self.index.get_utterances() - local_utterances = self.to_config().to_utterances() - - diff_obj = UtteranceDiff.from_utterances( - local_utterances=local_utterances, remote_utterances=remote_utterances - ) - sync_strategy = diff_obj.get_sync_strategy(sync_mode=self.auto_sync) - self._execute_sync_strategy(strategy=sync_strategy) - # update remote hash - self._write_hash() - def _extract_routes_details( self, routes: List[Route], include_metadata: bool = False ) -> Tuple: diff --git a/tests/unit/test_hybrid_layer.py b/tests/unit/test_hybrid_layer.py index fbf14566e3c9051331a3825fe7d952213ec38985..0859fc8394fd71c0b23071c95e08b66d1049c945 100644 --- a/tests/unit/test_hybrid_layer.py +++ b/tests/unit/test_hybrid_layer.py @@ -8,7 +8,7 @@ from semantic_router.encoders import ( OpenAIEncoder, TfidfEncoder, ) -from semantic_router.hybrid_layer import HybridRouteLayer +from semantic_router.OLD_hybrid_layer import HybridRouteLayer from semantic_router.route import Route diff --git a/tests/unit/test_layer.py b/tests/unit/test_router.py similarity index 99% rename from tests/unit/test_layer.py rename to tests/unit/test_router.py index ead3f6d39204d9873785791a4b05fb9562441f1b..2b39410d2f400017865308b02f1766ac1b618b21 100644 --- a/tests/unit/test_layer.py +++ b/tests/unit/test_router.py @@ -10,7 +10,7 @@ from semantic_router.encoders import BaseEncoder, CohereEncoder, OpenAIEncoder from semantic_router.index.local import LocalIndex from semantic_router.index.pinecone import PineconeIndex from semantic_router.index.qdrant import QdrantIndex -from semantic_router.layer import LayerConfig, RouteLayer +from semantic_router.routers import LayerConfig, RouteLayer from semantic_router.llms.base import BaseLLM from semantic_router.route import Route from platform import python_version diff --git a/tests/unit/test_sync.py b/tests/unit/test_sync.py index 6235088fcbf4dca05afafccdd4e0a35a397cbc41..f38635c2a164b90698877105c8882bb5e17723a4 100644 --- a/tests/unit/test_sync.py +++ b/tests/unit/test_sync.py @@ -7,7 +7,7 @@ from typing import Optional from semantic_router.encoders import BaseEncoder, CohereEncoder, OpenAIEncoder from semantic_router.index.pinecone import PineconeIndex from semantic_router.schema import Utterance -from semantic_router.layer import RouteLayer +from semantic_router.routers.base import RouteLayer from semantic_router.route import Route from platform import python_version