Skip to content
Snippets Groups Projects
Unverified Commit 4a81de14 authored by James Briggs's avatar James Briggs Committed by GitHub
Browse files

Merge pull request #8 from aurelio-labs/james/matching-pipelines

WIP restructure and testing different decision layer structures
parents 096991a2 bc976017
No related branches found
Tags v0.0.5
No related merge requests found
%% Cell type:markdown id: tags:
# Semantic Router: Hybrid Layer
%% Cell type:markdown id: tags:
The Hybrid Layer in the Semantic Router library can improve decision making performance particularly for niche use-cases that contain specific terminology, such as finance or medical. It helps us provide more importance to decision making based on the keywords contained in our utterances and user queries.
%% Cell type:markdown id: tags:
## Getting Started
%% Cell type:markdown id: tags:
We start by installing the library:
%% Cell type:code id: tags:
``` python
!pip install -qU semantic-router==0.0.5
```
%% Cell type:markdown id: tags:
We start by defining a dictionary mapping decisions to example phrases that should trigger those decisions.
%% Cell type:code id: tags:
``` python
import os
os.environ["COHERE_API_KEY"] = "BQBiUqqjDRsYl1QKKux4JsqKdDkjyInS5T3Z3eJP"
```
%% Cell type:code id: tags:
``` python
from semantic_router.schema import Decision
politics = Decision(
name="politics",
utterances=[
"isn't politics the best thing ever",
"why don't you tell me about your political opinions",
"don't you just love the president",
"don't you just hate the president",
"they're going to destroy this country!",
"they will save the country!",
],
)
```
%% Cell type:markdown id: tags:
Let's define another for good measure:
%% Cell type:code id: tags:
``` python
chitchat = Decision(
name="chitchat",
utterances=[
"how's the weather today?",
"how are things going?",
"lovely weather today",
"the weather is horrendous",
"let's go to the chippy",
],
)
chitchat = Decision(
name="chitchat",
utterances=[
"how's the weather today?",
"how are things going?",
"lovely weather today",
"the weather is horrendous",
"let's go to the chippy",
],
)
decisions = [politics, chitchat]
```
%% Cell type:markdown id: tags:
Now we initialize our embedding model:
%% Cell type:code id: tags:
``` python
from semantic_router.encoders import CohereEncoder
from getpass import getpass
os.environ["COHERE_API_KEY"] = os.environ["COHERE_API_KEY"] or getpass(
"Enter Cohere API Key: "
)
encoder = CohereEncoder()
```
%% Cell type:markdown id: tags:
Now we define the `DecisionLayer`. When called, the decision layer will consume text (a query) and output the category (`Decision`) it belongs to — to initialize a `DecisionLayer` we need our `encoder` model and a list of `decisions`.
%% Cell type:code id: tags:
``` python
from semantic_router.layer import HybridDecisionLayer
dl = HybridDecisionLayer(encoder=encoder, decisions=decisions)
```
%% Cell type:code id: tags:
``` python
dl("don't you love politics?")
```
%% Cell type:code id: tags:
``` python
dl("how's the weather today?")
```
%% Cell type:markdown id: tags:
---
This diff is collapsed.
[tool.poetry] [tool.poetry]
name = "semantic-router" name = "semantic-router"
version = "0.0.4" version = "0.0.5"
description = "Super fast semantic router for AI decision making" description = "Super fast semantic router for AI decision making"
authors = [ authors = [
"James Briggs <james@aurelio.ai>", "James Briggs <james@aurelio.ai>",
"Siraj Aizlewood <siraj@aurelio.ai>", "Siraj Aizlewood <siraj@aurelio.ai>",
"Simonas Jakubonis <simonas@aurelio.ai>", "Simonas Jakubonis <simonas@aurelio.ai>",
"Luca Mannini <luca@aurelio.ai>",
"Bogdan Buduroiu <bogdan@aurelio.ai>" "Bogdan Buduroiu <bogdan@aurelio.ai>"
] ]
readme = "README.md" readme = "README.md"
...@@ -15,7 +16,8 @@ python = "^3.10" ...@@ -15,7 +16,8 @@ python = "^3.10"
pydantic = "^1.8.2" pydantic = "^1.8.2"
openai = "^0.28.1" openai = "^0.28.1"
cohere = "^4.32" cohere = "^4.32"
numpy = "^1.26.2" numpy = "^1.25.2"
pinecone-text = "^0.7.0"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
......
from .layer import DecisionLayer, HybridDecisionLayer
__all__ = ["DecisionLayer", "HybridDecisionLayer"]
from .base import BaseEncoder from .base import BaseEncoder
from .cohere import CohereEncoder from .cohere import CohereEncoder
from .openai import OpenAIEncoder from .openai import OpenAIEncoder
from .bm25 import BM25Encoder
__all__ = ["BaseEncoder", "CohereEncoder", "OpenAIEncoder"] __all__ = ["BaseEncoder", "CohereEncoder", "OpenAIEncoder", "BM25Encoder"]
...@@ -7,5 +7,5 @@ class BaseEncoder(BaseModel): ...@@ -7,5 +7,5 @@ class BaseEncoder(BaseModel):
class Config: class Config:
arbitrary_types_allowed = True arbitrary_types_allowed = True
def __call__(self, texts: list[str]) -> list[float]: def __call__(self, docs: list[str]) -> list[float]:
raise NotImplementedError("Subclasses must implement this method") raise NotImplementedError("Subclasses must implement this method")
from pinecone_text.sparse import BM25Encoder as encoder
from semantic_router.encoders import BaseEncoder
class BM25Encoder(BaseEncoder):
model: encoder | None = None
idx_mapping: dict[int, int] | None = None
def __init__(self, name: str = "bm25"):
super().__init__(name=name)
# initialize BM25 encoder with default params (trained on MSMarco)
self.model = encoder.default()
self.idx_mapping = {
idx: i
for i, idx in enumerate(self.model.get_params()["doc_freq"]["indices"])
}
def __call__(self, docs: list[str]) -> list[list[float]]:
if len(docs) == 1:
sparse_dicts = self.model.encode_queries(docs)
elif len(docs) > 1:
sparse_dicts = self.model.encode_documents(docs)
else:
raise ValueError("No documents to encode.")
# convert sparse dict to sparse vector
embeds = [[0.0] * len(self.idx_mapping)] * len(docs)
for i, output in enumerate(sparse_dicts):
indices = output["indices"]
values = output["values"]
for idx, val in zip(indices, values):
if idx in self.idx_mapping:
position = self.idx_mapping[idx]
embeds[i][position] = val
else:
print(idx, "not in encoder.idx_mapping")
return embeds
def fit(self, docs: list[str]):
self.model.fit(docs)
...@@ -17,8 +17,8 @@ class CohereEncoder(BaseEncoder): ...@@ -17,8 +17,8 @@ class CohereEncoder(BaseEncoder):
raise ValueError("Cohere API key cannot be 'None'.") raise ValueError("Cohere API key cannot be 'None'.")
self.client = cohere.Client(cohere_api_key) self.client = cohere.Client(cohere_api_key)
def __call__(self, texts: list[str]) -> list[list[float]]: def __call__(self, docs: list[str]) -> list[list[float]]:
if self.client is None: if self.client is None:
raise ValueError("Cohere client is not initialized.") raise ValueError("Cohere client is not initialized.")
embeds = self.client.embed(texts, input_type="search_query", model=self.name) embeds = self.client.embed(docs, input_type="search_query", model=self.name)
return embeds.embeddings return embeds.embeddings
...@@ -14,7 +14,7 @@ class OpenAIEncoder(BaseEncoder): ...@@ -14,7 +14,7 @@ class OpenAIEncoder(BaseEncoder):
if openai.api_key is None: if openai.api_key is None:
raise ValueError("OpenAI API key cannot be 'None'.") raise ValueError("OpenAI API key cannot be 'None'.")
def __call__(self, texts: list[str]) -> list[list[float]]: def __call__(self, docs: list[str]) -> list[list[float]]:
"""Encode a list of texts using the OpenAI API. Returns a list of """Encode a list of texts using the OpenAI API. Returns a list of
vector embeddings. vector embeddings.
""" """
...@@ -22,7 +22,7 @@ class OpenAIEncoder(BaseEncoder): ...@@ -22,7 +22,7 @@ class OpenAIEncoder(BaseEncoder):
# exponential backoff in case of RateLimitError # exponential backoff in case of RateLimitError
for j in range(5): for j in range(5):
try: try:
res = openai.Embedding.create(input=texts, engine=self.name) res = openai.Embedding.create(input=docs, engine=self.name)
if isinstance(res, dict) and "data" in res: if isinstance(res, dict) and "data" in res:
break break
except RateLimitError: except RateLimitError:
......
import numpy as np import numpy as np
from numpy.linalg import norm
from tqdm.auto import tqdm
from semantic_router.encoders import BaseEncoder, CohereEncoder, OpenAIEncoder from semantic_router.encoders import (
BaseEncoder,
CohereEncoder,
OpenAIEncoder,
BM25Encoder,
)
from semantic_router.linear import similarity_matrix, top_scores from semantic_router.linear import similarity_matrix, top_scores
from semantic_router.schema import Decision from semantic_router.schema import Decision
...@@ -8,27 +15,27 @@ from semantic_router.schema import Decision ...@@ -8,27 +15,27 @@ from semantic_router.schema import Decision
class DecisionLayer: class DecisionLayer:
index = None index = None
categories = None categories = None
similarity_threshold = 0.82 score_threshold = 0.82
def __init__(self, encoder: BaseEncoder, decisions: list[Decision] = []): def __init__(self, encoder: BaseEncoder, decisions: list[Decision] = []):
self.encoder = encoder self.encoder = encoder
# decide on default threshold based on encoder # decide on default threshold based on encoder
if isinstance(encoder, OpenAIEncoder): if isinstance(encoder, OpenAIEncoder):
self.similarity_threshold = 0.82 self.score_threshold = 0.82
elif isinstance(encoder, CohereEncoder): elif isinstance(encoder, CohereEncoder):
self.similarity_threshold = 0.3 self.score_threshold = 0.3
else: else:
self.similarity_threshold = 0.82 self.score_threshold = 0.82
# if decisions list has been passed, we initialize index now # if decisions list has been passed, we initialize index now
if decisions: if decisions:
# initialize index now # initialize index now
for decision in decisions: for decision in tqdm(decisions):
self._add_decision(decision=decision) self._add_decision(decision=decision)
def __call__(self, text: str) -> str | None: def __call__(self, text: str) -> str | None:
results = self._query(text) results = self._query(text)
top_class, top_class_scores = self._semantic_classify(results) top_class, top_class_scores = self._semantic_classify(results)
passed = self._pass_threshold(top_class_scores, self.similarity_threshold) passed = self._pass_threshold(top_class_scores, self.score_threshold)
if passed: if passed:
return top_class return top_class
else: else:
...@@ -98,3 +105,135 @@ class DecisionLayer: ...@@ -98,3 +105,135 @@ class DecisionLayer:
return max(scores) > threshold return max(scores) > threshold
else: else:
return False return False
class HybridDecisionLayer:
index = None
sparse_index = None
categories = None
score_threshold = 0.82
def __init__(
self, encoder: BaseEncoder, decisions: list[Decision] = [], alpha: float = 0.3
):
self.encoder = encoder
self.sparse_encoder = BM25Encoder()
self.alpha = alpha
# decide on default threshold based on encoder
if isinstance(encoder, OpenAIEncoder):
self.score_threshold = 0.82
elif isinstance(encoder, CohereEncoder):
self.score_threshold = 0.3
else:
self.score_threshold = 0.82
# if decisions list has been passed, we initialize index now
if decisions:
# initialize index now
for decision in tqdm(decisions):
self._add_decision(decision=decision)
def __call__(self, text: str) -> str | None:
results = self._query(text)
top_class, top_class_scores = self._semantic_classify(results)
passed = self._pass_threshold(top_class_scores, self.score_threshold)
if passed:
return top_class
else:
return None
def add(self, decision: Decision):
self._add_decision(decision=decision)
def _add_decision(self, decision: Decision):
# create embeddings
dense_embeds = np.array(self.encoder(decision.utterances)) # * self.alpha
sparse_embeds = np.array(
self.sparse_encoder(decision.utterances)
) # * (1 - self.alpha)
# create decision array
if self.categories is None:
self.categories = np.array([decision.name] * len(decision.utterances))
self.utterances = np.array(decision.utterances)
else:
str_arr = np.array([decision.name] * len(decision.utterances))
self.categories = np.concatenate([self.categories, str_arr])
self.utterances = np.concatenate(
[self.utterances, np.array(decision.utterances)]
)
# create utterance array (the dense index)
if self.index is None:
self.index = dense_embeds
else:
self.index = np.concatenate([self.index, dense_embeds])
# create sparse utterance array
if self.sparse_index is None:
self.sparse_index = sparse_embeds
else:
self.sparse_index = np.concatenate([self.sparse_index, sparse_embeds])
def _query(self, text: str, top_k: int = 5):
"""Given some text, encodes and searches the index vector space to
retrieve the top_k most similar records.
"""
# create dense query vector
xq_d = np.array(self.encoder([text]))
xq_d = np.squeeze(xq_d) # Reduce to 1d array.
# create sparse query vector
xq_s = np.array(self.sparse_encoder([text]))
xq_s = np.squeeze(xq_s)
# convex scaling
xq_d, xq_s = self._convex_scaling(xq_d, xq_s)
if self.index is not None:
# calculate dense vec similarity
index_norm = norm(self.index, axis=1)
xq_d_norm = norm(xq_d.T)
sim_d = np.dot(self.index, xq_d.T) / (index_norm * xq_d_norm)
# calculate sparse vec similarity
sparse_norm = norm(self.sparse_index, axis=1)
xq_s_norm = norm(xq_s.T)
sim_s = np.dot(self.sparse_index, xq_s.T) / (sparse_norm * xq_s_norm)
total_sim = sim_d + sim_s
# get indices of top_k records
top_k = min(top_k, total_sim.shape[0])
idx = np.argpartition(total_sim, -top_k)[-top_k:]
scores = total_sim[idx]
# get the utterance categories (decision names)
decisions = self.categories[idx] if self.categories is not None else []
return [
{"decision": d, "score": s.item()} for d, s in zip(decisions, scores)
]
else:
return []
def _convex_scaling(self, dense: list[float], sparse: list[float]):
# scale sparse and dense vecs
dense = np.array(dense) * self.alpha
sparse = np.array(sparse) * (1 - self.alpha)
return dense, sparse
def _semantic_classify(self, query_results: list[dict]) -> tuple[str, list[float]]:
scores_by_class = {}
for result in query_results:
score = result["score"]
decision = result["decision"]
if decision in scores_by_class:
scores_by_class[decision].append(score)
else:
scores_by_class[decision] = [score]
# Calculate total score for each class
total_scores = {
decision: sum(scores) for decision, scores in scores_by_class.items()
}
top_class = max(total_scores, key=lambda x: total_scores[x], default=None)
# Return the top class and its associated scores
return str(top_class), scores_by_class.get(top_class, [])
def _pass_threshold(self, scores: list[float], threshold: float) -> bool:
if scores:
return max(scores) > threshold
else:
return False
...@@ -17,9 +17,9 @@ class Decision(BaseModel): ...@@ -17,9 +17,9 @@ class Decision(BaseModel):
class EncoderType(Enum): class EncoderType(Enum):
HUGGINGFACE = "huggingface"
OPENAI = "openai" OPENAI = "openai"
COHERE = "cohere" COHERE = "cohere"
HUGGINGFACE = "huggingface"
@dataclass @dataclass
......
import pytest
from semantic_router.encoders import BM25Encoder
@pytest.fixture
def bm25_encoder():
return BM25Encoder()
class TestBM25Encoder:
def test_initialization(self, bm25_encoder):
assert len(bm25_encoder.idx_mapping) != 0
def test_fit(self, bm25_encoder):
bm25_encoder.fit(["some docs", "and more docs", "and even more docs"])
assert len(bm25_encoder.idx_mapping) != 0
def test_call_method(self, bm25_encoder):
result = bm25_encoder(["test"])
assert isinstance(result, list), "Result should be a list"
assert all(
isinstance(sublist, list) for sublist in result
), "Each item in result should be a list"
def test_call_method_no_docs(self, bm25_encoder):
with pytest.raises(ValueError):
bm25_encoder([])
def test_call_method_no_word(self, bm25_encoder):
result = bm25_encoder(["doc with fake word gta5jabcxyz"])
assert isinstance(result, list), "Result should be a list"
assert all(
isinstance(sublist, list) for sublist in result
), "Each item in result should be a list"
import pytest import pytest
from semantic_router.encoders import BaseEncoder, CohereEncoder, OpenAIEncoder from semantic_router.encoders import BaseEncoder, CohereEncoder, OpenAIEncoder
from semantic_router.layer import DecisionLayer # Replace with the actual module name from semantic_router.layer import (
DecisionLayer,
HybridDecisionLayer,
) # Replace with the actual module name
from semantic_router.schema import Decision from semantic_router.schema import Decision
...@@ -45,16 +48,16 @@ def decisions(): ...@@ -45,16 +48,16 @@ def decisions():
class TestDecisionLayer: class TestDecisionLayer:
def test_initialization(self, openai_encoder, decisions): def test_initialization(self, openai_encoder, decisions):
decision_layer = DecisionLayer(encoder=openai_encoder, decisions=decisions) decision_layer = DecisionLayer(encoder=openai_encoder, decisions=decisions)
assert decision_layer.similarity_threshold == 0.82 assert decision_layer.score_threshold == 0.82
assert len(decision_layer.index) == 5 assert len(decision_layer.index) == 5
assert len(set(decision_layer.categories)) == 2 assert len(set(decision_layer.categories)) == 2
def test_initialization_different_encoders(self, cohere_encoder, openai_encoder): def test_initialization_different_encoders(self, cohere_encoder, openai_encoder):
decision_layer_cohere = DecisionLayer(encoder=cohere_encoder) decision_layer_cohere = DecisionLayer(encoder=cohere_encoder)
assert decision_layer_cohere.similarity_threshold == 0.3 assert decision_layer_cohere.score_threshold == 0.3
decision_layer_openai = DecisionLayer(encoder=openai_encoder) decision_layer_openai = DecisionLayer(encoder=openai_encoder)
assert decision_layer_openai.similarity_threshold == 0.82 assert decision_layer_openai.score_threshold == 0.82
def test_add_decision(self, openai_encoder): def test_add_decision(self, openai_encoder):
decision_layer = DecisionLayer(encoder=openai_encoder) decision_layer = DecisionLayer(encoder=openai_encoder)
...@@ -107,9 +110,87 @@ class TestDecisionLayer: ...@@ -107,9 +110,87 @@ class TestDecisionLayer:
assert not decision_layer._pass_threshold([], 0.5) assert not decision_layer._pass_threshold([], 0.5)
assert decision_layer._pass_threshold([0.6, 0.7], 0.5) assert decision_layer._pass_threshold([0.6, 0.7], 0.5)
def test_failover_similarity_threshold(self, base_encoder): def test_failover_score_threshold(self, base_encoder):
decision_layer = DecisionLayer(encoder=base_encoder) decision_layer = DecisionLayer(encoder=base_encoder)
assert decision_layer.similarity_threshold == 0.82 assert decision_layer.score_threshold == 0.82
class TestHybridDecisionLayer:
def test_initialization(self, openai_encoder, decisions):
decision_layer = HybridDecisionLayer(
encoder=openai_encoder, decisions=decisions
)
assert decision_layer.score_threshold == 0.82
assert len(decision_layer.index) == 5
assert len(set(decision_layer.categories)) == 2
def test_initialization_different_encoders(self, cohere_encoder, openai_encoder):
decision_layer_cohere = HybridDecisionLayer(encoder=cohere_encoder)
assert decision_layer_cohere.score_threshold == 0.3
decision_layer_openai = HybridDecisionLayer(encoder=openai_encoder)
assert decision_layer_openai.score_threshold == 0.82
def test_add_decision(self, openai_encoder):
decision_layer = HybridDecisionLayer(encoder=openai_encoder)
decision = Decision(name="Decision 3", utterances=["Yes", "No"])
decision_layer.add(decision)
assert len(decision_layer.index) == 2
assert len(set(decision_layer.categories)) == 1
def test_add_multiple_decisions(self, openai_encoder, decisions):
decision_layer = HybridDecisionLayer(encoder=openai_encoder)
for decision in decisions:
decision_layer.add(decision)
assert len(decision_layer.index) == 5
assert len(set(decision_layer.categories)) == 2
def test_query_and_classification(self, openai_encoder, decisions):
decision_layer = HybridDecisionLayer(
encoder=openai_encoder, decisions=decisions
)
query_result = decision_layer("Hello")
assert query_result in ["Decision 1", "Decision 2"]
def test_query_with_no_index(self, openai_encoder):
decision_layer = HybridDecisionLayer(encoder=openai_encoder)
assert decision_layer("Anything") is None
def test_semantic_classify(self, openai_encoder, decisions):
decision_layer = HybridDecisionLayer(
encoder=openai_encoder, decisions=decisions
)
classification, score = decision_layer._semantic_classify(
[
{"decision": "Decision 1", "score": 0.9},
{"decision": "Decision 2", "score": 0.1},
]
)
assert classification == "Decision 1"
assert score == [0.9]
def test_semantic_classify_multiple_decisions(self, openai_encoder, decisions):
decision_layer = HybridDecisionLayer(
encoder=openai_encoder, decisions=decisions
)
classification, score = decision_layer._semantic_classify(
[
{"decision": "Decision 1", "score": 0.9},
{"decision": "Decision 2", "score": 0.1},
{"decision": "Decision 1", "score": 0.8},
]
)
assert classification == "Decision 1"
assert score == [0.9, 0.8]
def test_pass_threshold(self, openai_encoder):
decision_layer = HybridDecisionLayer(encoder=openai_encoder)
assert not decision_layer._pass_threshold([], 0.5)
assert decision_layer._pass_threshold([0.6, 0.7], 0.5)
def test_failover_score_threshold(self, base_encoder):
decision_layer = HybridDecisionLayer(encoder=base_encoder)
assert decision_layer.score_threshold == 0.82
# Add more tests for edge cases and error handling as needed. # Add more tests for edge cases and error handling as needed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment