From 286891ad41adf6cb3fa1622e78bd924fb21c4b0f Mon Sep 17 00:00:00 2001 From: Sourabh Desai <sourabhdesai@gmail.com> Date: Tue, 12 Nov 2024 16:01:32 -0800 Subject: [PATCH] allow skipping waiting for ingestion when uploading file (#16934) * add support for files endpoints * version bump * add wait_for_ingestion kwarg flag * bump version * make it wait on specific file's ingestion --- .../indices/managed/llama_cloud/base.py | 52 ++++++++++++++++--- .../pyproject.toml | 2 +- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/llama-index-integrations/indices/llama-index-indices-managed-llama-cloud/llama_index/indices/managed/llama_cloud/base.py b/llama-index-integrations/indices/llama-index-indices-managed-llama-cloud/llama_index/indices/managed/llama_cloud/base.py index 5ce75df30..49497582d 100644 --- a/llama-index-integrations/indices/llama-index-indices-managed-llama-cloud/llama_index/indices/managed/llama_cloud/base.py +++ b/llama-index-integrations/indices/llama-index-indices-managed-llama-cloud/llama_index/indices/managed/llama_cloud/base.py @@ -130,6 +130,40 @@ class LlamaCloudIndex(BaseManagedIndex): if verbose: print("Done!") + def _wait_for_file_ingestion( + self, + file_id: str, + verbose: bool = False, + raise_on_error: bool = False, + ) -> None: + pipeline_id = self._get_pipeline_id() + client = self._client + if verbose: + print("Loading file: ", end="") + + # wait until the file is loaded + is_done = False + while not is_done: + status = client.pipelines.get_pipeline_file_status( + pipeline_id=pipeline_id, file_id=file_id + ).status + if status == ManagedIngestionStatus.ERROR: + if verbose: + print(f"File ingestion failed for {file_id}") + if raise_on_error: + raise ValueError(f"File ingestion failed for {file_id}") + elif status in [ + ManagedIngestionStatus.NOT_STARTED, + ManagedIngestionStatus.IN_PROGRESS, + ]: + if verbose: + print(".", end="") + time.sleep(0.5) + else: + is_done = True + if verbose: + print("Done!") + def _wait_for_documents_ingestion( self, doc_ids: List[str], @@ -461,6 +495,8 @@ class LlamaCloudIndex(BaseManagedIndex): file_path: str, resource_info: Optional[Dict[str, Any]] = None, verbose: bool = False, + wait_for_ingestion: bool = True, + raise_on_error: bool = False, ) -> str: """Upload a file to the index.""" with open(file_path, "rb") as f: @@ -478,9 +514,10 @@ class LlamaCloudIndex(BaseManagedIndex): pipeline_id=pipeline_id, request=[pipeline_file_create] ) - self._wait_for_pipeline_ingestion( - verbose=verbose, raise_on_partial_success=False - ) + if wait_for_ingestion: + self._wait_for_file_ingestion( + file.id, verbose=verbose, raise_on_error=raise_on_error + ) return file.id def upload_file_from_url( @@ -492,6 +529,8 @@ class LlamaCloudIndex(BaseManagedIndex): verify_ssl: bool = True, follow_redirects: bool = True, verbose: bool = False, + wait_for_ingestion: bool = True, + raise_on_error: bool = False, ) -> str: """Upload a file from a URL to the index.""" file = self._client.files.upload_file_from_url( @@ -512,9 +551,10 @@ class LlamaCloudIndex(BaseManagedIndex): pipeline_id=pipeline_id, request=[pipeline_file_create] ) - self._wait_for_pipeline_ingestion( - verbose=verbose, raise_on_partial_success=False - ) + if wait_for_ingestion: + self._wait_for_file_ingestion( + file.id, verbose=verbose, raise_on_error=raise_on_error + ) return file.id # Nodes related methods (not implemented for LlamaCloudIndex) diff --git a/llama-index-integrations/indices/llama-index-indices-managed-llama-cloud/pyproject.toml b/llama-index-integrations/indices/llama-index-indices-managed-llama-cloud/pyproject.toml index 9810d623c..18f6bc81d 100644 --- a/llama-index-integrations/indices/llama-index-indices-managed-llama-cloud/pyproject.toml +++ b/llama-index-integrations/indices/llama-index-indices-managed-llama-cloud/pyproject.toml @@ -34,7 +34,7 @@ exclude = ["**/BUILD"] license = "MIT" name = "llama-index-indices-managed-llama-cloud" readme = "README.md" -version = "0.4.1" +version = "0.4.2" [tool.poetry.dependencies] python = ">=3.8.1,<4.0" -- GitLab