diff --git a/poetry.lock b/poetry.lock index 1b4125e3829874e2a60595031fadb25dac46b148..b4805a47a5ce4a9c6321c760b0a97326b0dc0ef7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -136,8 +136,8 @@ description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.8" files = [ - {file = "botocore-1.34.116-py3-none-any.whl", hash = "sha256:ec4d42c816e9b2d87a2439ad277e7dda16a4a614ef6839cf66f4c1a58afa547c"}, - {file = "botocore-1.34.116.tar.gz", hash = "sha256:269cae7ba99081519a9f87d7298e238d9e68ba94eb4f8ddfa906224c34cb8b6c"}, + {file = "botocore-1.34.110-py3-none-any.whl", hash = "sha256:1edf3a825ec0a5edf238b2d42ad23305de11d5a71bb27d6f9a58b7e8862df1b6"}, + {file = "botocore-1.34.110.tar.gz", hash = "sha256:b2c98c40ecf0b1facb9e61ceb7dfa28e61ae2456490554a16c8dbf99f20d6a18"}, ] [package.dependencies] @@ -4217,7 +4217,7 @@ docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.link testing = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy", "pytest-ruff (>=0.2.1)"] [extras] -bedrock = ["boto3"] +bedrock = ["boto3", "botocore"] google = ["google-cloud-aiplatform"] hybrid = ["pinecone-text"] local = ["llama-cpp-python", "tokenizers", "torch", "transformers"] diff --git a/pyproject.toml b/pyproject.toml index 88e515e82f09dc0edc0cc2f99767248c7d43e2b4..ae30d23aefcee156d67cda08ded0d8f7583e3286 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ qdrant-client = {version = "^1.8.0", optional = true} google-cloud-aiplatform = {version = "^1.45.0", optional = true} requests-mock = "^1.12.1" boto3 = { version = "^1.34.98", optional = true } +botocore = {version = "^1.34.110", optional = true} [tool.poetry.extras] hybrid = ["pinecone-text"] @@ -50,7 +51,7 @@ processing = ["matplotlib"] mistralai = ["mistralai"] qdrant = ["qdrant-client"] google = ["google-cloud-aiplatform"] -bedrock = ["boto3"] +bedrock = ["boto3", "botocore"] [tool.poetry.group.dev.dependencies] ipykernel = "^6.25.0" @@ -75,4 +76,4 @@ build-backend = "poetry.core.masonry.api" line-length = 88 [tool.mypy] -ignore_missing_imports = true \ No newline at end of file +ignore_missing_imports = true diff --git a/semantic_router/encoders/bedrock.py b/semantic_router/encoders/bedrock.py index ce04719be7e5fa2117a706938c811803e746b1bc..fed53900f4e0b10810d667e75d72ef51f066e565 100644 --- a/semantic_router/encoders/bedrock.py +++ b/semantic_router/encoders/bedrock.py @@ -19,9 +19,11 @@ Classes: import json from typing import List, Optional, Any import os +from time import sleep import tiktoken from semantic_router.encoders import BaseEncoder from semantic_router.utils.defaults import EncoderDefault +from semantic_router.utils.logger import logger class BedrockEncoder(BaseEncoder): @@ -67,17 +69,16 @@ class BedrockEncoder(BaseEncoder): Raises: ValueError: If the Bedrock Platform client fails to initialize. """ - super().__init__(name=name, score_threshold=score_threshold) - self.access_key_id = self.get_env_variable("access_key_id", access_key_id) + self.access_key_id = self.get_env_variable("AWS_ACCESS_KEY_ID", access_key_id) self.secret_access_key = self.get_env_variable( - "secret_access_key", secret_access_key + "AWS_SECRET_ACCESS_KEY", secret_access_key ) self.session_token = self.get_env_variable("AWS_SESSION_TOKEN", session_token) - self.region = self.get_env_variable("AWS_REGION", region, default="us-west-1") - + self.region = self.get_env_variable( + "AWS_DEFAULT_REGION", region, default="us-west-1" + ) self.input_type = input_type - try: self.client = self._initialize_client( self.access_key_id, @@ -85,7 +86,6 @@ class BedrockEncoder(BaseEncoder): self.session_token, self.region, ) - except Exception as e: raise ValueError(f"Bedrock client failed to initialise. Error: {e}") from e @@ -115,30 +115,27 @@ class BedrockEncoder(BaseEncoder): "You can install them with: " "`pip install boto3`" ) - - access_key_id = access_key_id or os.getenv("access_key_id") - aws_secret_key = secret_access_key or os.getenv("secret_access_key") - region = region or os.getenv("AWS_REGION", "us-west-2") - + access_key_id = access_key_id or os.getenv("AWS_ACCESS_KEY_ID") + aws_secret_key = secret_access_key or os.getenv("AWS_SECRET_ACCESS_KEY") + region = region or os.getenv("AWS_DEFAULT_REGION", "us-west-2") if access_key_id is None: raise ValueError("AWS access key ID cannot be 'None'.") - if aws_secret_key is None: raise ValueError("AWS secret access key cannot be 'None'.") - + session = boto3.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + aws_session_token=session_token, + ) try: - bedrock_client = boto3.client( + bedrock_client = session.client( "bedrock-runtime", - aws_access_key_id=access_key_id, - aws_secret_access_key=secret_access_key, - aws_session_token=session_token, region_name=region, ) except Exception as err: raise ValueError( f"The Bedrock client failed to initialize. Error: {err}" ) from err - return bedrock_client def __call__(self, docs: List[str]) -> List[List[float]]: @@ -155,77 +152,101 @@ class BedrockEncoder(BaseEncoder): ValueError: If the Bedrock Platform client is not initialized or if the API call fails. """ + try: + from botocore.exceptions import ClientError + except ImportError: + raise ImportError( + "Please install Amazon's Botocore client library to use the BedrockEncoder. " + "You can install them with: " + "`pip install botocore`" + ) if self.client is None: raise ValueError("Bedrock client is not initialised.") - try: - embeddings = [] - - def chunk_strings(strings, MAX_WORDS=20): - """ - Breaks up a list of strings into smaller chunks. - - Args: - strings (list): A list of strings to be chunked. - max_chunk_size (int): The maximum size of each chunk. Default is 75. - - Returns: - list: A list of lists, where each inner list contains a chunk of strings. - """ - encoding = tiktoken.get_encoding("cl100k_base") - chunked_strings = [] - current_chunk = [] - - for text in strings: - encoded_text = encoding.encode(text) - - if len(encoded_text) > MAX_WORDS: - current_chunk = [ - encoding.decode(encoded_text[i : i + MAX_WORDS]) - for i in range(0, len(encoded_text), MAX_WORDS) - ] - else: - current_chunk = [encoding.decode(encoded_text)] - - chunked_strings.append(current_chunk) - return chunked_strings - - if self.name and "amazon" in self.name: - for doc in docs: - embedding_body = json.dumps( - { - "inputText": doc, - } - ) - response = self.client.invoke_model( - body=embedding_body, - modelId=self.name, - accept="application/json", - contentType="application/json", - ) - - response_body = json.loads(response.get("body").read()) - embeddings.append(response_body.get("embedding")) - elif self.name and "cohere" in self.name: - chunked_docs = chunk_strings(docs) - for chunk in chunked_docs: - chunk = json.dumps({"texts": chunk, "input_type": self.input_type}) - - response = self.client.invoke_model( - body=chunk, - modelId=self.name, - accept="*/*", - contentType="application/json", - ) - - response_body = json.loads(response.get("body").read()) - - chunk_embeddings = response_body.get("embeddings") - embeddings.extend(chunk_embeddings) - else: - raise ValueError("Unknown model name") - return embeddings - except Exception as e: - raise ValueError(f"Bedrock call failed. Error: {e}") from e + max_attempts = 3 + for attempt in range(max_attempts): + try: + embeddings = [] + if self.name and "amazon" in self.name: + for doc in docs: + embedding_body = json.dumps( + { + "inputText": doc, + } + ) + response = self.client.invoke_model( + body=embedding_body, + modelId=self.name, + accept="application/json", + contentType="application/json", + ) + response_body = json.loads(response.get("body").read()) + embeddings.append(response_body.get("embedding")) + elif self.name and "cohere" in self.name: + chunked_docs = self.chunk_strings(docs) + for chunk in chunked_docs: + chunk = json.dumps( + {"texts": chunk, "input_type": self.input_type} + ) + response = self.client.invoke_model( + body=chunk, + modelId=self.name, + accept="*/*", + contentType="application/json", + ) + response_body = json.loads(response.get("body").read()) + chunk_embeddings = response_body.get("embeddings") + embeddings.extend(chunk_embeddings) + else: + raise ValueError("Unknown model name") + return embeddings + except ClientError as error: + if attempt < max_attempts - 1: + if error.response["Error"]["Code"] == "ExpiredTokenException": + logger.warning( + "Session token has expired. Retrying initialisation." + ) + try: + self.session_token = os.getenv("AWS_SESSION_TOKEN") + self.client = self._initialize_client( + self.access_key_id, + self.secret_access_key, + self.session_token, + self.region, + ) + except Exception as e: + raise ValueError( + f"Bedrock client failed to reinitialise. Error: {e}" + ) from e + sleep(2**attempt) + logger.warning(f"Retrying in {2**attempt} seconds...") + raise ValueError( + f"Retries exhausted, Bedrock call failed. Error: {error}" + ) from error + except Exception as e: + raise ValueError(f"Bedrock call failed. Error: {e}") from e + raise ValueError("Bedrock call failed to return embeddings.") + + def chunk_strings(self, strings, MAX_WORDS=20): + """ + Breaks up a list of strings into smaller chunks. + + Args: + strings (list): A list of strings to be chunked. + max_chunk_size (int): The maximum size of each chunk. Default is 20. + + Returns: + list: A list of lists, where each inner list contains a chunk of strings. + """ + encoding = tiktoken.get_encoding("cl100k_base") + chunked_strings = [] + for text in strings: + encoded_text = encoding.encode(text) + chunks = [ + encoding.decode(encoded_text[i : i + MAX_WORDS]) + for i in range(0, len(encoded_text), MAX_WORDS) + ] + chunked_strings.append(chunks) + return chunked_strings @staticmethod def get_env_variable(var_name, provided_value, default=None): @@ -238,6 +259,7 @@ class BedrockEncoder(BaseEncoder): Returns: str: The value of the environment variable or the provided/default value. + None: Where AWS_SESSION_TOKEN is not set or provided Raises: ValueError: If no value is provided and the environment variable is not set. @@ -246,5 +268,7 @@ class BedrockEncoder(BaseEncoder): return provided_value value = os.getenv(var_name, default) if value is None: + if var_name == "AWS_SESSION_TOKEN": + return None raise ValueError(f"No {var_name} provided") return value diff --git a/tests/unit/encoders/test_bedrock.py b/tests/unit/encoders/test_bedrock.py index 43955d453c704bc95fcce598b94e0a46479947a0..619325ed0ac43e7f54cf7eacd14a68a21a22ee8f 100644 --- a/tests/unit/encoders/test_bedrock.py +++ b/tests/unit/encoders/test_bedrock.py @@ -1,3 +1,4 @@ +import os import pytest import json from io import BytesIO @@ -15,6 +16,18 @@ def bedrock_encoder(mocker): ) +@pytest.fixture +def bedrock_encoder_with_cohere(mocker): + mocker.patch("semantic_router.encoders.bedrock.BedrockEncoder._initialize_client") + return BedrockEncoder( + name="cohere_model", + access_key_id="fake_id", + secret_access_key="fake_secret", + session_token="fake_token", + region="us-west-2", + ) + + class TestBedrockEncoder: def test_initialisation_with_default_values(self, bedrock_encoder): assert ( @@ -23,9 +36,6 @@ class TestBedrockEncoder: assert bedrock_encoder.region == "us-west-2", "Region should be initialised" def test_initialisation_with_custom_values(self, mocker): - # mocker.patch( - # "semantic_router.encoders.bedrock.BedrockEncoder._initialize_client" - # ) name = "custom_model" score_threshold = 0.5 input_type = "custom_input" @@ -47,6 +57,72 @@ class TestBedrockEncoder: bedrock_encoder.input_type == input_type ), "Custom input type not set correctly" + def test_initialisation_with_session_token(self, mocker): + mocker.patch( + "semantic_router.encoders.bedrock.BedrockEncoder._initialize_client" + ) + bedrock_encoder = BedrockEncoder( + access_key_id="fake_id", + secret_access_key="fake_secret", + session_token="fake_token", + region="us-west-2", + ) + assert ( + bedrock_encoder.session_token == "fake_token" + ), "Session token not set correctly" + + def test_initialisation_with_missing_access_key(self, mocker): + mocker.patch.dict(os.environ, {"AWS_ACCESS_KEY_ID": "env_id"}) + mocker.patch( + "semantic_router.encoders.bedrock.BedrockEncoder._initialize_client" + ) + bedrock_encoder = BedrockEncoder( + access_key_id=None, + secret_access_key="fake_secret", + session_token="fake_token", + region="us-west-2", + ) + assert ( + bedrock_encoder.access_key_id == "env_id" + ), "Access key ID not set correctly from environment variable" + + def test_missing_access_key_id(self, mocker): + mocker.patch( + "semantic_router.encoders.bedrock.BedrockEncoder._initialize_client" + ) + + with pytest.raises(ValueError): + BedrockEncoder(access_key_id=None, secret_access_key="fake_secret") + + def test_missing_secret_access_key(self, mocker): + mocker.patch( + "semantic_router.encoders.bedrock.BedrockEncoder._initialize_client" + ) + + with pytest.raises(ValueError): + BedrockEncoder(access_key_id="fake_id", secret_access_key=None) + + def test_initialisation_missing_env_variables(self, mocker): + mocker.patch.dict(os.environ, {}, clear=True) + with pytest.raises(ValueError): + BedrockEncoder( + access_key_id=None, + secret_access_key=None, + session_token=None, + region=None, + ) + + def test_failed_client_initialisation(self, mocker): + mocker.patch.dict(os.environ, clear=True) + + mocker.patch( + "semantic_router.encoders.bedrock.BedrockEncoder._initialize_client", + side_effect=Exception("Initialization failed"), + ) + + with pytest.raises(ValueError): + BedrockEncoder(access_key_id="fake_id", secret_access_key="fake_secret") + def test_call_method(self, bedrock_encoder): response_content = json.dumps({"embedding": [0.1, 0.2, 0.3]}) response_body = BytesIO(response_content.encode("utf-8")) @@ -59,18 +135,38 @@ class TestBedrockEncoder: ), "Each item in result should be a list" assert result == [[0.1, 0.2, 0.3]], "Embedding should be [0.1, 0.2, 0.3]" - def test_raises_value_error_if_client_is_not_initialised(self, mocker): + def test_call_with_expired_token(self, mocker, bedrock_encoder): + from botocore.exceptions import ClientError + + error_response = {"Error": {"Code": "ExpiredTokenException"}} mocker.patch( "semantic_router.encoders.bedrock.BedrockEncoder._initialize_client", - side_effect=Exception("Client initialisation failed"), + return_value=None, ) + + def invoke_model_side_effect(*args, **kwargs): + if not invoke_model_side_effect.expired_token_raised: + invoke_model_side_effect.expired_token_raised = True + raise ClientError(error_response, "invoke_model") + else: + return { + "body": BytesIO( + json.dumps({"embedding": [0.1, 0.2, 0.3]}).encode("utf-8") + ) + } + + invoke_model_side_effect.expired_token_raised = False + bedrock_encoder.client.invoke_model.side_effect = invoke_model_side_effect + with pytest.raises(ValueError): - BedrockEncoder( - access_key_id="fake_id", - secret_access_key="fake_secret", - session_token="fake_token", - region="us-west-2", - ) + bedrock_encoder(["test"]) + + bedrock_encoder._initialize_client.assert_called_once_with( + bedrock_encoder.access_key_id, + bedrock_encoder.secret_access_key, + None, + bedrock_encoder.region, + ) def test_raises_value_error_if_call_to_bedrock_fails(self, bedrock_encoder): bedrock_encoder.client.invoke_model.side_effect = Exception( @@ -79,17 +175,51 @@ class TestBedrockEncoder: with pytest.raises(ValueError): bedrock_encoder(["test"]) + def test_call_with_unknown_model_name(self, bedrock_encoder): + bedrock_encoder.name = "unknown_model" + with pytest.raises(ValueError): + bedrock_encoder(["test"]) -@pytest.fixture -def bedrock_encoder_with_cohere(mocker): - mocker.patch("semantic_router.encoders.bedrock.BedrockEncoder._initialize_client") - return BedrockEncoder( - name="cohere_model", - access_key_id="fake_id", - secret_access_key="fake_secret", - session_token="fake_token", - region="us-west-2", - ) + def test_chunking_functionality(self, bedrock_encoder): + docs = ["This is a long text that needs to be chunked properly."] + chunked_docs = bedrock_encoder.chunk_strings(docs, MAX_WORDS=5) + assert isinstance(chunked_docs, list), "Chunked result should be a list" + assert ( + len(chunked_docs[0]) > 1 + ), "Document should be chunked into multiple parts" + assert all( + isinstance(chunk, str) for chunk in chunked_docs[0] + ), "Chunks should be strings" + + def test_get_env_variable(self): + var_name = "TEST_ENV_VAR" + default_value = "default" + os.environ[var_name] = "env_value" + assert BedrockEncoder.get_env_variable(var_name, None) == "env_value" + assert ( + BedrockEncoder.get_env_variable(var_name, None, default_value) + == "env_value" + ) + assert ( + BedrockEncoder.get_env_variable("NON_EXISTENT_VAR", None, default_value) + == default_value + ) + + def test_get_env_variable_missing(self): + with pytest.raises(ValueError): + BedrockEncoder.get_env_variable("MISSING_VAR", None) + + def test_uninitialised_client(self, bedrock_encoder): + bedrock_encoder.client = None + + with pytest.raises(ValueError): + bedrock_encoder(["test"]) + + def test_missing_env_variables(self, mocker): + mocker.patch.dict(os.environ, clear=True) + + with pytest.raises(ValueError): + BedrockEncoder() class TestBedrockEncoderWithCohere: