diff --git a/README.md b/README.md index 773b1d4..6901bf3 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,9 @@ Analysis of live documents streams. ![Effortlessly extract and organize unstructured data from PDFs, docs, and more into SQL tables - in real-time](examples/pipelines/unstructured_to_sql_on_the_fly/unstructured_to_sql_demo.gif) -(See: [`unstructured-to-sql`](#examples) app example.) + +(Check out: [`gpt_4o_multimodal_rag`](examples/pipelines/gpt_4o_multimodal_rag/README.md) to see the whole pipeline in the works. You may also check out: [`unstructured-to-sql`](examples/pipelines/unstructured_to_sql_on_the_fly/app.py) for a minimal example which works with non-multimodal models as well.) + ### Automated real-time knowledge mining and alerting. @@ -58,7 +60,7 @@ The default [`contextful`](examples/pipelines/contextful/app.py) app example lau This application template can also be combined with streams of fresh data, such as news feeds or status reports, either through REST or a technology like Kafka. It can also be combined with extra static data sources and user-specific contexts, to provide more relevant answers and reduce LLM hallucination. -Read more about the implementation details and how to extend this application in [our blog article](https://pathway.com/developers/showcases/llm-app-pathway/). +Read more about the implementation details and how to extend this application in [our blog article](https://pathway.com/developers/user-guide/llm-xpack/llm-app-pathway/). ### Instructional videos @@ -101,12 +103,6 @@ with increasing number of documents given as a context in the question, until Ch ## Get Started -To run the `demo-document-indexing` vector indexing pipeline and UI please follow instructions under [examples/pipelines/demo-document-indexing/README.md](examples/pipelines/demo-document-indexing/README.md). - -To run the `demo-question-answering` question answering pipeline please follow instructions under [examples/pipelines/demo-question-answering/README.md](examples/pipelines/demo-question-answering/README.md). - -For all other demos follow the steps below. - ### Prerequisites @@ -120,7 +116,7 @@ Now, follow the steps to install and [get started with one of the provided examp Alternatively, you can also take a look at the [application showcases](#showcases). -### Step 1: Clone the repository +### Clone the repository This is done with the `git clone` command followed by the URL of the repository: @@ -128,108 +124,9 @@ This is done with the `git clone` command followed by the URL of the repository: git clone https://github.com/pathwaycom/llm-app.git ``` -Next, navigate to the repository: - -```bash -cd llm-app -``` - -### Step 2: Set environment variables - -Create an .env file in the root directory and add the following environment variables, adjusting their values according to your specific requirements and setup. - -| Environment Variable | Description | -| --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| APP_VARIANT | Determines which pipeline to run in your application. Available modes are [`contextful`, `contextful-s3`, `contextless`, `local`, `unstructured-to-sql`, `alert`, `drive-alert`]. By default, the mode is set to `contextful`. | -| PATHWAY_REST_CONNECTOR_HOST | Specifies the host IP for the REST connector in Pathway. For the dockerized version, set it to `0.0.0.0` Natively, you can use `127.0.0.1` | -| PATHWAY_REST_CONNECTOR_PORT | Specifies the port number on which the REST connector service of the Pathway should listen. Here, it is set to 8080. | -| OPENAI_API_KEY | The API token for accessing OpenAI services. If you are not running the local version, please remember to replace it with your API token, which you can generate from your account on [openai.com](https:/platform.openai.com/account/api-keys). | -| PATHWAY_PERSISTENT_STORAGE | Specifies the directory where the cache is stored. You could use /tmpcache. | - -For example: - -```bash -APP_VARIANT=contextful -PATHWAY_REST_CONNECTOR_HOST=0.0.0.0 -PATHWAY_REST_CONNECTOR_PORT=8080 -OPENAI_API_KEY= -PATHWAY_PERSISTENT_STORAGE=/tmp/cache -``` - -### Step 3: Build and run the app - -You can install and run your chosen LLM App example in two different ways. - -#### Using Docker - -Docker is a tool designed to make it easier to create, deploy, and run applications by using containers. Here is how to use Docker to build and run the LLM App: - -```bash -docker compose run --build --rm -p 8080:8080 llm-app-examples -``` - -If you have set a different port in `PATHWAY_REST_CONNECTOR_PORT`, replace the second `8080` with this port in the command above. - -When the process is complete, the App will be up and running inside a Docker container and accessible at `0.0.0.0:8080`. From there, you can proceed to the "Usage" section of the documentation for information on how to interact with the application. - -#### Native Approach - -* **Install poetry:** - - ```bash - pip install poetry - ``` - -* **Install llm_app and dependencies:** - - ```bash - poetry install --with examples --extras local - ``` - - You can omit `--extras local` part if you're not going to run local example. - -* **Run the examples:** You can start the example with the command: - - ```bash - poetry run ./run_examples.py contextful - ``` - -### Step 4: Start to use it - -1. **Send REST queries** (in a separate terminal window): These are examples of how to interact with the application once it's running. `curl` is a command-line tool used to send data using various network protocols. Here, it's being used to send HTTP requests to the application. - - ```bash - curl --data '{"user": "user", "query": "How to connect to Kafka in Pathway?"}' http://localhost:8080/ - - curl --data '{"user": "user", "query": "How to use LLMs in Pathway?"}' http://localhost:8080/ - ``` - - If you are on windows CMD, then the query would rather look like this - - ```cmd - curl --data "{\"user\": \"user\", \"query\": \"How to use LLMs in Pathway?\"}" http://localhost:8080/ - ``` - -2. **Test reactivity by adding a new file:** This shows how to test the application's ability to react to changes in data by adding a new file and sending a query. - - ```bash - cp ./data/documents_extra.jsonl ./data/pathway-docs/ - ``` - - Or if using docker compose: - - ```bash - docker compose exec llm-app-examples mv /app/examples/data/documents_extra.jsonl /app/examples/data/pathway-docs/ - ``` - - Let's query again: - - ```bash - curl --data '{"user": "user", "query": "How to use LLMs in Pathway?"}' http://localhost:8080/ - ``` +### Run the chosen example -### Step 5: Launch the User Interface: -Go to the `examples/ui/` directory (or `examples/pipelines/unstructured/ui` if you are running the unstructured version.) and execute `streamlit run server.py`. Then, access the URL displayed in the terminal to engage with the LLM App using a chat interface. Please note: The provided Streamlit-based interface template is intended for internal rapid prototyping only. In production use, you would normally create your own component instead, taking into account security and authentication, multi-tenancy of data teams, integration with existing UI components, etc. +Each [example](examples/pipelines/) contains a README.md with instructions on how to run it. ### Bonus: Build your own Pathway-powered LLM App @@ -251,7 +148,7 @@ Please check out our [Q&A](https://github.com/pathwaycom/llm-app/discussions/cat ### Raise an issue -To provide feedback or report a bug, please [raise an issue on our issue tracker](https://github.com/pathwaycom/llm-app/issues). +To provide feedback or report a bug, please [raise an issue on our issue tracker](https://github.com/pathwaycom/pathway/issues). ## Contributing diff --git a/examples/pipelines/alert/README.md b/examples/pipelines/alert/README.md new file mode 100644 index 0000000..09e1be6 --- /dev/null +++ b/examples/pipelines/alert/README.md @@ -0,0 +1,67 @@ +# Alert Pipeline + +This example implements a pipeline that answers questions based on documents in a given folder. Additionally, in your prompts you can ask to be notified of any changes - in such case an alert will be sent to a Slack channel. + +Upon starting, a REST API endpoint is opened by the app to serve queries about files inside +the input folder `data_dir`. + +We can create notifications by sending a query to API and stating we want to be notified of the changes. +One example would be `Tell me and alert about the start date of the campaign for Magic Cola` + +What happens next? + +Each query text is first turned into a vector using OpenAI embedding service, +then relevant documentation pages are found using a Nearest Neighbor index computed +for documents in the corpus. A prompt is built from the relevant documentations pages +and sent to the OpenAI GPT3.5 chat service for processing and answering. + +Once you run, Pathway looks for any changes in data sources and efficiently detects changes +to the relevant documents. When a change is detected, the LLM is asked to answer the query +again, and if the new answer is sufficiently different, an alert is created. + +## How to run the project + +### Setup Slack notifications: + +For this demo, Slack notifications are optional and notifications will be printed if no Slack API keys are provided. See: [Slack Apps](https://api.slack.com/apps) and [Getting a token](https://api.slack.com/tutorials/tracks/getting-a-token). +Your Slack application will need at least `chat:write.public` scope enabled. + +### Setup environment: +Set your env variables in the .env file placed in this directory or in the root of the repo. + +```bash +OPENAI_API_KEY=sk-... +SLACK_ALERT_CHANNEL_ID= # If unset, alerts will be printed to the terminal +SLACK_ALERT_TOKEN= +PATHWAY_DATA_DIR= # If unset, defaults to ../../data/magic-cola/live/ +PATHWAY_PERSISTENT_STORAGE= # Set this variable if you want to use caching +``` + +### Run the project + +Make sure you have installed poetry dependencies with `--extras unstructured`. + +```bash +poetry install --with examples --extras unstructured +``` + +Run: + +```bash +poetry run python app.py +``` + +If all dependencies are managed manually rather than using poetry, you can run: + +```bash +python app.py +``` + +To create alerts, you can call the REST API: + +```bash +curl --data '{ + "user": "user", + "query": "When does the magic cola campaign start? Alert me if the start date changes." +}' http://localhost:8080/ | jq +``` diff --git a/examples/pipelines/alert/app.py b/examples/pipelines/alert/app.py index 3961178..bab045d 100644 --- a/examples/pipelines/alert/app.py +++ b/examples/pipelines/alert/app.py @@ -4,15 +4,12 @@ This demo is very similar to `contextful` example with an additional real time alerting capability. In the demo, alerts are sent to Slack (you need `slack_alert_channel_id` and `slack_alert_token`), you can either put these env variables in .env file under llm-app directory, -or create env variables in the terminal (ie. export in bash) -If you don't have Slack, you can leave them empty and app will print the notifications to -standard output instead. +or create env variables in the terminal (ie. export in bash). Upon starting, a REST API endpoint is opened by the app to serve queries about files inside the input folder `data_dir`. We can create notifications by sending a query to API and stating we want to be notified of the changes. -Alternatively, the provided Streamlit chat app can be used. One example would be `Tell me and alert about the start date of the campaign for Magic Cola` What happens next? @@ -26,37 +23,20 @@ to the relevant documents. When a change is detected, the LLM is asked to answer the query again, and if the new answer is sufficiently different, an alert is created. -Usage: -In the root of this repository run: -`poetry run ./run_examples.py alerts` -or, if all dependencies are managed manually rather than using poetry -You can either -`python examples/pipelines/alerts/app.py` -or -`python ./run_examples.py alert` - -You can also run this example directly in the environment with llm_app installed. - -To create alerts: -You can call the REST API: -curl --data '{ - "user": "user", - "query": "When does the magic cola campaign start? Alert me if the start date changes." -}' http://localhost:8080/ | jq - -Or start streamlit UI: -First go to examples/ui directory with `cd llm-app/examples/ui/` -run `streamlit run server.py` +Please check the README.md in this directory for how-to-run instructions. """ import asyncio import os +import dotenv import pathway as pw from pathway.stdlib.ml.index import KNNIndex from pathway.xpacks.llm.embedders import OpenAIEmbedder from pathway.xpacks.llm.llms import OpenAIChat, prompt_chat_single_qa +dotenv.load_dotenv() + class DocumentInputSchema(pw.Schema): doc: str @@ -154,12 +134,10 @@ def decision_to_bool(decision: str) -> bool: def run( *, - data_dir: str = os.environ.get( - "PATHWAY_DATA_DIR", "./examples/data/magic-cola/live/" - ), + data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "../../data/magic-cola/live/"), api_key: str = os.environ.get("OPENAI_API_KEY", ""), - host: str = "0.0.0.0", - port: int = 8080, + host: str = os.environ.get("PATHWAY_REST_CONNECTOR_HOST", "0.0.0.0"), + port: int = int(os.environ.get("PATHWAY_REST_CONNECTOR_PORT", "8080")), embedder_locator: str = "text-embedding-ada-002", embedding_dimension: int = 1536, model_locator: str = "gpt-3.5-turbo", @@ -173,8 +151,8 @@ def run( embedder = OpenAIEmbedder( api_key=api_key, model=embedder_locator, - retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(), - cache_strategy=pw.asynchronous.DefaultCache(), + retry_strategy=pw.udfs.FixedDelayRetryStrategy(), + cache_strategy=pw.udfs.DefaultCache(), ) documents = pw.io.jsonlines.read( @@ -205,8 +183,8 @@ def run( model=model_locator, temperature=temperature, max_tokens=max_tokens, - retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(), - cache_strategy=pw.asynchronous.DefaultCache(), + retry_strategy=pw.udfs.FixedDelayRetryStrategy(), + cache_strategy=pw.udfs.DefaultCache(), ) query += query.select( diff --git a/examples/pipelines/contextful/README.md b/examples/pipelines/contextful/README.md new file mode 100644 index 0000000..e0a8339 --- /dev/null +++ b/examples/pipelines/contextful/README.md @@ -0,0 +1,46 @@ +# Contextful Pipeline + +This example implements a simple pipeline that answers questions based on documents in a given folder. + +Each query text is first turned into a vector using OpenAI embedding service, +then relevant documentation pages are found using a Nearest Neighbor index computed +for documents in the corpus. A prompt is built from the relevant documentation pages +and sent to the OpenAI chat service for processing. + +## How to run the project + +### Setup environment: +Set your env variables in the .env file placed in this directory or in the root of the repo. + +```bash +OPENAI_API_KEY=sk-... +PATHWAY_DATA_DIR= # If unset, defaults to ../../data/pathway-docs/ +PATHWAY_PERSISTENT_STORAGE= # Set this variable if you want to use caching +``` + +### Run the project + +```bash +poetry install --with examples +``` + +Run: + +```bash +poetry run python app.py +``` + +If all dependencies are managed manually rather than using poetry, you can run either: + +```bash +python app.py +``` + +To query the pipeline, you can call the REST API: + +```bash +curl --data '{ + "user": "user", + "query": "How to connect to Kafka in Pathway?" +}' http://localhost:8080/ | jq +``` diff --git a/examples/pipelines/contextful/app.py b/examples/pipelines/contextful/app.py index 2423813..203b506 100644 --- a/examples/pipelines/contextful/app.py +++ b/examples/pipelines/contextful/app.py @@ -10,28 +10,22 @@ Each query text is first turned into a vector using OpenAI embedding service, then relevant documentation pages are found using a Nearest Neighbor index computed -for documents in the corpus. A prompt is built from the relevant documentations pages +for documents in the corpus. A prompt is built from the relevant documentation pages and sent to the OpenAI chat service for processing. -Usage: -In the root of this repository run: -`poetry run ./run_examples.py contextful` -or, if all dependencies are managed manually rather than using poetry -`python examples/pipelines/contextful/app.py` - -You can also run this example directly in the environment with llm_app installed. - -To call the REST API: -curl --data '{"user": "user", "query": "How to connect to Kafka in Pathway?"}' http://localhost:8080/ | jq +Please check the README.md in this directory for how-to-run instructions. """ import os +import dotenv import pathway as pw from pathway.stdlib.ml.index import KNNIndex from pathway.xpacks.llm.embedders import OpenAIEmbedder from pathway.xpacks.llm.llms import OpenAIChat, prompt_chat_single_qa +dotenv.load_dotenv() + class DocumentInputSchema(pw.Schema): doc: str @@ -44,10 +38,10 @@ class QueryInputSchema(pw.Schema): def run( *, - data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "./examples/data/pathway-docs/"), + data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "../../data/pathway-docs/"), api_key: str = os.environ.get("OPENAI_API_KEY", ""), - host: str = "0.0.0.0", - port: int = 8080, + host: str = os.environ.get("PATHWAY_REST_CONNECTOR_HOST", "0.0.0.0"), + port: int = int(os.environ.get("PATHWAY_REST_CONNECTOR_PORT", "8080")), embedder_locator: str = "text-embedding-ada-002", embedding_dimension: int = 1536, model_locator: str = "gpt-3.5-turbo", @@ -58,8 +52,8 @@ def run( embedder = OpenAIEmbedder( api_key=api_key, model=embedder_locator, - retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(), - cache_strategy=pw.asynchronous.DefaultCache(), + retry_strategy=pw.udfs.FixedDelayRetryStrategy(), + cache_strategy=pw.udfs.DefaultCache(), ) documents = pw.io.jsonlines.read( @@ -104,8 +98,8 @@ def build_prompt(documents, query): model=model_locator, temperature=temperature, max_tokens=max_tokens, - retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(), - cache_strategy=pw.asynchronous.DefaultCache(), + retry_strategy=pw.udfs.FixedDelayRetryStrategy(), + cache_strategy=pw.udfs.DefaultCache(), ) responses = prompt.select( diff --git a/examples/pipelines/contextful_geometric/README.md b/examples/pipelines/contextful_geometric/README.md new file mode 100644 index 0000000..66c4f0c --- /dev/null +++ b/examples/pipelines/contextful_geometric/README.md @@ -0,0 +1,51 @@ +# Contextful Pipeline + +This example implements a pipeline that answers questions based on documents in a given folder. To get the answer it sends increasingly more documents to the LLM chat until it can find an answer. You can read more about the reasoning behind this approach [here](https://pathway.com/developers/showcases/adaptive-rag). + +Each query text is first turned into a vector using OpenAI embedding service, +then relevant documentation pages are found using a Nearest Neighbor index computed +for documents in the corpus. A prompt is built from the relevant documentations pages +and sent to the OpenAI chat service for processing. + +To optimize use of tokens per query, this pipeline asks a question with a small number +of documents embedded in the prompt. If OpenAI chat fails to answer based on these documents, +the number of documents is increased by `factor` given as an argument, and continues to +do so until either question is answered or a limit of iterations is reached. + +## How to run the project + +### Setup environment: +Set your env variables in the .env file placed in this directory or in the root of the repo. + +```bash +OPENAI_API_KEY=sk-... +PATHWAY_DATA_DIR= # If unset, defaults to ../../data/pathway-docs/ +PATHWAY_PERSISTENT_STORAGE= # Set this variable if you want to use caching +``` + +### Run the project + +```bash +poetry install --with examples +``` + +Run: + +```bash +poetry run python app.py +``` + +If all dependencies are managed manually rather than using poetry, you can run either: + +```bash +python app.py +``` + +To query the pipeline, you can call the REST API: + +```bash +curl --data '{ + "user": "user", + "query": "How to connect to Kafka in Pathway?" +}' http://localhost:8080/ | jq +``` diff --git a/examples/pipelines/contextful_geometric/app.py b/examples/pipelines/contextful_geometric/app.py index 45d6e90..883631f 100644 --- a/examples/pipelines/contextful_geometric/app.py +++ b/examples/pipelines/contextful_geometric/app.py @@ -13,25 +13,17 @@ for documents in the corpus. A prompt is built from the relevant documentations pages and sent to the OpenAI chat service for processing. -To optimise use of tokens per query, this pipeline asks a question with a small number +To optimize use of tokens per query, this pipeline asks a question with a small number of documents embedded in the prompt. If OpenAI chat fails to answer based on these documents, the number of documents is increased by `factor` given as an argument, and continues to do so until either question is answered or a limit of iterations is reached. -Usage: -In the root of this repository run: -`poetry run ./run_examples.py contextful-geometric` -or, if all dependencies are managed manually rather than using poetry -`python examples/pipelines/contextful_geometric/app.py` - -You can also run this example directly in the environment with llm_app installed. - -To call the REST API: -curl --data '{"user": "user", "query": "How to connect to Kafka in Pathway?"}' http://localhost:8080/ | jq +Please check the README.md in this directory for how-to-run instructions. """ import os +import dotenv import pathway as pw from pathway.stdlib.indexing import default_vector_document_index from pathway.xpacks.llm.embedders import OpenAIEmbedder @@ -40,6 +32,8 @@ answer_with_geometric_rag_strategy_from_index, ) +dotenv.load_dotenv() + class DocumentInputSchema(pw.Schema): doc: str @@ -54,8 +48,8 @@ def run( *, data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "./examples/data/pathway-docs/"), api_key: str = os.environ.get("OPENAI_API_KEY", ""), - host: str = "0.0.0.0", - port: int = 8080, + host: str = os.environ.get("PATHWAY_REST_CONNECTOR_HOST", "0.0.0.0"), + port: int = int(os.environ.get("PATHWAY_REST_CONNECTOR_PORT", "8080")), embedder_locator: str = "text-embedding-ada-002", embedding_dimension: int = 1536, model_locator: str = "gpt-3.5-turbo", @@ -69,8 +63,8 @@ def run( embedder = OpenAIEmbedder( api_key=api_key, model=embedder_locator, - retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(), - cache_strategy=pw.asynchronous.DefaultCache(), + retry_strategy=pw.udfs.FixedDelayRetryStrategy(), + cache_strategy=pw.udfs.DefaultCache(), ) documents = pw.io.jsonlines.read( @@ -100,8 +94,8 @@ def run( model=model_locator, temperature=temperature, max_tokens=max_tokens, - retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(), - cache_strategy=pw.asynchronous.DefaultCache(), + retry_strategy=pw.udfs.FixedDelayRetryStrategy(), + cache_strategy=pw.udfs.DefaultCache(), ) responses = query.select( diff --git a/examples/pipelines/contextful_s3/README.md b/examples/pipelines/contextful_s3/README.md new file mode 100644 index 0000000..1680909 --- /dev/null +++ b/examples/pipelines/contextful_s3/README.md @@ -0,0 +1,45 @@ +# Contextful S3 Pipeline + +This example implements a simple pipeline that answers questions based on documents stored in S3. + +Each query text is first turned into a vector using OpenAI embedding service, +then relevant documentation pages are found using a Nearest Neighbor index computed +for documents in the corpus. A prompt is built from the relevant documentations pages +and sent to the OpenAI chat service for processing. + +## How to run the project + +### Setup environment: +Set your env variables in the .env file placed in this directory or in the root of the repo. + +```bash +OPENAI_API_KEY=sk-... +PATHWAY_PERSISTENT_STORAGE= # Set this variable if you want to use caching +``` + +### Run the project + +```bash +poetry install --with examples +``` + +Run: + +```bash +poetry run python app.py +``` + +If all dependencies are managed manually rather than using poetry, you can run either: + +```bash +python app.py +``` + +To query the pipeline, you can call the REST API: + +```bash +curl --data '{ + "user": "user", + "query": "How to connect to Kafka in Pathway?" +}' http://localhost:8080/ | jq +``` diff --git a/examples/pipelines/contextful_s3/app.py b/examples/pipelines/contextful_s3/app.py index 03a1279..711422c 100644 --- a/examples/pipelines/contextful_s3/app.py +++ b/examples/pipelines/contextful_s3/app.py @@ -27,11 +27,14 @@ import os +import dotenv import pathway as pw from pathway.stdlib.ml.index import KNNIndex from pathway.xpacks.llm.embedders import OpenAIEmbedder from pathway.xpacks.llm.llms import OpenAIChat, prompt_chat_single_qa +dotenv.load_dotenv() + class DocumentInputSchema(pw.Schema): doc: str @@ -46,8 +49,8 @@ def run( *, data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "llm_demo/data/"), api_key: str = os.environ.get("OPENAI_API_KEY", ""), - host: str = "0.0.0.0", - port: int = 8080, + host: str = os.environ.get("PATHWAY_REST_CONNECTOR_HOST", "0.0.0.0"), + port: int = int(os.environ.get("PATHWAY_REST_CONNECTOR_PORT", "8080")), embedder_locator: str = "text-embedding-ada-002", embedding_dimension: int = 1536, model_locator: str = "gpt-3.5-turbo", @@ -108,8 +111,8 @@ def build_prompt(documents, query): model=model_locator, temperature=temperature, max_tokens=max_tokens, - retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(), - cache_strategy=pw.asynchronous.DefaultCache(), + retry_strategy=pw.udfs.FixedDelayRetryStrategy(), + cache_strategy=pw.udfs.DefaultCache(), ) responses = prompt.select( diff --git a/examples/pipelines/contextless/README.md b/examples/pipelines/contextless/README.md new file mode 100644 index 0000000..a31a459 --- /dev/null +++ b/examples/pipelines/contextless/README.md @@ -0,0 +1,40 @@ +# Contextless Pipeline + +This example implements a pipeline that answers a single question, without any context. + +## How to run the project + +### Setup environment: +Set your env variables in the .env file placed in this directory or in the root of the repo. + +```bash +OPENAI_API_KEY=sk-... +PATHWAY_PERSISTENT_STORAGE= # Set this variable if you want to use caching +``` + +### Run the project + +```bash +poetry install --with examples +``` + +Run: + +```bash +poetry run python app.py +``` + +If all dependencies are managed manually rather than using poetry, you can run either: + +```bash +python app.py +``` + +To query the pipeline, you can call the REST API: + +```bash +curl --data '{ + "user": "user", + "query": "How to connect to Kafka in Pathway?" +}' http://localhost:8080/ | jq +``` diff --git a/examples/pipelines/contextless/app.py b/examples/pipelines/contextless/app.py old mode 100644 new mode 100755 index edacfec..6c09640 --- a/examples/pipelines/contextless/app.py +++ b/examples/pipelines/contextless/app.py @@ -4,12 +4,10 @@ The program responds to each query by directly forwarding it to the OpenAI API. Usage: -In the root of this repository run: -`poetry run ./run_examples.py contextless` +In this directory run: +`poetry run app.py` or, if all dependencies are managed manually rather than using poetry -`python examples/pipelines/contextless/app.py` - -You can also run this example directly in the environment with llm_app installed. +`python app.py` To call the REST API: curl --data '{"user": "user", "query": "How to connect to Kafka in Pathway?"}' http://localhost:8080/ | jq @@ -17,9 +15,12 @@ import os +import dotenv import pathway as pw from pathway.xpacks.llm.llms import OpenAIChat, prompt_chat_single_qa +dotenv.load_dotenv() + class QueryInputSchema(pw.Schema): query: str @@ -29,8 +30,8 @@ class QueryInputSchema(pw.Schema): def run( *, api_key: str = os.environ.get("OPENAI_API_KEY", ""), - host: str = "0.0.0.0", - port: int = 8080, + host: str = os.environ.get("PATHWAY_REST_CONNECTOR_HOST", "0.0.0.0"), + port: int = int(os.environ.get("PATHWAY_REST_CONNECTOR_PORT", "8080")), model_locator: str = "gpt-3.5-turbo", max_tokens: int = 60, temperature: float = 0.8, diff --git a/examples/pipelines/drive_alert/README.md b/examples/pipelines/drive_alert/README.md index b61e8c4..300cba4 100644 --- a/examples/pipelines/drive_alert/README.md +++ b/examples/pipelines/drive_alert/README.md @@ -1,3 +1,31 @@ +# Drive Alert pipeline + +Microservice for a context-aware alerting ChatGPT assistant. + +This demo is very similar to the `alert` example, the only difference is the data source (Google Drive) +For the demo, alerts are sent to Slack (you need to provide `slack_alert_channel_id` and `slack_alert_token`), +you can either put these env variables in .env file, +or create env variables in the terminal (i.e. export in bash). + +The program then starts a REST API endpoint serving queries about Google Docs stored in a +Google Drive folder. + +We can create notifications by asking from Streamlit or sending query to API stating we want to be notified. +One example would be `Tell me and alert about the start date of the campaign for Magic Cola` + +## How Does It Work? + +First, Pathway connects to Google Drive, extracts all documents, splits them into chunks, turns them into +vectors using OpenAI embedding service, and store in a nearest neighbor index. + +Each query text is first turned into a vector, then relevant document chunks are found +using the nearest neighbor index. A prompt is built from the relevant chunk +and sent to the OpenAI GPT3.5 chat service for processing and answering. + +After an initial answer is provided, Pathway monitors changes to documents and selectively +re-triggers potentially affected queries. If the new answer is significantly different from +the previously presented one, a new notification is created. + ## How to run the project Before running the app, you will need to give the app access to the Google Drive folder, we follow the steps below. @@ -45,7 +73,7 @@ For this demo, Slack notifications are optional and notifications will be printe Your Slack application will need at least `chat:write.public` scope enabled. ### Setup environment: -First, set your env variables in the .env file placed in the root of the repo. +Set your env variables in the .env file placed in this directory or in the root of the repo. ```bash OPENAI_API_KEY=sk-... @@ -53,6 +81,7 @@ SLACK_ALERT_CHANNEL_ID= # If unset, alerts will be printed to the terminal SLACK_ALERT_TOKEN= FILE_OR_DIRECTORY_ID= # file or folder ID that you want to track that we have retrieved earlier GOOGLE_CREDS=examples/pipelines/drive_alert/secrets.json # Default location of Google Drive authorization secrets +PATHWAY_PERSISTENT_STORAGE= # Set this variable if you want to use caching ``` ### Run the project @@ -66,23 +95,15 @@ poetry install --with examples --extras unstructured Run: ```bash -poetry run ./run_examples.py drivealert +poetry run python app.py ``` If all dependencies are managed manually rather than using poetry, you can run either: ```bash -python examples/pipelines/drive_alert/app.py +python app.py ``` -or - -```bash -python ./run_examples.py drivealert -``` - -You can also run this example directly in the environment with llm_app installed. - To create alerts: You can call the REST API: @@ -95,9 +116,9 @@ curl --data '{ Or start streamlit UI: -First go to `examples/pipelines/drive_alert/ui` directory with `cd examples/pipelines/drive_alert/ui/` +First go to `ui` directory with `cd ui/` and run: ```bash streamlit run server.py -``` \ No newline at end of file +``` diff --git a/examples/pipelines/drive_alert/app.py b/examples/pipelines/drive_alert/app.py index fd04cd9..0e1b2f1 100644 --- a/examples/pipelines/drive_alert/app.py +++ b/examples/pipelines/drive_alert/app.py @@ -4,8 +4,7 @@ This demo is very similar to the `alert` example, the only difference is the data source (Google Drive) For the demo, alerts are sent to Slack (you need to provide `slack_alert_channel_id` and `slack_alert_token`), you can either put these env variables in .env file under llm-app directory, -or create env variables in the terminal (i.e. export in bash) -If you don't have Slack, you can leave them empty and app will print the notifications instead. +or create env variables in the terminal (i.e. export in bash). The program then starts a REST API endpoint serving queries about Google Docs stored in a Google Drive folder. @@ -25,45 +24,13 @@ re-triggers potentially affected queries. If the new answer is significantly different from the previously presented one, a new notification is created. -Usage: -First, obtain the Google credentials as in the examples/pipelines/drive_alert/README_GDRIVE_AUTH.md -Then, set the env variables in the .env file placed in the root of this repo - -``` -OPENAI_API_KEY=sk-... -PATHWAY_REST_CONNECTOR_HOST=127.0.0.1 -PATHWAY_REST_CONNECTOR_PORT=8181 -SLACK_ALERT_CHANNEL_ID= # if unset, alerts will be printed to the terminal -SLACK_ALERT_TOKEN= -FILE_OR_DIRECTORY_ID= # file or folder id that you want to track that we have retrieved earlier -GOOGLE_CREDS=examples/pipelines/drive_alert/secrets.json # Default location of Google Drive authorization secrets -``` - -In the root of this repository run: -`poetry run ./run_examples.py drivealert` -or, if all dependencies are managed manually rather than using poetry -You can either -`python examples/pipelines/drive_alert/app.py` -or -`python ./run_examples.py drivealert` - -You can also run this example directly in the environment with llm_app installed. - -To create alerts: -You can call the REST API: -curl --data '{ - "user": "user", - "query": "When does the magic cola campaign start? Alert me if the start date changes." -}' http://localhost:8080/ | jq - -Or start streamlit UI: -First go to examples/pipelines/drive_alert/ui directory with `cd examples/pipelines/drive_alert/ui/` -run `streamlit run server.py` +Please check the README.md in this directory for how-to-run instructions. """ import asyncio import os +import dotenv import pathway as pw from pathway.stdlib.ml.index import KNNIndex from pathway.xpacks.llm.embedders import OpenAIEmbedder @@ -71,6 +38,8 @@ from pathway.xpacks.llm.parsers import ParseUnstructured from pathway.xpacks.llm.splitters import TokenCountSplitter +dotenv.load_dotenv() + class DocumentInputSchema(pw.Schema): doc: str @@ -155,8 +124,8 @@ def run( *, object_id=os.environ.get("FILE_OR_DIRECTORY_ID", ""), api_key: str = os.environ.get("OPENAI_API_KEY", ""), - host: str = "0.0.0.0", - port: int = 8080, + host: str = os.environ.get("PATHWAY_REST_CONNECTOR_HOST", "0.0.0.0"), + port: int = int(os.environ.get("PATHWAY_REST_CONNECTOR_PORT", "8080")), embedder_locator: str = "text-embedding-ada-002", embedding_dimension: int = 1536, model_locator: str = "gpt-3.5-turbo", diff --git a/examples/pipelines/gpt_4o_multimodal_rag/Dockerfile b/examples/pipelines/gpt_4o_multimodal_rag/Dockerfile new file mode 100644 index 0000000..ccf3d98 --- /dev/null +++ b/examples/pipelines/gpt_4o_multimodal_rag/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.11 + +WORKDIR /app + +RUN apt-get update \ + && apt-get install -y python3-opencv \ + && rm -rf /var/lib/apt/lists/* /var/cache/apt/archives/* + +COPY requirements.txt . +RUN pip install -U --no-cache-dir -r requirements.txt + +COPY . . + +EXPOSE 8000 + +CMD ["python", "app.py"] diff --git a/examples/pipelines/gpt_4o_multimodal_rag/README.md b/examples/pipelines/gpt_4o_multimodal_rag/README.md new file mode 100644 index 0000000..dcadb67 --- /dev/null +++ b/examples/pipelines/gpt_4o_multimodal_rag/README.md @@ -0,0 +1,137 @@ +## Multimodal RAG with Pathway + +This showcase illustrates how to get started with multimodal RAG with the help of `GPT-4o` and Pathway. + +This showcase demonstrates a data pipeline that calls into LLMs for document processing. In the showcase, you will see how Pathway can extract information from unstructured documents and keep the results up to date when documents change and new documents arrive. The extraction is tuned towards financial documents. OpenAI's `GPT-4o` model is used to improve accuracy of extraction of information from the tables. + +We specifically chose the finance domain since the files are heavily reliant on using tables in different forms. We also compare in few examples how regular RAG setups fail to answer questions based on the tables. + +We use the `GPT-4o` in two parts: +- Extracting and understanding the tables inside the PDF +- Answering questions with the retrieved context + +![Architecture](gpt4o.gif) + +## Introduction + +We will use `BaseRAGQuestionAnswerer` provided under `pathway.xpacks` to get started on our RAG application with very minimal overhead. This module brings together the foundational building bricks for the RAG application. + +It includes ingesting the data from the sources, LLM, document parsers and splitters, database (index) and also serving the app on an endpoint. + +For more advanced RAG options, make sure to check out [rerankers](https://pathway.com/developers/api-docs/pathway-xpacks-llm/rerankers) and the [adaptive rag example](../adaptive-rag/). + + +## Modifying the code + +Under the main function, we define: +- input folders +- LLM +- embedder +- index +- host and port to run the app +- run options (caching, cache folder) + +You can modify any of the components by checking the options from the imported modules: `from pathway.xpacks.llm import embedders, llms, parsers, splitters`. + +It is also possible to easily create new components by extending the [`pw.UDF`](https://pathway.com/developers/user-guide/data-transformation/user-defined-functions) class and implementing the `__wrapped__` function. + + +## Running the app + +> Note: Recommended way of running Pathway on Windows is Docker, refer to [Running with the Docker section](#with-docker). + +First, make sure to install the requirements by running: +```bash +pip install -r requirements.txt +``` +Then, create an `.env` file in this directory and put your API key with `OPENAI_API_KEY=sk-...`, or add the `api_key` argument to `OpenAIChat` and `OpenAIEmbedder`. + +Then, simply run with `python app.py` in this directory. + +### With Docker + +In order to let the pipeline get updated with each change in local files, you need to mount the `data` folder inside the docker. The following commands show how to do that. + +Following commands will: +- mount the `data` folder inside the Docker +- build the image +- run the app and expose the port `8000`. + +You can omit the ```-v `pwd`/data:/app/data``` part if you are not using local files as a data source. + +```bash +# Make sure you are in the right directory. +cd examples/pipelines/gpt_4o_multimodal_rag/ + +# Build the image in this folder +docker build -t rag . + +# Run the image, mount the `data` folder into image and expose the port `8000` +docker run -v `pwd`/data:/app/data -p 8000:8000 rag +``` + +## Using the app + +After running the app, you will see the logs about the files being processed, after the logs stop streaming, app is ready to receive requests. + +First, let's check the files that are currently indexed: +```bash +curl -X 'POST' 'http://0.0.0.0:8000/v1/pw_list_documents' -H 'accept: */*' -H 'Content-Type: application/json' +``` + +This will return the list of files as follows: +> `[{"modified_at": 1715765613, "owner": "berke", "path": "data/20230203_alphabet_10K.pdf", "seen_at": 1715768762}]` + +Now, let's ask a question from one of the tables inside the report. In our tests, regular RAG applications struggled with the tables and couldn't answer to this question correctly. + +```bash +curl -X 'POST' 'http://0.0.0.0:8000/v1/pw_ai_answer' -H 'accept: */*' -H 'Content-Type: application/json' -d '{ + "prompt": "How much was Operating lease cost in 2021?" +}' +``` +> `$2,699 million` + +This response was correct thanks to the initial LLM parsing step. +When we check the context that is sent to the LLM, we see that Pathway included the table in the context where as other RAG applications failed to include the table. + +Following GIF shows a snippet from our experiments: + +![Regular RAG vs Pathway Multimodal comparison](gpt4o_with_pathway_comparison.gif) + +Let's try another one, + +```bash +curl -X 'POST' 'http://0.0.0.0:8000/v1/pw_ai_answer' -H 'accept: */*' -H 'Content-Type: application/json' -d '{ + "prompt": "What is the operating income for the fiscal year of 2022?" +}' +``` +> `$74,842 million` + +Another example, let's ask a question that can be answered from the table on the 48th page of the PDF. + +```bash +curl -X 'POST' 'http://0.0.0.0:8000/v1/pw_ai_answer' -H 'accept: */*' -H 'Content-Type: application/json' -d '{ + "prompt": "How much was Marketable securities worth in 2021 in the consolidated balance sheets?" +}' +``` +> `$118,704 million` + +## Conclusion + +This showcase demonstrates setting up a powerful RAG pipeline with advanced table parsing capabilities, unlocking new finance use cases. While we've only scratched the surface, there's more to explore: + +- Re-ranking: Prioritize the most relevant results for your specific query. +- Knowledge graphs: Leverage relationships between entities to improve understanding. +- Hybrid indexing: Combine different indexing strategies for optimal retrieval. +- Adaptive reranking: Iteratively enlarge the context for optimal accuracy, see [our example](../adaptive-rag/README.md). +Stay tuned for future examples exploring these advanced techniques with Pathway! + +RAG applications are most effective when tailored to your specific use case. Here's how you can customize yours: + +- Document parsers and splitters: Fine-tune how documents are processed and broken down for analysis. +- Indexing and retrieval strategies: Choose the most efficient approach for your data and search needs. +- User Interface (UI): Design a user-friendly interface that caters to your end users' workflows. + +Ready to Get Started? + +Let's discuss how we can help you build a powerful, customized RAG application. [Reach us here!](https://pathway.com/solutions/enterprise-generative-ai?modal=requestdemo) diff --git a/examples/pipelines/gpt_4o_multimodal_rag/app.py b/examples/pipelines/gpt_4o_multimodal_rag/app.py new file mode 100644 index 0000000..cbfafd6 --- /dev/null +++ b/examples/pipelines/gpt_4o_multimodal_rag/app.py @@ -0,0 +1,93 @@ +import logging +import os + +# flake8: noqa +os.environ["TESSDATA_PREFIX"] = ( + "/usr/share/tesseract/tessdata/" # fix for tesseract ocr +) + +import pathway as pw +from dotenv import load_dotenv +from pathway.udfs import DiskCache, ExponentialBackoffRetryStrategy +from pathway.xpacks.llm import embedders, llms # , parsers, splitters +from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer +from pathway.xpacks.llm.vector_store import VectorStoreServer +from src.ext_parsers import OpenParse + +load_dotenv() + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) + + +class RAGApp(BaseRAGQuestionAnswerer): + @pw.table_transformer + def pw_ai_query(self, pw_ai_queries: pw.Table) -> pw.Table: + """Main function for RAG applications that answer questions + based on available information.""" + + pw_ai_results = pw_ai_queries + self.indexer.retrieve_query( + pw_ai_queries.select( + metadata_filter=pw.this.filters, + filepath_globpattern=pw.cast(str | None, None), + query=pw.this.prompt, + k=6, + ) + ).select( + docs=pw.this.result, + ) + + pw_ai_results += pw_ai_results.select( + rag_prompt=self.long_prompt_template(pw.this.prompt, pw.this.docs), + ) + + pw_ai_results += pw_ai_results.select( + result=self.llm(llms.prompt_chat_single_qa(pw.this.rag_prompt)) + ) + return pw_ai_results + + +if __name__ == "__main__": + path = "./data/20230203_alphabet_10K.pdf" + + folder = pw.io.fs.read( + path=path, + format="binary", + with_metadata=True, + ) + + sources = [ + folder, + ] # define the inputs (local folders & files, google drive, sharepoint, ...) + + chat = llms.OpenAIChat( + model="gpt-4o", + retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6), + cache_strategy=DiskCache(), + temperature=0.0, + ) + + app_host = "0.0.0.0" + app_port = 8000 + + parser = OpenParse() + embedder = embedders.OpenAIEmbedder(cache_strategy=DiskCache()) + + doc_store = VectorStoreServer( + *sources, + embedder=embedder, + splitter=None, # OpenParse parser handles the chunking + parser=parser, + ) + + app = RAGApp( + llm=chat, + indexer=doc_store, + ) + + app.build_server(host=app_host, port=app_port) + + app.run_server(with_cache=True, terminate_on_error=False) diff --git a/examples/pipelines/gpt_4o_multimodal_rag/data/20230203_alphabet_10K.pdf b/examples/pipelines/gpt_4o_multimodal_rag/data/20230203_alphabet_10K.pdf new file mode 100644 index 0000000..9a1ec66 Binary files /dev/null and b/examples/pipelines/gpt_4o_multimodal_rag/data/20230203_alphabet_10K.pdf differ diff --git a/examples/pipelines/gpt_4o_multimodal_rag/gpt4o.gif b/examples/pipelines/gpt_4o_multimodal_rag/gpt4o.gif new file mode 100644 index 0000000..db55314 Binary files /dev/null and b/examples/pipelines/gpt_4o_multimodal_rag/gpt4o.gif differ diff --git a/examples/pipelines/gpt_4o_multimodal_rag/gpt4o_with_pathway_comparison.gif b/examples/pipelines/gpt_4o_multimodal_rag/gpt4o_with_pathway_comparison.gif new file mode 100644 index 0000000..a96e0bd Binary files /dev/null and b/examples/pipelines/gpt_4o_multimodal_rag/gpt4o_with_pathway_comparison.gif differ diff --git a/examples/pipelines/gpt_4o_multimodal_rag/requirements.txt b/examples/pipelines/gpt_4o_multimodal_rag/requirements.txt new file mode 100644 index 0000000..0f64f9c --- /dev/null +++ b/examples/pipelines/gpt_4o_multimodal_rag/requirements.txt @@ -0,0 +1,5 @@ +pathway[xpack-llm] +openparse==0.5.6 +python-dotenv==1.0.1 +unstructured[all-docs]==0.10.28 +mpmath==1.3.0 diff --git a/examples/pipelines/gpt_4o_multimodal_rag/src/_parser_utils.py b/examples/pipelines/gpt_4o_multimodal_rag/src/_parser_utils.py new file mode 100644 index 0000000..a0b23bb --- /dev/null +++ b/examples/pipelines/gpt_4o_multimodal_rag/src/_parser_utils.py @@ -0,0 +1,245 @@ +# Copyright © 2024 Pathway + +""" +A library for document parsers: functions that take raw bytes and return a list of text +chunks along with their metadata. +""" + +import asyncio +import base64 +import concurrent.futures +import io +import logging +from typing import List, Literal, Union + +import PIL +from openparse import DocumentParser, consts, tables, text +from openparse.pdf import Pdf +from openparse.schemas import ParsedDocument, TableElement +from openparse.tables.parse import ( + Bbox, + PyMuPDFArgs, + TableTransformersArgs, + UnitableArgs, + _ingest_with_pymupdf, + _ingest_with_table_transformers, + _ingest_with_unitable, +) +from openparse.tables.utils import adjust_bbox_with_padding, crop_img_with_padding +from pathway.internals import udfs +from pathway.xpacks.llm._utils import _coerce_sync +from pathway.xpacks.llm.llms import OpenAIChat +from pydantic import BaseModel, ConfigDict, Field + +DEFAULT_TABLE_PARSE_PROMPT = """Explain the given table in JSON format in detail. +Do not skip over details or units/metrics. +Make sure column and row names are understandable. +If it is not a table, return 'No table.' .""" + + +logger = logging.getLogger(__name__) + +chat = OpenAIChat( + model="gpt-4o", + cache_strategy=udfs.DiskCache(), + retry_strategy=udfs.ExponentialBackoffRetryStrategy(max_retries=4), +) + + +def llm_parse_table( + image, model="gpt-4o", prompt=DEFAULT_TABLE_PARSE_PROMPT, **kwargs +) -> str: + + content = [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": {"url": f"data:image/jpeg;base64,{image}"}, + }, + ] + + messages = [ + { + "role": "user", + "content": content, + } + ] + + logger.info(f"Parsing table, model: {model}\nmessages: {str(content)[:350]}...") + + response = _coerce_sync(chat.__wrapped__)(model=model, messages=messages, **kwargs) + + return response + + +async def a_llm_parse_table(image, model="gpt-4o", prompt=DEFAULT_TABLE_PARSE_PROMPT): + loop = asyncio.get_event_loop() + task = loop.run_in_executor(None, llm_parse_table, image, model, prompt) + result = await task + return result + + +class LLMArgs(BaseModel): + parsing_algorithm: Literal["llm"] = Field(default="llm") + min_table_confidence: float = Field(default=0.9, ge=0.0, le=1.0) + llm_model: str = Field(default="gpt-4o") + prompt: str = Field(default=DEFAULT_TABLE_PARSE_PROMPT) + + model_config = ConfigDict(extra="forbid") + + +def _table_args_dict_to_model(args_dict: dict): + if args_dict["parsing_algorithm"] == "table-transformers": + return tables.TableTransformersArgs(**args_dict) + elif args_dict["parsing_algorithm"] == "pymupdf": + return tables.PyMuPDFArgs(**args_dict) + elif args_dict["parsing_algorithm"] == "unitable": + return tables.UnitableArgs(**args_dict) + elif args_dict["parsing_algorithm"] == "llm": + return LLMArgs(**args_dict) + else: + raise ValueError( + f"Unsupported parsing_algorithm: {args_dict['parsing_algorithm']}" + ) + + +def img_to_b64(img: PIL.Image) -> str: + buffer = io.BytesIO() + img.save(buffer, format="PNG") + buffer.seek(0) + + img_bytes = buffer.read() + + return base64.b64encode(img_bytes).decode("utf-8") + + +def _ingest_with_llm( + doc: Pdf, + args: LLMArgs, + verbose: bool = False, +) -> List[TableElement]: + try: + from openparse.tables.table_transformers.ml import find_table_bboxes + from openparse.tables.utils import doc_to_imgs + + except ImportError as e: + raise ImportError( + "Table detection and extraction requires the `torch`, `torchvision` and `transformers` libraries to be installed.", # noqa: E501 + e, + ) + pdoc = doc.to_pymupdf_doc() + pdf_as_imgs = doc_to_imgs(pdoc) + + pages_with_tables = {} + for page_num, img in enumerate(pdf_as_imgs): + pages_with_tables[page_num] = find_table_bboxes(img, args.min_table_confidence) + + tables = [] + image_ls = [] + for page_num, table_bboxes in pages_with_tables.items(): + page = pdoc[page_num] + for table_bbox in table_bboxes: + padding_pct = 0.05 + padded_bbox = adjust_bbox_with_padding( + bbox=table_bbox.bbox, + page_width=page.rect.width, + page_height=page.rect.height, + padding_pct=padding_pct, + ) + table_img = crop_img_with_padding(pdf_as_imgs[page_num], padded_bbox) + + img = img_to_b64(table_img) + + image_ls.append(img) + + with concurrent.futures.ThreadPoolExecutor() as executor: + task_results = list( + executor.map( + lambda img: llm_parse_table(img, args.llm_model, args.prompt), + image_ls, + ) + ) + + for table_str in task_results: + fy0 = page.rect.height - padded_bbox[3] + fy1 = page.rect.height - padded_bbox[1] + + table_elem = TableElement( + bbox=Bbox( + page=page_num, + x0=padded_bbox[0], + y0=fy0, + x1=padded_bbox[2], + y1=fy1, + page_width=page.rect.width, + page_height=page.rect.height, + ), + text=table_str, + ) + + tables.append(table_elem) + + return tables + + +def ingest( + doc: Pdf, + parsing_args: Union[ + TableTransformersArgs, PyMuPDFArgs, UnitableArgs, LLMArgs, None + ] = None, + verbose: bool = False, +) -> List[TableElement]: + if isinstance(parsing_args, TableTransformersArgs): + return _ingest_with_table_transformers(doc, parsing_args, verbose) + elif isinstance(parsing_args, PyMuPDFArgs): + return _ingest_with_pymupdf(doc, parsing_args, verbose) + elif isinstance(parsing_args, UnitableArgs): + return _ingest_with_unitable(doc, parsing_args, verbose) + elif isinstance(parsing_args, LLMArgs): + return _ingest_with_llm(doc, parsing_args, verbose) + else: + raise ValueError("Unsupported parsing_algorithm.") + + +class CustomDocumentParser(DocumentParser): + def parse( + self, + doc, + ) -> ParsedDocument: + """ + Parse a given document with the multi modal LLM. + + Uses pymupdf to parse the document, then runs the LLM on the table images. + + Args: + doc: Document to be parsed. + """ + + text_engine = "pymupdf" + text_elems = text.ingest(doc, parsing_method=text_engine) + text_nodes = self._elems_to_nodes(text_elems) + + table_nodes = [] + table_args_obj = None + if self.table_args: + table_args_obj = _table_args_dict_to_model(self.table_args) + table_elems = ingest(doc, table_args_obj, verbose=self._verbose) + table_nodes = self._elems_to_nodes(table_elems) + + nodes = text_nodes + table_nodes + nodes = self.processing_pipeline.run(nodes) + + parsed_doc = ParsedDocument( + nodes=nodes, + filename="Path(file).name", + num_pages=doc.num_pages, + coordinate_system=consts.COORDINATE_SYSTEM, + table_parsing_kwargs=( + table_args_obj.model_dump() if table_args_obj else None + ), + creation_date=doc.file_metadata.get("creation_date"), + last_modified_date=doc.file_metadata.get("last_modified_date"), + last_accessed_date=doc.file_metadata.get("last_accessed_date"), + file_size=doc.file_metadata.get("file_size"), + ) + return parsed_doc diff --git a/examples/pipelines/gpt_4o_multimodal_rag/src/ext_parsers.py b/examples/pipelines/gpt_4o_multimodal_rag/src/ext_parsers.py new file mode 100644 index 0000000..954402d --- /dev/null +++ b/examples/pipelines/gpt_4o_multimodal_rag/src/ext_parsers.py @@ -0,0 +1,67 @@ +# Copyright © 2024 Pathway + +""" +Sneak peak on what is coming to Pathway on the next release. +""" + +import logging +from io import BytesIO + +import pathway as pw +from pathway.internals import udfs +from pathway.optional_import import optional_imports + +logger = logging.getLogger(__name__) + + +class OpenParse(pw.UDF): + """ + Parse document using `https://github.com/Filimoa/open-parse `_. + + `parsing_algorithm` can be one of `llm`, `unitable`, `pymupdf`, `table-transformers`. + While using in the VectorStoreServer, splitter can be set to `None` as OpenParse already chunks the documents. + + + Args: + - table_args: dict containing the table parser arguments. + - cache_strategy: Defines the caching mechanism. To enable caching, + a valid `CacheStrategy` should be provided. + See `Cache strategy `_ + for more information. Defaults to None. + """ + + def __init__( + self, + table_args: dict = {"parsing_algorithm": "llm"}, + cache_strategy: udfs.CacheStrategy | None = None, + ): + with optional_imports("xpack-llm"): + import openparse # noqa:F401 + from pypdf import PdfReader # noqa:F401 + + from ._parser_utils import CustomDocumentParser + + super().__init__(cache_strategy=cache_strategy) + + self.doc_parser = CustomDocumentParser(table_args=table_args) + + self.kwargs = dict(table_args=table_args) + + def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]: + import openparse + from pypdf import PdfReader + + reader = PdfReader(stream=BytesIO(contents)) + doc = openparse.Pdf(file=reader) + + parsed_content = self.doc_parser.parse(doc) + nodes = [i for i in parsed_content.nodes] + + logger.info( + f"OpenParser completed parsing, total number of nodes: {len(nodes)}" + ) + + metadata: dict = {} + docs = list(map(lambda x: (x.dict()["text"], metadata), nodes)) + + return docs diff --git a/examples/pipelines/local/README.md b/examples/pipelines/local/README.md new file mode 100644 index 0000000..e67d200 --- /dev/null +++ b/examples/pipelines/local/README.md @@ -0,0 +1,42 @@ +# Local Pipeline + +This pipeline is similar to the [contextful pipeline](), but relies on local computations, rather than querying external API. To do that it uses [HuggingFace](https://huggingface.co/) for the chat model and [Sentence Transformers](https://www.sbert.net/) for the embedding model. + +## How to run the project + +### Setup environment: +Set your env variables in the .env file placed in this directory or in the root of the repo. + +```bash +PATHWAY_DATA_DIR= # If unset, defaults to ../../data/pathway-docs-small/ +PATHWAY_PERSISTENT_STORAGE= # Set this variable if you want to use caching +``` + +### Run the project + +Make sure you have installed poetry dependencies with `--extras local`. + +```bash +poetry install --with examples --extras local +``` + +Run: + +```bash +poetry run python app.py +``` + +If all dependencies are managed manually rather than using poetry, you can also use: + +```bash +python app.py +``` + +To query the pipeline, you can call the REST API: + +```bash +curl --data '{ + "user": "user", + "query": "How to connect to Kafka in Pathway?" +}' http://localhost:8080/ | jq +``` diff --git a/examples/pipelines/local/app.py b/examples/pipelines/local/app.py index 7b81bcd..fc2c2a7 100644 --- a/examples/pipelines/local/app.py +++ b/examples/pipelines/local/app.py @@ -20,25 +20,19 @@ Depending on the length of documents and the model you use this may not be necessary or you can use some more refined method of shortening your prompts. -Usage: -In the root of this repository run: -`poetry run ./run_examples.py local` -or, if all dependencies are managed manually rather than using poetry -`python examples/pipelines/local/app.py` - -You can also run this example directly in the environment with llm_app instaslled. - -To call the REST API: -curl --data '{"user": "user", "query": "How to connect to Kafka in Pathway?"}' http://localhost:8080/ | jq +Please check the README.md in this directory for how-to-run instructions. """ import os +import dotenv import pathway as pw from pathway.stdlib.ml.index import KNNIndex from pathway.xpacks.llm.embedders import SentenceTransformerEmbedder from pathway.xpacks.llm.llms import HFPipelineChat, prompt_chat_single_qa +dotenv.load_dotenv() + class DocumentInputSchema(pw.Schema): doc: str @@ -52,10 +46,10 @@ class QueryInputSchema(pw.Schema): def run( *, data_dir: str = os.environ.get( - "PATHWAY_DATA_DIR", "./examples/data/pathway-docs-small/" + "PATHWAY_DATA_DIR", "../../data/pathway-docs-small/" ), - host: str = "0.0.0.0", - port: int = 8080, + host: str = os.environ.get("PATHWAY_REST_CONNECTOR_HOST", "0.0.0.0"), + port: int = int(os.environ.get("PATHWAY_REST_CONNECTOR_PORT", "8080")), model_locator: str = os.environ.get("MODEL", "gpt2"), embedder_locator: str = os.environ.get("EMBEDDER", "intfloat/e5-large-v2"), max_tokens: int = 60, diff --git a/examples/pipelines/unstructured/README.md b/examples/pipelines/unstructured/README.md new file mode 100644 index 0000000..7b6ac10 --- /dev/null +++ b/examples/pipelines/unstructured/README.md @@ -0,0 +1,52 @@ +# Unstructured Pipeline + +This example implements a RAG pipeline, similarly to [contextful pipeline](). It uses, however, [Unstructured](https://unstructured.io/) library for parsing documents, which are then split into smaller chunks. + +## How to run the project + +### Setup environment: +Set your env variables in the .env file placed in this directory or in the root of the repo. + +```bash +OPENAI_API_KEY=sk-... +PATHWAY_DATA_DIR= # If unset, defaults to ../../data/finance/ +PATHWAY_PERSISTENT_STORAGE= # Set this variable if you want to use caching +``` + +### Run the project + +Make sure you have installed poetry dependencies with `--extras unstructured`. + +```bash +poetry install --with examples --extras unstructured +``` + +Run: + +```bash +poetry run python app.py +``` + +If all dependencies are managed manually rather than using poetry, you can also use: + +```bash +python app.py +``` + +To query the pipeline, you can call the REST API: + +```bash +curl --data '{ + "user": "user", + "query": "When does the magic cola campaign start? Alert me if the start date changes." +}' http://localhost:8080/ | jq +``` + +Or start streamlit UI: + +First go to `ui` directory with `cd ui/` +and run: + +```bash +streamlit run server.py +``` diff --git a/examples/pipelines/unstructured/app.py b/examples/pipelines/unstructured/app.py old mode 100644 new mode 100755 index 4a66f9a..9baad83 --- a/examples/pipelines/unstructured/app.py +++ b/examples/pipelines/unstructured/app.py @@ -27,6 +27,7 @@ import os +import dotenv import pathway as pw from pathway.stdlib.ml.index import KNNIndex from pathway.xpacks.llm.embedders import OpenAIEmbedder @@ -34,6 +35,8 @@ from pathway.xpacks.llm.parsers import ParseUnstructured from pathway.xpacks.llm.splitters import TokenCountSplitter +dotenv.load_dotenv() + class QueryInputSchema(pw.Schema): query: str @@ -42,10 +45,10 @@ class QueryInputSchema(pw.Schema): def run( *, - data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "./examples/data/finance/"), + data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "../../data/finance/"), api_key: str = os.environ.get("OPENAI_API_KEY", ""), - host: str = "0.0.0.0", - port: int = 8080, + host: str = os.environ.get("PATHWAY_REST_CONNECTOR_HOST", "0.0.0.0"), + port: int = int(os.environ.get("PATHWAY_REST_CONNECTOR_PORT", "8080")), embedder_locator: str = "text-embedding-ada-002", embedding_dimension: int = 1536, model_locator: str = "gpt-3.5-turbo", diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/Dockerfile b/examples/pipelines/unstructured_to_sql_on_the_fly/Dockerfile new file mode 100644 index 0000000..46c3606 --- /dev/null +++ b/examples/pipelines/unstructured_to_sql_on_the_fly/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.11 + +WORKDIR /app + +RUN apt-get update \ + && apt-get install -y python3-opencv \ + && rm -rf /var/lib/apt/lists/* /var/cache/apt/archives/* + + +COPY requirements.txt . + +RUN pip install --pre -U --no-cache-dir -r requirements.txt + +COPY . . + +EXPOSE 8080 + +CMD ["python", "app.py"] diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/README.md b/examples/pipelines/unstructured_to_sql_on_the_fly/README.md new file mode 100644 index 0000000..fa038c0 --- /dev/null +++ b/examples/pipelines/unstructured_to_sql_on_the_fly/README.md @@ -0,0 +1,109 @@ +# Unstructured to SQL on the fly + +The aim of this pipeline is to extract and structure the data out of unstructured data (PDFs, queries) +on the fly. + +This example consists of two separate parts that can be used independently. +1 - Pipeline 1: Proactive data pipeline that is always live and tracking file changes, + it reads documents, structures them and writes results to PostgreSQL. +2 - Pipeline 2: Query answering pipeline that reads user queries, and answers them by + generating SQL queries that are run on the data stored in PostgreSQL. + + +Specifically, Pipeline 1 reads in a collection of financial PDF documents from a local directory +(that can be synchronized with a Dropbox account), tokenizes each document using the tiktoken encoding, +then extracts, using the OpenAI API, the wanted fields. +The values are stored in a Pathway table which is then output to a PostgreSQL instance. + +Pipeline 2 then starts a REST API endpoint serving queries about programming in Pathway. + +Each query text is converted into a SQL query using the OpenAI API. + +Architecture diagram and description are at +https://pathway.com/developers/showcases/unstructured-to-structured + + +⚠️ This project requires a running PostgreSQL instance. + +🔵 The extracted fields from the PDFs documents are the following: +- company_symbol: str +- year: int +- quarter: str +- revenue_md: float +- eps: float +- net_income_md: float + +⚠️ The revenue and net income are expressed in millions of dollars, the eps is in dollars. + +🔵 The script uses a prompt to instruct the Language Model and generate SQL queries that adhere to the specified format. +The allowed queries follow a particular pattern: +1. The SELECT clause should specify columns or standard aggregator operators (SUM, COUNT, MIN, MAX, AVG). +2. The WHERE clause should include conditions using standard binary operators (<, >, =, etc.), + with support for AND and OR logic. +3. To prevent 'psycopg2.errors.GroupingError', relevant columns from the WHERE clause are included + in the GROUP BY clause. +4. For readability, if no aggregator is used, the company_symbol, year, + and quarter are included in addition to the wanted columns. + +Example: +"What is the net income of all companies?" should return: +Response: +'SELECT company_symbol, net_income_md, quarter, net_income_md FROM table;' + + +## Project architecture: +``` +. +├── postgresql/ +│ └── init-db.sql +├── ui/ +│ └── server.py +├── __init__.py +├── app.py +└── docker-compose.yml +``` + +## Running the Pipeline + +### Setup environment: + +Set your env variables in the .env file placed in this directory. + +```bash +OPENAI_API_KEY=sk-... +PATHWAY_PERSISTENT_STORAGE= # Set this variable if you want to use caching +``` + +### With Docker + +To run jointly the Unstructured to SQL on the fly pipeline and a simple UI please execute: + +```bash +docker compose up --build +``` + +Then, the UI will run at http://127.0.0.1:8501 by default. You can access it by following this URL in your web browser. + +The `docker-compose.yml` file declares a [volume bind mount](https://docs.docker.com/reference/cli/docker/container/run/#volume) that makes changes to files under `data/` made on your host computer visible inside the docker container. The files in `data/quarterly_earnings` are indexed by the pipeline - you can paste new files there and they will impact the computations. + +### Manually + +Alternatively, you can run each service separately. To run PostgreSQL use Docker. To run it, run: +`docker compose up -d postgres`. + +To install the dependencies, run: +`pip install -r requirements.txt` +Then run: +`python app.py` + +This will run the pipeline, which you can access through REST API at `localhost:8080`. For example, you can send questions using curl: +``` +curl --data '{ + "user": "user", + "query": "What is the maximum quarterly revenue achieved by Apple?" +}' http://localhost:8080/ | jq +``` + +You can also run the Streamlit interface: +`streamlit run ui/server.py` + diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/app.py b/examples/pipelines/unstructured_to_sql_on_the_fly/app.py index 83ad6aa..3f7863d 100644 --- a/examples/pipelines/unstructured_to_sql_on_the_fly/app.py +++ b/examples/pipelines/unstructured_to_sql_on_the_fly/app.py @@ -103,6 +103,7 @@ import logging import os +import dotenv import pathway as pw import psycopg import tiktoken @@ -110,6 +111,8 @@ from pathway.xpacks.llm.llms import OpenAIChat, prompt_chat_single_qa from pathway.xpacks.llm.parsers import ParseUnstructured +dotenv.load_dotenv() + class FinancialStatementSchema(pw.Schema): company_symbol: str @@ -199,7 +202,8 @@ def build_prompt_query(postresql_table: str, query: str) -> str: WHERE year = 2022 AND eps > 1.0 GROUP BY company_symbol;' - Make sure the query adheres to the specified format, + Make sure the query adheres to the specified format, and that it includes GROUP BY clause + if you aggregate results and do not include any other SQL commands or clauses besides the SELECT statement. Thank you!""" return prompt @@ -225,7 +229,7 @@ def structure_on_the_fly( model=model_locator, temperature=temperature, max_tokens=max_tokens, - retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(), + retry_strategy=pw.asynchronous.ExponentialBackoffRetryStrategy(), cache_strategy=pw.asynchronous.DefaultCache(), ) @@ -268,7 +272,7 @@ def unstructured_query( model=model_locator, temperature=temperature, max_tokens=max_tokens, - retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(), + retry_strategy=pw.asynchronous.ExponentialBackoffRetryStrategy(), cache_strategy=pw.asynchronous.DefaultCache(), ) @@ -307,12 +311,12 @@ def strip_metadata(docs: list[tuple[str, dict]]) -> list[str]: def run( *, - data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "./examples/data/q_earnings/"), + data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "./data/quarterly_earnings"), api_key: str = os.environ.get("OPENAI_API_KEY", ""), - host: str = "0.0.0.0", - port: int = 8080, - model_locator: str = "gpt-3.5-turbo-16k", # "gpt-4", # gpt-3.5-turbo-16k - max_tokens: int = 60, + host: str = os.environ.get("PATHWAY_REST_CONNECTOR_HOST", "0.0.0.0"), + port: int = int(os.environ.get("PATHWAY_REST_CONNECTOR_PORT", "8080")), + model_locator: str = "gpt-3.5-turbo", # "gpt-4", # gpt-3.5-turbo-16k + max_tokens: int = 120, temperature: float = 0.0, postresql_host: str = os.environ.get("POSTGRESQL_HOST", "localhost"), postresql_port: str = os.environ.get("POSTGRESQL_PORT", "5432"), @@ -345,7 +349,7 @@ def run( unstructured_documents, api_key, model_locator, max_tokens, temperature ) pw.io.postgres.write(structured_table, postgreSQL_settings, postresql_table) - pw.io.csv.write(structured_table, "./examples/data/quarterly_earnings.csv") + pw.io.csv.write(structured_table, "./data/quarterly_earnings.csv") # # # Pipeline 2 - query answering using PostgreSql diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/2023q2-alphabet-earnings-release.pdf b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/2023q2-alphabet-earnings-release.pdf new file mode 100644 index 0000000..665d796 Binary files /dev/null and b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/2023q2-alphabet-earnings-release.pdf differ diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/FY22_Q4_Consolidated_Financial_Statements.pdf b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/FY22_Q4_Consolidated_Financial_Statements.pdf new file mode 100644 index 0000000..34122cc Binary files /dev/null and b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/FY22_Q4_Consolidated_Financial_Statements.pdf differ diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/FY23_Q1_Consolidated_Financial_Statements.pdf b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/FY23_Q1_Consolidated_Financial_Statements.pdf new file mode 100644 index 0000000..418812d Binary files /dev/null and b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/FY23_Q1_Consolidated_Financial_Statements.pdf differ diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/FY23_Q2_Consolidated_Financial_Statements.pdf b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/FY23_Q2_Consolidated_Financial_Statements.pdf new file mode 100644 index 0000000..180ad4d Binary files /dev/null and b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/FY23_Q2_Consolidated_Financial_Statements.pdf differ diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/FY23_Q3_Consolidated_Financial_Statements.pdf b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/FY23_Q3_Consolidated_Financial_Statements.pdf new file mode 100644 index 0000000..593675d Binary files /dev/null and b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/FY23_Q3_Consolidated_Financial_Statements.pdf differ diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/Meta-03-31-2023-Exhibit-99-1-FINAL-v2.pdf b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/Meta-03-31-2023-Exhibit-99-1-FINAL-v2.pdf new file mode 100644 index 0000000..232275d Binary files /dev/null and b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/Meta-03-31-2023-Exhibit-99-1-FINAL-v2.pdf differ diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/goog-exhibit-99-1-q1-2023-19.pdf b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/goog-exhibit-99-1-q1-2023-19.pdf new file mode 100644 index 0000000..4bf8cd0 Binary files /dev/null and b/examples/pipelines/unstructured_to_sql_on_the_fly/data/quarterly_earnings/goog-exhibit-99-1-q1-2023-19.pdf differ diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/docker-compose.yml b/examples/pipelines/unstructured_to_sql_on_the_fly/docker-compose.yml new file mode 100644 index 0000000..9792086 --- /dev/null +++ b/examples/pipelines/unstructured_to_sql_on_the_fly/docker-compose.yml @@ -0,0 +1,51 @@ +version: "3.8" +services: + postgres: + container_name: postgres + image: postgres + ports: + - 5432:5432 + environment: + - POSTGRES_USER=user + - POSTGRES_PASSWORD=password + - POSTGRES_DB=STRUCTUREDDB + - PGPASSWORD=password + volumes: + - ./postgres/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql + expose: + - 5432 + healthcheck: + test: + [ + "CMD-SHELL", + "pg_isready -h 0.0.0.0 -p 5432 -U user -d STRUCTUREDDB" + ] + interval: 5s + timeout: 5s + retries: 5 + pathway: + depends_on: + postgres: + condition: service_healthy + build: + context: . + ports: + - "8080:8080" + environment: + OPENAI_API_KEY: + PATHWAY_REST_CONNECTOR_HOST: + PATHWAY_REST_CONNECTOR_PORT: + PATHWAY_PERSISTENT_STORAGE: + POSTGRESQL_HOST: "172.17.0.1" + volumes: + - "./data:/app/data" + streamlit_ui: + depends_on: + - pathway + build: + context: ./ui + ports: + - "8501:8501" + environment: + PATHWAY_HOST: "pathway" + PATHWAY_PORT: "8080" diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/postgres/docker-compose.yml b/examples/pipelines/unstructured_to_sql_on_the_fly/postgres/docker-compose.yml deleted file mode 100644 index 0a9a45a..0000000 --- a/examples/pipelines/unstructured_to_sql_on_the_fly/postgres/docker-compose.yml +++ /dev/null @@ -1,16 +0,0 @@ -version: "3.7" -services: - postgres: - container_name: postgres - image: postgres - ports: - - 5432:5432 - environment: - - POSTGRES_USER=user - - POSTGRES_PASSWORD=password - - POSTGRES_DB=STRUCTUREDDB - - PGPASSWORD=password - volumes: - - ./init-db.sql:/docker-entrypoint-initdb.d/init-db.sql - expose: - - 5432 diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/requirements.txt b/examples/pipelines/unstructured_to_sql_on_the_fly/requirements.txt new file mode 100644 index 0000000..ca8b9f5 --- /dev/null +++ b/examples/pipelines/unstructured_to_sql_on_the_fly/requirements.txt @@ -0,0 +1,7 @@ +pathway==0.8.6 +python-dotenv==1.0.1 +unstructured[all-docs]==0.10.28 +psycopg==3.1.12 +tiktoken==0.5.2 +streamlit==1.31.0 +openai==1.12.0 diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/ui/Dockerfile b/examples/pipelines/unstructured_to_sql_on_the_fly/ui/Dockerfile new file mode 100644 index 0000000..e3544a8 --- /dev/null +++ b/examples/pipelines/unstructured_to_sql_on_the_fly/ui/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.11 + +WORKDIR /app + +RUN pip install streamlit + +COPY . . + +EXPOSE 8501 + +CMD ["streamlit", "run", "server.py", "--server.port", "8501", "--server.address", "0.0.0.0"] diff --git a/examples/pipelines/unstructured_to_sql_on_the_fly/ui/server.py b/examples/pipelines/unstructured_to_sql_on_the_fly/ui/server.py index 4046d4d..32462c2 100644 --- a/examples/pipelines/unstructured_to_sql_on_the_fly/ui/server.py +++ b/examples/pipelines/unstructured_to_sql_on_the_fly/ui/server.py @@ -1,9 +1,11 @@ +import os + import pandas as pd import requests import streamlit as st -api_host = "localhost" -api_port = 8080 +api_host = os.environ.get("PATHWAY_HOST", "localhost") +api_port = os.environ.get("PATHWAY_PORT", 8000) with st.sidebar: st.markdown( diff --git a/poetry.lock b/poetry.lock index 46a1410..148896c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -210,6 +210,17 @@ doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphin test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] trio = ["trio (>=0.23)"] +[[package]] +name = "appdirs" +version = "1.4.4" +description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." +optional = false +python-versions = "*" +files = [ + {file = "appdirs-1.4.4-py2.py3-none-any.whl", hash = "sha256:a841dacd6b99318a741b166adb07e19ee71a274450e68237b4650ca1055ab128"}, + {file = "appdirs-1.4.4.tar.gz", hash = "sha256:7d5d0167b2b1ba821647616af46a749d1c653740dd0d2415100fe26e27afdf41"}, +] + [[package]] name = "asttokens" version = "2.4.1" @@ -228,6 +239,20 @@ six = ">=1.12.0" astroid = ["astroid (>=1,<2)", "astroid (>=2,<4)"] test = ["astroid (>=1,<2)", "astroid (>=2,<4)", "pytest"] +[[package]] +name = "async-lru" +version = "2.0.4" +description = "Simple LRU cache for asyncio" +optional = false +python-versions = ">=3.8" +files = [ + {file = "async-lru-2.0.4.tar.gz", hash = "sha256:b8a59a5df60805ff63220b2a0c5b5393da5521b113cd5465a44eb037d81a5627"}, + {file = "async_lru-2.0.4-py3-none-any.whl", hash = "sha256:ff02944ce3c288c5be660c42dbcca0742b32c3b279d6dceda655190240b99224"}, +] + +[package.dependencies] +typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.11\""} + [[package]] name = "async-timeout" version = "4.0.3" @@ -1206,6 +1231,25 @@ files = [ {file = "frozenlist-1.4.1.tar.gz", hash = "sha256:c037a86e8513059a2613aaba4d817bb90b9d9b6b69aace3ce9c877e8c8ed402b"}, ] +[[package]] +name = "fs" +version = "2.4.16" +description = "Python's filesystem abstraction layer" +optional = false +python-versions = "*" +files = [ + {file = "fs-2.4.16-py2.py3-none-any.whl", hash = "sha256:660064febbccda264ae0b6bace80a8d1be9e089e0a5eb2427b7d517f9a91545c"}, + {file = "fs-2.4.16.tar.gz", hash = "sha256:ae97c7d51213f4b70b6a958292530289090de3a7e15841e108fbe144f069d313"}, +] + +[package.dependencies] +appdirs = ">=1.4.3,<1.5.0" +setuptools = "*" +six = ">=1.10,<2.0" + +[package.extras] +scandir = ["scandir (>=1.5,<2.0)"] + [[package]] name = "fsspec" version = "2024.2.0" @@ -1321,12 +1365,12 @@ files = [ google-auth = ">=2.14.1,<3.0.dev0" googleapis-common-protos = ">=1.56.2,<2.0.dev0" grpcio = [ - {version = ">=1.49.1,<2.0dev", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, {version = ">=1.33.2,<2.0dev", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, + {version = ">=1.49.1,<2.0dev", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, ] grpcio-status = [ - {version = ">=1.49.1,<2.0.dev0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, {version = ">=1.33.2,<2.0.dev0", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, + {version = ">=1.49.1,<2.0.dev0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, ] protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<5.0.0.dev0" requests = ">=2.18.0,<3.0.0.dev0" @@ -2900,7 +2944,7 @@ files = [ name = "networkx" version = "3.2.1" description = "Python package for creating and manipulating graphs and networks" -optional = true +optional = false python-versions = ">=3.9" files = [ {file = "networkx-3.2.1-py3-none-any.whl", hash = "sha256:f18c69adc97877c42332c170849c96cefa91881c99a7cb3e95b7c659ebdc1ec2"}, @@ -3332,10 +3376,10 @@ files = [ [package.dependencies] numpy = [ - {version = ">=1.26.0", markers = "python_version >= \"3.12\""}, - {version = ">=1.23.5", markers = "python_version >= \"3.11\" and python_version < \"3.12\""}, {version = ">=1.21.4", markers = "python_version >= \"3.10\" and platform_system == \"Darwin\" and python_version < \"3.11\""}, {version = ">=1.21.2", markers = "platform_system != \"Darwin\" and python_version >= \"3.10\" and python_version < \"3.11\""}, + {version = ">=1.23.5", markers = "python_version >= \"3.11\" and python_version < \"3.12\""}, + {version = ">=1.26.0", markers = "python_version >= \"3.12\""}, ] [[package]] @@ -3510,9 +3554,9 @@ files = [ [package.dependencies] numpy = [ - {version = ">=1.26.0,<2", markers = "python_version >= \"3.12\""}, - {version = ">=1.23.2,<2", markers = "python_version == \"3.11\""}, {version = ">=1.22.4,<2", markers = "python_version < \"3.11\""}, + {version = ">=1.23.2,<2", markers = "python_version == \"3.11\""}, + {version = ">=1.26.0,<2", markers = "python_version >= \"3.12\""}, ] python-dateutil = ">=2.8.2" pytz = ">=2020.1" @@ -3629,37 +3673,40 @@ files = [ [[package]] name = "pathway" -version = "0.8.2" +version = "0.8.6" description = "Pathway is a data processing framework which takes care of streaming data updates for you." optional = false python-versions = ">=3.10" files = [ - {file = "pathway-0.8.2-cp310-abi3-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:dc81beecfb396d7a1a40aa1d230617c38c41d0b6572a900ed0349078bc7f2632"}, - {file = "pathway-0.8.2-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ad84e08722a8590893ae80f36248c14e800d332d477516802a71d155da1b2f68"}, - {file = "pathway-0.8.2-cp310-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:d32bf13b34ee4c14eee18e6e62f55136446d07ceef62be5343f62477dcfb7800"}, + {file = "pathway-0.8.6-cp310-abi3-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:f816846fdf7c10dd4455ff97d3b24a3e706e3d40b2ed2a65d2b7a507138fbb34"}, + {file = "pathway-0.8.6-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:666a3ad6552fd9826bb2e139479f9faf5cdad6cbb7a715fd3012489bfe9bb8cd"}, + {file = "pathway-0.8.6-cp310-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:c007bec4f0008e0a7839b44915c76885f37c5d59dc93a8fadcad090c8c12ffac"}, ] [package.dependencies] aiohttp = ">=3.8.4" -aiohttp_cors = ">=0.7.0" +aiohttp-cors = ">=0.7.0" airbyte-serverless = "0.23" +async-lru = ">=2.0.4" beartype = ">=0.14.0,<0.16.0" boto3 = ">=1.26.76" click = ">=8.1" diskcache = ">=5.2.1" exceptiongroup = {version = ">=1.1.3", markers = "python_version < \"3.11\""} +fs = ">=2.4.16" geopy = ">=2.4.0" google-api-python-client = ">=2.108.0" h3 = ">=3.7.6" jmespath = ">=1.0.1" -jupyter_bokeh = ">=3.0.7" +jupyter-bokeh = ">=3.0.7" +networkx = ">=3.2.1" numba = {version = ">=0.56", markers = "python_version < \"3.12\""} numpy = ">=1.21" -Office365-REST-Python-Client = ">=2.5.3" +office365-rest-python-client = ">=2.5.3" opentelemetry-api = ">=1.22.0" opentelemetry-exporter-otlp-proto-grpc = ">=1.22.0" opentelemetry-sdk = ">=1.22.0" -pandas = ">=1.3" +pandas = ">=2.0" panel = ">=1.3.1" pyarrow = ">=10.0.0" python-sat = ">=0.1.8.dev0" @@ -3668,10 +3715,10 @@ rich = ">=12.6.0" scikit-learn = ">=1.0" shapely = ">=2.0.1" sqlglot = "10.6.1" -typing_extensions = ">=4.8.0" +typing-extensions = ">=4.8.0" [package.extras] -tests = ["litellm (>=1.0)", "llama_index (>=0.9,<0.10)", "networkx", "openai (>=1.6)", "openapi_spec_validator", "pytest (>=8.0.0,<9.0.0)", "pytest-rerunfailures (>=13.0,<14.0)", "pytest-xdist (>=3.3.1,<4.0.0)", "python-louvain", "sentence_transformers", "tiktoken (>=0.5)", "transformers"] +tests = ["cohere (>=5.1.0,<5.2.0)", "kafka-python (>=2.0.2)", "langchain (==0.1.11)", "langchain-core (==0.1.30)", "litellm (>=1.0)", "llama-index-core (>=0.10.0,<0.11.0)", "llama-index-readers-pathway (>=0.1.0,<0.2.0)", "llama-index-retrievers-pathway (>=0.1.0,<0.2.0)", "openai (>=1.6)", "openapi-spec-validator", "pytest (>=8.0.0,<9.0.0)", "pytest-rerunfailures (>=13.0,<14.0)", "pytest-xdist (>=3.3.1,<4.0.0)", "python-louvain", "python-magic", "sentence-transformers (==2.4.0)", "tiktoken (>=0.5)", "transformers (==4.38.1)", "unstructured[all-docs] (==0.10.28)"] [[package]] name = "pdf2image" @@ -5426,6 +5473,22 @@ torch = ">=1.11.0" tqdm = "*" transformers = ">=4.32.0,<5.0.0" +[[package]] +name = "setuptools" +version = "69.5.1" +description = "Easily download, build, install, upgrade, and uninstall Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "setuptools-69.5.1-py3-none-any.whl", hash = "sha256:c636ac361bc47580504644275c9ad802c50415c7522212252c033bd15f301f32"}, + {file = "setuptools-69.5.1.tar.gz", hash = "sha256:6c1fccdac05a97e598fb0ae3bbed5904ccb317337a51139dcd51453611bbb987"}, +] + +[package.extras] +docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"] +testing = ["build[virtualenv]", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] +testing-integration = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "packaging (>=23.2)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] + [[package]] name = "shapely" version = "2.0.3" @@ -6035,13 +6098,13 @@ test = ["argcomplete (>=3.0.3)", "mypy (>=1.7.0)", "pre-commit", "pytest (>=7.0, [[package]] name = "transformers" -version = "4.38.2" +version = "4.39.3" description = "State-of-the-art Machine Learning for JAX, PyTorch and TensorFlow" optional = true python-versions = ">=3.8.0" files = [ - {file = "transformers-4.38.2-py3-none-any.whl", hash = "sha256:c4029cb9f01b3dd335e52f364c52d2b37c65b4c78e02e6a08b1919c5c928573e"}, - {file = "transformers-4.38.2.tar.gz", hash = "sha256:c5fc7ad682b8a50a48b2a4c05d4ea2de5567adb1bdd00053619dbe5960857dd5"}, + {file = "transformers-4.39.3-py3-none-any.whl", hash = "sha256:7838034a12cca3168247f9d2d1dba6724c9de3ae0f73a108258c6b8fc5912601"}, + {file = "transformers-4.39.3.tar.gz", hash = "sha256:2586e5ff4150f122716fc40f5530e92871befc051848fbe82600969c535b762d"}, ] [package.dependencies] @@ -6709,11 +6772,11 @@ docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.link testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy (>=0.9.1)", "pytest-ruff"] [extras] -local = ["sentence-transformers", "torch"] +local = ["sentence-transformers", "torch", "transformers"] unstructured = ["tiktoken", "unstructured"] unstructured-to-sql = ["psycopg", "tiktoken", "unstructured"] [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.13" -content-hash = "378709c02a2991dc35c34f59f5d0f8b41f70cd7226b23a82fe4f493b7e3a549a" +content-hash = "960c4eb3dfd9f27897c7e7ac9d44642f58c6d82a4ed5e8d1ff2545ef89ac6c7b" diff --git a/pyproject.toml b/pyproject.toml index f9ca6a9..7f1a89f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ classifiers = [ [tool.poetry.dependencies] python = ">=3.10,<3.13" -pathway = "=0.8.2" +pathway = "=0.8.6" openai = "^1.2.4" requests = "^2.31.0" diskcache = "^5.6.1" @@ -42,10 +42,11 @@ unstructured = { extras = ["all-docs"], version = "^0.11.8", optional = true } tiktoken = { version = "^0.6.0", optional = true } psycopg = { version = "^3.1.12", optional = true } litellm = "^1.18.0" +transformers = { version = "^4.39.0", optional = true } [tool.poetry.extras] -local = ["torch", "sentence-transformers"] +local = ["torch", "sentence-transformers", "transformers"] unstructured = ["unstructured", "tiktoken"] unstructured_to_sql = ["tiktoken", "psycopg", "unstructured"] diff --git a/run_examples.py b/run_examples.py deleted file mode 100755 index 27d3e25..0000000 --- a/run_examples.py +++ /dev/null @@ -1,223 +0,0 @@ -#!/usr/bin/env python - -""" -This is the main entrypoint for running all examples with centralized configuration. -However, all examples are self-contained and can also be run directly. -""" - -import functools - -import click -from dotenv import load_dotenv - -load_dotenv() - - -@click.group -def cli() -> None: - pass - - -def common_options(func): - @click.option( - "--host", - "-h", - envvar="PATHWAY_REST_CONNECTOR_HOST", - type=str, - default="127.0.0.1", - help="Rest input connector host.", - ) - @click.option( - "--port", - "-p", - envvar="PATHWAY_REST_CONNECTOR_PORT", - type=int, - default=8080, - help="Rest input connector port.", - ) - @click.option( - "--data_dir", - envvar="PATHWAY_DATA_DIR", - type=str, - required=False, - ) - @click.option( - "--cache_dir", - "-c", - envvar="PATHWAY_PERSISTENT_STORAGE", - type=str, - default="/tmp/cache", - ) - @click.option( - "--embedder_locator", - "-e", - envvar="EMBEDDER_LOCATOR", - type=str, - required=False, - help="Embedding model locator.", - ) - @click.option( - "--embedding_dimension", - "-d", - envvar="EMBEDDING_DIMENSION", - type=int, - required=False, - help="Embedding model output dimension.", - ) - @click.option( - "--max_tokens", - "-x", - envvar="MAX_OUTPUT_TOKENS", - type=int, - required=False, - help="Maximum output tokens of the LLM.", - ) - @click.option( - "--model_locator", - "-m", - envvar="MODEL_LOCATOR", - type=str, - required=False, - help="LLM locator for text completion/generation.", - ) - @click.option( - "--api_key", - "-k", - envvar="OPENAI_API_KEY", - type=str, - required=False, - help="API Key for OpenAI/HuggingFace Inference APIs.", - ) - @click.option( - "--temperature", - "-t", - envvar="MODEL_TEMPERATURE", - type=float, - required=False, - help="LLM temperature, controls the randomness of the outputs.", - ) - @click.option( - "--device", - envvar="DEVICE", - type=str, - required=False, - help="Device to run models on, e.g. 'cpu', 'cuda'", - ) - @functools.wraps(func) - def wrapper(**kwargs): - kwargs = {k: v for k, v in kwargs.items() if v is not None} - return func(**kwargs) - - return wrapper - - -@cli.command() -@common_options -def local(**kwargs): - from examples.pipelines.local import run - - return run(**kwargs) - - -@cli.command() -@common_options -def contextful(**kwargs): - from examples.pipelines.contextful import run - - return run(**kwargs) - - -@cli.command() -@common_options -def s3(**kwargs): - from examples.pipelines.contextful_s3 import run - - return run(**kwargs) - - -@cli.command() -@common_options -def contextful_s3(**kwargs): - from examples.pipelines.contextful_s3 import run - - return run(**kwargs) - - -@cli.command() -@common_options -def contextless(**kwargs): - from examples.pipelines.contextless import run - - return run(**kwargs) - - -@cli.command() -@common_options -def unstructured(**kwargs): - from examples.pipelines.unstructured import run - - return run(**kwargs) - - -@cli.command() -@common_options -def unstructuredtosql(**kwargs): - from examples.pipelines.unstructured_to_sql_on_the_fly import run - - return run(**kwargs) - - -@cli.command() -@common_options -def unstructured_to_sql(**kwargs): - from examples.pipelines.unstructured_to_sql_on_the_fly import run - - return run(**kwargs) - - -@cli.command() -@common_options -def alert(**kwargs): - from examples.pipelines.alert import run - - return run(**kwargs) - - -@cli.command() -@common_options -def drivealert(**kwargs): - from examples.pipelines.drive_alert import run - - return run(**kwargs) - - -@cli.command() -@common_options -def drive_alert(**kwargs): - from examples.pipelines.drive_alert import run - - return run(**kwargs) - - -@cli.command() -@common_options -def contextful_geometric(**kwargs): - from examples.pipelines.contextful_geometric import run - - return run(**kwargs) - - -@cli.command() -@common_options -def geometric(**kwargs): - from examples.pipelines.contextful_geometric import run - - return run(**kwargs) - - -def main(): - cli.main() - - -if __name__ == "__main__": - cli.main()