Skip to content
Snippets Groups Projects
Unverified Commit 605fb0ee authored by James Briggs's avatar James Briggs
Browse files

cleanup

parent 30a50b4b
No related branches found
No related tags found
No related merge requests found
[tool.poetry]
name = "semantic-router"
version = "0.0.1"
version = "0.0.5"
description = "Super fast semantic router for AI decision making"
authors = [
"James Briggs <james@aurelio.ai>",
"Siraj Aizlewood <siraj@aurelio.ai>",
"Simonas Jakubonis <simonas@aurelio.ai>"
"Simonas Jakubonis <simonas@aurelio.ai>",
"Luca Mannini <luca@aurelio.ai>"
]
readme = "README.md"
......
......@@ -7,104 +7,9 @@ from semantic_router.retrievers import (
OpenAIRetriever,
BM25Retriever
)
from semantic_router.rankers import BaseRanker
from semantic_router.matchers import BaseMatcher
from semantic_router.schema import Decision
class MatcherDecisionLayer:
index: None
decision_arr: None
score_threshold: float
def __init__(self, matcher: BaseMatcher, decisions: list[Decision] = []):
self.matcher = matcher
# if decisions list has been passed and we have retriever
# we initialize index now
if matcher.retriever and decisions:
# initialize index now
for decision in decisions:
self._add_decision(decision=decision)
def __call__(self, text: str) -> str | None:
raise NotImplementedError
class RankDecisionLayer:
def __init__(self, ranker: BaseRanker, decisions: list[Decision] = []):
self.ranker = ranker
# if decisions list has been passed, we initialize decision array
if decisions:
for decision in decisions:
self._add_decision(decision=decision)
def __call__(self, text: str) -> str | None:
results = self._query(text)
top_class, top_class_scores = self._semantic_classify(results)
passed = self._pass_threshold(top_class_scores, self.score_threshold)
if passed:
return top_class
else:
return None
def add(self, decision: Decision):
self._add_decision(decision.utterances)
def _add_decision(self, decision: Decision):
# create decision categories array
if self.categories is None:
self.categories = np.array([decision.name] * len(decision.utterances))
self.utterances = np.array(decision.utterances)
else:
str_arr = np.array([decision.name] * len(decision.utterances))
self.categories = np.concatenate([self.categories, str_arr])
self.utterances = np.concatenate([
self.utterances,
np.array(decision.utterances)
])
def _query(self, text: str, top_k: int = 5):
"""Given some text, encodes and searches the index vector space to
retrieve the top_k most similar records.
"""
if self.categories:
self.rerank.top_n = top_k
idx, docs = self.ranker(query=text, docs=self.utterances)
# create scores based on rank
scores = [1/(i+1) for i in range(len(docs))]
# get the utterance categories (decision names)
decisions = self.categories[idx] if self.categories is not None else []
return [
{"decision": d, "score": s.item()} for d, s in zip(decisions, scores)
]
else:
return []
def _semantic_classify(self, query_results: list[dict]) -> tuple[str, list[float]]:
scores_by_class = {}
for result in query_results:
score = result["score"]
decision = result["decision"]
if decision in scores_by_class:
scores_by_class[decision].append(score)
else:
scores_by_class[decision] = [score]
# Calculate total score for each class
total_scores = {
decision: sum(scores) for decision, scores in scores_by_class.items()
}
top_class = max(total_scores, key=lambda x: total_scores[x], default=None)
# Return the top class and its associated scores
return str(top_class), scores_by_class.get(top_class, [])
def _pass_threshold(self, scores: list[float], threshold: float) -> bool:
if scores:
return max(scores) > threshold
else:
return False
class DecisionLayer:
index = None
categories = None
......@@ -217,6 +122,7 @@ class HybridDecisionLayer:
):
self.encoder = encoder
self.sparse_encoder = BM25Retriever()
self.alpha = alpha
# decide on default threshold based on encoder
if isinstance(encoder, OpenAIRetriever):
self.score_threshold = 0.82
......
from pydantic import BaseModel
from semantic_router.retrievers import BaseRetriever
from semantic_router.rankers import BaseRanker
from semantic_router.schema import Decision
class BaseMatcher(BaseModel):
retriever: BaseRetriever | None
ranker: BaseRanker | None
top_k: int | None
top_n: int | None
class Config:
arbitrary_types_allowed = True
def __call__(self, query: str, decisions: list[Decision]) -> str:
raise NotImplementedError("Subclasses must implement this method")
\ No newline at end of file
from semantic_router import rankers
\ No newline at end of file
import numpy as np
from semantic_router.rankers import (
BaseRanker,
CohereRanker
)
from semantic_router.retrievers import (
BaseRetriever,
CohereRetriever
)
from semantic_router.matchers import BaseMatcher
from semantic_router.schema import Decision
class TwoStageMatcher(BaseMatcher):
def __init__(
self,
retriever: BaseRetriever | None,
ranker: BaseRanker | None,
top_k: int = 25,
top_n: int = 5
):
super().__init__(
retriever=retriever, ranker=ranker, top_k=top_k, top_n=top_n
)
if retriever is None:
retriever = CohereRetriever(
name="embed-english-v3.0",
top_k=top_k
)
if ranker is None:
ranker = CohereRanker(
name="rerank-english-v2.0",
top_n=top_n
)
def __call__(self, query: str, decisions: list[Decision]) -> str:
pass
def add(self, decision: Decision):
self._add_decision(decision=decision)
def _add_decision(self, decision: Decision):
# create embeddings for first stage
embeds = self.retriever(decision.utterances)
# create a decision array for decision categories
if self.categories is None:
self.categories = np.array([decision.name] * len(embeds))
else:
str_arr = np.array([decision.name] * len(embeds))
self.categories = np.concatenate([self.categories, str_arr])
# create utterance array (the index)
if self.index is None:
self.index = np.array(embeds)
else:
embed_arr = np.array(embeds)
self.index = np.concatenate([self.index, embed_arr])
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment