From 705f4355b0034b818e8f297b29ef7f96993a97c1 Mon Sep 17 00:00:00 2001 From: Siraj R Aizlewood <siraj@aurelio.ai> Date: Mon, 6 Nov 2023 13:49:57 +0400 Subject: [PATCH] Initial Code Added simple_categorise which uses sum of Cosine Similarity Scores to determine Category. Option to use tan function to boost scores for closest points, and reduce scores for further away points. --- decision_layer/decision_layer.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/decision_layer/decision_layer.py b/decision_layer/decision_layer.py index 31de29bd..8a88f4be 100644 --- a/decision_layer/decision_layer.py +++ b/decision_layer/decision_layer.py @@ -2,14 +2,13 @@ from decision_layer.encoders import BaseEncoder from decision_layer.schema import Decision import numpy as np from numpy.linalg import norm - - class DecisionLayer: index = None categories = None def __init__(self, encoder: BaseEncoder, decisions: list[Decision] = []): self.encoder = encoder + self.embeddings_classified = False # if decisions list has been passed, we initialize index now if decisions: # initialize index now @@ -17,16 +16,20 @@ class DecisionLayer: self._add_decision(decision=decision) def __call__(self, text: str): + results = self._query(text) + decision = self.simple_categorise(results) + # return decision raise NotImplementedError("To implement decision logic based on scores") - def add(self, decision: Decision): + def add(self, decision: Decision, dimensiona): self._add_decision(devision=decision) def _add_decision(self, decision: Decision): # create embeddings embeds = self.encoder(decision.utterances) + # create decision array if self.categories is None: self.categories = np.array([decision.name]*len(embeds)) @@ -56,3 +59,25 @@ class DecisionLayer: return [ {"decision": d, "score": s.item()} for d, s in zip(decisions, scores) ] + + def simple_categorise(self, text: str, top_k: int=5, apply_tan: bool=True): + """Given some text, categorises it based on the scores from _query.""" + # get the results from _query + results = self._query(text, top_k) + + # apply the scoring system to the results and group by category + scores_by_category = {} + for result in results: + score = np.tan(result['score'] * (np.pi / 2)) if apply_tan else result['score'] + if result['decision'] in scores_by_category: + scores_by_category[result['decision']] += score + else: + scores_by_category[result['decision']] = score + + # sort the categories by score in descending order + sorted_categories = sorted(scores_by_category.items(), key=lambda x: x[1], reverse=True) + + # return the category with the highest total score + return sorted_categories[0][0] if sorted_categories else None + + -- GitLab