Skip to content

Commit

Permalink
Refactoring LLM examples to use Pathway xpack (#5499)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 8699786a7449fc9b91f900ebb141f966fe37e3e0
  • Loading branch information
szymondudycz authored and Manul from Pathway committed Jan 30, 2024
1 parent ba9cd71 commit 0dc944d
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 186 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Pick one that is closest to your needs.
| Example app (template) | Description |
| --------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| [`contextless`](examples/pipelines/contextless/app.py) | This simple example calls OpenAI ChatGPT API but does not use an index when processing queries. It relies solely on the given user query. We recommend it to start your Pathway LLM journey. |
| [`contextful`](examples/pipelines/contextful/app.py) | This default example of the app will index the jsonlines documents located in the [`data/pathway-docs`](examples/data/pathway-docs) directory. These indexed documents are then taken into account when processing queries. The pathway pipeline running in this mode is located at [`examples/pipelines/contextful/app.py`](examples/pipelines/contextful/app.py). |
| [`contextful`](examples/pipelines/contextful/app.py) | This default example of the app will index the jsonlines documents located in the [`data/pathway-docs`](examples/data/pathway-docs) directory. These indexed documents are then taken into account when processing queries. |
| [`contextful-s3`](examples/pipelines/contextful_s3/app.py) | This example operates similarly to the contextful mode. The main difference is that the documents are stored and indexed from an S3 bucket, allowing the handling of a larger volume of documents. This can be more suitable for production environments. |
| [`unstructured`](examples/pipelines/unstructured/app.py) | Process unstructured documents such as PDF, HTML, DOCX, PPTX, and more. Visit [unstructured-io](https://unstructured-io.github.io/unstructured/) for the full list of supported formats. |
| [`local`](examples/pipelines/local/app.py) | This example runs the application using Huggingface Transformers, which eliminates the need for the data to leave the machine. It provides a convenient way to use state-of-the-art NLP models locally. |
Expand Down Expand Up @@ -130,7 +130,7 @@ Create an .env file in the root directory and add the following environment vari
| PATHWAY_REST_CONNECTOR_HOST | Specifies the host IP for the REST connector in Pathway. For the dockerized version, set it to `0.0.0.0` Natively, you can use `127.0.0.1` |
| PATHWAY_REST_CONNECTOR_PORT | Specifies the port number on which the REST connector service of the Pathway should listen. Here, it is set to 8080. |
| OPENAI_API_KEY | The API token for accessing OpenAI services. If you are not running the local version, please remember to replace it with your API token, which you can generate from your account on [openai.com](https:/platform.openai.com/account/api-keys). |
| PATHWAY_CACHE_DIR | Specifies the directory where the cache is stored. You could use /tmpcache. |
| PATHWAY_PERSISTENT_STORAGE | Specifies the directory where the cache is stored. You could use /tmpcache. |

For example:

Expand All @@ -139,7 +139,7 @@ APP_VARIANT=contextful
PATHWAY_REST_CONNECTOR_HOST=0.0.0.0
PATHWAY_REST_CONNECTOR_PORT=8080
OPENAI_API_KEY=<Your Token>
PATHWAY_CACHE_DIR=/tmp/cache
PATHWAY_PERSISTENT_STORAGE=/tmp/cache
```

### Step 3: Build and run the app
Expand Down
63 changes: 30 additions & 33 deletions examples/pipelines/alert/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@
Microservice for a context-aware alerting ChatGPT assistant.
This demo is very similar to `contextful` example with an additional real time alerting capability.
In the demo, alerts are sent to the Slack (you need `slack_alert_channel_id` and `slack_alert_token`),
In the demo, alerts are sent to Slack (you need `slack_alert_channel_id` and `slack_alert_token`),
you can either put these env variables in .env file under llm-app directory,
or create env variables in terminal (ie. export in bash)
or create env variables in the terminal (ie. export in bash)
If you don't have Slack, you can leave them empty and app will print the notifications to
standard output instead.
Upon starting, a REST API endpoint is opened by the app to serve queries about files inside
the input folder `data_dir`.
We can create notifications by sending query to API stating we want to be modified.
We can create notifications by sending a query to API and stating we want to be notified of the changes.
Alternatively, the provided Streamlit chat app can be used.
One example would be `Tell me and alert about start date of campaign for Magic Cola`
One example would be `Tell me and alert about the start date of the campaign for Magic Cola`
What happens next?
Each query text is first turned into a vector using OpenAI embedding service,
then relevant documentation pages are found using a Nearest Neighbor index computed
for documents in the corpus. A prompt is build from the relevant documentations pages
for documents in the corpus. A prompt is built from the relevant documentations pages
and sent to the OpenAI GPT3.5 chat service for processing and answering.
Once you run, Pathway looks for any changes in data sources, and efficiently detects changes
Once you run, Pathway looks for any changes in data sources and efficiently detects changes
to the relevant documents. When a change is detected, the LLM is asked to answer the query
again, and if the new answer is sufficiently different, an alert is created.
Expand All @@ -45,13 +45,15 @@
run `streamlit run server.py`
"""

import asyncio
import os

import pathway as pw
from pathway.stdlib.ml.index import KNNIndex
from pathway.xpacks.llm.embedders import OpenAIEmbedder
from pathway.xpacks.llm.llms import OpenAIChat, prompt_chat_single_qa

from llm_app import send_slack_alerts
from llm_app.model_wrappers import OpenAIChatGPTModel, OpenAIEmbeddingModel


class DocumentInputSchema(pw.Schema):
Expand Down Expand Up @@ -151,7 +153,12 @@ def run(
**kwargs,
):
# Part I: Build index
embedder = OpenAIEmbeddingModel(api_key=api_key)
embedder = OpenAIEmbedder(
api_key=api_key,
model=embedder_locator,
retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(),
cache_strategy=pw.asynchronous.DefaultCache(),
)

documents = pw.io.jsonlines.read(
data_dir,
Expand All @@ -160,9 +167,7 @@ def run(
autocommit_duration_ms=50,
)

enriched_documents = documents + documents.select(
data=embedder.apply(text=pw.this.doc, locator=embedder_locator)
)
enriched_documents = documents + documents.select(data=embedder(pw.this.doc))

index = KNNIndex(
enriched_documents.data, enriched_documents, n_dimensions=embedding_dimension
Expand All @@ -178,20 +183,20 @@ def run(
delete_completed_queries=False,
)

model = OpenAIChatGPTModel(api_key=api_key)
model = OpenAIChat(
api_key=api_key,
model=model_locator,
temperature=temperature,
max_tokens=max_tokens,
retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(),
cache_strategy=pw.asynchronous.DefaultCache(),
)

query += query.select(
prompt=build_prompt_check_for_alert_request_and_extract_query(query.query)
)
query += query.select(
tupled=split_answer(
model.apply(
pw.this.prompt,
locator=model_locator,
temperature=temperature,
max_tokens=100,
)
),
tupled=split_answer(model(prompt_chat_single_qa(pw.this.prompt))),
)
query = query.select(
pw.this.user,
Expand All @@ -200,7 +205,7 @@ def run(
)

query += query.select(
data=embedder.apply(text=pw.this.query, locator=embedder_locator),
data=embedder(pw.this.query),
query_id=pw.apply(make_query_id, pw.this.user, pw.this.query),
)

Expand All @@ -221,12 +226,7 @@ def run(
pw.this.query_id,
pw.this.query,
pw.this.alert_enabled,
response=model.apply(
pw.this.prompt,
locator=model_locator,
temperature=temperature,
max_tokens=max_tokens,
),
response=model(prompt_chat_single_qa(pw.this.prompt)),
)

output = responses.select(
Expand All @@ -243,12 +243,9 @@ def acceptor(new: str, old: str) -> bool:
if new == old:
return False

decision = model(
build_prompt_compare_answers(new, old),
locator=model_locator,
max_tokens=20,
)

# TODO: clean after udfs can be used as common functions
prompt = [dict(role="system", content=build_prompt_compare_answers(new, old))]
decision = asyncio.run(model.__wrapped__(prompt, max_tokens=20))
return decision_to_bool(decision)

deduplicated_responses = pw.stateful.deduplicate(
Expand Down
42 changes: 22 additions & 20 deletions examples/pipelines/contextful/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
Each query text is first turned into a vector using OpenAI embedding service,
then relevant documentation pages are found using a Nearest Neighbor index computed
for documents in the corpus. A prompt is build from the relevant documentations pages
and sent to the OpenAI GPT-4 chat service for processing.
for documents in the corpus. A prompt is built from the relevant documentations pages
and sent to the OpenAI chat service for processing.
Usage:
In the root of this repository run:
`poetry run ./run_examples.py contextful`
or, if all dependencies are managed manually rather than using poetry
`python examples/pipelines/contextful/app.py`
You can also run this example directly in the environment with llm_app instaslled.
You can also run this example directly in the environment with llm_app installed.
To call the REST API:
curl --data '{"user": "user", "query": "How to connect to Kafka in Pathway?"}' http://localhost:8080/ | jq
Expand All @@ -29,8 +29,8 @@

import pathway as pw
from pathway.stdlib.ml.index import KNNIndex

from llm_app.model_wrappers import OpenAIChatGPTModel, OpenAIEmbeddingModel
from pathway.xpacks.llm.embedders import OpenAIEmbedder
from pathway.xpacks.llm.llms import OpenAIChat, prompt_chat_single_qa


class DocumentInputSchema(pw.Schema):
Expand All @@ -55,7 +55,12 @@ def run(
temperature: float = 0.0,
**kwargs,
):
embedder = OpenAIEmbeddingModel(api_key=api_key)
embedder = OpenAIEmbedder(
api_key=api_key,
model=embedder_locator,
retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(),
cache_strategy=pw.asynchronous.DefaultCache(),
)

documents = pw.io.jsonlines.read(
data_dir,
Expand All @@ -64,9 +69,7 @@ def run(
autocommit_duration_ms=50,
)

enriched_documents = documents + documents.select(
vector=embedder.apply(text=pw.this.doc, locator=embedder_locator)
)
enriched_documents = documents + documents.select(vector=embedder(pw.this.doc))

index = KNNIndex(
enriched_documents.vector, enriched_documents, n_dimensions=embedding_dimension
Expand All @@ -80,9 +83,7 @@ def run(
delete_completed_queries=True,
)

query += query.select(
vector=embedder.apply(text=pw.this.query, locator=embedder_locator),
)
query += query.select(vector=embedder(pw.this.query))

query_context = query + index.get_nearest_items(
query.vector, k=3, collapse_rows=True
Expand All @@ -98,16 +99,17 @@ def build_prompt(documents, query):
prompt=build_prompt(pw.this.documents_list, pw.this.query)
)

model = OpenAIChatGPTModel(api_key=api_key)
model = OpenAIChat(
api_key=api_key,
model=model_locator,
temperature=temperature,
max_tokens=max_tokens,
retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(),
cache_strategy=pw.asynchronous.DefaultCache(),
)

responses = prompt.select(
query_id=pw.this.id,
result=model.apply(
pw.this.prompt,
locator=model_locator,
temperature=temperature,
max_tokens=max_tokens,
),
query_id=pw.this.id, result=model(prompt_chat_single_qa(pw.this.prompt))
)

response_writer(responses)
Expand Down
46 changes: 24 additions & 22 deletions examples/pipelines/contextful_s3/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Microservice for a context-aware ChatGPT assistant.
Microservice for a context-aware ChatGPT assistant.
The following program reads in a collection of documents from a public AWS S3 bucket,
embeds each document using the OpenAI document embedding model,
Expand All @@ -10,16 +10,16 @@
Each query text is first turned into a vector using OpenAI embedding service,
then relevant documentation pages are found using a Nearest Neighbor index computed
for documents in the corpus. A prompt is build from the relevant documentations pages
and sent to the OpenAI GPT-4 chat service for processing.
for documents in the corpus. A prompt is built from the relevant documentations pages
and sent to the OpenAI chat service for processing.
Usage:
In the root of this repository run:
`poetry run ./run_examples.py contextful_s3`
`poetry run ./run_examples.py contextful-s3`
or, if all dependencies are managed manually rather than using poetry
`python examples/pipelines/contextful_s3/app.py`
You can also run this example directly in the environment with llm_app instaslled.
You can also run this example directly in the environment with llm_app installed.
To call the REST API:
curl --data '{"user": "user", "query": "How to connect to Kafka in Pathway?"}' http://localhost:8080/ | jq
Expand All @@ -28,8 +28,8 @@

import pathway as pw
from pathway.stdlib.ml.index import KNNIndex

from llm_app.model_wrappers import OpenAIChatGPTModel, OpenAIEmbeddingModel
from pathway.xpacks.llm.embedders import OpenAIEmbedder
from pathway.xpacks.llm.llms import OpenAIChat, prompt_chat_single_qa


class DocumentInputSchema(pw.Schema):
Expand All @@ -54,7 +54,12 @@ def run(
temperature: float = 0.0,
**kwargs,
):
embedder = OpenAIEmbeddingModel(api_key=api_key)
embedder = OpenAIEmbedder(
api_key=api_key,
model=embedder_locator,
retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(),
cache_strategy=pw.asynchronous.DefaultCache(),
)

documents = pw.io.s3.read(
data_dir,
Expand All @@ -67,9 +72,7 @@ def run(
mode="streaming",
)

enriched_documents = documents + documents.select(
vector=embedder.apply(text=pw.this.doc, locator=embedder_locator)
)
enriched_documents = documents + documents.select(vector=embedder(pw.this.doc))

index = KNNIndex(
enriched_documents.vector, enriched_documents, n_dimensions=embedding_dimension
Expand All @@ -83,9 +86,7 @@ def run(
delete_completed_queries=True,
)

query += query.select(
vector=embedder.apply(text=pw.this.query, locator=embedder_locator),
)
query += query.select(vector=embedder(pw.this.query))

query_context = query + index.get_nearest_items(
query.vector, k=3, collapse_rows=True
Expand All @@ -101,16 +102,17 @@ def build_prompt(documents, query):
prompt=build_prompt(pw.this.documents_list, pw.this.query)
)

model = OpenAIChatGPTModel(api_key=api_key)
model = OpenAIChat(
api_key=api_key,
model=model_locator,
temperature=temperature,
max_tokens=max_tokens,
retry_strategy=pw.asynchronous.FixedDelayRetryStrategy(),
cache_strategy=pw.asynchronous.DefaultCache(),
)

responses = prompt.select(
query_id=pw.this.id,
result=model.apply(
pw.this.prompt,
locator=model_locator,
temperature=temperature,
max_tokens=max_tokens,
),
query_id=pw.this.id, result=model(prompt_chat_single_qa(pw.this.prompt))
)

response_writer(responses)
Expand Down
Loading

0 comments on commit 0dc944d

Please sign in to comment.