diff --git a/decision_layer/decision_layer.py b/decision_layer/decision_layer.py index 31de29bd2acc810edad348b3d5d6c309f7582659..8a88f4be3c71aae641e622e8aac6c0d37e20b0e8 100644 --- a/decision_layer/decision_layer.py +++ b/decision_layer/decision_layer.py @@ -2,14 +2,13 @@ from decision_layer.encoders import BaseEncoder from decision_layer.schema import Decision import numpy as np from numpy.linalg import norm - - class DecisionLayer: index = None categories = None def __init__(self, encoder: BaseEncoder, decisions: list[Decision] = []): self.encoder = encoder + self.embeddings_classified = False # if decisions list has been passed, we initialize index now if decisions: # initialize index now @@ -17,16 +16,20 @@ class DecisionLayer: self._add_decision(decision=decision) def __call__(self, text: str): + results = self._query(text) + decision = self.simple_categorise(results) + # return decision raise NotImplementedError("To implement decision logic based on scores") - def add(self, decision: Decision): + def add(self, decision: Decision, dimensiona): self._add_decision(devision=decision) def _add_decision(self, decision: Decision): # create embeddings embeds = self.encoder(decision.utterances) + # create decision array if self.categories is None: self.categories = np.array([decision.name]*len(embeds)) @@ -56,3 +59,25 @@ class DecisionLayer: return [ {"decision": d, "score": s.item()} for d, s in zip(decisions, scores) ] + + def simple_categorise(self, text: str, top_k: int=5, apply_tan: bool=True): + """Given some text, categorises it based on the scores from _query.""" + # get the results from _query + results = self._query(text, top_k) + + # apply the scoring system to the results and group by category + scores_by_category = {} + for result in results: + score = np.tan(result['score'] * (np.pi / 2)) if apply_tan else result['score'] + if result['decision'] in scores_by_category: + scores_by_category[result['decision']] += score + else: + scores_by_category[result['decision']] = score + + # sort the categories by score in descending order + sorted_categories = sorted(scores_by_category.items(), key=lambda x: x[1], reverse=True) + + # return the category with the highest total score + return sorted_categories[0][0] if sorted_categories else None + +