Skip to content
Snippets Groups Projects
Unverified Commit e26d120a authored by James Briggs's avatar James Briggs
Browse files

refactor index code

parent 8a38ccdb
No related branches found
No related tags found
No related merge requests found
from pydantic.v1 import BaseModel from pydantic.v1 import BaseModel
from typing import Any, List, Tuple, Optional from typing import Any, List, Tuple, Optional, Union
import numpy as np import numpy as np
...@@ -13,16 +13,19 @@ class BaseIndex(BaseModel): ...@@ -13,16 +13,19 @@ class BaseIndex(BaseModel):
# You can define common attributes here if there are any. # You can define common attributes here if there are any.
# For example, a placeholder for the index attribute: # For example, a placeholder for the index attribute:
index: Optional[Any] = None index: Optional[Any] = None
type: str = "" routes: Optional[List[str]] = None
utterances: Optional[List[str]] = None
dimensions: Union[int, None] = None
type: str = "base"
def add(self, embeds: List[Any]): def add(self, embeddings: List[float], routes: List[str], utterances: List[str]):
""" """
Add embeddings to the index. Add embeddings to the index.
This method should be implemented by subclasses. This method should be implemented by subclasses.
""" """
raise NotImplementedError("This method should be implemented by subclasses.") raise NotImplementedError("This method should be implemented by subclasses.")
def remove(self, indices_to_remove: List[int]): def delete(self, indices_to_remove: List[int]):
""" """
Remove items from the index by their indices. Remove items from the index by their indices.
This method should be implemented by subclasses. This method should be implemented by subclasses.
......
import numpy as np import numpy as np
from typing import List, Any, Tuple, Optional from typing import List, Any, Tuple, Optional
from semantic_router.linear import similarity_matrix, top_scores from semantic_router.linear import similarity_matrix, top_scores
from semantic_router.indices.base import BaseIndex from semantic_router.index.base import BaseIndex
class LocalIndex(BaseIndex): class LocalIndex(BaseIndex):
def __init__(self, **data): def __init__(self):
super().__init__(**data) super().__init__()
self.type = "local" self.type = "local"
class Config: # Stop pydantic from complaining about Optional[np.ndarray] type hints. class Config: # Stop pydantic from complaining about Optional[np.ndarray] type hints.
arbitrary_types_allowed = True arbitrary_types_allowed = True
def add(self, embeds: List[Any]): def add(self, embeddings: List[List[float]], routes: List[str], utterances: List[str]):
embeds = np.array(embeds) # type: ignore embeds = np.array(embeddings) # type: ignore
if self.index is None: if self.index is None:
self.index = embeds # type: ignore self.index = embeds # type: ignore
else: else:
self.index = np.concatenate([self.index, embeds]) self.index = np.concatenate([self.index, embeds])
def remove(self, indices_to_remove: List[int]): def delete(self, indices_to_remove: List[int]):
""" """
Remove all items of a specific category from the index. Remove all items of a specific category from the index.
""" """
if self.index is not None: if self.index is not None:
self.index = np.delete(self.index, indices_to_remove, axis=0) self.index = np.delete(self.index, indices_to_remove, axis=0)
def is_index_populated(self): def describe(self):
return self.index is not None and len(self.index) > 0 return {
"type": self.type,
"dimensions": self.index.shape[1] if self.index is not None else 0,
"vectors": self.index.shape[0] if self.index is not None else 0
}
def query(self, query_vector: Any, top_k: int = 5) -> Tuple[np.ndarray, np.ndarray]: def query(self, query_vector: np.ndarray, top_k: int = 5) -> Tuple[np.ndarray, List[str]]:
""" """
Search the index for the query and return top_k results. Search the index for the query and return top_k results.
""" """
......
from pydantic.v1 import BaseModel, Field
import time
import os
from typing import Any, List, Tuple, Optional, Union
from semantic_router.index.base import BaseIndex
import numpy as np
import uuid
class PineconeRecord(BaseModel):
id: str = Field(default_factory=lambda: f"utt_{uuid.uuid4().hex}")
values: List[float]
route: str
utterance: str
def to_dict(self):
return {
"id": self.id,
"values": self.values,
"metadata": {
"sr_route": self.route,
"sr_utterance": self.utterance
}
}
class PineconeIndex(BaseIndex):
index_prefix: str = "semantic-router--"
index_name: str = "index"
dimensions: Union[int, None] = None
metric: str = "cosine"
cloud: str = "aws"
region: str = "us-west-2"
client: Any = Field(default=None, exclude=True)
index: Optional[Any] = Field(default=None, exclude=True)
def __init__(self, **data):
super().__init__(**data)
self._initialize_client()
self.type = "pinecone"
self.client = self._initialize_client()
if not self.index_name.startswith(self.index_prefix):
self.index_name = f"{self.index_prefix}{self.index_name}"
# Create or connect to an existing Pinecone index
self.index = self._init_index()
def _initialize_client(self, api_key: Optional[str] = None):
try:
from pinecone import Pinecone, ServerlessSpec
self.ServerlessSpec = ServerlessSpec
except ImportError:
raise ImportError(
"Please install pinecone-client to use PineconeIndex. "
"You can install it with: "
"`pip install 'semantic-router[pinecone]'`"
)
api_key = api_key or os.getenv("PINECONE_API_KEY")
if api_key is None:
raise ValueError("Pinecone API key is required.")
return Pinecone(api_key=api_key)
def _init_index(self, force_create: bool = False) -> Union[Any, None]:
index_exists = self.index_name in self.client.list_indexes().names()
dimensions_given = self.dimensions is not None
if dimensions_given and not index_exists:
# if the index doesn't exist and we have dimension value
# we create the index
self.client.create_index(
name=self.index_name,
dimension=self.dimensions,
metric=self.metric,
spec=self.ServerlessSpec(
cloud=self.cloud,
region=self.region
)
)
# wait for index to be created
while not self.client.describe_index(self.index_name).status["ready"]:
time.sleep(1)
index = self.client.Index(self.index_name)
time.sleep(0.5)
elif index_exists:
# if the index exists we just return it
index = self.client.Index(self.index_name)
# grab the dimensions from the index
self.dimensions = index.describe_index_stats()["dimension"]
elif force_create and not dimensions_given:
raise ValueError("Cannot create an index without specifying the dimensions.")
else:
# if the index doesn't exist and we don't have the dimensions
# we return None
index = None
return index
def add(self, embeddings: List[List[float]], routes: List[str], utterances: List[str]):
if self.index is None:
self.dimensions = self.dimensions or len(embeddings[0])
# we set force_create to True as we MUST have an index to add data
self.index = self._init_index(force_create=True)
vectors_to_upsert = []
for vector, route, utterance in zip(embeddings, routes, utterances):
record = PineconeRecord(values=vector, route=route, utterance=utterance)
vectors_to_upsert.append(record.to_dict())
self.index.upsert(vectors=vectors_to_upsert)
def delete(self, ids_to_remove: List[str]):
self.index.delete(ids=ids_to_remove)
def delete_all(self):
self.index.delete(delete_all=True)
def describe(self) -> bool:
stats = self.index.describe_index_stats()
return {
"type": self.type,
"dimensions": stats["dimension"],
"vectors": stats["total_vector_count"]
}
def query(self, query_vector: np.ndarray, top_k: int = 5) -> Tuple[np.ndarray, List[str]]:
query_vector_list = query_vector.tolist()
results = self.index.query(
vector=[query_vector_list],
top_k=top_k,
include_metadata=True
)
scores = [result["score"] for result in results["matches"]]
route_names = [result["metadata"]["sr_route"] for result in results["matches"]]
return np.array(scores), route_names
def delete_index(self):
self.client.delete_index(self.index_name)
\ No newline at end of file
from pydantic import BaseModel, Field
import os
import pinecone
from typing import Any, List, Tuple
from semantic_router.indices.base import BaseIndex
import numpy as np
import uuid
class PineconeIndex(BaseIndex):
index_name: str
dimension: int = 768
metric: str = "cosine"
cloud: str = "aws"
region: str = "us-west-2"
pinecone: Any = Field(default=None, exclude=True)
vector_id_counter: int = -1
def __init__(self, **data):
super().__init__(**data)
self.type = "pinecone"
self.pinecone = pinecone.Pinecone(api_key=os.getenv("PINECONE_API_KEY"))
# Create or connect to an existing Pinecone index
if self.index_name not in self.pinecone.list_indexes().names():
print(f"Creating new Pinecone index: {self.index_name}")
self.pinecone.create_index(
name=self.index_name,
dimension=self.dimension,
metric=self.metric,
spec=pinecone.ServerlessSpec(
cloud=self.cloud,
region=self.region
)
)
self.index = self.pinecone.Index(self.index_name)
def add(self, embeds_with_route_names: List[Tuple[List[float], str]]):
vectors_to_upsert = []
for vector, route_name in embeds_with_route_names:
vector_id = str(uuid.uuid4())
vectors_to_upsert.append({
"id": vector_id,
"values": vector,
"metadata": {"route_name": route_name}
})
self.index.upsert(vectors=vectors_to_upsert)
def remove(self, ids_to_remove: List[str]):
self.index.delete(ids=ids_to_remove)
def remove_all(self):
self.index.delete(delete_all=True)
def is_index_populated(self) -> bool:
stats = self.index.describe_index_stats()
return stats["dimension"] > 0 and stats["total_vector_count"] > 0
def query(self, query_vector: np.ndarray, top_k: int = 5) -> Tuple[np.ndarray, List[str]]:
query_vector_list = query_vector.tolist()
results = self.index.query(
vector=[query_vector_list],
top_k=top_k,
include_metadata=True)
scores = [result["score"] for result in results["matches"]]
route_names = [result["metadata"]["route_name"] for result in results["matches"]]
return np.array(scores), route_names
def delete_index(self):
pinecone.delete_index(self.index_name)
\ No newline at end of file
...@@ -12,8 +12,8 @@ from semantic_router.llms import BaseLLM, OpenAILLM ...@@ -12,8 +12,8 @@ from semantic_router.llms import BaseLLM, OpenAILLM
from semantic_router.route import Route from semantic_router.route import Route
from semantic_router.schema import Encoder, EncoderType, RouteChoice from semantic_router.schema import Encoder, EncoderType, RouteChoice
from semantic_router.utils.logger import logger from semantic_router.utils.logger import logger
from semantic_router.indices.base import BaseIndex from semantic_router.index.base import BaseIndex
from semantic_router.indices.local_index import LocalIndex from semantic_router.index.local import LocalIndex
def is_valid(layer_config: str) -> bool: def is_valid(layer_config: str) -> bool:
...@@ -276,23 +276,12 @@ class RouteLayer: ...@@ -276,23 +276,12 @@ class RouteLayer:
if route.score_threshold is None: if route.score_threshold is None:
route.score_threshold = self.score_threshold route.score_threshold = self.score_threshold
# Embed route arrays with method that depends on index type. # add routes to the index
if self.index.type == "local": self.index.add(
# create route array embeddings=embeds,
if self.categories is None: routes=[route.name] * len(route.utterances),
self.categories = np.array([route.name] * len(embeds)) utterances=route.utterances,
else: )
str_arr = np.array([route.name] * len(embeds))
self.categories = np.concatenate([self.categories, str_arr])
self.index.add(embeds)
elif self.index.type == "pinecone":
vectors_to_upsert = []
for _, embed in enumerate(embeds):
vectors_to_upsert.append((embed, route.name))
self.index.add(vectors_to_upsert)
# add route to routes list
self.routes.append(route)
def list_route_names(self) -> List[str]: def list_route_names(self) -> List[str]:
return [route.name for route in self.routes] return [route.name for route in self.routes]
...@@ -319,17 +308,14 @@ class RouteLayer: ...@@ -319,17 +308,14 @@ class RouteLayer:
# create embeddings for all routes # create embeddings for all routes
all_utterances = [utterance for route in routes for utterance in route.utterances] all_utterances = [utterance for route in routes for utterance in route.utterances]
embedded_utterances = self.encoder(all_utterances) embedded_utterances = self.encoder(all_utterances)
# create route array # create route array
route_names = [route.name for route in routes for _ in route.utterances] route_names = [route.name for route in routes for _ in route.utterances]
# add everything to the index
if self.index.type == "local": self.index.add(
# For local index, just add the embeddings directly embeddings=embedded_utterances,
self.index.add(embedded_utterances) routes=route_names,
elif self.index.type == "pinecone": utterances=all_utterances
# For Pinecone, prepare a list of 2-tuples with embeddings and route names )
vectors_to_upsert = list(zip(embedded_utterances, route_names))
self.index.add(vectors_to_upsert)
def _encode(self, text: str) -> Any: def _encode(self, text: str) -> Any:
"""Given some text, encode it.""" """Given some text, encode it."""
...@@ -340,18 +326,14 @@ class RouteLayer: ...@@ -340,18 +326,14 @@ class RouteLayer:
def _retrieve(self, xq: Any, top_k: int = 5) -> List[dict]: def _retrieve(self, xq: Any, top_k: int = 5) -> List[dict]:
"""Given a query vector, retrieve the top_k most similar records.""" """Given a query vector, retrieve the top_k most similar records."""
if self.index.is_index_populated(): # calculate similarity matrix
# calculate similarity matrix if self.index.type == "local":
if self.index.type == "local": scores, idx = self.index.query(xq, top_k)
scores, idx = self.index.query(xq, top_k) # get the utterance categories (route names)
# get the utterance categories (route names) routes = self.categories[idx] if self.categories is not None else []
routes = self.categories[idx] if self.categories is not None else [] elif self.index.type == "pinecone":
elif self.index.type == "pinecone": scores, routes = self.index.query(xq, top_k)
scores, routes = self.index.query(xq, top_k) return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)]
return [{"route": d, "score": s.item()} for d, s in zip(routes, scores)]
else:
logger.warning("No index found for route layer.")
return []
def _semantic_classify(self, query_results: List[dict]) -> Tuple[str, List[float]]: def _semantic_classify(self, query_results: List[dict]) -> Tuple[str, List[float]]:
scores_by_class: Dict[str, List[float]] = {} scores_by_class: Dict[str, List[float]] = {}
......
...@@ -11,9 +11,9 @@ from semantic_router.encoders import ( ...@@ -11,9 +11,9 @@ from semantic_router.encoders import (
OpenAIEncoder, OpenAIEncoder,
) )
from semantic_router.indices.local_index import LocalIndex from semantic_router.index.local import LocalIndex
from semantic_router.indices.pinecone import PineconeIndex from semantic_router.index.pinecone import PineconeIndex
from semantic_router.indices.base import BaseIndex from semantic_router.index.base import BaseIndex
class EncoderType(Enum): class EncoderType(Enum):
HUGGINGFACE = "huggingface" HUGGINGFACE = "huggingface"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment