Newer
Older
import json
from typing import Any, Dict, List, Optional, Tuple, Union
from aurelio_sdk.schema import SparseEmbedding as BM25SparseEmbedding
from pydantic import BaseModel, Field
from semantic_router.utils.logger import logger
OPENAI = "openai"
BM25 = "bm25"
TFIDF = "tfidf"
FASTEMBED = "fastembed"
HUGGINGFACE = "huggingface"
GOOGLE = "google"
class RouteChoice(BaseModel):
function_call: Optional[List[Dict]] = None
"""A message in a conversation, includes the role and content fields."""
if self.role.lower() not in ["user", "assistant", "system", "tool"]:
raise ValueError(
"Role must be either 'user', 'assistant', 'system' or 'tool'"
)
"""Convert the message to a LlamaCPP-compatible format."""
return {"role": self.role, "content": self.content}
"""A configuration parameter for a route. Used for remote router metadata such as
router hashes, sync locks, etc.
"""
default_factory=lambda: datetime.now(timezone.utc).isoformat()
"""Convert the configuration parameter to a Pinecone-compatible format. Should
be used when upserting configuration parameters to a separate config namespace
within your Pinecone index.
:param dimensions: The dimensions of the Pinecone index.
:type dimensions: int
:return: A Pinecone-compatible configuration parameter.
:rtype: dict
"""
"values": [0.1] * dimensions,
"metadata": {
"value": self.value,
"created_at": self.created_at,
class Utterance(BaseModel):
"""An utterance in a conversation, includes the route, utterance, function
schemas, metadata, and diff tag.
"""
function_schemas: Optional[List[Dict]] = None
diff_tag: str = " "
@classmethod
def from_tuple(cls, tuple_obj: Tuple):
"""Create an Utterance object from a tuple. The tuple must contain
route and utterance as the first two elements. Then optionally
function schemas and metadata as the third and fourth elements
respectively. If this order is not followed an invalid Utterance
object will be returned.
:param tuple_obj: A tuple containing route, utterance, function schemas and metadata.
:type tuple_obj: Tuple
:return: An Utterance object.
:rtype: Utterance
"""
route, utterance = tuple_obj[0], tuple_obj[1]
function_schemas = tuple_obj[2] if len(tuple_obj) > 2 else None
if isinstance(function_schemas, dict):
function_schemas = [function_schemas]
metadata = tuple_obj[3] if len(tuple_obj) > 3 else {}
return cls(
route=route,
utterance=utterance,
function_schemas=function_schemas,
)
def to_tuple(self):
"""Convert an Utterance object to a tuple.
:return: A tuple containing (route, utterance, function schemas, metadata).
:rtype: Tuple
"""
return (
self.route,
self.utterance,
self.function_schemas,
self.metadata,
)
def to_str(self, include_metadata: bool = False):
"""Convert an Utterance object to a string. Used for comparisons during sync
check operations.
:param include_metadata: Whether to include metadata in the string.
:type include_metadata: bool
:return: A string representation of the Utterance object.
:rtype: str
"""
# we sort the dicts to ensure consistent order as we need this to compare
# stringified function schemas accurately
json.dumps(schema, sort_keys=True)
for schema in self.function_schemas
]
# we must do the same for metadata
metadata_sorted = json.dumps(self.metadata, sort_keys=True)
return f"{self.route}: {self.utterance} | {function_schemas_sorted} | {metadata_sorted}"
return f"{self.route}: {self.utterance}"
def to_diff_str(self, include_metadata: bool = False):
return f"{self.diff_tag} {self.to_str(include_metadata=include_metadata)}"
class SyncMode(Enum):
"""Synchronization modes for local (route layer) and remote (index) instances."""
ERROR = "error"
REMOTE = "remote"
LOCAL = "local"
MERGE_FORCE_REMOTE = "merge-force-remote"
MERGE_FORCE_LOCAL = "merge-force-local"
MERGE = "merge"
class UtteranceDiff(BaseModel):
"""A list of Utterance objects that represent the differences between local and
remote utterances.
"""
diff: List[Utterance]
@classmethod
def from_utterances(
cls, local_utterances: List[Utterance], remote_utterances: List[Utterance]
"""Create a UtteranceDiff object from two lists of Utterance objects.
:param local_utterances: A list of Utterance objects.
:type local_utterances: List[Utterance]
:param remote_utterances: A list of Utterance objects.
:type remote_utterances: List[Utterance]
"""
local_utterances_map = {
x.to_str(include_metadata=True): x for x in local_utterances
}
remote_utterances_map = {
x.to_str(include_metadata=True): x for x in remote_utterances
}
# sort local and remote utterances
local_utterances_str = list(local_utterances_map.keys())
local_utterances_str.sort()
remote_utterances_str = list(remote_utterances_map.keys())
remote_utterances_str.sort()
# get diff
differ = Differ()
diff_obj = list(differ.compare(local_utterances_str, remote_utterances_str))
# create UtteranceDiff list
utterance_diffs = []
for line in diff_obj:
utterance_str = line[2:]
utterance_diff_tag = line[0]
if utterance_diff_tag == "?":
# this is a new line from diff string, we can ignore
continue
utterance = (
remote_utterances_map[utterance_str]
if utterance_diff_tag == "+"
else local_utterances_map[utterance_str]
)
utterance.diff_tag = utterance_diff_tag
utterance_diffs.append(utterance)
return UtteranceDiff(diff=utterance_diffs)
def to_utterance_str(self, include_metadata: bool = False) -> List[str]:
"""Outputs the utterance diff as a list of diff strings. Returns a list
of strings showing what is different in the remote when compared to the
local. For example:
[" route1: utterance1",
" route1: utterance2",
"- route2: utterance3",
"- route2: utterance4"]
Tells us that the remote is missing "route2: utterance3" and "route2:
utterance4", which do exist locally. If we see:
[" route1: utterance1",
" route1: utterance2",
"+ route2: utterance3",
"+ route2: utterance4"]
This diff tells us that the remote has "route2: utterance3" and
"route2: utterance4", which do not exist locally.
:param include_metadata: Whether to include metadata in the string.
:type include_metadata: bool
:return: A list of diff strings.
:rtype: List[str]
return [x.to_diff_str(include_metadata=include_metadata) for x in self.diff]
def get_tag(self, diff_tag: str) -> List[Utterance]:
"""Get all utterances with a given diff tag.
:param diff_tag: The diff tag to filter by. Must be one of "+", "-", or " ".
:type diff_tag: str
:return: A list of Utterance objects.
:rtype: List[Utterance]
"""
if diff_tag not in ["+", "-", " "]:
raise ValueError("diff_tag must be one of '+', '-', or ' '")
return [x for x in self.diff if x.diff_tag == diff_tag]
def get_sync_strategy(self, sync_mode: str) -> dict:
"""Generates the optimal synchronization plan for local and remote instances.
:param sync_mode: The mode to sync the routes with the remote index.
:type sync_mode: str
:return: A dictionary describing the synchronization strategy.
:rtype: dict
"""
if sync_mode not in SYNC_MODES:
raise ValueError(f"sync_mode must be one of {SYNC_MODES}")
local_only = self.get_tag("-")
local_only_mapper = {
utt.route: (utt.function_schemas, utt.metadata) for utt in local_only
}
remote_only = self.get_tag("+")
remote_only_mapper = {
utt.route: (utt.function_schemas, utt.metadata) for utt in remote_only
}
local_and_remote = self.get_tag(" ")
if sync_mode == "error":
if len(local_only) > 0 or len(remote_only) > 0:
raise ValueError(
"There are utterances that exist in the local or remote "
"instance that do not exist in the other instance. Please "
"sync the routes before running this command."
)
else:
return {
"remote": {"upsert": [], "delete": []},
"local": {"upsert": [], "delete": []},
}
elif sync_mode == "local":
return {
"remote": {
"upsert": local_only, # + remote_updates,
"delete": remote_only,
}
elif sync_mode == "remote":
return {
"remote": {"upsert": [], "delete": []},
"local": {"upsert": remote_only, "delete": local_only},
elif sync_mode == "merge-force-local": # merge-to-local merge-join-local
# get set of route names that exist in local (we keep these if
# they are in remote)
local_route_names = set([utt.route for utt in local_only])
# if we see route: utterance exists in local, we do not pull it in
local_route_utt_strs = set([utt.to_str() for utt in local_only])
remote_to_keep = [
utt
for utt in remote_only
if (
utt.route in local_route_names
and utt.to_str() not in local_route_utt_strs
)
]
# overwrite remote routes with local metadata and function schemas
logger.info(f"local_only_mapper: {local_only_mapper}")
remote_to_update = [
Utterance(
route=utt.route,
utterance=utt.utterance,
metadata=local_only_mapper[utt.route][1],
function_schemas=local_only_mapper[utt.route][0],
)
for utt in remote_only
if (
utt.route in local_only_mapper
and (
utt.metadata != local_only_mapper[utt.route][1]
or utt.function_schemas != local_only_mapper[utt.route][0]
)
)
]
remote_to_keep = [
Utterance(
route=utt.route,
utterance=utt.utterance,
metadata=local_only_mapper[utt.route][1],
function_schemas=local_only_mapper[utt.route][0],
)
for utt in remote_to_keep
if utt.to_str() not in [x.to_str() for x in remote_to_update]
]
# get remote utterances that are NOT in local
remote_to_delete = [
utt for utt in remote_only if utt.route not in local_route_names
elif sync_mode == "merge-force-remote": # merge-to-remote merge-join-remote
# get set of route names that exist in remote (we keep these if
# they are in local)
remote_route_names = set([utt.route for utt in remote_only])
# if we see route: utterance exists in remote, we do not pull it in
remote_route_utt_strs = set([utt.to_str() for utt in remote_only])
# get local utterances that are in remote
local_to_keep = [
utt
for utt in local_only
if (
utt.route in remote_route_names
and utt.to_str() not in remote_route_utt_strs
)
]
# overwrite remote routes with local metadata and function schemas
local_to_keep = [
Utterance(
route=utt.route,
utterance=utt.utterance,
metadata=remote_only_mapper[utt.route][1],
function_schemas=remote_only_mapper[utt.route][0],
)
for utt in local_to_keep
local_to_delete = [
utt for utt in local_only if utt.route not in remote_route_names
]
"remote": {"upsert": local_to_keep, "delete": []},
"local": {"upsert": remote_only, "delete": local_to_delete},
}
elif sync_mode == "merge":
# overwrite remote routes with local metadata and function schemas
remote_only_updated = [
(
Utterance(
route=utt.route,
utterance=utt.utterance,
metadata=local_only_mapper[utt.route][1],
function_schemas=local_only_mapper[utt.route][0],
)
if utt.route in local_only_mapper
else utt
)
for utt in remote_only
]
# propogate same to shared routes
shared_updated = [
Utterance(
route=utt.route,
utterance=utt.utterance,
metadata=local_only_mapper[utt.route][1],
function_schemas=local_only_mapper[utt.route][0],
)
for utt in local_and_remote
if (
utt.route in local_only_mapper
and (
utt.metadata != local_only_mapper[utt.route][1]
or utt.function_schemas != local_only_mapper[utt.route][0]
"upsert": local_only + shared_updated + remote_only_updated,
"local": {"upsert": remote_only_updated + shared_updated, "delete": []},
else:
raise ValueError(f"sync_mode must be one of {SYNC_MODES}")
"""The metric to use in vector-based similarity search indexes."""
COSINE = "cosine"
DOTPRODUCT = "dotproduct"
EUCLIDEAN = "euclidean"
MANHATTAN = "manhattan"
class SparseEmbedding(BaseModel):
"""Sparse embedding interface. Primarily uses numpy operations for faster
operations.
"""
class Config:
arbitrary_types_allowed = True
@classmethod
def from_compact_array(cls, array: np.ndarray):
"""Create a SparseEmbedding object from a compact array.
:param array: A compact array.
:type array: np.ndarray
:return: A SparseEmbedding object.
:rtype: SparseEmbedding
"""
if array.ndim != 2 or array.shape[1] != 2:
raise ValueError(
f"Expected a 2D array with 2 columns, got a {array.ndim}D array with {array.shape[1]} columns. "
"Column 0 should contain index positions, and column 1 should contain respective values."
)
return cls(embedding=array)
"""Consumes an array of sparse vectors containing zero-values.
:param vector: A sparse vector.
:type vector: np.ndarray
:return: A SparseEmbedding object.
:rtype: SparseEmbedding
"""
if vector.ndim != 1:
raise ValueError(f"Expected a 1D array, got a {vector.ndim}D array.")
return cls.from_compact_array(np.array([np.arange(len(vector)), vector]).T)
def from_aurelio(cls, embedding: BM25SparseEmbedding):
"""Create a SparseEmbedding object from an AurelioSparseEmbedding object.
:param embedding: An AurelioSparseEmbedding object.
:type embedding: BM25SparseEmbedding
:return: A SparseEmbedding object.
:rtype: SparseEmbedding
"""
arr = np.array([embedding.indices, embedding.values]).T
@classmethod
def from_dict(cls, sparse_dict: dict):
"""Create a SparseEmbedding object from a dictionary.
:param sparse_dict: A dictionary of sparse values.
:type sparse_dict: dict
:return: A SparseEmbedding object.
:rtype: SparseEmbedding
"""
arr = np.array([list(sparse_dict.keys()), list(sparse_dict.values())]).T
@classmethod
def from_pinecone_dict(cls, sparse_dict: dict):
"""Create a SparseEmbedding object from a Pinecone dictionary.
:param sparse_dict: A Pinecone dictionary.
:type sparse_dict: dict
:return: A SparseEmbedding object.
:rtype: SparseEmbedding
"""
arr = np.array([sparse_dict["indices"], sparse_dict["values"]]).T
return cls.from_compact_array(arr)
"""Convert a SparseEmbedding object to a dictionary.
:return: A dictionary of sparse values.
:rtype: dict
"""
i: v for i, v in zip(self.embedding[:, 0].astype(int), self.embedding[:, 1])
"""Convert a SparseEmbedding object to a Pinecone dictionary.
:return: A Pinecone dictionary.
:rtype: dict
"""
return {
"indices": self.embedding[:, 0].astype(int).tolist(),
"values": self.embedding[:, 1].tolist(),
}
# dictionary interface
def items(self):
"""Return a list of (index, value) tuples from the SparseEmbedding object.
:return: A list of (index, value) tuples.
:rtype: list
"""
(i, v)
for i, v in zip(self.embedding[:, 0].astype(int), self.embedding[:, 1])