diff --git a/semantic_router/encoders/cohere.py b/semantic_router/encoders/cohere.py index f534fa51cb3524b047416260ff688fd9b9769bee..e145e6cd6aefa878cf817996a2eed242fc908bb1 100644 --- a/semantic_router/encoders/cohere.py +++ b/semantic_router/encoders/cohere.py @@ -9,16 +9,19 @@ from semantic_router.encoders import BaseEncoder class CohereEncoder(BaseEncoder): client: Optional[cohere.Client] = None type: str = "cohere" + input_type: Optional[str] = "search_query" def __init__( self, name: Optional[str] = None, cohere_api_key: Optional[str] = None, - score_threshold: float = 0.3, + score_threshold: Optional[float] = 0.3, + input_type: Optional[str] = "search_query" ): if name is None: name = os.getenv("COHERE_MODEL_NAME", "embed-english-v3.0") - super().__init__(name=name, score_threshold=score_threshold) + super().__init__(name=name, score_threshold=score_threshold, input_type=input_type) + self.input_type = input_type cohere_api_key = cohere_api_key or os.getenv("COHERE_API_KEY") if cohere_api_key is None: raise ValueError("Cohere API key cannot be 'None'.") @@ -33,7 +36,7 @@ class CohereEncoder(BaseEncoder): if self.client is None: raise ValueError("Cohere client is not initialized.") try: - embeds = self.client.embed(docs, input_type="search_query", model=self.name) + embeds = self.client.embed(docs, input_type=self.input_type, model=self.name) return embeds.embeddings except Exception as e: raise ValueError(f"Cohere API call failed. Error: {e}") from e diff --git a/semantic_router/schema.py b/semantic_router/schema.py index e56653a3fcea2b572e867d04cb538fc4b181a680..1f4bc5f0c9bc2e6b589a2e1807ea55871b51a476 100644 --- a/semantic_router/schema.py +++ b/semantic_router/schema.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import List, Literal, Optional +from typing import List, Literal, Optional, Tuple from pydantic.v1 import BaseModel from pydantic.v1.dataclasses import dataclass @@ -10,7 +10,10 @@ from semantic_router.encoders import ( FastEmbedEncoder, OpenAIEncoder, ) -from semantic_router.utils.splitters import DocumentSplit, semantic_splitter +# from semantic_router.utils.splitters import DocumentSplit, semantic_splitter +from semantic_router.splitters.consecutive_sim import ConsecutiveSimSplitter +from semantic_router.splitters.cumulative_sim import CumulativeSimSplitter +from semantic_router.splitters.cav_sim import CAVSimSplitter class EncoderType(Enum): @@ -67,23 +70,66 @@ class Message(BaseModel): return {"role": self.role, "content": self.content} +class DocumentSplit(BaseModel): + docs: List[str] + is_triggered: bool = False + triggered_score: Optional[float] = None + class Conversation(BaseModel): messages: List[Message] + topics: List[Tuple[int, str]] = [] + splitter = None + + def add_new_messages(self, new_messages: List[Message]): + self.messages.extend(new_messages) - def split_by_topic( + def configure_splitter( self, encoder: BaseEncoder, threshold: float = 0.5, split_method: Literal[ - "consecutive_similarity_drop", "cumulative_similarity_drop" - ] = "consecutive_similarity_drop", - ) -> list[DocumentSplit]: - docs = [f"{m.role}: {m.content}" for m in self.messages] - return semantic_splitter( - encoder=encoder, docs=docs, threshold=threshold, split_method=split_method - ) - -class DocumentSplit(BaseModel): - docs: List[str] - is_triggered: bool = False - triggered_score: Optional[float] = None \ No newline at end of file + "consecutive_similarity", "cumulative_similarity", "cav_similarity" + ] = "consecutive_similarity", + ): + if split_method == "consecutive_similarity": + self.splitter = ConsecutiveSimSplitter(encoder=encoder, similarity_threshold=threshold) + elif split_method == "cumulative_similarity": + self.splitter = CumulativeSimSplitter(encoder=encoder, similarity_threshold=threshold) + elif split_method == "cav_similarity": + self.splitter = CAVSimSplitter(encoder=encoder, similarity_threshold=threshold) + else: + raise ValueError(f"Invalid split method: {split_method}") + + def split_by_topic(self): + if self.splitter is None: + raise ValueError("Splitter is not configured. Please call configure_splitter first.") + + # Get the messages that haven't been clustered into topics yet + unclustered_messages = self.messages[len(self.topics):] + + # Check if there are any messages that have been assigned topics + if len(self.topics) >= 1: + # Include the last message in the docs + docs = [self.topics[-1][1]] + else: + # No messages have been assigned topics yet + docs = [] + + # Add the unclustered messages to the docs + docs.extend([f"{m.role}: {m.content}" for m in unclustered_messages]) + + # Use the splitter to split the documents + new_topics = self.splitter(docs) + + # Check if the first new topic includes any of the new messages + if any(message in new_topics[0].docs for message in docs[-len(unclustered_messages):]): + start = self.topics[-1][0] + else: + start = len(self.topics) + 1 + + # Add the new topics to the list of topics with unique IDs + for i, topic in enumerate(new_topics, start=start): + for message in topic.docs: + self.topics.append((i, message)) + + return new_topics \ No newline at end of file diff --git a/semantic_router/splitters/base.py b/semantic_router/splitters/base.py index 46f50eb463e763c381953a7e9ca5be8bb77d230c..38867a25600ad6e353de885cf4640faeb13d1e54 100644 --- a/semantic_router/splitters/base.py +++ b/semantic_router/splitters/base.py @@ -6,7 +6,6 @@ from semantic_router.encoders import BaseEncoder class BaseSplitter(BaseModel): name: str encoder: BaseEncoder - docs: List[str] similarity_threshold: float def __call__(self, docs: List[str]) -> List[List[float]]: diff --git a/semantic_router/splitters/cav_sim.py b/semantic_router/splitters/cav_sim.py index d18e5c4edec349ae4017b7fe390e7b5bab900b4f..dd5c4a53684dc79aac7d7e32fb11c01608f2d37d 100644 --- a/semantic_router/splitters/cav_sim.py +++ b/semantic_router/splitters/cav_sim.py @@ -1,7 +1,8 @@ from typing import List -from semantic_router.splitters import BaseSplitter +from semantic_router.splitters.base import BaseSplitter import numpy as np -from semantic_router.utils import DocumentSplit +from semantic_router.schema import DocumentSplit +from semantic_router.encoders import BaseEncoder class CAVSimSplitter(BaseSplitter): @@ -34,32 +35,33 @@ class CAVSimSplitter(BaseSplitter): def __init__( self, - docs: List[str], + encoder: BaseEncoder, name: str = "cav_similarity_splitter", similarity_threshold: float = 0.45, ): super().__init__( - docs=docs, name=name, similarity_threshold=similarity_threshold, + encoder=encoder ) - def __call__(self): - total_docs = len(self.docs) + def __call__(self, docs: List[str]): + total_docs = len(docs) splits = [] curr_split_start_idx = 0 curr_split_num = 1 + doc_embeds = self.encoder(docs) for idx in range(1, total_docs): - curr_split_docs_embeds = self.encoder(self.docs[curr_split_start_idx : idx + 1]) + curr_split_docs_embeds = doc_embeds[curr_split_start_idx : idx + 1] avg_embedding = np.mean(curr_split_docs_embeds, axis=0) # Compute the average embedding for the next two documents, if available if idx + 3 <= total_docs: # Check if the next two indices are within the range - next_doc_embeds = self.encoder(self.docs[idx + 1 : idx + 3]) + next_doc_embeds = doc_embeds[idx + 1 : idx + 3] next_avg_embed = np.mean(next_doc_embeds, axis=0) elif idx + 2 <= total_docs: # Check if the next index is within the range - next_avg_embed = self.encoder([self.docs[idx + 1]])[0] + next_avg_embed = doc_embeds[idx + 1] else: next_avg_embed = None @@ -72,7 +74,7 @@ class CAVSimSplitter(BaseSplitter): if curr_sim_score < self.similarity_threshold: splits.append( DocumentSplit( - docs=list(self.docs[curr_split_start_idx : idx + 1]), + docs=list(docs[curr_split_start_idx : idx + 1]), is_triggered=True, triggered_score=curr_sim_score, ) @@ -80,5 +82,5 @@ class CAVSimSplitter(BaseSplitter): curr_split_start_idx = idx + 1 curr_split_num += 1 - splits.append(DocumentSplit(docs=list(self.docs[curr_split_start_idx:]))) + splits.append(DocumentSplit(docs=list(docs[curr_split_start_idx:]))) return splits \ No newline at end of file diff --git a/semantic_router/splitters/consecutive_sim.py b/semantic_router/splitters/consecutive_sim.py index 22859758764b7cfc45b9310067f783539fb4be7d..a9038750685a0c404027a2511dfda7b9df121619 100644 --- a/semantic_router/splitters/consecutive_sim.py +++ b/semantic_router/splitters/consecutive_sim.py @@ -1,7 +1,8 @@ from typing import List -from semantic_router.splitters import BaseSplitter +from semantic_router.splitters.base import BaseSplitter +from semantic_router.encoders import BaseEncoder import numpy as np -from semantic_router.utils import DocumentSplit +from semantic_router.schema import DocumentSplit class ConsecutiveSimSplitter(BaseSplitter): @@ -11,21 +12,21 @@ class ConsecutiveSimSplitter(BaseSplitter): def __init__( self, - docs: List[str], + encoder: BaseEncoder, name: str = "consecutive_similarity_splitter", - similarity_threshold: float = 0.45, + similarity_threshold: float = 0.45 ): super().__init__( - docs=docs, name=name, similarity_threshold=similarity_threshold, + encoder=encoder ) - def __call__(self): - doc_embeds = self.encoder(self.docs) + def __call__(self, docs: List[str]): + doc_embeds = self.encoder(docs) norm_embeds = doc_embeds / np.linalg.norm(doc_embeds, axis=1, keepdims=True) sim_matrix = np.matmul(norm_embeds, norm_embeds.T) - total_docs = len(self.docs) + total_docs = len(docs) splits = [] curr_split_start_idx = 0 curr_split_num = 1 @@ -35,12 +36,12 @@ class ConsecutiveSimSplitter(BaseSplitter): if idx < len(sim_matrix) and curr_sim_score < self.similarity_threshold: splits.append( DocumentSplit( - docs=list(self.docs[curr_split_start_idx:idx]), + docs=list(docs[curr_split_start_idx:idx]), is_triggered=True, triggered_score=curr_sim_score, ) ) curr_split_start_idx = idx curr_split_num += 1 - splits.append(DocumentSplit(docs=list(self.docs[curr_split_start_idx:]))) + splits.append(DocumentSplit(docs=list(docs[curr_split_start_idx:]))) return splits \ No newline at end of file diff --git a/semantic_router/splitters/cumulative_sim.py b/semantic_router/splitters/cumulative_sim.py index f85516aab9c43c930a2c590b6944dab7f566b6a6..b45ebc37d498513f58d48d6e5153034888975fce 100644 --- a/semantic_router/splitters/cumulative_sim.py +++ b/semantic_router/splitters/cumulative_sim.py @@ -1,7 +1,12 @@ from typing import List -from semantic_router.splitters import BaseSplitter +from semantic_router.splitters.base import BaseSplitter import numpy as np -from semantic_router.utils import DocumentSplit +from semantic_router.schema import DocumentSplit +from semantic_router.encoders import BaseEncoder +# DEBUGGING: Start. +import time +# DEBUGGING: End. + class CumulativeSimSplitter(BaseSplitter): @@ -11,27 +16,27 @@ class CumulativeSimSplitter(BaseSplitter): def __init__( self, - docs: List[str], + encoder: BaseEncoder, name: str = "cumulative_similarity_splitter", similarity_threshold: float = 0.45, ): super().__init__( - docs=docs, name=name, similarity_threshold=similarity_threshold, + encoder=encoder ) - def __call__(self): - total_docs = len(self.docs) + def __call__(self, docs: List[str]): + total_docs = len(docs) splits = [] curr_split_start_idx = 0 curr_split_num = 1 for idx in range(1, total_docs): if idx + 1 < total_docs: - curr_split_docs = "\n".join(self.docs[curr_split_start_idx : idx + 1]) - next_doc = self.docs[idx + 1] - + curr_split_docs = "\n".join(docs[curr_split_start_idx : idx + 1]) + next_doc = docs[idx + 1] + time.sleep(30) curr_split_docs_embed = self.encoder([curr_split_docs])[0] next_doc_embed = self.encoder([next_doc])[0] @@ -43,7 +48,7 @@ class CumulativeSimSplitter(BaseSplitter): if curr_sim_score < self.similarity_threshold: splits.append( DocumentSplit( - docs=list(self.docs[curr_split_start_idx : idx + 1]), + docs=list(docs[curr_split_start_idx : idx + 1]), is_triggered=True, triggered_score=curr_sim_score, ) @@ -51,5 +56,5 @@ class CumulativeSimSplitter(BaseSplitter): curr_split_start_idx = idx + 1 curr_split_num += 1 - splits.append(DocumentSplit(docs=list(self.docs[curr_split_start_idx:]))) + splits.append(DocumentSplit(docs=list(docs[curr_split_start_idx:]))) return splits \ No newline at end of file