Skip to content
Snippets Groups Projects
Commit aaae6f2a authored by jamescalam's avatar jamescalam
Browse files

fix: router bug fixes

parent 6bbafa4c
No related branches found
No related tags found
No related merge requests found
......@@ -250,39 +250,13 @@ class RouterConfig:
return utterances
def add(self, route: Route):
"""Add a route to the local SemanticRouter and index.
"""Add a route to the RouterConfig.
:param route: The route to add.
:type route: Route
"""
current_local_hash = self._get_hash()
current_remote_hash = self.index._read_hash()
if current_remote_hash.value == "":
# if remote hash is empty, the index is to be initialized
current_remote_hash = current_local_hash
embedded_utterances = self.encoder(route.utterances)
self.index.add(
embeddings=embedded_utterances,
routes=[route.name] * len(route.utterances),
utterances=route.utterances,
function_schemas=(
route.function_schemas * len(route.utterances)
if route.function_schemas
else [{}] * len(route.utterances)
),
metadata_list=[route.metadata if route.metadata else {}]
* len(route.utterances),
)
self.routes.append(route)
if current_local_hash.value == current_remote_hash.value:
self._write_hash() # update current hash in index
else:
logger.warning(
"Local and remote route layers were not aligned. Remote hash "
"not updated. Use `SemanticRouter.get_utterance_diff()` to see "
"details."
)
logger.info(f"Added route `{route.name}`")
def get(self, name: str) -> Optional[Route]:
for route in self.routes:
......@@ -307,7 +281,7 @@ class RouterConfig:
class BaseRouter(BaseModel):
encoder: DenseEncoder
encoder: DenseEncoder = Field(default_factory=OpenAIEncoder)
index: BaseIndex = Field(default_factory=BaseIndex)
score_threshold: Optional[float] = Field(default=None)
routes: List[Route] = []
......@@ -339,18 +313,11 @@ class BaseRouter(BaseModel):
aggregation=aggregation,
auto_sync=auto_sync,
)
if encoder is None:
logger.warning(
"No encoder provided. Using default OpenAIEncoder. Ensure "
"that you have set OPENAI_API_KEY in your environment."
)
self.encoder = OpenAIEncoder()
else:
self.encoder = encoder
self.encoder = self._get_encoder(encoder=encoder)
self.llm = llm
self.routes = routes.copy() if routes else []
# initialize index
self._set_index(index=index)
self.index =self._get_index(index=index)
# set score threshold using default method
self._set_score_threshold()
self.top_k = top_k
......@@ -372,12 +339,21 @@ class BaseRouter(BaseModel):
if self.auto_sync:
self._init_index_state()
def _set_index(self, index: Optional[BaseIndex]):
def _get_index(self, index: Optional[BaseIndex]) -> BaseIndex:
if index is None:
logger.warning("No index provided. Using default LocalIndex.")
self.index = LocalIndex()
index = LocalIndex()
else:
index = index
return index
def _get_encoder(self, encoder: Optional[DenseEncoder]) -> DenseEncoder:
if encoder is None:
logger.warning("No encoder provided. Using default OpenAIEncoder.")
encoder = OpenAIEncoder()
else:
self.index = index
encoder = encoder
return encoder
def _init_index_state(self):
"""Initializes an index (where required) and runs auto_sync if active."""
......@@ -521,20 +497,57 @@ class BaseRouter(BaseModel):
vector_arr = self._encode(text=text)
else:
vector_arr = np.array(vector)
print(f"{text=}")
print(f"{vector_arr}")
# get relevant utterances
results = self._retrieve(xq=vector_arr)
print(f"{results=}")
# decide most relevant routes
categories_with_scores = self._semantic_classify_multiple_routes(results)
print(f"{categories_with_scores=}")
return [
RouteChoice(name=category, similarity_score=score) for category, score in categories_with_scores
]
route_choices = []
for category, score in categories_with_scores:
route = self.check_for_matching_routes(category)
if route:
route_choice = RouteChoice(name=route.name, similarity_score=score)
route_choices.append(route_choice)
#route_choices = []
# TODO JB: do we need this check? Maybe we should be returning directly
#for category, score in categories_with_scores:
# route = self.check_for_matching_routes(category)
# if route:
# route_choice = RouteChoice(name=route.name, similarity_score=score)
# route_choices.append(route_choice)
return route_choices
#return route_choices
def _retrieve_top_route(
self, vector: List[float], route_filter: Optional[List[str]] = None
) -> Tuple[Optional[Route], List[float]]:
"""
Retrieve the top matching route based on the given vector.
Returns a tuple of the route (if any) and the scores of the top class.
"""
# get relevant results (scores and routes)
results = self._retrieve(
xq=np.array(vector), top_k=self.top_k, route_filter=route_filter
)
# decide most relevant routes
top_class, top_class_scores = self._semantic_classify(results)
# TODO do we need this check?
route = self.check_for_matching_routes(top_class)
return route, top_class_scores
async def _async_retrieve_top_route(
self, vector: List[float], route_filter: Optional[List[str]] = None
) -> Tuple[Optional[Route], List[float]]:
# get relevant results (scores and routes)
results = await self._async_retrieve(
xq=np.array(vector), top_k=self.top_k, route_filter=route_filter
)
# decide most relevant routes
top_class, top_class_scores = await self._async_semantic_classify(results)
# TODO do we need this check?
route = self.check_for_matching_routes(top_class)
return route, top_class_scores
def sync(self, sync_mode: str, force: bool = False) -> List[str]:
"""Runs a sync of the local routes with the remote index.
......@@ -661,40 +674,11 @@ class BaseRouter(BaseModel):
self.routes = new_routes
def _retrieve_top_route(
self, vector: List[float], route_filter: Optional[List[str]] = None
) -> Tuple[Optional[Route], List[float]]:
"""
Retrieve the top matching route based on the given vector.
Returns a tuple of the route (if any) and the scores of the top class.
"""
# get relevant results (scores and routes)
results = self._retrieve(
xq=np.array(vector), top_k=self.top_k, route_filter=route_filter
)
# decide most relevant routes
top_class, top_class_scores = self._semantic_classify(results)
# TODO do we need this check?
route = self.check_for_matching_routes(top_class)
return route, top_class_scores
async def _async_retrieve_top_route(
self, vector: List[float], route_filter: Optional[List[str]] = None
) -> Tuple[Optional[Route], List[float]]:
# get relevant results (scores and routes)
results = await self._async_retrieve(
xq=np.array(vector), top_k=self.top_k, route_filter=route_filter
)
# decide most relevant routes
top_class, top_class_scores = await self._async_semantic_classify(results)
# TODO do we need this check?
route = self.check_for_matching_routes(top_class)
return route, top_class_scores
def _check_threshold(self, scores: List[float], route: Optional[Route]) -> bool:
"""
Check if the route's score passes the specified threshold.
"""
# TODO JB: do we need this?
if route is None:
return False
threshold = (
......@@ -1124,21 +1108,55 @@ class BaseRouter(BaseModel):
scores_by_class[route] = [score]
return scores_by_class
def _pass_threshold(self, scores: List[float], threshold: float) -> bool:
def _pass_threshold(self, scores: List[float], threshold: float | None) -> bool:
"""Test if the route score passes the minimum threshold. If a threshold of None is
set, then the route will always pass no matter how low it scores.
:param scores: The scores to test.
:type scores: List[float]
:param threshold: The minimum threshold to pass.
:type threshold: float | None
:return: True if the route passes the threshold, False otherwise.
:rtype: bool
"""
if threshold is None:
return True
if scores:
return max(scores) > threshold
else:
return False
def _update_thresholds(self, score_thresholds: Optional[Dict[str, float]] = None):
def _update_thresholds(self, route_thresholds: Optional[Dict[str, float]] = None):
"""Update the score thresholds for each route using a dictionary of
route names and thresholds.
:param route_thresholds: A dictionary of route names and thresholds.
:type route_thresholds: Dict[str, float] | None
"""
Update the score thresholds for each route.
if route_thresholds:
for route, threshold in route_thresholds.items():
self.set_threshold(
threshold=threshold,
route_name=route,
)
def set_threshold(self, threshold: float, route_name: str | None = None):
"""Set the score threshold for a specific route or all routes.
:param threshold: The threshold to set.
:type threshold: float
:param route_name: The name of the route to set the threshold for. If None, the threshold will be set for all routes.
:type route_name: str | None
"""
if score_thresholds:
if route_name is None:
for route in self.routes:
route.score_threshold = score_thresholds.get(
route.name, self.score_threshold
)
route.score_threshold = threshold
else:
route = self.get(route_name)
if route is not None:
route.score_threshold = threshold
else:
logger.error(f"Route `{route_name}` not found")
def to_config(self) -> RouterConfig:
return RouterConfig(
......@@ -1207,7 +1225,7 @@ class BaseRouter(BaseModel):
search_range=0.8,
)
# update current route layer
self._update_thresholds(score_thresholds=thresholds)
self._update_thresholds(route_thresholds=thresholds)
# evaluate
acc = self._vec_evaluate(Xq=Xq, y=y)
# update best
......@@ -1215,7 +1233,7 @@ class BaseRouter(BaseModel):
best_acc = acc
best_thresholds = thresholds
# update route layer to best thresholds
self._update_thresholds(score_thresholds=best_thresholds)
self._update_thresholds(route_thresholds=best_thresholds)
if local_execution:
# Switch back to the original index
......
......@@ -10,7 +10,7 @@ from semantic_router.encoders import (
TfidfEncoder,
)
from semantic_router.route import Route
from semantic_router.index.hybrid_local import HybridLocalIndex
from semantic_router.index import BaseIndex, HybridLocalIndex
from semantic_router.schema import RouteChoice, SparseEmbedding
from semantic_router.utils.logger import logger
from semantic_router.routers.base import BaseRouter
......@@ -39,6 +39,7 @@ class HybridRouter(BaseRouter):
if index is None:
logger.warning("No index provided. Using default HybridLocalIndex.")
index = HybridLocalIndex()
encoder = self._get_encoder(encoder=encoder)
super().__init__(
encoder=encoder,
llm=llm,
......@@ -61,6 +62,14 @@ class HybridRouter(BaseRouter):
if self.auto_sync:
self._init_index_state()
def _get_index(self, index: Optional[BaseIndex]) -> BaseIndex:
if index is None:
logger.warning("No index provided. Using default HybridLocalIndex.")
index = HybridLocalIndex()
else:
index = index
return index
def _set_sparse_encoder(self, sparse_encoder: Optional[DenseEncoder]):
if sparse_encoder is None:
logger.warning("No sparse_encoder provided. Using default BM25Encoder.")
......@@ -146,43 +155,3 @@ class HybridRouter(BaseRouter):
{k: v * (1 - self.alpha) for k, v in sparse_dict.items()}
)
return scaled_dense, scaled_sparse
def _set_aggregation_method(self, aggregation: str = "sum"):
if aggregation == "sum":
return lambda x: sum(x)
elif aggregation == "mean":
return lambda x: np.mean(x)
elif aggregation == "max":
return lambda x: max(x)
else:
raise ValueError(
f"Unsupported aggregation method chosen: {aggregation}. Choose either 'SUM', 'MEAN', or 'MAX'."
)
def _semantic_classify(self, query_results: List[Tuple]) -> Tuple[str, List[float]]:
scores_by_class: Dict[str, List[float]] = {}
for score, route in query_results:
if route in scores_by_class:
scores_by_class[route].append(score)
else:
scores_by_class[route] = [score]
# Calculate total score for each class
total_scores = {
route: self.aggregation_method(scores)
for route, scores in scores_by_class.items()
}
top_class = max(total_scores, key=lambda x: total_scores[x], default=None)
# Return the top class and its associated scores
if top_class is not None:
return str(top_class), scores_by_class.get(top_class, [])
else:
logger.warning("No classification found for semantic classifier.")
return "", []
def _pass_threshold(self, scores: List[float], threshold: float) -> bool:
if scores:
return max(scores) > threshold
else:
return False
This diff is collapsed.
......@@ -428,9 +428,12 @@ class SparseEmbedding(BaseModel):
return cls(embedding=array)
@classmethod
def from_array(cls, array: np.ndarray):
"""Consumes a single sparse vector which contains zero-values.
def from_vector(cls, vector: np.ndarray):
"""Consumes an array of sparse vectors containing zero-values.
"""
if vector.ndim != 1:
raise ValueError(f"Expected a 1D array, got a {vector.ndim}D array.")
return cls.from_compact_array(np.array([np.arange(len(vector)), vector]).T)
@classmethod
def from_aurelio(cls, embedding: BM25Embedding):
......
......@@ -644,7 +644,7 @@ class TestSemanticRouter:
file.write(invalid_config_json)
# Patch the is_valid function to return False for this test
with patch("semantic_router.layer.is_valid", return_value=False):
with patch("semantic_router.routers.base.is_valid", return_value=False):
# Attempt to load the RouterConfig from the temporary file
# and assert that it raises an exception due to invalid configuration
with pytest.raises(Exception) as excinfo:
......@@ -720,8 +720,6 @@ class TestSemanticRouter:
{"route": "Route 2", "score": 0.7},
{"route": "Route 1", "score": 0.8},
]
# Override _pass_threshold to always return True for this test
route_layer._pass_threshold = lambda scores, threshold: True
expected = [("Route 1", 0.8), ("Route 2", 0.7)]
results = route_layer._semantic_classify_multiple_routes(query_results)
assert sorted(results) == sorted(
......@@ -731,9 +729,8 @@ class TestSemanticRouter:
def test_with_no_routes_passing_threshold(self, openai_encoder, routes, index_cls):
index = init_index(index_cls)
route_layer = SemanticRouter(encoder=openai_encoder, routes=routes, index=index)
route_layer.score_threshold = 0.5
# Override _pass_threshold to always return False for this test
route_layer._pass_threshold = lambda scores, threshold: False
# set threshold to 1.0 so that no routes pass
route_layer.score_threshold = 1.0
query_results = [
{"route": "Route 1", "score": 0.3},
{"route": "Route 2", "score": 0.2},
......@@ -815,11 +812,13 @@ class TestSemanticRouter:
auto_sync="local",
)
text = "Asparagus"
if index_cls is PineconeIndex:
time.sleep(PINECONE_SLEEP)
results = route_layer.retrieve_multiple_routes(text=text)
assert len(results) == 0, f"Expected no results, but got {len(results)}"
def test_retrieve_one_match(self, openai_encoder, routes_3, index_cls):
index = init_index(index_cls)
index = init_index(index_cls, dimensions=3)
route_layer = SemanticRouter(
encoder=openai_encoder,
routes=routes_3,
......@@ -827,6 +826,8 @@ class TestSemanticRouter:
auto_sync="local",
)
text = "Hello"
# set low threshold
route_layer.set_threshold(threshold=0.1, route_name="Route 1")
if index_cls is PineconeIndex:
time.sleep(PINECONE_SLEEP)
results = route_layer.retrieve_multiple_routes(text=text)
......@@ -845,6 +846,7 @@ class TestSemanticRouter:
auto_sync="local",
)
text = "Hello"
route_layer.set_threshold(threshold=0.01, route_name=None)
if index_cls is PineconeIndex:
time.sleep(PINECONE_SLEEP)
results = route_layer.retrieve_multiple_routes(text=text)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment