Skip to content

Commit

Permalink
Merge pull request #5620 from pathwaycom/berke/public-rag-demo
Browse files Browse the repository at this point in the history
add: demo for rag pipeline
GitOrigin-RevId: aecbd855926478b06c2dec66be784956e0db3e7e
  • Loading branch information
berkecanrizai authored and Manul from Pathway committed Feb 7, 2024
1 parent 9f1dbd5 commit b8fb490
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 0 deletions.
66 changes: 66 additions & 0 deletions examples/pipelines/demo-question-answering/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
## How to run the project

To run the project, you will need
- OpenAI API Key
- (Optional) Google Drive folder and Google Service account

To use OpenAI API Key in the app, create `.env` file and put `OPENAI_API_KEY=sk-...` in it.
You can also set it in the `app.py` while initializing the embedder and chat instances.

```python
chat = llms.OpenAIChat(api_key='sk-...', ...)

embedder = embedders.OpenAIEmbedder(api_key='sk-...', ...)
```

The default version of the app uses local folder `data` as the source of documents. However, you can use any other pathway supported connector. For instance, to add a Google Drive folder as another source, uncomment the following code in the `app.py` and follow the steps below to learn how to set up your Service Account.

```python
drive_folder = pw.io.gdrive.read(
object_id="YOUR FOLDER ID",
with_metadata=True,
service_user_credentials_file="secret.json",
refresh_interval=30,
)

data_sources.append(drive_folder)
```

Before running the app, you will need to give the app access to the Google Drive folder, we follow the steps below.

In order to access files on your Google Drive from the Pathway app, you will need a Google Cloud project and a service user.

### Create a new project in the Google API console:

- Go to [https://console.cloud.google.com/projectcreate](https://console.cloud.google.com/projectcreate) and create new project
- Enable Google Drive API by going to [https://console.cloud.google.com/apis/library/drive.googleapis.com](https://console.cloud.google.com/apis/library/drive.googleapis.com), make sure the newly created project is selected in the top left corner
- Configure consent screen:
- Go to [https://console.cloud.google.com/apis/credentials/consent](https://console.cloud.google.com/apis/credentials/consent)
- If using a private Gmail, select "External", and go next.
- Fill required parameters: application name, user support, and developer email (your email is fine)
- On the next screen click "Add or remove scopes" search for "drive.readonly" and select this scope
- Save and click through other steps
- Create service user:

- Go to [https://console.cloud.google.com/apis/credentials](https://console.cloud.google.com/apis/credentials)
- Click "+ Create credentials" and create a service account
- Name the service user and click through the next steps
- Generate service user key:
- Once more go to [https://console.cloud.google.com/apis/credentials](https://console.cloud.google.com/apis/credentials) and click on your newly created user (under Service Accounts)
- Go to "Keys", click "Add key" -> "Create new key" -> "JSON"

A JSON file will be saved to your computer.

Rename this JSON file to `secrets.json` and put it under `examples/pipelines/drive_alert` next to `app.py` so that it is easily reachable from the app.

You can now share desired Google Drive resources with the created user.
Note the email ending with `gserviceaccount.com` we will share the folder with this email.

Once you've done it, you will need an ID of some file or directory. You can obtain it manually by right-clicking on the file -> share -> copy link. It will be part of the URL.

[https://drive.google.com/file/d/[FILE_ID]/view?usp=drive_link](https://drive.google.com/file/d/%5BFILE_ID%5D/view?usp=drive_link)

For folders,
First, right-click on the folder and click share, link will be of the format: [https://drive.google.com/drive/folders/[folder_id]?usp=drive_link](https://drive.google.com/drive/folders/%7Bfolder_id%7D?usp=drive_link)
Copy the folder_id from the URL.
Second, click on share and share the folder with the email ending with `gserviceaccount.com`
269 changes: 269 additions & 0 deletions examples/pipelines/demo-question-answering/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
import json
from enum import Enum

import pathway as pw
from dotenv import load_dotenv
from pathway.internals.asynchronous import DiskCache, ExponentialBackoffRetryStrategy
from pathway.xpacks.llm import embedders, llms, prompts
from pathway.xpacks.llm.parsers import ParseUnstructured
from pathway.xpacks.llm.splitters import TokenCountSplitter
from pathway.xpacks.llm.vector_store import VectorStoreServer


class AIResponseType(Enum):
SHORT = "short"
LONG = "long"


load_dotenv()

embedder = embedders.OpenAIEmbedder(
model="text-embedding-ada-002",
cache_strategy=DiskCache(),
)

host = "0.0.0.0"
port = 8000

data_sources = []

folder = pw.io.fs.read(
"data",
format="binary",
mode="streaming",
with_metadata=True,
)

data_sources.append(folder)

# drive_folder = pw.io.gdrive.read(
# object_id="YOUR FOLDER ID",
# with_metadata=True,
# service_user_credentials_file="secret.json",
# refresh_interval=30,
# )

# data_sources.append(drive_folder)


text_splitter = TokenCountSplitter(max_tokens=400)


vector_server = VectorStoreServer(
*data_sources,
embedder=embedder,
splitter=text_splitter,
parser=ParseUnstructured(),
)


chat = llms.OpenAIChat(
model="gpt-3.5-turbo",
retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6),
cache_strategy=DiskCache(),
temperature=0.05,
)


class PWAIQuery(pw.Schema):
prompt: str
filters: str | None = pw.column_definition(default_value=None)
model: str | None = pw.column_definition(default_value="gpt-3.5-turbo")
response_type: str = pw.column_definition(default_value="short") # short | long
openai_api_key: str


pw_ai_endpoint = "/pw_ai_answer"


class SummarizeQuery(pw.Schema):
text_list: list[str]
model: str | None = pw.column_definition(default_value="gpt-3.5-turbo")
openai_api_key: str


summarize_endpoint = "/pw_ai_summary"


class AggregateQuery(pw.Schema):
question: str
answers: list[str]
model: str | None = pw.column_definition(default_value="gpt-3.5-turbo")
openai_api_key: str


aggregate_endpoint = "/pw_ai_aggregate_responses"


def _unwrap_udf(func):
if isinstance(func, pw.UDF):
return func.__wrapped__
return func


@pw.udf
def gpt_respond(prompt, docs, filter, response_type) -> str:
if filter is None:
return prompt

docs = docs.value

try:
docs = [{"text": doc["text"], "path": doc["metadata"]["path"]} for doc in docs]

except Exception:
print("No context was found.")

if response_type == AIResponseType.SHORT.value:
prompt_func = _unwrap_udf(prompts.prompt_short_qa)
else:
prompt_func = _unwrap_udf(prompts.prompt_citing_qa)
return prompt_func(prompt, docs)


@pw.udf
def prompt_aggregate(question, answers):
summary_data = "\n".join(answers)

summaries_str = json.dumps(summary_data, indent=2)

prompt = f"""Given a json with client names and responses
to the question: "{question}".
Categorize clients stance according to their policy and list them separately.
Use the question and answers to separate them with good logic according to question.
Use Markdown formatting starting with header level 2 (##).
Company Policies: ```{summaries_str}```
Answer:"""

return prompt


def run(
with_cache: bool = True,
cache_backend: pw.persistence.Backend
| None = pw.persistence.Backend.filesystem("./Cache"),
):
webserver = pw.io.http.PathwayWebserver(host=host, port=port)
# Vectorserver

def serve(route, schema, handler):
queries, writer = pw.io.http.rest_connector(
webserver=webserver,
route=route,
schema=schema,
autocommit_duration_ms=50,
delete_completed_queries=True,
)
writer(handler(queries))

serve(
"/v1/retrieve", vector_server.RetrieveQuerySchema, vector_server.retrieve_query
)
serve(
"/v1/statistics",
vector_server.StatisticsQuerySchema,
vector_server.statistics_query,
)
serve(
"/pw_list_documents",
vector_server.InputsQuerySchema,
vector_server.inputs_query,
)

gpt_queries, gpt_response_writer = pw.io.http.rest_connector(
webserver=webserver,
route=pw_ai_endpoint,
schema=PWAIQuery,
autocommit_duration_ms=50,
delete_completed_queries=True,
)

gpt_results = gpt_queries + vector_server.retrieve_query(
gpt_queries.select(
metadata_filter=pw.this.filters,
filepath_globpattern=pw.cast(str | None, None),
query=pw.this.prompt,
k=6,
)
).select(
docs=pw.this.result,
)

gpt_results += gpt_results.select(
rag_prompt=gpt_respond(
pw.this.prompt, pw.this.docs, pw.this.filters, pw.this.response_type
)
)
gpt_results += gpt_results.select(
result=chat(
llms.prompt_chat_single_qa(pw.this.rag_prompt),
model=pw.this.model,
api_key=pw.this.openai_api_key,
)
)

summarize_queries, summarize_response_writer = pw.io.http.rest_connector(
webserver=webserver,
route=summarize_endpoint,
schema=SummarizeQuery,
autocommit_duration_ms=50,
delete_completed_queries=True,
)

summarize_results = summarize_queries.select(
pw.this.model,
pw.this.openai_api_key,
prompt=prompts.prompt_summarize(pw.this.text_list),
)
summarize_results += summarize_results.select(
result=chat(
llms.prompt_chat_single_qa(pw.this.prompt),
model=pw.this.model,
api_key=pw.this.openai_api_key,
)
)

aggregate_queries, aggregate_response_writer = pw.io.http.rest_connector(
webserver=webserver,
route=aggregate_endpoint,
schema=AggregateQuery,
autocommit_duration_ms=50,
delete_completed_queries=True,
)

aggregate_results = aggregate_queries.select(
pw.this.model,
pw.this.openai_api_key,
prompt=prompt_aggregate(pw.this.question, pw.this.answers),
)
aggregate_results += aggregate_results.select(
result=chat(
llms.prompt_chat_single_qa(pw.this.prompt),
model=pw.this.model,
api_key=pw.this.openai_api_key,
)
)

gpt_response_writer(gpt_results)
summarize_response_writer(summarize_results)
aggregate_response_writer(aggregate_results)

if with_cache:
if cache_backend is None:
raise ValueError("Cache usage was requested but the backend is unspecified")
persistence_config = pw.persistence.Config.simple_config(
cache_backend,
persistence_mode=pw.PersistenceMode.UDF_CACHING,
)
else:
persistence_config = None

pw.run(
monitoring_level=pw.MonitoringLevel.NONE,
persistence_config=persistence_config,
)


if __name__ == "__main__":
run(with_cache=True)
Binary file not shown.

0 comments on commit b8fb490

Please sign in to comment.