From 81ef7f0f9362f34339669bdc81f8ab885701f73b Mon Sep 17 00:00:00 2001 From: Marcus Schiesser <mail@marcusschiesser.de> Date: Wed, 14 Aug 2024 17:03:16 +0700 Subject: [PATCH] feat: use llamacloud pipeline for private files and generate script in Python (#226) --------- Co-authored-by: Thuc Pham <51660321+thucpn@users.noreply.github.com> --- .changeset/plenty-beers-cheer.md | 5 + helpers/index.ts | 15 +- helpers/python.ts | 15 +- .../sample-projects/llamapack/pyproject.toml | 2 +- .../vectordbs/python/llamacloud/generate.py | 60 +++--- .../vectordbs/python/llamacloud/index.py | 14 +- .../python/llamacloud/query_filter.py | 2 +- .../vectordbs/python/llamacloud/service.py | 173 ++++++++++++++++++ .../types/extractor/fastapi/pyproject.toml | 2 +- .../src/controllers/chat-upload.controller.ts | 4 +- .../streaming/fastapi/app/api/routers/chat.py | 30 ++- .../fastapi/app/api/routers/chat_config.py | 45 +++-- .../fastapi/app/api/routers/models.py | 38 +--- .../fastapi/app/api/routers/upload.py | 6 +- .../fastapi/app/api/services/file.py | 48 +++-- .../fastapi/app/api/services/llama_cloud.py | 114 ------------ .../types/streaming/fastapi/pyproject.toml | 2 +- .../nextjs/app/api/chat/upload/route.ts | 5 +- .../nextjs/app/components/chat-section.tsx | 1 + .../app/components/ui/chat/chat-input.tsx | 3 +- .../app/components/ui/chat/hooks/use-file.ts | 9 +- 21 files changed, 331 insertions(+), 262 deletions(-) create mode 100644 .changeset/plenty-beers-cheer.md create mode 100644 templates/components/vectordbs/python/llamacloud/service.py delete mode 100644 templates/types/streaming/fastapi/app/api/services/llama_cloud.py diff --git a/.changeset/plenty-beers-cheer.md b/.changeset/plenty-beers-cheer.md new file mode 100644 index 00000000..da3dba27 --- /dev/null +++ b/.changeset/plenty-beers-cheer.md @@ -0,0 +1,5 @@ +--- +"create-llama": patch +--- + +Use LlamaCloud pipeline for data ingestion (private file uploads and generate script) diff --git a/helpers/index.ts b/helpers/index.ts index 91eed694..6ff16e93 100644 --- a/helpers/index.ts +++ b/helpers/index.ts @@ -142,12 +142,15 @@ export const installTemplate = async ( if (props.framework === "fastapi") { await installPythonTemplate(props); - // write loaders configuration (currently Python only) - await writeLoadersConfig( - props.root, - props.dataSources, - props.useLlamaParse, - ); + if (props.vectorDb !== "llamacloud") { + // write loaders configuration (currently Python only) + // not needed for LlamaCloud as it has its own loaders + await writeLoadersConfig( + props.root, + props.dataSources, + props.useLlamaParse, + ); + } } else { await installTSTemplate(props); } diff --git a/helpers/python.ts b/helpers/python.ts index 7aac6c2f..445f2f9c 100644 --- a/helpers/python.ts +++ b/helpers/python.ts @@ -350,12 +350,15 @@ export const installPythonTemplate = async ({ cwd: path.join(compPath, "vectordbs", "python", vectorDb ?? "none"), }); - // Copy all loaders to enginePath - const loaderPath = path.join(enginePath, "loaders"); - await copy("**", loaderPath, { - parents: true, - cwd: path.join(compPath, "loaders", "python"), - }); + if (vectorDb !== "llamacloud") { + // Copy all loaders to enginePath + // Not needed for LlamaCloud as it has its own loaders + const loaderPath = path.join(enginePath, "loaders"); + await copy("**", loaderPath, { + parents: true, + cwd: path.join(compPath, "loaders", "python"), + }); + } // Copy settings.py to app await copy("**", path.join(root, "app"), { diff --git a/templates/components/sample-projects/llamapack/pyproject.toml b/templates/components/sample-projects/llamapack/pyproject.toml index 4bd28bd8..3ac40927 100644 --- a/templates/components/sample-projects/llamapack/pyproject.toml +++ b/templates/components/sample-projects/llamapack/pyproject.toml @@ -6,7 +6,7 @@ authors = ["Marcus Schiesser <mail@marcusschiesser.de>"] readme = "README.md" [tool.poetry.dependencies] -python = "^3.11,<3.12" +python = "^3.11,<4.0" llama-index = "^0.10.6" llama-index-readers-file = "^0.1.3" python-dotenv = "^1.0.0" diff --git a/templates/components/vectordbs/python/llamacloud/generate.py b/templates/components/vectordbs/python/llamacloud/generate.py index 8bcf6068..7ddfb7e8 100644 --- a/templates/components/vectordbs/python/llamacloud/generate.py +++ b/templates/components/vectordbs/python/llamacloud/generate.py @@ -1,48 +1,46 @@ from dotenv import load_dotenv +from app.engine.index import get_index + load_dotenv() -import os import logging -from app.settings import init_settings -from app.engine.loaders import get_documents -from llama_index.indices.managed.llama_cloud import LlamaCloudIndex - +from llama_index.core.readers import SimpleDirectoryReader +from app.engine.service import LLamaCloudFileService logging.basicConfig(level=logging.INFO) logger = logging.getLogger() def generate_datasource(): - init_settings() logger.info("Generate index for the provided data") - name = os.getenv("LLAMA_CLOUD_INDEX_NAME") - project_name = os.getenv("LLAMA_CLOUD_PROJECT_NAME") - api_key = os.getenv("LLAMA_CLOUD_API_KEY") - base_url = os.getenv("LLAMA_CLOUD_BASE_URL") - organization_id = os.getenv("LLAMA_CLOUD_ORGANIZATION_ID") - - if name is None or project_name is None or api_key is None: - raise ValueError( - "Please set LLAMA_CLOUD_INDEX_NAME, LLAMA_CLOUD_PROJECT_NAME and LLAMA_CLOUD_API_KEY" - " to your environment variables or config them in .env file" - ) - - documents = get_documents() - - # Set private=false to mark the document as public (required for filtering) - for doc in documents: - doc.metadata["private"] = "false" - - LlamaCloudIndex.from_documents( - documents=documents, - name=name, - project_name=project_name, - api_key=api_key, - base_url=base_url, - organization_id=organization_id + index = get_index() + project_id = index._get_project_id() + pipeline_id = index._get_pipeline_id() + + # use SimpleDirectoryReader to retrieve the files to process + reader = SimpleDirectoryReader( + "data", + recursive=True, ) + files_to_process = reader.input_files + + # add each file to the LlamaCloud pipeline + for input_file in files_to_process: + with open(input_file, "rb") as f: + logger.info( + f"Adding file {input_file} to pipeline {index.name} in project {index.project_name}" + ) + LLamaCloudFileService.add_file_to_pipeline( + project_id, + pipeline_id, + f, + custom_metadata={ + # Set private=false to mark the document as public (required for filtering) + "private": "false", + }, + ) logger.info("Finished generating the index") diff --git a/templates/components/vectordbs/python/llamacloud/index.py b/templates/components/vectordbs/python/llamacloud/index.py index e54e8ca9..0a4ba795 100644 --- a/templates/components/vectordbs/python/llamacloud/index.py +++ b/templates/components/vectordbs/python/llamacloud/index.py @@ -1,10 +1,20 @@ import logging import os from llama_index.indices.managed.llama_cloud import LlamaCloudIndex - +from llama_index.core.ingestion.api_utils import ( + get_client as llama_cloud_get_client, +) logger = logging.getLogger("uvicorn") + +def get_client(): + return llama_cloud_get_client( + os.getenv("LLAMA_CLOUD_API_KEY"), + os.getenv("LLAMA_CLOUD_BASE_URL"), + ) + + def get_index(params=None): configParams = params or {} pipelineConfig = configParams.get("llamaCloudPipeline", {}) @@ -25,7 +35,7 @@ def get_index(params=None): project_name=project_name, api_key=api_key, base_url=base_url, - organization_id=organization_id + organization_id=organization_id, ) return index diff --git a/templates/components/vectordbs/python/llamacloud/query_filter.py b/templates/components/vectordbs/python/llamacloud/query_filter.py index 9c59735a..b9a5ca40 100644 --- a/templates/components/vectordbs/python/llamacloud/query_filter.py +++ b/templates/components/vectordbs/python/llamacloud/query_filter.py @@ -12,7 +12,7 @@ def generate_filters(doc_ids): operator="nin", # type: ignore ) selected_doc_filter = MetadataFilter( - key="doc_id", + key="file_id", # Note: LLamaCloud uses "file_id" to reference private document ids as "doc_id" is a restricted field in LlamaCloud value=doc_ids, operator="in", # type: ignore ) diff --git a/templates/components/vectordbs/python/llamacloud/service.py b/templates/components/vectordbs/python/llamacloud/service.py new file mode 100644 index 00000000..68216f98 --- /dev/null +++ b/templates/components/vectordbs/python/llamacloud/service.py @@ -0,0 +1,173 @@ +from io import BytesIO +import logging +import os +import time +from typing import Any, Dict, List, Optional, Set, Tuple, Union +import typing + +from fastapi import BackgroundTasks +from llama_cloud import ManagedIngestionStatus, PipelineFileCreateCustomMetadataValue +from pydantic import BaseModel +import requests +from app.api.routers.models import SourceNodes +from app.engine.index import get_client +from llama_index.core.schema import NodeWithScore + + +logger = logging.getLogger("uvicorn") + + +class LlamaCloudFile(BaseModel): + file_name: str + pipeline_id: str + + def __eq__(self, other): + if not isinstance(other, LlamaCloudFile): + return NotImplemented + return ( + self.file_name == other.file_name and self.pipeline_id == other.pipeline_id + ) + + def __hash__(self): + return hash((self.file_name, self.pipeline_id)) + + +class LLamaCloudFileService: + LOCAL_STORE_PATH = "output/llamacloud" + DOWNLOAD_FILE_NAME_TPL = "{pipeline_id}${filename}" + + @classmethod + def get_all_projects_with_pipelines(cls) -> List[Dict[str, Any]]: + try: + client = get_client() + projects = client.projects.list_projects() + pipelines = client.pipelines.search_pipelines() + return [ + { + **(project.dict()), + "pipelines": [ + {"id": p.id, "name": p.name} + for p in pipelines + if p.project_id == project.id + ], + } + for project in projects + ] + except Exception as error: + logger.error(f"Error listing projects and pipelines: {error}") + return [] + + @classmethod + def add_file_to_pipeline( + cls, + project_id: str, + pipeline_id: str, + upload_file: Union[typing.IO, Tuple[str, BytesIO]], + custom_metadata: Optional[Dict[str, PipelineFileCreateCustomMetadataValue]], + ) -> str: + client = get_client() + file = client.files.upload_file(project_id=project_id, upload_file=upload_file) + files = [ + { + "file_id": file.id, + "custom_metadata": {"file_id": file.id, **(custom_metadata or {})}, + } + ] + files = client.pipelines.add_files_to_pipeline(pipeline_id, request=files) + + # Wait 2s for the file to be processed + max_attempts = 20 + attempt = 0 + while attempt < max_attempts: + result = client.pipelines.get_pipeline_file_status(pipeline_id, file.id) + if result.status == ManagedIngestionStatus.ERROR: + raise Exception(f"File processing failed: {str(result)}") + if result.status == ManagedIngestionStatus.SUCCESS: + # File is ingested - return the file id + return file.id + attempt += 1 + time.sleep(0.1) # Sleep for 100ms + raise Exception( + f"File processing did not complete after {max_attempts} attempts." + ) + + @classmethod + def download_pipeline_file( + cls, + file: LlamaCloudFile, + force_download: bool = False, + ): + client = get_client() + file_name = file.file_name + pipeline_id = file.pipeline_id + + # Check is the file already exists + downloaded_file_path = cls._get_file_path(file_name, pipeline_id) + if os.path.exists(downloaded_file_path) and not force_download: + logger.debug(f"File {file_name} already exists in local storage") + return + try: + logger.info(f"Downloading file {file_name} for pipeline {pipeline_id}") + files = client.pipelines.list_pipeline_files(pipeline_id) + if not files or not isinstance(files, list): + raise Exception("No files found in LlamaCloud") + for file_entry in files: + if file_entry.name == file_name: + file_id = file_entry.file_id + project_id = file_entry.project_id + file_detail = client.files.read_file_content( + file_id, project_id=project_id + ) + cls._download_file(file_detail.url, downloaded_file_path) + break + except Exception as error: + logger.info(f"Error fetching file from LlamaCloud: {error}") + + @classmethod + def download_files_from_nodes( + cls, nodes: List[NodeWithScore], background_tasks: BackgroundTasks + ): + files = cls._get_files_to_download(nodes) + for file in files: + logger.info(f"Adding download of {file.file_name} to background tasks") + background_tasks.add_task( + LLamaCloudFileService.download_pipeline_file, file + ) + + @classmethod + def _get_files_to_download(cls, nodes: List[NodeWithScore]) -> Set[LlamaCloudFile]: + source_nodes = SourceNodes.from_source_nodes(nodes) + llama_cloud_files = [ + LlamaCloudFile( + file_name=node.metadata.get("file_name"), + pipeline_id=node.metadata.get("pipeline_id"), + ) + for node in source_nodes + if ( + node.metadata.get("pipeline_id") is not None + and node.metadata.get("file_name") is not None + ) + ] + # Remove duplicates and return + return set(llama_cloud_files) + + @classmethod + def _get_file_name(cls, name: str, pipeline_id: str) -> str: + return cls.DOWNLOAD_FILE_NAME_TPL.format(pipeline_id=pipeline_id, filename=name) + + @classmethod + def _get_file_path(cls, name: str, pipeline_id: str) -> str: + return os.path.join(cls.LOCAL_STORE_PATH, cls._get_file_name(name, pipeline_id)) + + @classmethod + def _download_file(cls, url: str, local_file_path: str): + logger.info(f"Saving file to {local_file_path}") + # Create directory if it doesn't exist + os.makedirs(cls.LOCAL_STORE_PATH, exist_ok=True) + # Download the file + with requests.get(url, stream=True) as r: + r.raise_for_status() + with open(local_file_path, "wb") as f: + for chunk in r.iter_content(chunk_size=8192): + f.write(chunk) + logger.info("File downloaded successfully") diff --git a/templates/types/extractor/fastapi/pyproject.toml b/templates/types/extractor/fastapi/pyproject.toml index b5bdea0f..37e2df25 100644 --- a/templates/types/extractor/fastapi/pyproject.toml +++ b/templates/types/extractor/fastapi/pyproject.toml @@ -9,7 +9,7 @@ readme = "README.md" generate = "app.engine.generate:generate_datasource" [tool.poetry.dependencies] -python = "^3.11,<3.12" +python = "^3.11,<4.0" fastapi = "^0.109.1" uvicorn = { extras = ["standard"], version = "^0.23.2" } python-dotenv = "^1.0.0" diff --git a/templates/types/streaming/express/src/controllers/chat-upload.controller.ts b/templates/types/streaming/express/src/controllers/chat-upload.controller.ts index 781cb635..f97234e5 100644 --- a/templates/types/streaming/express/src/controllers/chat-upload.controller.ts +++ b/templates/types/streaming/express/src/controllers/chat-upload.controller.ts @@ -3,12 +3,12 @@ import { getDataSource } from "./engine"; import { uploadDocument } from "./llamaindex/documents/upload"; export const chatUpload = async (req: Request, res: Response) => { - const { base64 }: { base64: string } = req.body; + const { base64, params }: { base64: string; params?: any } = req.body; if (!base64) { return res.status(400).json({ error: "base64 is required in the request body", }); } - const index = await getDataSource(); + const index = await getDataSource(params); return res.status(200).json(await uploadDocument(index, base64)); }; diff --git a/templates/types/streaming/fastapi/app/api/routers/chat.py b/templates/types/streaming/fastapi/app/api/routers/chat.py index 11a34533..ace2a3b8 100644 --- a/templates/types/streaming/fastapi/app/api/routers/chat.py +++ b/templates/types/streaming/fastapi/app/api/routers/chat.py @@ -13,7 +13,6 @@ from app.api.routers.models import ( SourceNodes, ) from app.api.routers.vercel_response import VercelStreamResponse -from app.api.services.llama_cloud import LLamaCloudFileService from app.engine import get_chat_engine from app.engine.query_filter import generate_filters @@ -22,27 +21,12 @@ chat_router = r = APIRouter() logger = logging.getLogger("uvicorn") -def process_response_nodes( - nodes: List[NodeWithScore], - background_tasks: BackgroundTasks, -): - """ - Start background tasks on the source nodes if needed. - """ - files_to_download = SourceNodes.get_download_files(nodes) - for file in files_to_download: - background_tasks.add_task( - LLamaCloudFileService.download_llamacloud_pipeline_file, file - ) - - # streaming endpoint - delete if not needed @r.post("") async def chat( request: Request, data: ChatData, background_tasks: BackgroundTasks, - chat_engine: BaseChatEngine = Depends(get_chat_engine), ): try: last_message_content = data.get_last_message_content() @@ -85,3 +69,17 @@ async def chat_request( result=Message(role=MessageRole.ASSISTANT, content=response.response), nodes=SourceNodes.from_source_nodes(response.source_nodes), ) + + +def process_response_nodes( + nodes: List[NodeWithScore], + background_tasks: BackgroundTasks, +): + try: + # Start background tasks to download documents from LlamaCloud if needed + from app.engine.service import LLamaCloudFileService + + LLamaCloudFileService.download_files_from_nodes(nodes, background_tasks) + except ImportError: + logger.debug("LlamaCloud is not configured. Skipping post processing of nodes") + pass diff --git a/templates/types/streaming/fastapi/app/api/routers/chat_config.py b/templates/types/streaming/fastapi/app/api/routers/chat_config.py index ebd51ca4..8d926e50 100644 --- a/templates/types/streaming/fastapi/app/api/routers/chat_config.py +++ b/templates/types/streaming/fastapi/app/api/routers/chat_config.py @@ -1,12 +1,15 @@ +import logging import os from fastapi import APIRouter from app.api.routers.models import ChatConfig -from app.api.services.llama_cloud import LLamaCloudFileService + config_router = r = APIRouter() +logger = logging.getLogger("uvicorn") + @r.get("") async def chat_config() -> ChatConfig: @@ -17,21 +20,29 @@ async def chat_config() -> ChatConfig: return ChatConfig(starter_questions=starter_questions) -@r.get("/llamacloud") -async def chat_llama_cloud_config(): - projects = LLamaCloudFileService.get_all_projects_with_pipelines() - pipeline = os.getenv("LLAMA_CLOUD_INDEX_NAME") - project = os.getenv("LLAMA_CLOUD_PROJECT_NAME") - pipeline_config = ( - pipeline - and project - and { - "pipeline": pipeline, - "project": project, +try: + from app.engine.service import LLamaCloudFileService + + logger.info("LlamaCloud is configured. Adding /config/llamacloud route.") + + @r.get("/llamacloud") + async def chat_llama_cloud_config(): + projects = LLamaCloudFileService.get_all_projects_with_pipelines() + pipeline = os.getenv("LLAMA_CLOUD_INDEX_NAME") + project = os.getenv("LLAMA_CLOUD_PROJECT_NAME") + pipeline_config = None + if pipeline and project: + pipeline_config = { + "pipeline": pipeline, + "project": project, + } + return { + "projects": projects, + "pipeline": pipeline_config, } - or None + +except ImportError: + logger.debug( + "LlamaCloud is not configured. Skipping adding /config/llamacloud route." ) - return { - "projects": projects, - "pipeline": pipeline_config, - } + pass diff --git a/templates/types/streaming/fastapi/app/api/routers/models.py b/templates/types/streaming/fastapi/app/api/routers/models.py index c9ea1adb..3d790adb 100644 --- a/templates/types/streaming/fastapi/app/api/routers/models.py +++ b/templates/types/streaming/fastapi/app/api/routers/models.py @@ -147,21 +147,6 @@ class ChatData(BaseModel): return list(set(document_ids)) -class LlamaCloudFile(BaseModel): - file_name: str - pipeline_id: str - - def __eq__(self, other): - if not isinstance(other, LlamaCloudFile): - return NotImplemented - return ( - self.file_name == other.file_name and self.pipeline_id == other.pipeline_id - ) - - def __hash__(self): - return hash((self.file_name, self.pipeline_id)) - - class SourceNodes(BaseModel): id: str metadata: Dict[str, Any] @@ -193,8 +178,8 @@ class SourceNodes(BaseModel): if file_name and url_prefix: # file_name exists and file server is configured pipeline_id = metadata.get("pipeline_id") - if pipeline_id and metadata.get("private") is None: - # file is from LlamaCloud and was not ingested locally + if pipeline_id: + # file is from LlamaCloud file_name = f"{pipeline_id}${file_name}" return f"{url_prefix}/output/llamacloud/{file_name}" is_private = metadata.get("private", "false") == "true" @@ -209,25 +194,6 @@ class SourceNodes(BaseModel): def from_source_nodes(cls, source_nodes: List[NodeWithScore]): return [cls.from_source_node(node) for node in source_nodes] - @staticmethod - def get_download_files(nodes: List[NodeWithScore]) -> Set[LlamaCloudFile]: - source_nodes = SourceNodes.from_source_nodes(nodes) - llama_cloud_files = [ - LlamaCloudFile( - file_name=node.metadata.get("file_name"), - pipeline_id=node.metadata.get("pipeline_id"), - ) - for node in source_nodes - if ( - node.metadata.get("private") - is None # Only download files are from LlamaCloud and were not ingested locally - and node.metadata.get("pipeline_id") is not None - and node.metadata.get("file_name") is not None - ) - ] - # Remove duplicates and return - return set(llama_cloud_files) - class Result(BaseModel): result: Message diff --git a/templates/types/streaming/fastapi/app/api/routers/upload.py b/templates/types/streaming/fastapi/app/api/routers/upload.py index 94f3ce7d..dcb3a08d 100644 --- a/templates/types/streaming/fastapi/app/api/routers/upload.py +++ b/templates/types/streaming/fastapi/app/api/routers/upload.py @@ -1,5 +1,5 @@ import logging -from typing import List +from typing import List, Any from fastapi import APIRouter, HTTPException from pydantic import BaseModel @@ -13,13 +13,15 @@ logger = logging.getLogger("uvicorn") class FileUploadRequest(BaseModel): base64: str + filename: str + params: Any @r.post("") def upload_file(request: FileUploadRequest) -> List[str]: try: logger.info("Processing file") - return PrivateFileService.process_file(request.base64) + return PrivateFileService.process_file(request.filename, request.base64, request.params) except Exception as e: logger.error(f"Error processing file: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Error processing file") diff --git a/templates/types/streaming/fastapi/app/api/services/file.py b/templates/types/streaming/fastapi/app/api/services/file.py index a478570a..20356113 100644 --- a/templates/types/streaming/fastapi/app/api/services/file.py +++ b/templates/types/streaming/fastapi/app/api/services/file.py @@ -1,19 +1,20 @@ import base64 import mimetypes import os +from io import BytesIO from pathlib import Path -from typing import Dict, List +import time +from typing import Any, Dict, List, Tuple from uuid import uuid4 + from app.engine.index import get_index from llama_index.core import VectorStoreIndex from llama_index.core.ingestion import IngestionPipeline from llama_index.core.readers.file.base import ( _try_loading_included_file_formats as get_file_loaders_map, ) -from llama_index.core.readers.file.base import ( - default_file_metadata_func, -) +from llama_index.core.readers.file.base import default_file_metadata_func from llama_index.core.schema import Document from llama_index.indices.managed.llama_cloud.base import LlamaCloudIndex from llama_index.readers.file import FlatReader @@ -41,7 +42,7 @@ class PrivateFileService: PRIVATE_STORE_PATH = "output/uploaded" @staticmethod - def preprocess_base64_file(base64_content: str) -> tuple: + def preprocess_base64_file(base64_content: str) -> Tuple[bytes, str | None]: header, data = base64_content.split(",", 1) mime_type = header.split(";")[0].split(":", 1)[1] extension = mimetypes.guess_extension(mime_type) @@ -78,25 +79,34 @@ class PrivateFileService: return documents @staticmethod - def process_file(base64_content: str) -> List[str]: + def process_file(file_name: str, base64_content: str, params: Any) -> List[str]: file_data, extension = PrivateFileService.preprocess_base64_file(base64_content) - documents = PrivateFileService.store_and_parse_file(file_data, extension) - - # Only process nodes, no store the index - pipeline = IngestionPipeline() - nodes = pipeline.run(documents=documents) # Add the nodes to the index and persist it - current_index = get_index() + current_index = get_index(params) # Insert the documents into the index if isinstance(current_index, LlamaCloudIndex): - # LlamaCloudIndex is a managed index so we don't need to process the nodes - # just insert the documents - for doc in documents: - current_index.insert(doc) + from app.engine.service import LLamaCloudFileService + + project_id = current_index._get_project_id() + pipeline_id = current_index._get_pipeline_id() + # LlamaCloudIndex is a managed index so we can directly use the files + upload_file = (file_name, BytesIO(file_data)) + return [ + LLamaCloudFileService.add_file_to_pipeline( + project_id, + pipeline_id, + upload_file, + custom_metadata={ + # Set private=true to mark the document as private user docs (required for filtering) + "private": "true", + }, + ) + ] else: - # Only process nodes, no store the index + # First process documents into nodes + documents = PrivateFileService.store_and_parse_file(file_data, extension) pipeline = IngestionPipeline() nodes = pipeline.run(documents=documents) @@ -109,5 +119,5 @@ class PrivateFileService: persist_dir=os.environ.get("STORAGE_DIR", "storage") ) - # Return the document ids - return [doc.doc_id for doc in documents] + # Return the document ids + return [doc.doc_id for doc in documents] diff --git a/templates/types/streaming/fastapi/app/api/services/llama_cloud.py b/templates/types/streaming/fastapi/app/api/services/llama_cloud.py deleted file mode 100644 index 852ae7ce..00000000 --- a/templates/types/streaming/fastapi/app/api/services/llama_cloud.py +++ /dev/null @@ -1,114 +0,0 @@ -import logging -import os -from typing import Any, Dict, List, Optional - -import requests -from app.api.routers.models import LlamaCloudFile - -logger = logging.getLogger("uvicorn") - - -class LLamaCloudFileService: - LLAMA_CLOUD_URL = "https://cloud.llamaindex.ai/api/v1" - LOCAL_STORE_PATH = "output/llamacloud" - - DOWNLOAD_FILE_NAME_TPL = "{pipeline_id}${filename}" - - @classmethod - def get_all_projects(cls) -> List[Dict[str, Any]]: - url = f"{cls.LLAMA_CLOUD_URL}/projects" - return cls._make_request(url) - - @classmethod - def get_all_pipelines(cls) -> List[Dict[str, Any]]: - url = f"{cls.LLAMA_CLOUD_URL}/pipelines" - return cls._make_request(url) - - @classmethod - def get_all_projects_with_pipelines(cls) -> List[Dict[str, Any]]: - try: - projects = cls.get_all_projects() - pipelines = cls.get_all_pipelines() - return [ - { - **project, - "pipelines": [p for p in pipelines if p["project_id"] == project["id"]], - } - for project in projects - ] - except Exception as error: - logger.error(f"Error listing projects and pipelines: {error}") - return [] - - @classmethod - def _get_files(cls, pipeline_id: str) -> List[Dict[str, Any]]: - url = f"{cls.LLAMA_CLOUD_URL}/pipelines/{pipeline_id}/files" - return cls._make_request(url) - - @classmethod - def _get_file_detail(cls, project_id: str, file_id: str) -> Dict[str, Any]: - url = f"{cls.LLAMA_CLOUD_URL}/files/{file_id}/content?project_id={project_id}" - return cls._make_request(url) - - @classmethod - def _download_file(cls, url: str, local_file_path: str): - logger.info(f"Downloading file to {local_file_path}") - # Create directory if it doesn't exist - os.makedirs(cls.LOCAL_STORE_PATH, exist_ok=True) - # Download the file - with requests.get(url, stream=True) as r: - r.raise_for_status() - with open(local_file_path, "wb") as f: - for chunk in r.iter_content(chunk_size=8192): - f.write(chunk) - logger.info("File downloaded successfully") - - @classmethod - def download_llamacloud_pipeline_file( - cls, - file: LlamaCloudFile, - force_download: bool = False, - ): - file_name = file.file_name - pipeline_id = file.pipeline_id - - # Check is the file already exists - downloaded_file_path = cls.get_file_path(file_name, pipeline_id) - if os.path.exists(downloaded_file_path) and not force_download: - logger.debug(f"File {file_name} already exists in local storage") - return - try: - logger.info(f"Downloading file {file_name} for pipeline {pipeline_id}") - files = cls._get_files(pipeline_id) - if not files or not isinstance(files, list): - raise Exception("No files found in LlamaCloud") - for file_entry in files: - if file_entry["name"] == file_name: - file_id = file_entry["file_id"] - project_id = file_entry["project_id"] - file_detail = cls._get_file_detail(project_id, file_id) - cls._download_file(file_detail["url"], downloaded_file_path) - break - except Exception as error: - logger.info(f"Error fetching file from LlamaCloud: {error}") - - @classmethod - def get_file_name(cls, name: str, pipeline_id: str) -> str: - return cls.DOWNLOAD_FILE_NAME_TPL.format(pipeline_id=pipeline_id, filename=name) - - @classmethod - def get_file_path(cls, name: str, pipeline_id: str) -> str: - return os.path.join(cls.LOCAL_STORE_PATH, cls.get_file_name(name, pipeline_id)) - - @staticmethod - def _make_request( - url: str, data=None, headers: Optional[Dict] = None, method: str = "get" - ): - if headers is None: - headers = { - "Accept": "application/json", - "Authorization": f'Bearer {os.getenv("LLAMA_CLOUD_API_KEY")}', - } - response = requests.request(method, url, headers=headers, data=data) - response.raise_for_status() - return response.json() diff --git a/templates/types/streaming/fastapi/pyproject.toml b/templates/types/streaming/fastapi/pyproject.toml index e897faff..f622b1d8 100644 --- a/templates/types/streaming/fastapi/pyproject.toml +++ b/templates/types/streaming/fastapi/pyproject.toml @@ -9,7 +9,7 @@ readme = "README.md" generate = "app.engine.generate:generate_datasource" [tool.poetry.dependencies] -python = "^3.11,<3.12" +python = "^3.11,<4.0" fastapi = "^0.109.1" uvicorn = { extras = ["standard"], version = "^0.23.2" } python-dotenv = "^1.0.0" diff --git a/templates/types/streaming/nextjs/app/api/chat/upload/route.ts b/templates/types/streaming/nextjs/app/api/chat/upload/route.ts index 08b1d419..d5aeb505 100644 --- a/templates/types/streaming/nextjs/app/api/chat/upload/route.ts +++ b/templates/types/streaming/nextjs/app/api/chat/upload/route.ts @@ -10,14 +10,15 @@ export const dynamic = "force-dynamic"; export async function POST(request: NextRequest) { try { - const { base64 }: { base64: string } = await request.json(); + const { base64, params }: { base64: string; params?: any } = + await request.json(); if (!base64) { return NextResponse.json( { error: "base64 is required in the request body" }, { status: 400 }, ); } - const index = await getDataSource(); + const index = await getDataSource(params); if (!index) { throw new Error( `StorageContext is empty - call 'npm run generate' to generate the storage first`, diff --git a/templates/types/streaming/nextjs/app/components/chat-section.tsx b/templates/types/streaming/nextjs/app/components/chat-section.tsx index 619ab9a4..7fcf188d 100644 --- a/templates/types/streaming/nextjs/app/components/chat-section.tsx +++ b/templates/types/streaming/nextjs/app/components/chat-section.tsx @@ -48,6 +48,7 @@ export default function ChatSection() { messages={messages} append={append} setInput={setInput} + requestParams={{ params: requestData }} setRequestData={setRequestData} /> </div> diff --git a/templates/types/streaming/nextjs/app/components/ui/chat/chat-input.tsx b/templates/types/streaming/nextjs/app/components/ui/chat/chat-input.tsx index 3f758205..9d1cb44e 100644 --- a/templates/types/streaming/nextjs/app/components/ui/chat/chat-input.tsx +++ b/templates/types/streaming/nextjs/app/components/ui/chat/chat-input.tsx @@ -71,7 +71,8 @@ export default function ChatInput( await uploadFile(file, props.requestParams); props.onFileUpload?.(file); } catch (error: any) { - props.onFileError?.(error.message); + const onFileUploadError = props.onFileError || window.alert; + onFileUploadError(error.message); } }; diff --git a/templates/types/streaming/nextjs/app/components/ui/chat/hooks/use-file.ts b/templates/types/streaming/nextjs/app/components/ui/chat/hooks/use-file.ts index 2c2c34bc..cc49169a 100644 --- a/templates/types/streaming/nextjs/app/components/ui/chat/hooks/use-file.ts +++ b/templates/types/streaming/nextjs/app/components/ui/chat/hooks/use-file.ts @@ -49,9 +49,10 @@ export function useFile() { }; const uploadContent = async ( - base64: string, + file: File, requestParams: any = {}, ): Promise<string[]> => { + const base64 = await readContent({ file, asUrl: true }); const uploadAPI = `${backend}/api/chat/upload`; const response = await fetch(uploadAPI, { method: "POST", @@ -59,8 +60,9 @@ export function useFile() { "Content-Type": "application/json", }, body: JSON.stringify({ - base64, ...requestParams, + base64, + filename: file.name, }), }); if (!response.ok) throw new Error("Failed to upload document."); @@ -128,8 +130,7 @@ export function useFile() { }); } default: { - const base64 = await readContent({ file, asUrl: true }); - const ids = await uploadContent(base64, requestParams); + const ids = await uploadContent(file, requestParams); return addDoc({ ...newDoc, content: { -- GitLab