From e26d120aa20285843ae8eb721bf7b80a40e80b4a Mon Sep 17 00:00:00 2001 From: James Briggs <35938317+jamescalam@users.noreply.github.com> Date: Sat, 10 Feb 2024 13:51:58 +0100 Subject: [PATCH] refactor index code --- .../{indices => index}/__init__.py | 0 semantic_router/{indices => index}/base.py | 11 +- .../local_index.py => index/local.py} | 22 +-- semantic_router/index/pinecone.py | 134 ++++++++++++++++++ semantic_router/indices/pinecone.py | 70 --------- semantic_router/layer.py | 62 +++----- semantic_router/schema.py | 6 +- 7 files changed, 179 insertions(+), 126 deletions(-) rename semantic_router/{indices => index}/__init__.py (100%) rename semantic_router/{indices => index}/base.py (83%) rename semantic_router/{indices/local_index.py => index/local.py} (63%) create mode 100644 semantic_router/index/pinecone.py delete mode 100644 semantic_router/indices/pinecone.py diff --git a/semantic_router/indices/__init__.py b/semantic_router/index/__init__.py similarity index 100% rename from semantic_router/indices/__init__.py rename to semantic_router/index/__init__.py diff --git a/semantic_router/indices/base.py b/semantic_router/index/base.py similarity index 83% rename from semantic_router/indices/base.py rename to semantic_router/index/base.py index a2c43db4..4a7764e3 100644 --- a/semantic_router/indices/base.py +++ b/semantic_router/index/base.py @@ -1,5 +1,5 @@ from pydantic.v1 import BaseModel -from typing import Any, List, Tuple, Optional +from typing import Any, List, Tuple, Optional, Union import numpy as np @@ -13,16 +13,19 @@ class BaseIndex(BaseModel): # You can define common attributes here if there are any. # For example, a placeholder for the index attribute: index: Optional[Any] = None - type: str = "" + routes: Optional[List[str]] = None + utterances: Optional[List[str]] = None + dimensions: Union[int, None] = None + type: str = "base" - def add(self, embeds: List[Any]): + def add(self, embeddings: List[float], routes: List[str], utterances: List[str]): """ Add embeddings to the index. This method should be implemented by subclasses. """ raise NotImplementedError("This method should be implemented by subclasses.") - def remove(self, indices_to_remove: List[int]): + def delete(self, indices_to_remove: List[int]): """ Remove items from the index by their indices. This method should be implemented by subclasses. diff --git a/semantic_router/indices/local_index.py b/semantic_router/index/local.py similarity index 63% rename from semantic_router/indices/local_index.py rename to semantic_router/index/local.py index 8433c004..27e39704 100644 --- a/semantic_router/indices/local_index.py +++ b/semantic_router/index/local.py @@ -1,36 +1,40 @@ import numpy as np from typing import List, Any, Tuple, Optional from semantic_router.linear import similarity_matrix, top_scores -from semantic_router.indices.base import BaseIndex +from semantic_router.index.base import BaseIndex class LocalIndex(BaseIndex): - def __init__(self, **data): - super().__init__(**data) + def __init__(self): + super().__init__() self.type = "local" class Config: # Stop pydantic from complaining about Optional[np.ndarray] type hints. arbitrary_types_allowed = True - def add(self, embeds: List[Any]): - embeds = np.array(embeds) # type: ignore + def add(self, embeddings: List[List[float]], routes: List[str], utterances: List[str]): + embeds = np.array(embeddings) # type: ignore if self.index is None: self.index = embeds # type: ignore else: self.index = np.concatenate([self.index, embeds]) - def remove(self, indices_to_remove: List[int]): + def delete(self, indices_to_remove: List[int]): """ Remove all items of a specific category from the index. """ if self.index is not None: self.index = np.delete(self.index, indices_to_remove, axis=0) - def is_index_populated(self): - return self.index is not None and len(self.index) > 0 + def describe(self): + return { + "type": self.type, + "dimensions": self.index.shape[1] if self.index is not None else 0, + "vectors": self.index.shape[0] if self.index is not None else 0 + } - def query(self, query_vector: Any, top_k: int = 5) -> Tuple[np.ndarray, np.ndarray]: + def query(self, query_vector: np.ndarray, top_k: int = 5) -> Tuple[np.ndarray, List[str]]: """ Search the index for the query and return top_k results. """ diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py new file mode 100644 index 00000000..792381e2 --- /dev/null +++ b/semantic_router/index/pinecone.py @@ -0,0 +1,134 @@ +from pydantic.v1 import BaseModel, Field +import time +import os +from typing import Any, List, Tuple, Optional, Union +from semantic_router.index.base import BaseIndex +import numpy as np +import uuid + + + +class PineconeRecord(BaseModel): + id: str = Field(default_factory=lambda: f"utt_{uuid.uuid4().hex}") + values: List[float] + route: str + utterance: str + + def to_dict(self): + return { + "id": self.id, + "values": self.values, + "metadata": { + "sr_route": self.route, + "sr_utterance": self.utterance + } + } + + +class PineconeIndex(BaseIndex): + index_prefix: str = "semantic-router--" + index_name: str = "index" + dimensions: Union[int, None] = None + metric: str = "cosine" + cloud: str = "aws" + region: str = "us-west-2" + client: Any = Field(default=None, exclude=True) + index: Optional[Any] = Field(default=None, exclude=True) + + def __init__(self, **data): + super().__init__(**data) + self._initialize_client() + + self.type = "pinecone" + self.client = self._initialize_client() + if not self.index_name.startswith(self.index_prefix): + self.index_name = f"{self.index_prefix}{self.index_name}" + # Create or connect to an existing Pinecone index + self.index = self._init_index() + + def _initialize_client(self, api_key: Optional[str] = None): + try: + from pinecone import Pinecone, ServerlessSpec + self.ServerlessSpec = ServerlessSpec + except ImportError: + raise ImportError( + "Please install pinecone-client to use PineconeIndex. " + "You can install it with: " + "`pip install 'semantic-router[pinecone]'`" + ) + api_key = api_key or os.getenv("PINECONE_API_KEY") + if api_key is None: + raise ValueError("Pinecone API key is required.") + return Pinecone(api_key=api_key) + + def _init_index(self, force_create: bool = False) -> Union[Any, None]: + index_exists = self.index_name in self.client.list_indexes().names() + dimensions_given = self.dimensions is not None + if dimensions_given and not index_exists: + # if the index doesn't exist and we have dimension value + # we create the index + self.client.create_index( + name=self.index_name, + dimension=self.dimensions, + metric=self.metric, + spec=self.ServerlessSpec( + cloud=self.cloud, + region=self.region + ) + ) + # wait for index to be created + while not self.client.describe_index(self.index_name).status["ready"]: + time.sleep(1) + index = self.client.Index(self.index_name) + time.sleep(0.5) + elif index_exists: + # if the index exists we just return it + index = self.client.Index(self.index_name) + # grab the dimensions from the index + self.dimensions = index.describe_index_stats()["dimension"] + elif force_create and not dimensions_given: + raise ValueError("Cannot create an index without specifying the dimensions.") + else: + # if the index doesn't exist and we don't have the dimensions + # we return None + index = None + return index + + def add(self, embeddings: List[List[float]], routes: List[str], utterances: List[str]): + if self.index is None: + self.dimensions = self.dimensions or len(embeddings[0]) + # we set force_create to True as we MUST have an index to add data + self.index = self._init_index(force_create=True) + vectors_to_upsert = [] + for vector, route, utterance in zip(embeddings, routes, utterances): + record = PineconeRecord(values=vector, route=route, utterance=utterance) + vectors_to_upsert.append(record.to_dict()) + self.index.upsert(vectors=vectors_to_upsert) + + def delete(self, ids_to_remove: List[str]): + self.index.delete(ids=ids_to_remove) + + def delete_all(self): + self.index.delete(delete_all=True) + + def describe(self) -> bool: + stats = self.index.describe_index_stats() + return { + "type": self.type, + "dimensions": stats["dimension"], + "vectors": stats["total_vector_count"] + } + + def query(self, query_vector: np.ndarray, top_k: int = 5) -> Tuple[np.ndarray, List[str]]: + query_vector_list = query_vector.tolist() + results = self.index.query( + vector=[query_vector_list], + top_k=top_k, + include_metadata=True + ) + scores = [result["score"] for result in results["matches"]] + route_names = [result["metadata"]["sr_route"] for result in results["matches"]] + return np.array(scores), route_names + + def delete_index(self): + self.client.delete_index(self.index_name) \ No newline at end of file diff --git a/semantic_router/indices/pinecone.py b/semantic_router/indices/pinecone.py deleted file mode 100644 index e95d5761..00000000 --- a/semantic_router/indices/pinecone.py +++ /dev/null @@ -1,70 +0,0 @@ -from pydantic import BaseModel, Field -import os -import pinecone -from typing import Any, List, Tuple -from semantic_router.indices.base import BaseIndex -import numpy as np -import uuid - -class PineconeIndex(BaseIndex): - index_name: str - dimension: int = 768 - metric: str = "cosine" - cloud: str = "aws" - region: str = "us-west-2" - pinecone: Any = Field(default=None, exclude=True) - vector_id_counter: int = -1 - - def __init__(self, **data): - super().__init__(**data) - - self.type = "pinecone" - self.pinecone = pinecone.Pinecone(api_key=os.getenv("PINECONE_API_KEY")) - - # Create or connect to an existing Pinecone index - if self.index_name not in self.pinecone.list_indexes().names(): - print(f"Creating new Pinecone index: {self.index_name}") - self.pinecone.create_index( - name=self.index_name, - dimension=self.dimension, - metric=self.metric, - spec=pinecone.ServerlessSpec( - cloud=self.cloud, - region=self.region - ) - ) - self.index = self.pinecone.Index(self.index_name) - - def add(self, embeds_with_route_names: List[Tuple[List[float], str]]): - vectors_to_upsert = [] - for vector, route_name in embeds_with_route_names: - vector_id = str(uuid.uuid4()) - vectors_to_upsert.append({ - "id": vector_id, - "values": vector, - "metadata": {"route_name": route_name} - }) - self.index.upsert(vectors=vectors_to_upsert) - - def remove(self, ids_to_remove: List[str]): - self.index.delete(ids=ids_to_remove) - - def remove_all(self): - self.index.delete(delete_all=True) - - def is_index_populated(self) -> bool: - stats = self.index.describe_index_stats() - return stats["dimension"] > 0 and stats["total_vector_count"] > 0 - - def query(self, query_vector: np.ndarray, top_k: int = 5) -> Tuple[np.ndarray, List[str]]: - query_vector_list = query_vector.tolist() - results = self.index.query( - vector=[query_vector_list], - top_k=top_k, - include_metadata=True) - scores = [result["score"] for result in results["matches"]] - route_names = [result["metadata"]["route_name"] for result in results["matches"]] - return np.array(scores), route_names - - def delete_index(self): - pinecone.delete_index(self.index_name) \ No newline at end of file diff --git a/semantic_router/layer.py b/semantic_router/layer.py index 8634af07..89d084e8 100644 --- a/semantic_router/layer.py +++ b/semantic_router/layer.py @@ -12,8 +12,8 @@ from semantic_router.llms import BaseLLM, OpenAILLM from semantic_router.route import Route from semantic_router.schema import Encoder, EncoderType, RouteChoice from semantic_router.utils.logger import logger -from semantic_router.indices.base import BaseIndex -from semantic_router.indices.local_index import LocalIndex +from semantic_router.index.base import BaseIndex +from semantic_router.index.local import LocalIndex def is_valid(layer_config: str) -> bool: @@ -276,23 +276,12 @@ class RouteLayer: if route.score_threshold is None: route.score_threshold = self.score_threshold - # Embed route arrays with method that depends on index type. - if self.index.type == "local": - # create route array - if self.categories is None: - self.categories = np.array([route.name] * len(embeds)) - else: - str_arr = np.array([route.name] * len(embeds)) - self.categories = np.concatenate([self.categories, str_arr]) - self.index.add(embeds) - elif self.index.type == "pinecone": - vectors_to_upsert = [] - for _, embed in enumerate(embeds): - vectors_to_upsert.append((embed, route.name)) - self.index.add(vectors_to_upsert) - - # add route to routes list - self.routes.append(route) + # add routes to the index + self.index.add( + embeddings=embeds, + routes=[route.name] * len(route.utterances), + utterances=route.utterances, + ) def list_route_names(self) -> List[str]: return [route.name for route in self.routes] @@ -319,17 +308,14 @@ class RouteLayer: # create embeddings for all routes all_utterances = [utterance for route in routes for utterance in route.utterances] embedded_utterances = self.encoder(all_utterances) - # create route array route_names = [route.name for route in routes for _ in route.utterances] - - if self.index.type == "local": - # For local index, just add the embeddings directly - self.index.add(embedded_utterances) - elif self.index.type == "pinecone": - # For Pinecone, prepare a list of 2-tuples with embeddings and route names - vectors_to_upsert = list(zip(embedded_utterances, route_names)) - self.index.add(vectors_to_upsert) + # add everything to the index + self.index.add( + embeddings=embedded_utterances, + routes=route_names, + utterances=all_utterances + ) def _encode(self, text: str) -> Any: """Given some text, encode it.""" @@ -340,18 +326,14 @@ class RouteLayer: def _retrieve(self, xq: Any, top_k: int = 5) -> List[dict]: """Given a query vector, retrieve the top_k most similar records.""" - if self.index.is_index_populated(): - # calculate similarity matrix - if self.index.type == "local": - scores, idx = self.index.query(xq, top_k) - # get the utterance categories (route names) - routes = self.categories[idx] if self.categories is not None else [] - elif self.index.type == "pinecone": - scores, routes = self.index.query(xq, top_k) - return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)] - else: - logger.warning("No index found for route layer.") - return [] + # calculate similarity matrix + if self.index.type == "local": + scores, idx = self.index.query(xq, top_k) + # get the utterance categories (route names) + routes = self.categories[idx] if self.categories is not None else [] + elif self.index.type == "pinecone": + scores, routes = self.index.query(xq, top_k) + return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)] def _semantic_classify(self, query_results: List[dict]) -> Tuple[str, List[float]]: scores_by_class: Dict[str, List[float]] = {} diff --git a/semantic_router/schema.py b/semantic_router/schema.py index 61f9b0b6..2ac7280a 100644 --- a/semantic_router/schema.py +++ b/semantic_router/schema.py @@ -11,9 +11,9 @@ from semantic_router.encoders import ( OpenAIEncoder, ) -from semantic_router.indices.local_index import LocalIndex -from semantic_router.indices.pinecone import PineconeIndex -from semantic_router.indices.base import BaseIndex +from semantic_router.index.local import LocalIndex +from semantic_router.index.pinecone import PineconeIndex +from semantic_router.index.base import BaseIndex class EncoderType(Enum): HUGGINGFACE = "huggingface" -- GitLab