From 29ea69362e87edc20af2378970ad4128876e1ecc Mon Sep 17 00:00:00 2001 From: Jan Chorowski <janchorowski@users.noreply.github.com> Date: Fri, 15 Mar 2024 03:50:28 +0100 Subject: [PATCH] Update integratins with Pathway: retriever and reader (#11924) --- .../data_connectors/PathwayReaderDemo.ipynb | 204 +++++++++--------- .../retrievers/pathway_retriever.ipynb | 199 +++++++---------- .../llama_index/readers/pathway/base.py | 113 +++++++++- .../pyproject.toml | 3 +- .../llama_index/retrievers/pathway/base.py | 201 ++++++++--------- .../pyproject.toml | 3 +- 6 files changed, 380 insertions(+), 343 deletions(-) diff --git a/docs/examples/data_connectors/PathwayReaderDemo.ipynb b/docs/examples/data_connectors/PathwayReaderDemo.ipynb index 0f0b58e746..b9c9e83942 100644 --- a/docs/examples/data_connectors/PathwayReaderDemo.ipynb +++ b/docs/examples/data_connectors/PathwayReaderDemo.ipynb @@ -22,17 +22,16 @@ "\n", "This notebook demonstrates how to set up a live data indexing pipeline. You can query the results of this pipeline from your LLM application in the same manner as you would a regular reader. However, under the hood, Pathway updates the index on each data change giving you always up-to-date answers.\n", "\n", - "In this notebook, we will use a simple document processing pipeline that:\n", + "In this notebook, we will first connect the `llama_index.readers.pathway.PathwayReader` reader to a [public demo document processing pipeline](https://pathway.com/solutions/ai-pipelines#try-it-out) that:\n", "\n", - "1. Monitors several data sources (files, S3 folders, cloud storage) for data changes.\n", - "2. Parses, splits and embeds the documents using Llama-index methods.\n", - "3. Builds a vector index for the data.\n", + "1. Monitors several cloud data sources for data changes.\n", + "2. Builds a vector index for the data.\n", "\n", - "We will connect to the index using `llama_index.readers.pathway.PathwayReader` reader, which implements the `load_data` interface.\n", + "To have your own document processing pipeline check the [hosted offering](https://pathway.com/solutions/ai-pipelines) or [build your own](https://pathway.com/developers/user-guide/llm-xpack/vectorstore_pipeline/) by following this notebook. \n", "\n", "The basic pipeline described in this document allows to effortlessly build a simple index of files stored in a cloud location. However, Pathway provides everything needed to build realtime data pipelines and apps, including SQL-like able operations such as groupby-reductions and joins between disparate data sources, time-based grouping and windowing of data, and a wide array of connectors. \n", "\n", - "For more details about Pathway data ingestion pipeline and vector store, visit [vector store pipeline](https://pathway.com/developers/showcases/vectorstore_pipeline)." + "For more details about Pathway data ingestion pipeline and vector store, visit [vector store pipeline](https://pathway.com/developers/user-guide/llm-xpack/vectorstore_pipeline/)." ] }, { @@ -41,7 +40,7 @@ "source": [ "## Prerequisites\n", "\n", - "Install `pathway` and `llama-index` packages. Then download sample data." + "Install the `llama-index-readers-pathway` integration" ] }, { @@ -50,28 +49,14 @@ "metadata": {}, "outputs": [], "source": [ - "%pip install llama-index-readers-pathway\n", - "%pip install llama-index-embeddings-openai" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!pip install pathway\n", - "!pip install llama-index\n", - "\n", - "!mkdir -p 'data/'\n", - "!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'" + "%pip install llama-index-readers-pathway" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Configure logging." + "Configure logging" ] }, { @@ -98,15 +83,7 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdin", - "output_type": "stream", - "text": [ - "OpenAI API Key: ········\n" - ] - } - ], + "outputs": [], "source": [ "import getpass\n", "import os\n", @@ -120,7 +97,92 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Define data sources tracked by Pathway" + "## Create the reader and connect to a public pipeline" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To instantiate and configure `PathwayReader` you need to provide either the `url` or the `host` and `port` of your document indexing pipeline. In the code below we use a publicly available [demo pipeline](https://pathway.com/solutions/ai-pipelines#try-it-out), which REST API you can access at `https://demo-document-indexing.pathway.stream`. This demo ingests documents from [Google Drive](https://drive.google.com/drive/u/0/folders/1cULDv2OaViJBmOfG5WB0oWcgayNrGtVs) and [Sharepoint](https://navalgo.sharepoint.com/sites/ConnectorSandbox/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FConnectorSandbox%2FShared%20Documents%2FIndexerSandbox&p=true&ga=1) and maintains an index for retrieving documents." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from llama_index.readers.pathway import PathwayReader\n", + "\n", + "reader = PathwayReader(url=\"https://demo-document-indexing.pathway.stream\")\n", + "# let us search with some text\n", + "reader.load_data(query_text=\"What is Pathway\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create a summary index with llama-index" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "docs = reader.load_data(query_text=\"What is Pathway\", k=2)\n", + "from llama_index.core import SummaryIndex\n", + "\n", + "index = SummaryIndex.from_documents(docs)\n", + "query_engine = index.as_query_engine()\n", + "response = query_engine.query(\"What does Pathway do?\")\n", + "print(response)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Building your own data processing pipeline" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Prerequisites\n", + "\n", + "Install `pathway` package. Then download sample data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%pip install pathway\n", + "%pip install llama-index-embeddings-openai" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!mkdir -p 'data/'\n", + "!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define data sources tracked by Pathway" ] }, { @@ -161,7 +223,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Create the document indexing pipeline" + "### Create the document indexing pipeline" ] }, { @@ -177,29 +239,9 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "<Thread(Thread-5 (run), started 140336003253824)>" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "======== Running on http://127.0.0.1:8754 ========\n", - "(Press CTRL+C to quit)\n", - "148\n" - ] - } - ], + "outputs": [], "source": [ - "from llama_index.core.retrievers import PathwayVectorServer\n", + "from pathway.xpacks.llm.vector_store import VectorStoreServer\n", "from llama_index.embeddings.openai import OpenAIEmbedding\n", "from llama_index.core.node_parser import TokenTextSplitter\n", "\n", @@ -214,7 +256,7 @@ " embed_model,\n", "]\n", "\n", - "processing_pipeline = PathwayVectorServer(\n", + "processing_pipeline = VectorStoreServer.from_llamaindex_components(\n", " *data_sources,\n", " transformations=transformations_example,\n", ")\n", @@ -234,28 +276,14 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Create the Reader" + "### Connect the reader to the custom pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "[Document(id_='cf4217ed-6ba8-4ac7-8125-e3ce847244ef', embedding=None, metadata={'created_at': None, 'modified_at': 1703700883, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/data_connectors/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='92c84c3369fcd527faeee011f1106d054ea385f5dfa725f39999fd1c2f44a3db', text=\"Pathway is an open framework for high-throughput and low-latency real-time data processing. It is used to create Python code which seamlessly combines batch processing, streaming, and real-time API's for LLM apps. Pathway's distributed runtime (🦀-ðŸ) provides fresh results of your data pipelines whenever new inputs and requests are received.\\n\\nIn the first place, Pathway was designed to be a life-saver (or at least a time-saver) for Python developers and ML/AI engineers faced with live data sources, where you need to react quickly to fresh data. Still, Pathway is a powerful tool that can be used for a lot of things. If you want to do streaming in Python,\", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\\n\\n{content}', metadata_template='{key}: {value}', metadata_seperator='\\n'),\n", - " Document(id_='ec25ff55-f59d-406a-8319-a24cccdd9638', embedding=None, metadata={'created_at': None, 'modified_at': 1703700883, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/data_connectors/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='e72e4a7e14dc2d6d75b1dc2452609d08eb3743fb7c707d88af684870456dd06e', text='Resources\\n\\nSee also: Pathway Documentation (https://pathway.com/developers/) webpage (including API Docs).\\n\\n### Videos about Pathway<a id=\"videos-about-pathway\"></a>\\n[â–¶ï¸ Building an LLM Application without a vector database](https://www.youtube.com/watch?v=kcrJSk00duw) - by [Jan Chorowski](https://scholar.google.com/citations?user=Yc94070AAAAJ) (7min 56s)\\n\\n[â–¶ï¸ Linear regression on a Kafka Stream](https://vimeo.com/805069039) - by [Richard Pelgrim](https://twitter.com/richardpelgrim) (7min 53s)\\n\\n[â–¶ï¸', start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\\n\\n{content}', metadata_template='{key}: {value}', metadata_seperator='\\n'),\n", - " Document(id_='a4710236-8362-4181-8910-04de01a6f203', embedding=None, metadata={'created_at': None, 'modified_at': 1703700883, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/data_connectors/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='1f03446cf0d7c1d39ec54932ca40d7fd45715db5ccd14ca93aea0aa20633d59f', text=\"If you want to do streaming in Python, build an AI data pipeline, or if you are looking for your next Python data processing framework, keep reading.\\n\\nPathway provides a high-level programming interface in Python for defining data transformations, aggregations, and other operations on data streams.\\nWith Pathway, you can effortlessly design and deploy sophisticated data workflows that efficiently handle high volumes of data in real time.\\n\\nPathway is interoperable with various data sources and sinks such as Kafka, CSV files, SQL/noSQL databases, and REST API's, allowing you to connect and process data from different storage systems.\\n\\nTypical use-cases of Pathway include realtime data processing, ETL (Extract, Transform, Load) pipelines, data analytics,\", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\\n\\n{content}', metadata_template='{key}: {value}', metadata_seperator='\\n'),\n", - " Document(id_='668429b4-594f-491f-88c9-3504d2b0cc65', embedding=None, metadata={'created_at': None, 'modified_at': 1703700883, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/data_connectors/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='b8e5829ef270c804df92834d70467c328a2c9b7afa3688132c35116be07d6286', text=\"Get Help\\n\\nIf you have any questions, issues, or just want to chat about Pathway, we're here to help! Feel free to:\\n- Check out the documentation in https://pathway.com/developers/ for detailed information.\\n- Reach out to us via email at contact@pathway.com.\\n\\nOur team is always happy to help you and ensure that you get the most out of Pathway.\\nIf you would like to better understand how best to use Pathway in your project, please don't hesitate to reach out to us.\", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\\n\\n{content}', metadata_template='{key}: {value}', metadata_seperator='\\n')]" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "from llama_index.readers.pathway import PathwayReader\n", "\n", @@ -263,36 +291,6 @@ "# let us search with some text\n", "reader.load_data(query_text=\"What is Pathway\")" ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Create a summary index with llama-index" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Pathway is a platform that offers reactive data processing. It provides detailed information and documentation for users to better understand and utilize its features. Additionally, Pathway has a support team that is available to assist users with any questions or issues they may have.\n" - ] - } - ], - "source": [ - "docs = reader.load_data(query_text=\"some search input\", k=2)\n", - "from llama_index.core import SummaryIndex\n", - "\n", - "index = SummaryIndex.from_documents(docs)\n", - "query_engine = index.as_query_engine()\n", - "response = query_engine.query(\"What does Pathway do?\")\n", - "print(response)" - ] } ], "metadata": { diff --git a/docs/examples/retrievers/pathway_retriever.ipynb b/docs/examples/retrievers/pathway_retriever.ipynb index bfbc87085c..69c4935139 100644 --- a/docs/examples/retrievers/pathway_retriever.ipynb +++ b/docs/examples/retrievers/pathway_retriever.ipynb @@ -20,15 +20,16 @@ "source": [ "> [Pathway](https://pathway.com/) is an open data processing framework. It allows you to easily develop data transformation pipelines and Machine Learning applications that work with live data sources and changing data.\n", "\n", - "This notebook demonstrates how to set up a live data indexing pipeline. You can query the results of this pipeline from your LLM application using the provided `PathwayRetriever`. However, under the hood, Pathway updates the index on each data change giving you always up-to-date answers.\n", + "This notebook demonstrates how to use a live data indexing pipeline with `LlamaIndex`. You can query the results of this pipeline from your LLM application using the provided `PathwayRetriever`. However, under the hood, Pathway updates the index on each data change giving you always up-to-date answers.\n", "\n", - "In this notebook, we will use a simple document processing pipeline that:\n", + "In this notebook, we will use a [public demo document processing pipeline](https://pathway.com/solutions/ai-pipelines#try-it-out) that:\n", "\n", - "1. Monitors several data sources (files, S3 folders, cloud storage) for data changes.\n", - "2. Parses, splits and embeds the documents using Llama-index methods.\n", - "3. Builds a vector index for the data.\n", + "1. Monitors several cloud data sources for data changes.\n", + "2. Builds a vector index for the data.\n", "\n", - "We will connect to the index using `llama_index.retrievers.PathwayRetriever` retriever, which implements the `retrieve` interface.\n", + "To have your own document processing pipeline check the [hosted offering](https://pathway.com/solutions/ai-pipelines) or [build your own](https://pathway.com/developers/user-guide/llm-xpack/vectorstore_pipeline/) by following this notebook.\n", + "\n", + "We will connect to the index using `llama_index.retrievers.pathway.PathwayRetriever` retriever, which implements the `retrieve` interface.\n", "\n", "The basic pipeline described in this document allows to effortlessly build a simple index of files stored in a cloud location. However, Pathway provides everything needed to build realtime data pipelines and apps, including SQL-like able operations such as groupby-reductions and joins between disparate data sources, time-based grouping and windowing of data, and a wide array of connectors. \n", "\n", @@ -41,7 +42,7 @@ "source": [ "## Prerequisites\n", "\n", - "Install `pathway` and `llama-index` packages. Then download sample data." + "To use `PathwayRetrievier` you must install `llama-index-retrievers-pathway` package. " ] }, { @@ -50,8 +51,21 @@ "metadata": {}, "outputs": [], "source": [ - "%pip install llama-index-embeddings-openai\n", - "%pip install llama-index-retrievers-pathway" + "!pip install llama-index-retrievers-pathway" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create Retriever for llama-index" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To instantiate and configure `PathwayRetriever` you need to provide either the `url` or the `host` and `port` of your document indexing pipeline. In the code below we use a publicly available [demo pipeline](https://pathway.com/solutions/ai-pipelines#try-it-out), which REST API you can access at `https://demo-document-indexing.pathway.stream`. This demo ingests documents from [Google Drive](https://drive.google.com/drive/u/0/folders/1cULDv2OaViJBmOfG5WB0oWcgayNrGtVs) and [Sharepoint](https://navalgo.sharepoint.com/sites/ConnectorSandbox/Shared%20Documents/Forms/AllItems.aspx?id=%2Fsites%2FConnectorSandbox%2FShared%20Documents%2FIndexerSandbox&p=true&ga=1) and maintains an index for retrieving documents." ] }, { @@ -60,47 +74,65 @@ "metadata": {}, "outputs": [], "source": [ - "!pip install pathway\n", - "!pip install llama-index\n", + "from llama_index.retrievers.pathway import PathwayRetriever\n", "\n", - "!mkdir -p 'data/'\n", - "!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'" + "retriever = PathwayRetriever(\n", + " url=\"https://demo-document-indexing.pathway.stream\"\n", + ")\n", + "retriever.retrieve(str_or_query_bundle=\"what is pathway\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Set up your OpenAI API key." + "**Your turn!** [Get your pipeline](https://pathway.com/solutions/ai-pipelines) or upload [new documents](https://chat-realtime-sharepoint-gdrive.demo.pathway.com/) to the demo pipeline and retry the query!" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Use in Query Engine" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "OpenAI API Key: ········\n" - ] - } - ], + "outputs": [], "source": [ - "import getpass\n", - "import os\n", + "from llama_index.core.query_engine import RetrieverQueryEngine\n", "\n", - "# omit if embedder of choice is not OpenAI\n", - "if \"OPENAI_API_KEY\" not in os.environ:\n", - " os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"OpenAI API Key:\")" + "query_engine = RetrieverQueryEngine.from_args(\n", + " retriever,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "response = query_engine.query(\"Tell me about Pathway\")\n", + "print(str(response))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Building your own data processing pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Configure logging" + "### Prerequisites\n", + "\n", + "Install `pathway` package. Then download sample data." ] }, { @@ -109,18 +141,25 @@ "metadata": {}, "outputs": [], "source": [ - "import logging\n", - "import sys\n", - "\n", - "logging.basicConfig(stream=sys.stdout, level=logging.ERROR)\n", - "logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))" + "%pip install pathway\n", + "%pip install llama-index-embeddings-openai" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!mkdir -p 'data/'\n", + "!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Define data sources tracked by Pathway" + "### Define data sources tracked by Pathway" ] }, { @@ -161,7 +200,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Create the document indexing pipeline" + "### Create the document indexing pipeline" ] }, { @@ -177,29 +216,9 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "<Thread(Thread-5 (run), started 140158811412032)>" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "======== Running on http://127.0.0.1:8754 ========\n", - "(Press CTRL+C to quit)\n", - "148\n" - ] - } - ], + "outputs": [], "source": [ - "from llama_index.core.retrievers import PathwayVectorServer\n", + "from pathway.xpacks.llm.vector_store import VectorStoreServer\n", "from llama_index.embeddings.openai import OpenAIEmbedding\n", "from llama_index.core.node_parser import TokenTextSplitter\n", "\n", @@ -214,7 +233,7 @@ " embed_model,\n", "]\n", "\n", - "processing_pipeline = PathwayVectorServer(\n", + "processing_pipeline = VectorStoreServer.from_llamaindex_components(\n", " *data_sources,\n", " transformations=transformations_example,\n", ")\n", @@ -234,78 +253,20 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Create Retriever for llama-index" + "### Connect the retriever to the custom pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "[NodeWithScore(node=TextNode(id_='7f507732-bf58-4a45-948c-dc6bb762812e', embedding=None, metadata={'created_at': None, 'modified_at': 1703700445, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/retrievers/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='28f5c0d0e8cae502a0c4d7a72947449c6297f56196fa5c830ebb889026e4ccb9', text=\"If you want to do streaming in Python, build an AI data pipeline, or if you are looking for your next Python data processing framework, keep reading.\\n\\nPathway provides a high-level programming interface in Python for defining data transformations, aggregations, and other operations on data streams.\\nWith Pathway, you can effortlessly design and deploy sophisticated data workflows that efficiently handle high volumes of data in real time.\\n\\nPathway is interoperable with various data sources and sinks such as Kafka, CSV files, SQL/noSQL databases, and REST API's, allowing you to connect and process data from different storage systems.\\n\\nTypical use-cases of Pathway include realtime data processing, ETL (Extract, Transform, Load) pipelines, data analytics,\", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\\n\\n{content}', metadata_template='{key}: {value}', metadata_seperator='\\n'), score=0.16584277747494724),\n", - " NodeWithScore(node=TextNode(id_='274b0481-dedd-41a8-8d94-d1319cd4bc2f', embedding=None, metadata={'created_at': None, 'modified_at': 1703700445, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/retrievers/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='1c57f8f0d152933320cca745a42239f5ef7c70966c1e5b52fc83dfd515243012', text=\"Pathway is an open framework for high-throughput and low-latency real-time data processing. It is used to create Python code which seamlessly combines batch processing, streaming, and real-time API's for LLM apps. Pathway's distributed runtime (🦀-ðŸ) provides fresh results of your data pipelines whenever new inputs and requests are received.\\n\\nIn the first place, Pathway was designed to be a life-saver (or at least a time-saver) for Python developers and ML/AI engineers faced with live data sources, where you need to react quickly to fresh data. Still, Pathway is a powerful tool that can be used for a lot of things. If you want to do streaming in Python,\", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\\n\\n{content}', metadata_template='{key}: {value}', metadata_seperator='\\n'), score=0.1441072441651936),\n", - " NodeWithScore(node=TextNode(id_='be82f00d-553a-47df-b639-5fc15ee7bdc8', embedding=None, metadata={'created_at': None, 'modified_at': 1703700445, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/retrievers/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='625d489d1a465cbfcb4f50fb84a5e29ca07cffeb3c5ef900e530822e9ff8cf7f', text=\"Get Help\\n\\nIf you have any questions, issues, or just want to chat about Pathway, we're here to help! Feel free to:\\n- Check out the documentation in https://pathway.com/developers/ for detailed information.\\n- Reach out to us via email at contact@pathway.com.\\n\\nOur team is always happy to help you and ensure that you get the most out of Pathway.\\nIf you would like to better understand how best to use Pathway in your project, please don't hesitate to reach out to us.\", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\\n\\n{content}', metadata_template='{key}: {value}', metadata_seperator='\\n'), score=0.186929683249915)]" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "from llama_index.retrievers.pathway import PathwayRetriever\n", "\n", "retriever = PathwayRetriever(host=PATHWAY_HOST, port=PATHWAY_PORT)\n", "retriever.retrieve(str_or_query_bundle=\"what is pathway\")" ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "**Your turn!** Now edit the contents of the source file, or upload a new file to the `./data` directory and repeat the query - the set of retrieved documents will reflect the changes!" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Use in Query Engine" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from llama_index.core.query_engine import RetrieverQueryEngine\n", - "\n", - "query_engine = RetrieverQueryEngine.from_args(\n", - " retriever,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Pathway is an open framework for high-throughput and low-latency real-time data processing. It provides a high-level programming interface in Python for defining data transformations, aggregations, and other operations on data streams. With Pathway, you can effortlessly design and deploy sophisticated data workflows that efficiently handle high volumes of data in real time. It is interoperable with various data sources and sinks such as Kafka, CSV files, SQL/noSQL databases, and REST API's, allowing you to connect and process data from different storage systems. Pathway was designed to be a life-saver for Python developers and ML/AI engineers faced with live data sources, where quick reactions to fresh data are necessary. It can be used for a variety of purposes including streaming in Python, building AI data pipelines, and general data processing tasks. If you have any questions or need assistance with Pathway, you can check out the documentation on the official website or reach out to the team via email.\n" - ] - } - ], - "source": [ - "response = query_engine.query(\"Tell me about Pathway\")\n", - "print(str(response))" - ] } ], "metadata": { diff --git a/llama-index-integrations/readers/llama-index-readers-pathway/llama_index/readers/pathway/base.py b/llama-index-integrations/readers/llama-index-readers-pathway/llama_index/readers/pathway/base.py index 5cfaec3682..ac87deb13d 100644 --- a/llama-index-integrations/readers/llama-index-readers-pathway/llama_index/readers/pathway/base.py +++ b/llama-index-integrations/readers/llama-index-readers-pathway/llama_index/readers/pathway/base.py @@ -1,11 +1,108 @@ """Pathway reader.""" -from typing import List, Optional, Union +import json +from typing import List, Optional +import requests from llama_index.core.readers.base import BaseReader from llama_index.core.schema import Document +# Copied from https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/vector_store.py +# to remove dependency on Pathway library when only the client is used. +class _VectorStoreClient: + def __init__( + self, + host: Optional[str] = None, + port: Optional[int] = None, + url: Optional[str] = None, + ): + """ + A client you can use to query :py:class:`VectorStoreServer`. + + Please provide either the `url`, or `host` and `port`. + + Args: + - host: host on which `:py:class:`VectorStoreServer` listens + - port: port on which `:py:class:`VectorStoreServer` listens + - url: url at which `:py:class:`VectorStoreServer` listens + """ + err = "Either (`host` and `port`) or `url` must be provided, but not both." + if url is not None: + if host or port: + raise ValueError(err) + self.url = url + else: + if host is None: + raise ValueError(err) + port = port or 80 + self.url = f"http://{host}:{port}" + + def query( + self, query: str, k: int = 3, metadata_filter: Optional[str] = None + ) -> List[dict]: + """ + Perform a query to the vector store and fetch results. + + Args: + - query: + - k: number of documents to be returned + - metadata_filter: optional string representing the metadata filtering query + in the JMESPath format. The search will happen only for documents + satisfying this filtering. + """ + data = {"query": query, "k": k} + if metadata_filter is not None: + data["metadata_filter"] = metadata_filter + url = self.url + "/v1/retrieve" + response = requests.post( + url, + data=json.dumps(data), + headers={"Content-Type": "application/json"}, + timeout=3, + ) + return response.json() + + # Make an alias + __call__ = query + + def get_vectorstore_statistics(self) -> dict: + """Fetch basic statistics about the vector store.""" + url = self.url + "/v1/statistics" + response = requests.post( + url, + json={}, + headers={"Content-Type": "application/json"}, + ) + return response.json() + + def get_input_files( + self, + metadata_filter: Optional[str] = None, + filepath_globpattern: Optional[str] = None, + ) -> list: + """ + Fetch information on documents in the the vector store. + + Args: + metadata_filter: optional string representing the metadata filtering query + in the JMESPath format. The search will happen only for documents + satisfying this filtering. + filepath_globpattern: optional glob pattern specifying which documents + will be searched for this query. + """ + url = self.url + "/v1/inputs" + response = requests.post( + url, + json={ + "metadata_filter": metadata_filter, + "filepath_globpattern": filepath_globpattern, + }, + headers={"Content-Type": "application/json"}, + ) + return response.json() + + class PathwayReader(BaseReader): """Pathway reader. @@ -20,14 +117,14 @@ class PathwayReader(BaseReader): llamaindex.retriever.pathway.PathwayVectorServer """ - def __init__(self, host: str, port: Union[str, int]): + def __init__( + self, + host: Optional[str] = None, + port: Optional[int] = None, + url: Optional[str] = None, + ): """Initializing the Pathway reader client.""" - import_err_msg = "`pathway` package not found, please run `pip install pathway`" - try: - from pathway.xpacks.llm.vector_store import VectorStoreClient - except ImportError: - raise ImportError(import_err_msg) - self.client = VectorStoreClient(host, port) + self.client = _VectorStoreClient(host, port, url) def load_data( self, diff --git a/llama-index-integrations/readers/llama-index-readers-pathway/pyproject.toml b/llama-index-integrations/readers/llama-index-readers-pathway/pyproject.toml index 0ae53ebbe3..7a9898a514 100644 --- a/llama-index-integrations/readers/llama-index-readers-pathway/pyproject.toml +++ b/llama-index-integrations/readers/llama-index-readers-pathway/pyproject.toml @@ -27,11 +27,12 @@ exclude = ["**/BUILD"] license = "MIT" name = "llama-index-readers-pathway" readme = "README.md" -version = "0.1.2" +version = "0.1.3" [tool.poetry.dependencies] python = ">=3.8.1,<4.0" llama-index-core = "^0.10.1" +requests = "^2.31.0" [tool.poetry.group.dev.dependencies] ipython = "8.10.0" diff --git a/llama-index-integrations/retrievers/llama-index-retrievers-pathway/llama_index/retrievers/pathway/base.py b/llama-index-integrations/retrievers/llama-index-retrievers-pathway/llama_index/retrievers/pathway/base.py index 28ae8cc5d0..9da2c62939 100644 --- a/llama-index-integrations/retrievers/llama-index-retrievers-pathway/llama_index/retrievers/pathway/base.py +++ b/llama-index-integrations/retrievers/llama-index-retrievers-pathway/llama_index/retrievers/pathway/base.py @@ -1,136 +1,119 @@ """Pathway Retriever.""" -import logging -from typing import Any, Callable, List, Optional, Tuple, Union +import json +from typing import List, Optional + +import requests from llama_index.core.base.base_retriever import BaseRetriever -from llama_index.core.base.embeddings.base import BaseEmbedding from llama_index.core.callbacks.base import CallbackManager from llama_index.core.constants import DEFAULT_SIMILARITY_TOP_K -from llama_index.core.ingestion.pipeline import run_transformations from llama_index.core.schema import ( - BaseNode, NodeWithScore, QueryBundle, TextNode, - TransformComponent, ) -logger = logging.getLogger(__name__) - - -def node_transformer(x: str) -> List[BaseNode]: - return [TextNode(text=x)] - - -def node_to_pathway(x: BaseNode) -> List[Tuple[str, dict]]: - return [(node.text, node.extra_info) for node in x] - - -class PathwayVectorServer: - """ - Build an autoupdating document indexing pipeline - for approximate nearest neighbor search. - - Args: - docs (list): Pathway tables, may be pw.io connectors or custom tables. - - transformations (List[TransformComponent]): list of transformation steps, has to - include embedding as last step, optionally splitter and other - TransformComponent in the middle - - parser (Callable[[bytes], list[tuple[str, dict]]]): optional, callable that - parses file contents into a list of documents. If None, defaults to `uft-8` decoding of the file contents. Defaults to None. - """ +# Copied from https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/vector_store.py +# to remove dependency on Pathway library when only the client is used. +class _VectorStoreClient: def __init__( self, - *docs: Any, - transformations: List[Union[TransformComponent, Callable[[Any], Any]]], - parser: Optional[Callable[[bytes], List[Tuple[str, dict]]]] = None, - **kwargs: Any, - ) -> None: - try: - from pathway.xpacks.llm import vector_store - except ImportError: - raise ImportError( - "Could not import pathway python package. " - "Please install it with `pip install pathway`." - ) - - if transformations is None or not transformations: - raise ValueError("Transformations list cannot be None or empty.") - - if not isinstance(transformations[-1], BaseEmbedding): - raise ValueError( - f"Last step of transformations should be an instance of {BaseEmbedding.__name__}, " - f"found {type(transformations[-1])}." - ) - - embedder: BaseEmbedding = transformations.pop() # type: ignore - - def embedding_callable(x: str) -> List[float]: - return embedder.get_text_embedding(x) + host: Optional[str] = None, + port: Optional[int] = None, + url: Optional[str] = None, + ): + """ + A client you can use to query :py:class:`VectorStoreServer`. - transformations.insert(0, node_transformer) - transformations.append(node_to_pathway) # TextNode -> (str, dict) + Please provide either the `url`, or `host` and `port`. - def generic_transformer(x: List[str]) -> List[Tuple[str, dict]]: - return run_transformations(x, transformations) # type: ignore + Args: + - host: host on which `:py:class:`VectorStoreServer` listens + - port: port on which `:py:class:`VectorStoreServer` listens + - url: url at which `:py:class:`VectorStoreServer` listens + """ + err = "Either (`host` and `port`) or `url` must be provided, but not both." + if url is not None: + if host or port: + raise ValueError(err) + self.url = url + else: + if host is None: + raise ValueError(err) + port = port or 80 + self.url = f"http://{host}:{port}" + + def query( + self, query: str, k: int = 3, metadata_filter: Optional[str] = None + ) -> List[dict]: + """ + Perform a query to the vector store and fetch results. - self.vector_store_server = vector_store.VectorStoreServer( - *docs, - embedder=embedding_callable, - parser=parser, - splitter=generic_transformer, - **kwargs, + Args: + - query: + - k: number of documents to be returned + - metadata_filter: optional string representing the metadata filtering query + in the JMESPath format. The search will happen only for documents + satisfying this filtering. + """ + data = {"query": query, "k": k} + if metadata_filter is not None: + data["metadata_filter"] = metadata_filter + url = self.url + "/v1/retrieve" + response = requests.post( + url, + data=json.dumps(data), + headers={"Content-Type": "application/json"}, + timeout=3, ) + responses = response.json() + return sorted(responses, key=lambda x: x["dist"]) + + # Make an alias + __call__ = query + + def get_vectorstore_statistics(self) -> dict: + """Fetch basic statistics about the vector store.""" + url = self.url + "/v1/statistics" + response = requests.post( + url, + json={}, + headers={"Content-Type": "application/json"}, + ) + return response.json() - def run_server( + def get_input_files( self, - host: str, - port: str, - threaded: bool = False, - with_cache: bool = True, - cache_backend: Any = None, - ) -> Any: + metadata_filter: Optional[str] = None, + filepath_globpattern: Optional[str] = None, + ) -> list: """ - Run the server and start answering queries. + Fetch information on documents in the the vector store. Args: - host (str): host to bind the HTTP listener - port (str | int): port to bind the HTTP listener - threaded (bool): if True, run in a thread. Else block computation - with_cache (bool): if True, embedding requests for the same contents are cached - cache_backend: the backend to use for caching if it is enabled. The - default is the disk cache, hosted locally in the folder ``./Cache``. You - can use ``Backend`` class of the [`persistence API`] - (/developers/api-docs/persistence-api/#pathway.persistence.Backend) - to override it. - - Returns: - If threaded, return the Thread object. Else, does not return. + metadata_filter: optional string representing the metadata filtering query + in the JMESPath format. The search will happen only for documents + satisfying this filtering. + filepath_globpattern: optional glob pattern specifying which documents + will be searched for this query. """ - try: - import pathway as pw - except ImportError: - raise ImportError( - "Could not import pathway python package. " - "Please install it with `pip install pathway`." - ) - if with_cache and cache_backend is None: - cache_backend = pw.persistence.Backend.filesystem("./Cache") - return self.vector_store_server.run_server( - host, - port, - threaded=threaded, - with_cache=with_cache, - cache_backend=cache_backend, + url = self.url + "/v1/inputs" + response = requests.post( + url, + json={ + "metadata_filter": metadata_filter, + "filepath_globpattern": filepath_globpattern, + }, + headers={"Content-Type": "application/json"}, ) + return response.json() class PathwayRetriever(BaseRetriever): """Pathway retriever. + Pathway is an open data processing framework. It allows you to easily develop data transformation pipelines that work with live data sources and changing data. @@ -140,18 +123,14 @@ class PathwayRetriever(BaseRetriever): def __init__( self, - host: str, - port: Union[str, int], + host: Optional[str] = None, + port: Optional[int] = None, + url: Optional[str] = None, similarity_top_k: int = DEFAULT_SIMILARITY_TOP_K, callback_manager: Optional[CallbackManager] = None, ) -> None: """Initializing the Pathway retriever client.""" - import_err_msg = "`pathway` package not found, please run `pip install pathway`" - try: - from pathway.xpacks.llm.vector_store import VectorStoreClient - except ImportError: - raise ImportError(import_err_msg) - self.client = VectorStoreClient(host, port) + self.client = _VectorStoreClient(host, port, url) self.similarity_top_k = similarity_top_k super().__init__(callback_manager) diff --git a/llama-index-integrations/retrievers/llama-index-retrievers-pathway/pyproject.toml b/llama-index-integrations/retrievers/llama-index-retrievers-pathway/pyproject.toml index 83b587c19f..51585cda90 100644 --- a/llama-index-integrations/retrievers/llama-index-retrievers-pathway/pyproject.toml +++ b/llama-index-integrations/retrievers/llama-index-retrievers-pathway/pyproject.toml @@ -27,11 +27,12 @@ exclude = ["**/BUILD"] license = "MIT" name = "llama-index-retrievers-pathway" readme = "README.md" -version = "0.1.2" +version = "0.1.3" [tool.poetry.dependencies] python = ">=3.8.1,<4.0" llama-index-core = "^0.10.1" +requests = "^2.31.0" [tool.poetry.group.dev.dependencies] ipython = "8.10.0" -- GitLab