diff --git a/.gitignore b/.gitignore index e6fbdb6..2c9016b 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ docs/recordings/* etc/ example_projects/ lib/ +logs/ share/ src/ansari_backend.egg-info/* tmp/ diff --git a/docs/fastapi/async_await_backgroundtasks_logs_for_tracing.log b/docs/fastapi/async_await_backgroundtasks_logs_for_tracing.log new file mode 100644 index 0000000..93746a9 --- /dev/null +++ b/docs/fastapi/async_await_backgroundtasks_logs_for_tracing.log @@ -0,0 +1,66 @@ +(manually inserted log) ------------------------------------------------------------------------------ FastAPI receives a message from a whatsapp user on /whatsapp/v1 endpoint, so the event loop picks the `main_webhook()` to be executed now ------------------------------------------------------------------------------ +(manually inserted log) ------------------------------------------------------------------------------ Execution in `main_webhook()` starts ------------------------------------------------------------------------------ +2025-04-20 07:56:22 | DEBUG | ansari.app.main_whatsapp:main_webhook:112 | ! Before `background_tasks -> send_typing_indicator_then_start_loop` +2025-04-20 07:56:22 | DEBUG | ansari.app.main_whatsapp:main_webhook:112 | ! After `background_tasks -> send_typing_indicator_then_start_loop` +2025-04-20 07:56:22 | DEBUG | ansari.app.main_whatsapp:main_webhook:112 | ! Before `background_tasks -> handle_text_message` +2025-04-20 07:56:22 | DEBUG | ansari.app.main_whatsapp:main_webhook:112 | ! After `background_tasks -> handle_text_message` +(manually inserted log) ------------------------------------------------------------------------------ Execution in `main_webhook()` ends ------------------------------------------------------------------------------ +(manually inserted log) ------------------------------------------------------------------------------ Now, the event loop picks up (i.e., runs) the 1st task (i.e., async func.) that was in queue, which is "send_typing_indicator_then_start_loop", since it's the 1st task added by FastAPI's "BackgroundTasks" ------------------------------------------------------------------------------ +2025-04-20 07:56:22 | DEBUG | ansari.presenters.whatsapp_presenter:send_typing_indicator_then_start_loop:187 | ! Before `await self._send_whatsapp_typing_indicator()` in (send_typing_indicator_then_start_loop()) +2025-04-20 07:56:22 | DEBUG | ansari.presenters.whatsapp_presenter:_send_whatsapp_typing_indicator:252 | ! Before `await client.post()` in (_send_whatsapp_typing_indicator()) +2025-04-20 07:56:23 | DEBUG | ansari.presenters.whatsapp_presenter:_send_whatsapp_typing_indicator:254 | ! After `await client.post()` in (_send_whatsapp_typing_indicator()) +2025-04-20 07:56:23 | DEBUG | ansari.presenters.whatsapp_presenter:send_typing_indicator_then_start_loop:189 | ! After `await self._send_whatsapp_typing_indicator()` in (send_typing_indicator_then_start_loop()) +2025-04-20 07:56:23 | DEBUG | ansari.presenters.whatsapp_presenter:send_typing_indicator_then_start_loop:192 | ! Before `asyncio.create_task(self._typing_indicator_loop())` in (send_typing_indicator_then_start_loop()) +2025-04-20 07:56:23 | DEBUG | ansari.presenters.whatsapp_presenter:send_typing_indicator_then_start_loop:194 | ! After `asyncio.create_task(self._typing_indicator_loop())` in (send_typing_indicator_then_start_loop()) +(manually inserted log) ------------------------------------------------------------------------------ Now, the "_typing_indicator_loop()" task is created and added to the queue (i.e., event loop) as the 3rd task to be executed ------------------------------------------------------------------------------ +(manually inserted log) ------------------------------------------------------------------------------ i.e., that `create_task` method is basically saying: "I'll `await` the creation of this new 3rd task (inside 1st task), so run other tasks until they await or finish" ------------------------------------------------------------------------------ +(manually inserted log) ------------------------------------------------------------------------------ Therefore, the event loop can now switch tasks by picking up the 2nd task in queue, which is "handle_text_message()", since it's the 2nd task added by FastAPI's "BackgroundTasks" ------------------------------------------------------------------------------ +2025-04-20 07:56:24 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:762 | ! Before first call to `agent.replace_message_history()` (in handle_text_message()) +(manually inserted log) ------------------------------------------------------------------------------ loop 1 starts (Remember, we're in "handle_text_message()" right now) ------------------------------------------------------------------------------ +2025-04-20 07:56:25 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:764 | ! Before `await asyncio.sleep(0) 0>` (in handle_text_message()) +(manually inserted log) ------------------------------------------------------------------------------ loop 1 pauses, and the 3rd task (i.e., "_typing_indicator_loop()") starts ------------------------------------------------------------------------------ +(manually inserted log) ------------------------------------------------------------------------------ Why? as the code has reached an `await` line (i.e., await asyncio.sleep(0)), so the event loop checks if there are other tasks to be resumed, and finds 3rd task ready to be continued, so control will go to it ------------------------------------------------------------------------------ +2025-04-20 07:56:25 | DEBUG | ansari.presenters.whatsapp_presenter:_typing_indicator_loop:204 | ! Before `asyncio.sleep(26)` in (_typing_indicator_loop()) +(manually inserted log) ------------------------------------------------------------------------------ loop 1 (i.e., 2nd task) continues, as 3rd task is now `await`ing for a long time (26 seconds) ------------------------------------------------------------------------------ +2025-04-20 07:56:25 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:766 | ! After `await asyncio.sleep(0) 0>` (in handle_text_message()) +2025-04-20 07:56:26 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:764 | ! Before `await asyncio.sleep(0) ...` (in handle_text_message()) +2025-04-20 07:56:27 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:766 | ! After `await asyncio.sleep(0) ...` (in handle_text_message()) +2025-04-20 07:56:52 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:764 | ! Before `await asyncio.sleep(0) 68>` (in handle_text_message()) +(manually inserted log) ------------------------------------------------------------------------------ loop 1 pauses, as 3rd task has finished `await`ing for 26 seconds so code will resume from there ------------------------------------------------------------------------------ +2025-04-20 07:56:52 | DEBUG | ansari.presenters.whatsapp_presenter:_typing_indicator_loop:207 | ! After `asyncio.sleep(26)` in (_typing_indicator_loop()) +2025-04-20 07:56:52 | DEBUG | ansari.presenters.whatsapp_presenter:_typing_indicator_loop:217 | ! Before `await self._send_whatsapp_typing_indicator()` in (_typing_indicator_loop()) +2025-04-20 07:56:52 | DEBUG | ansari.presenters.whatsapp_presenter:_send_whatsapp_typing_indicator:252 | ! Before `await client.post()` in (_send_whatsapp_typing_indicator()) +(manually inserted log) ------------------------------------------------------------------------------ loop 1 resumes ------------------------------------------------------------------------------ +2025-04-20 07:56:52 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:766 | ! After `await asyncio.sleep(0) 68>` (in handle_text_message()) +2025-04-20 07:56:53 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:764 | ! Before `await asyncio.sleep(0) ...` (in handle_text_message()) +2025-04-20 07:56:54 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:766 | ! After `await asyncio.sleep(0) ...` (in handle_text_message()) +2025-04-20 07:56:56 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:764 | ! Before `await asyncio.sleep(0) 95>` (in handle_text_message()) +(manually inserted log) ------------------------------------------------------------------------------ loop 1 pauses ------------------------------------------------------------------------------ +2025-04-20 07:56:56 | DEBUG | ansari.presenters.whatsapp_presenter:_send_whatsapp_typing_indicator:254 | ! After `await client.post()` in (_send_whatsapp_typing_indicator()) +(manually inserted log) ------------------------------------------------------------------------------ loop 1 resumes ------------------------------------------------------------------------------ +2025-04-20 07:56:56 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:766 | ! After `await asyncio.sleep(0) 95>` (in handle_text_message()) +2025-04-20 07:56:56 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:764 | ! Before `await asyncio.sleep(0) 96>` (in handle_text_message()) +(manually inserted log) ------------------------------------------------------------------------------ loop 1 pauses ------------------------------------------------------------------------------ +2025-04-20 07:56:56 | DEBUG | ansari.presenters.whatsapp_presenter:_typing_indicator_loop:219 | ! After `await self._send_whatsapp_typing_indicator()` in (_typing_indicator_loop()) +2025-04-20 07:56:56 | DEBUG | ansari.presenters.whatsapp_presenter:_typing_indicator_loop:204 | ! Before `asyncio.sleep(26)` in (_typing_indicator_loop()) +(manually inserted log) ------------------------------------------------------------------------------ loop 1 resumes ------------------------------------------------------------------------------ +2025-04-20 07:56:56 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:766 | ! After `await asyncio.sleep(0) 96>` (in handle_text_message()) +2025-04-20 07:56:57 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:764 | ! Before `await asyncio.sleep(0) ...` (in handle_text_message()) +2025-04-20 07:56:58 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:766 | ! After `await asyncio.sleep(0) ...` (in handle_text_message()) +2025-04-20 07:57:12 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:764 | ! Before `await asyncio.sleep(0) 208>` (in handle_text_message()) +2025-04-20 07:57:12 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:766 | ! After `await asyncio.sleep(0) 208>` (in handle_text_message()) +(manually inserted log) ------------------------------------------------------------------------------ loop 1 finishes ------------------------------------------------------------------------------ +2025-04-20 07:57:12 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:777 | ! Before `await self.send_whatsapp_message()` (in handle_text_message()) +(manually inserted log) ------------------------------------------------------------------------------ Task 3 is still waiting the 26 seconds, so event loop doesn't pick it up ------------------------------------------------------------------------------ +(manually inserted log) ------------------------------------------------------------------------------ loop 2 starts (NOTE: this is a different loop found in "send_whatsapp_message()", not "handle_text_message()") ------------------------------------------------------------------------------ +(manually inserted log) ------------------------------------------------------------------------------ Notice how this loop doesn't get interrupted? This is because we ran "typing_indicator_task.cancel()" first, which cancelled the 3rd task ------------------------------------------------------------------------------ +2025-04-20 07:57:12 | DEBUG | ansari.presenters.whatsapp_presenter:send_whatsapp_message:307 | ! Before `await client.post() 0` (in send_whatsapp_message()) +2025-04-20 07:57:14 | DEBUG | ansari.presenters.whatsapp_presenter:send_whatsapp_message:309 | ! After `await client.post() 0` (in send_whatsapp_message()) +2025-04-20 07:57:15 | DEBUG | ansari.presenters.whatsapp_presenter:send_whatsapp_message:307 | ! Before `await client.post() ...` (in send_whatsapp_message()) +2025-04-20 07:57:16 | DEBUG | ansari.presenters.whatsapp_presenter:send_whatsapp_message:309 | ! After `await client.post() ...` (in send_whatsapp_message()) +2025-04-20 07:57:24 | DEBUG | ansari.presenters.whatsapp_presenter:send_whatsapp_message:307 | ! Before `await client.post() 15` (in send_whatsapp_message()) +2025-04-20 07:57:25 | DEBUG | ansari.presenters.whatsapp_presenter:send_whatsapp_message:309 | ! After `await client.post() 15` (in send_whatsapp_message()) +(manually inserted log) ------------------------------------------------------------------------------ loop 2 finishes ------------------------------------------------------------------------------ +2025-04-20 07:57:25 | DEBUG | ansari.presenters.whatsapp_presenter:handle_text_message:779 | ! After `await self.send_whatsapp_message()` (in handle_text_message()) + + diff --git a/docs/fastapi/async_await_backgroundtasks_visualized.md b/docs/fastapi/async_await_backgroundtasks_visualized.md new file mode 100644 index 0000000..6539cd0 --- /dev/null +++ b/docs/fastapi/async_await_backgroundtasks_visualized.md @@ -0,0 +1,197 @@ +# Understanding Async Flow in WhatsApp Integration + +This document explains the asynchronous execution flow in the WhatsApp integration, focusing on how control flows between `main_whatsapp.py` and `whatsapp_presenter.py`. + +Note 1: to render mermaid diagrams, you can: +* Install this VSCode extension (recommended): [Markdown Preview Mermaid Support](https://marketplace.visualstudio.com/items/?itemName=bierner.markdown-mermaid) +* Alternatively, install this VSCode extension: [Markdown Preview Enhanced](https://marketplace.visualstudio.com/items/?itemName=shd101wyy.markdown-preview-enhanced) + * Caveat, if you're in dark mode, then text in mermaid diagram will also have a dark color as well, so you may need to change the theme to light mode to see the text clearly. + * Or, change the theme within the preview menu (bottom right when previewing) to light mode. +* Copy-paste the mermaid code to a live editor like this one: [Mermaid Live Editor](https://mermaid-js.github.io/mermaid-live-editor/) +* Check this file on GitHub (since [it can render mermaid diagrams](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/creating-diagrams)) + + +Note 2: If you don't understand the diagrams below, then I suggest checking out the following resource(s) for more information on Python's GIL and async programming: + +* Understand Python's GIL: [The Python Global Interpreter Lock (GIL): An (Almost) Love Story](https://medium.com/@amitkhachane.7/the-python-global-interpreter-lock-gil-an-almost-love-story-3acfeff99016) +* Understand async 1 (FastAPI docs): [Concurrency and async / await](https://fastapi.tiangolo.com/async/#async-and-await) (specifically starting from the `async and await` section) +* Understand async 2: [Decoding Asynchronous Programming in Python: Understanding the Basics](https://sunscrapers.com/blog/python-async-programming-basics#:~:text=concurrent%20programming) +* Undertstand async 3: [A Complete Visual Guide to Understanding the Node.js Event Loop](https://www.builder.io/blog/visual-guide-to-nodejs-event-loop#:~:text=within%20a%20callback%20function%20passed%20to) + * Specifically, the video highlighted in the hyperlink above +* Understand async 4 (to understand the problem's root cause mentioned in this file): [mihi's SO answer](https://stackoverflow.com/a/67601373/13626137) +* Understand async/coroutines in depth: [Mastering Python Async IO with FastAPI](https://dev.to/leapcell/mastering-python-async-io-with-fastapi-13e8) + * Highly-recommended, but stop before this section: "I/O Multiplexing Technology" +* Understand sync/async/GIL/BackgroundTasks interactions in FastAPI: FastAPI - [Why does synchronous code do not block the event Loop?](https://stackoverflow.com/a/79382844/13626137) +* Finally, using sync vs async in FastAPI: [Dead Simple: When to Use Async in FastAPI](https://hughesadam87.medium.com/dead-simple-when-to-use-async-in-fastapi-0e3259acea6f) + + +## Initial Concurrency Challenge and Solution - High level Overview + +One of the main challenges was ensuring the typing indicator continues to run while the lengthy API call to Claude is processing: + +(Note: Similar problem statement can be found on SO [here](https://stackoverflow.com/questions/67599119/fastapi-asynchronous-background-tasks-blocks-other-requests)) + +```mermaid +sequenceDiagram + participant P as WhatsAppPresenter + participant A as AnsariClaude Agent + participant E as Event Loop + + Note over P,A: Problem: List comprehension blocks event loop + + P->>+A: [tok for tok in agent.replace_message_history()] + + A->>A: Blocking API call to Claude + + Note over E: Event loop blocked ❌ + Note over E: Typing indicator can't run + + A-->>-P: Eventually returns all tokens at once + + Note over P,A: Solution: Use for loop with await + + P->>+A: for token in agent.replace_message_history() + loop Each token + A-->>P: yield token + P->>E: await asyncio.sleep(0) + Note over E: Event Loop switches tasks ✅ + E->>P: _typing_indicator_loop runs + E->>P: Continue with next token + end + A-->>-P: Complete +``` + +Below is a more detailed explanation of the solution by visualizing the control flow and how the event loop manages tasks: + + +## Overview of Control Flow + +The WhatsApp integration uses FastAPI's background tasks and asyncio to handle message processing and typing indicators concurrently. Here's a high-level overview: + +```mermaid +graph TD + A[Incoming Webhook Request] --> B[main_webhook function] + B --> C[Extract message details] + C --> D[Create user-specific presenter] + D --> E[Start Background Task 1
send_typing_indicator_then_start_loop] + D --> F[Start Background Task 2
handle_text_message] + E --> G[Send initial typing indicator] + G --> H[Create Task
_typing_indicator_loop] + F --> I[Process message with agent] + H --> J[Periodically send
typing indicators] + I --> K[Send final response] + K --> L[Cancel typing indicator task] +``` + + +## Ultra Detailed Code Execution Flow Traced from Logs + +The logs in `async_await_backgroundtasks_logs_for_tracing.log` provide clear evidence of the execution flow described in these diagrams. While the debug logs with "! Before" and "! After" markers may have been removed from the current code, they were strategically placed around critical `await` statements and task creation points to trace how control passes between different parts of the system. + +Therefore, this sequence diagram shows the exact order of execution traced from the debug logs, providing a precise view of how control passes between functions and when the event loop switches between tasks: + +```mermaid +sequenceDiagram + participant F as FastAPI + participant MW as main_webhook + participant T1 as send_typing_indicator_then_start_loop + participant T2 as handle_text_message + participant T3 as _typing_indicator_loop + participant EL as Event Loop + + Note over F,EL: FastAPI receives webhook request + + F->>+MW: Execute main_webhook() + + Note over MW: Create user_presenter + MW->>EL: background_tasks.add_task(send_typing_indicator_then_start_loop) + MW->>EL: background_tasks.add_task(handle_text_message) + MW-->>-F: Return Response(status_code=200) + + Note over F,EL: FastAPI's event loop checks for tasks + + EL->>+T1: Execute Task 1 + Note over T1: Set first_indicator_time + T1->>T1: await _send_whatsapp_typing_indicator() + Note over T1: Send initial typing indicator to WhatsApp + T1->>T1: Create Task 3 (_typing_indicator_loop)
(Added to event loop) + T1-->>-EL: Task 1 completes + + EL->>+T2: Execute Task 2 + + Note over T2: Start token processing loop + T2->>T2: for token in agent.replace_message_history(): + T2->>EL: await asyncio.sleep(0) - first iteration + + EL->>+T3: Switch to Task 3 + T3->>EL: await asyncio.sleep(26) - Start waiting + + EL->>T2: Resume Task 2 + T2->>T2: response_str += token - continue processing + + Note over T2: Continue token processing... + + T2->>EL: await asyncio.sleep(0) - middle of iterations + + Note over T3: 26 seconds elapsed + EL->>T3: Resume Task 3 + T3->>EL: await _send_whatsapp_typing_indicator() + + EL->>T2: Resume Task 2 + T2->>T2: response_str += token - continue processing + + Note over T2: Continue token processing... + + T2->>EL: await asyncio.sleep(0) - later in iterations + + EL->>T3: _send_whatsapp_typing_indicator() completes + T3->>EL: await asyncio.sleep(26) - Start next wait + + EL->>T2: Resume Task 2 + + Note over T2: Final token processing... + + T2->>T2: Processing complete + T2->>EL: await send_whatsapp_message() + + Note over EL: Task 3 is still in the 26-second wait
(so event loop won't resume it yet) + EL->>T2: Resume Task 2 + + T2->>T2: Cancel typing_indicator_task
(Removed from event loop) + T2-->>-EL: Task 2 completes + + Note over EL: All tasks complete +``` + +## Key Insights + +1. **FastAPI Background Tasks**: + - FastAPI's background tasks allow tasks to continue after the HTTP response has been sent + - In our case, two tasks are created: typing indicator and message processing + +2. **Task Creation and Execution Order**: + - Tasks are executed in the order they're added to the event loop + - The typing indicator task is added first, ensuring it starts before message processing + +3. **Proper Yielding Points**: + - `await asyncio.sleep(0)` is strategically placed in the token processing loop + - This allows the typing indicator task to run periodically during message processing + +4. **Task Cancellation**: + - The typing indicator task is explicitly cancelled when message processing completes + - This prevents the typing indicator from continuing after the response is sent + +5. **The Critical Role of `async with`**: + - `async with httpx.AsyncClient()` ensures proper asynchronous resource management + - Both client creation and cleanup happen asynchronously without blocking the event loop + +## Common Pitfalls and Solutions + +| Pitfall | Problem | Solution | +| ---------------------------- | --------------------------------------------------------------- | ----------------------------------------------------------------------------- | +| Blocking operations | List comprehensions, synchronous API calls block the event loop | Replace with `for` loops and add periodic `await asyncio.sleep(0)` statements | +| Missing yield points | Long-running operations prevent other tasks from executing | Add strategic `await` statements to yield control | +| Forgotten task cancellation | Background tasks continue running indefinitely | Explicitly call `task.cancel()` when the task is no longer needed | +| Improper resource management | Using `with` instead of `async with` for async resources | Always use `async with` for asynchronous context managers | +| Task execution order | Critical tasks start too late | Pay attention to the order in which tasks are added to the event loop | + diff --git a/docs/structure_of_api_responses/anthropic_api_structure_of_message_history.json b/docs/structure_of_api_responses/anthropic_api_structure_of_message_history.json new file mode 100644 index 0000000..50725f1 --- /dev/null +++ b/docs/structure_of_api_responses/anthropic_api_structure_of_message_history.json @@ -0,0 +1,56 @@ +[ // This is a dummy example, so don't focus on the content of the messages + { + "role": "user", + "content": "Can you search for information about machine learning?" + }, + { + "role": "assistant", + "content": [ + { + "type": "text", + "text": "I'll search for information about machine learning for you." + }, + { + "type": "tool_use", + "id": "tool-123456", + "name": "search_database", + "input": { + "query": "machine learning", + "limit": 3 + } + } + ] + }, + { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": "tool-123456", + "content": "{\"results\": [{\"title\": \"Introduction to ML\", \"url\": \"https://example.com/ml1\"}, {\"title\": \"Deep Learning Fundamentals\", \"url\": \"https://example.com/ml2\"}]}" + } + ] + }, + { + "role": "assistant", + "content": [ + { + "type": "text", + "text": "Based on my search, here are some resources about machine learning:\n\n1. Introduction to ML - This resource covers the basics of machine learning algorithms and techniques.\n2. Deep Learning Fundamentals - This focuses on neural networks and deep learning approaches." + } + ] + }, + { + "role": "user", + "content": "Can you explain neural networks in more detail?" + }, + { + "role": "assistant", + "content": [ + { + "type": "text", + "text": "Neural networks are computational models inspired by the human brain. They consist of layers of interconnected nodes or \"neurons\" that process information.\n\nThe basic structure includes:\n\n1. Input layer: Receives initial data\n2. Hidden layers: Perform computations on inputs\n3. Output layer: Produces the final result\n\nEach connection between neurons has a weight that gets adjusted during training[1]." + } + ] + } +] \ No newline at end of file diff --git a/docs/structure_of_api_responses/anthropic_api_structure_of_response.json b/docs/structure_of_api_responses/anthropic_api_structure_of_response.json new file mode 100644 index 0000000..dd46308 --- /dev/null +++ b/docs/structure_of_api_responses/anthropic_api_structure_of_response.json @@ -0,0 +1,20 @@ +{ + "type": "content_block_start | content_block_delta | content_block_stop | message_delta | message_stop", + "content_block": { + "type": "text | tool_use", + "id": "tool-123456", + "name": "search_database" + }, + "delta": { + "text": "Here's information about your query...", + "partial_json": "{ \"query\": \"ma", + "type": "citations_delta", + "citation": { + "start": 23, + "end": 45, + "number": 1, + "text": "according to source X..." + }, + "stop_reason": "end_turn | tool_use" + } +} \ No newline at end of file diff --git a/docs/structure_of_api_responses/openai_api_structure_of_chat_completion_chunk_object.ipynb b/docs/structure_of_api_responses/openai_api_structure_of_chat_completion_chunk_object.ipynb index 627b480..8a1f7e0 100644 --- a/docs/structure_of_api_responses/openai_api_structure_of_chat_completion_chunk_object.ipynb +++ b/docs/structure_of_api_responses/openai_api_structure_of_chat_completion_chunk_object.ipynb @@ -110,7 +110,7 @@ }, "language_info": { "name": "python", - "version": "3.12.5" + "version": "3.13.2" } }, "nbformat": 4, diff --git a/src/ansari/agents/ansari.py b/src/ansari/agents/ansari.py index 20147df..c4be15a 100644 --- a/src/ansari/agents/ansari.py +++ b/src/ansari/agents/ansari.py @@ -95,7 +95,7 @@ def process_input(self, user_input: str): if m: yield m - def replace_message_history(self, message_history: list[dict], use_tool=True, stream=True): + def replace_message_history(self, message_history: list[dict], use_tool=True): """ Replaces the current message history (stored in Ansari) with the given message history, and then processes it to generate a response from Ansari. @@ -106,23 +106,19 @@ def replace_message_history(self, message_history: list[dict], use_tool=True, st {"role": "system", "content": self.sys_msg}, ] + message_history - # Return/Yield Ansari's response to the user - # TODO(odyash) later (good_first_issue): `stream == False` is not implemented yet; so it has to stay `True` - for m in self.process_message_history(use_tool, stream=stream): + # Yield Ansari's response to the user + for m in self.process_message_history(use_tool): if m: yield m def get_completion(self, **kwargs): return litellm.completion(**kwargs) - def process_message_history(self, use_tool=True, stream=True): - """ - TODO(odyash) later (good_first_issue): `stream == False` is not implemented yet; so it has to stay `True` - """ + def process_message_history(self, use_tool=True): common_params = { "model": self.model, "messages": self.message_history, - "stream": stream, + "stream": True, "stream_options": {"include_usage": True}, "timeout": 30.0, "temperature": 0.0, @@ -225,14 +221,11 @@ def process_message_history(self, use_tool=True, stream=True): else: raise Exception("Invalid response mode: " + response_mode) - def process_one_round(self, use_tool=True, stream=True): - """ - TODO(odyash) later (good_first_issue): `stream == False` is not implemented yet; so it has to stay `True` - """ + def process_one_round(self, use_tool=True): common_params = { "model": self.model, "messages": self.message_history, - "stream": stream, + "stream": True, "stream_options": {"include_usage": True}, "timeout": 30.0, "temperature": 0.0, diff --git a/src/ansari/agents/ansari_claude.py b/src/ansari/agents/ansari_claude.py index 5d29c6d..27b659b 100644 --- a/src/ansari/agents/ansari_claude.py +++ b/src/ansari/agents/ansari_claude.py @@ -166,9 +166,14 @@ def _log_message(self, message): This ensures that the messages logged to the database match what's in the message_history. The database will store this in a flattened format which will be reconstructed during retrieval. """ + role = message.get("role", "Unknown") + tool_name = message.get("tool_name") + tool_name_log = " (tool_name=" + tool_name + ")" if tool_name else "" + content = message.get("content") + logger.debug( - f"_log_message called with message role: {message.get('role')}, " - f"content type: {type(message.get('content'))}, " + f"_log_message called with message role: {role}{tool_name_log}, " + f"content type: {type(content)}, " f"message_history length: {len(self.message_history)}" ) @@ -189,7 +194,7 @@ def _log_message(self, message): logger.error(f"Error logging message: {str(e)}") logger.error(f"Message that failed to log: {message}") - def replace_message_history(self, message_history: list[dict], use_tool=True, stream=True): + def replace_message_history(self, message_history: list[dict], use_tool=True): """ Replaces the current message history (stored in Ansari) with the given message history, and then processes it to generate a response from Ansari. @@ -205,7 +210,8 @@ def replace_message_history(self, message_history: list[dict], use_tool=True, st self.message_history = cleaned_history - for m in self.process_message_history(use_tool, stream): + # Yield Ansari's response to the user + for m in self.process_message_history(use_tool): if m: yield m @@ -267,6 +273,7 @@ def process_tool_call(self, tool_name: str, tool_args: dict, tool_id: str): reference_list = tool_instance.format_as_ref_list(results) if not reference_list: + logger.warning(f"No references found for tool call: {tool_name}") return (tool_result, []) logger.info(f"Got {len(reference_list)} results from {tool_name}") @@ -274,6 +281,109 @@ def process_tool_call(self, tool_name: str, tool_args: dict, tool_id: str): # Return results return (tool_result, reference_list) + def _separate_tool_result_from_preceding_text(self): + """ + Corner case: if we our current history is like this: + + ```json + [ + + { + "role": "user", + "content": [ + { + "type": "text", + "text": "QUESTION THAT WILL MAKE LLM USE A TOOL" + } + ] + }, + ] + ``` + + Then, when we enter process_one_round(), the following will happen (logs): + * Processing chunk #1 of type: message_start + * Processing chunk #2 of type: content_block_start + * Content block #1 start: text + * Content block start but not a tool use: ... + * Processing chunk #3 of type: content_block_delta + * Adding text delta: 'START OF LLM RESPONSE' (truncated) + * Processing chunk #... + * Adding text delta: ... + * ... + * Adding text delta: 'TOOL RESULT:' + * Processing chunk #8 of type: content_block_stop + * Content block stop received + * Processing chunk #9 of type: content_block_start + * Content block #2 start: tool_use + * ... + + so then, after the output of tool call is added to self.message_history, it will be like this: + ```json + [ + + { + ... + "text": "QUESTION THAT WILL MAKE LLM USE A TOOL" + ... + }, + + { + "role": "assistant", + "content": [ + { + "type": "tool_use", + "id": "toolu_01CQJoWaPFZYNjrzjdLsxEeZ", + "name": "search_mawsuah", + "input": { + "query": "\u062d\u0643\u0645 ..." + } + } + ] + }, + + { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": "toolu_01CQJoWaPFZYNjrzjdLsxEeZ", + "content": "Please see the references below." + }, + { + "type": "document", + ... + } + ] + } + ] + ``` + + Then, when we enter process_one_round(), the following will happen (logs): + * Processing chunk #1 of type: message_start + * Processing chunk #2 of type: content_block_start + * Content block #1 start: text + * Content block start but not a tool use: ... + * Processing chunk #3 of type: content_block_delta + * Adding text delta: 'START OF LLM RESPONSE (PARAPHRASED FROM TOOL RESULT)' (truncated) + + So, "TOOL RESULT:" (at the top) will be directly concatenated to "START OF ...". + But we want to leave `\n\n` between them, so that's what this function does. + + Therefore, this function should prefix the start of an assistant response IFF: + * The last message in the history is a "tool_result" + * Content block's type is "start" (that's where this function will be called) + """ + + if ( + (msg := self.message_history[-1]) + and (content := msg.get("content")) + and len(content) > 0 + and (content[0].get("type", "") == "tool_result") + ): + return "\n\n" + else: + return "" + def process_one_round(self) -> Generator[str, None, None]: """Process one round of conversation. @@ -290,7 +400,16 @@ def process_one_round(self) -> Generator[str, None, None]: prompt_mgr = PromptMgr() system_prompt = prompt_mgr.bind("system_msg_claude").render() - logger.info(f"Sending messages to Claude: {json.dumps(self.message_history, indent=2)}") + # Log the `self.message_history` + if get_settings().DEV_MODE: + # In DEV_MODE, log the full message history to a file for inspection + json_file_path = "./logs/last_msg_hist.json" + with open(json_file_path, "w") as f: + json.dump(self.message_history, f, indent=4) + logger.debug(f"Dumped full message history to {json_file_path}") + else: + # In production, log the message history in the terminal + logger.info(f"Sending messages to Claude: {json.dumps(self.message_history, indent=2)}") # Limit documents in message history to prevent Claude from crashing # This creates a copy of the message history, preserving the original @@ -350,6 +469,13 @@ def process_one_round(self) -> Generator[str, None, None]: if hasattr(e, "__dict__"): logger.error(f"Error details: {e.__dict__}") + # If in DEV_MODE and it's the first failure, dump message history to file + if get_settings().DEV_MODE and failures == 1: + json_file_path = "./logs/last_err_msg_hist.json" + with open(json_file_path, "w") as f: + json.dump(self.message_history, f, indent=4) + logger.info(f"Dumped message history to {json_file_path}") + if failures >= self.settings.MAX_FAILURES: logger.error("Max retries exceeded") raise @@ -381,8 +507,8 @@ def process_one_round(self) -> Generator[str, None, None]: - If it's a content block start and it's a tool call, capture the key parameters of the tool call. - If it's a content block delta that is text, add the text to the assistant's message. - If it's a content block delta that is a citation, add the citation to the citations list and - yield a string that represents the citation. - - If it's tool parameters, accumulate the tool paramters into the current tool. + yield a string that represents the citation. + - If it's tool parameters, accumulate the tool paramters into the current tool. """ logger.debug("Starting to process response stream") @@ -422,6 +548,12 @@ def process_one_round(self) -> Generator[str, None, None]: logger.debug(f"Starting tool call: {current_tool}") else: logger.debug(f"Content block start but not a tool use: {chunk}") + if (newline := self._separate_tool_result_from_preceding_text()) and assistant_text == "": + # If we have a newline to separate, add it to the assistant text + assistant_text += newline + logger.debug( + f"Adding `{newline}` to start of assistant text (to separate it from previous content block)" + ) elif chunk.type == "content_block_delta": if hasattr(chunk.delta, "text"): @@ -624,6 +756,7 @@ def _process_tool_calls(self, tool_calls): logger.warning(f"Failed to parse source data to JSON for ref: {doc}") # Add tool result message + logger.debug("Adding a 'tool_result' message to history") self.message_history.append( { "role": "user", @@ -902,7 +1035,7 @@ def limit_documents_in_message_history(self, max_documents=100): return limited_history - def process_message_history(self, use_tool=True, stream=True): + def process_message_history(self, use_tool=True): """ This is the main loop that processes the message history. It yields from the process_one_round method until the last message is an assistant message. diff --git a/src/ansari/ansari_logger.py b/src/ansari/ansari_logger.py index 05d34a4..55ece15 100644 --- a/src/ansari/ansari_logger.py +++ b/src/ansari/ansari_logger.py @@ -1,5 +1,6 @@ # This file provides a standard Python logging instance for the caller file (e.g., main_api.py, etc.). +import os import logging import sys @@ -43,4 +44,22 @@ def get_logger(name: str) -> logging.Logger: # Add handler to logger logger.addHandler(console_handler) + # Add file handler if DEV_MODE is enabled + if get_settings().DEV_MODE: + # Ensure logs directory exists + log_dir = os.path.join(os.getcwd(), "logs") + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, f"{name}.log") + # Using standard FileHandler instead of TimedRotatingFileHandler + # Add encoding='utf-8' to handle Unicode characters like emojis + file_handler = logging.FileHandler( + filename=log_file, + mode="a", # Append mode + encoding="utf-8", # Use UTF-8 encoding to support Unicode characters + ) + file_handler.setLevel(logging_level) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + return logger diff --git a/src/ansari/app/main_whatsapp.py b/src/ansari/app/main_whatsapp.py index dd7aae1..cd54371 100644 --- a/src/ansari/app/main_whatsapp.py +++ b/src/ansari/app/main_whatsapp.py @@ -1,4 +1,7 @@ # This file aims to extend `main_api.py` with FastAPI endpoints which handle incoming WhatsApp webhook messages. +# NOTE: the `BackgroundTasks` logic is inspired by this issue and chat (respectively): +# https://stackoverflow.com/questions/72894209/whatsapp-cloud-api-sending-old-message-inbound-notification-multiple-time-on-my +# https://www.perplexity.ai/search/explain-fastapi-s-backgroundta-rnpU7D19QpSxp2ZOBzNUyg # Steps: # 1. Import necessary modules and configure logging. # 2. Create a FastAPI router to extend the main FastAPI app found in `main_api.py`. @@ -10,8 +13,8 @@ # 5. Define a GET endpoint to handle WhatsApp webhook verification. # 6. Define a POST endpoint to handle incoming WhatsApp messages. -from fastapi import APIRouter, HTTPException, Request -from fastapi.responses import HTMLResponse +from fastapi import APIRouter, HTTPException, Request, BackgroundTasks +from fastapi.responses import HTMLResponse, Response from ansari.agents import Ansari, AnsariClaude from ansari.ansari_logger import get_logger @@ -73,94 +76,109 @@ async def verification_webhook(request: Request) -> str | None: @router.post("/whatsapp/v1") -async def main_webhook(request: Request) -> None: +async def main_webhook(request: Request, background_tasks: BackgroundTasks) -> Response: """Handles the incoming WhatsApp webhook message. Args: request (Request): The incoming HTTP request. + background_tasks (BackgroundTasks): The background tasks to be executed. Returns: - None + Response: HTTP response with status code 200. """ - # Wait for the incoming webhook message to be received as JSON - data = await request.json() # # Logging the origin (host) of the incoming webhook message # logger.debug(f"ORIGIN of the incoming webhook message: {json.dumps(request, indent=4)}") - # Terminate if incoming webhook message is empty/invalid/msg-status-update(sent,delivered,read) + # Wait for the incoming webhook message to be received as JSON + data = await request.json() + + # Extract all relevant data in one go using the general presenter try: - result = await presenter.extract_relevant_whatsapp_message_details(data) - except Exception: - return + ( + is_status, + from_whatsapp_number, + incoming_msg_type, + incoming_msg_body, + message_id, + ) = await presenter.extract_relevant_whatsapp_message_details(data) + except Exception as e: + logger.error(f"Error extracting message details: {e}") + return Response(status_code=200) + + # Terminate if the incoming message is a status message (e.g., "delivered") + if not is_status: + logger.debug(f"Incoming whatsapp webhook message from {from_whatsapp_number}") else: - if isinstance(result, str): - return - - # Get relevant info from Meta's API - ( - from_whatsapp_number, - incoming_msg_type, - incoming_msg_body, - ) = result + # NOTE: This is a status message (e.g., "delivered"), not a user message, so doesn't need processing + return Response(status_code=200) + # Terminate if whatsapp is not enabled (i.e., via .env configurations, etc) if not whatsapp_enabled: - await presenter.send_whatsapp_message( - from_whatsapp_number, + # Create a temporary user-specific presenter just to send the message + temp_presenter = WhatsAppPresenter.create_user_specific_presenter(presenter, from_whatsapp_number, None, None, None) + background_tasks.add_task( + temp_presenter.send_whatsapp_message, "Ansari for WhatsApp is down for maintenance, please try again later or visit our website at https://ansari.chat.", ) - return + return Response(status_code=200) + + # Workaround while locally developing: + # don't process other dev's whatsapp recepient phone nums coming from staging env. + # (as both stage Meta app / local-.env-file have same testing number) + dev_num_sub_str = "YOUR_DEV_PHONE_NUM" + if get_settings().DEV_MODE and dev_num_sub_str not in from_whatsapp_number: + logger.debug( + f"Incoming message from {from_whatsapp_number} (doesn't have this sub-str: {dev_num_sub_str}). \ + Therefore, will not process it as it's not cur. dev." + ) + return Response(status_code=200) - # Check if the user's phone number is stored in users_whatsapp table and register if not - # Returns false if user's not found and thier registration fails - user_found: bool = await presenter.check_and_register_user( + # Create a user-specific presenter for this message + user_presenter = WhatsAppPresenter.create_user_specific_presenter( + presenter, from_whatsapp_number, incoming_msg_type, incoming_msg_body, + message_id, ) + + # Start the typing indicator loop that will continue until message is processed + background_tasks.add_task( + user_presenter.send_typing_indicator_then_start_loop, + ) + + # Check if the user's phone number is stored in users_whatsapp table and register if not + # Returns false if user's not found and their registration fails + user_found: bool = await user_presenter.check_and_register_user() if not user_found: - await presenter.send_whatsapp_message( - from_whatsapp_number, + background_tasks.add_task( + user_presenter.send_whatsapp_message, "Sorry, we couldn't register you to our Database. Please try again later.", ) - return + return Response(status_code=200) # Check if the incoming message is a location if incoming_msg_type == "location": # NOTE: Currently, will not handle location messages - await presenter.handle_unsupported_message( - from_whatsapp_number, - incoming_msg_type, + background_tasks.add_task( + user_presenter.handle_unsupported_message, ) - return + return Response(status_code=200) # Check if the incoming message is a media type other than text if incoming_msg_type != "text": - await presenter.handle_unsupported_message( - from_whatsapp_number, - incoming_msg_type, + background_tasks.add_task( + user_presenter.handle_unsupported_message, ) - return + return Response(status_code=200) # Rest of the code below is for processing text messages sent by the whatsapp user - incoming_msg_text = incoming_msg_body["body"] - - # # Send acknowledgment message (only when DEV_MODE) - # # and if dev. doesn't need it, comment it out :] - # if get_settings().DEV_MODE: - # await presenter.send_whatsapp_message( - # from_whatsapp_number, - # f"Ack: {incoming_msg_text}", - # ) - - # Send a typing indicator to the sender - # Side note: As of 2024-12-21, Meta's WhatsApp API does not support typing indicators - # Source: Search "typing indicator whatsapp api" on Google - await presenter.send_whatsapp_message(from_whatsapp_number, "...") # Actual code to process the incoming message using Ansari agent then reply to the sender - await presenter.handle_text_message( - from_whatsapp_number, - incoming_msg_text, + background_tasks.add_task( + user_presenter.handle_text_message, ) + + return Response(status_code=200) diff --git a/src/ansari/config.py b/src/ansari/config.py index d86f74a..2359cde 100644 --- a/src/ansari/config.py +++ b/src/ansari/config.py @@ -176,7 +176,7 @@ def get_resource_path(filename): SENDGRID_API_KEY: SecretStr | None = Field(default=None) QURAN_DOT_COM_API_KEY: SecretStr = Field(alias="QURAN_DOT_COM_API_KEY") WHATSAPP_ENABLED: bool = Field(default=True) - WHATSAPP_API_VERSION: str | None = Field(default="v21.0") + WHATSAPP_API_VERSION: str | None = Field(default="v22.0") WHATSAPP_BUSINESS_PHONE_NUMBER_ID: SecretStr | None = Field(default=None) WHATSAPP_ACCESS_TOKEN_FROM_SYS_USER: SecretStr | None = Field(default=None) WHATSAPP_VERIFY_TOKEN_FOR_WEBHOOK: SecretStr | None = Field(default=None) @@ -194,14 +194,14 @@ def get_resource_path(filename): ANTHROPIC_MODEL: str = Field(default="claude-3-7-sonnet-latest") LOGGING_LEVEL: str = Field(default="INFO") DEV_MODE: bool = Field(default=False) - + # Application settings MAINTENANCE_MODE: bool = Field(default=False) - + # iOS app build versions IOS_MINIMUM_BUILD_VERSION: int = Field(default=1) IOS_LATEST_BUILD_VERSION: int = Field(default=1) - + # Android app build versions ANDROID_MINIMUM_BUILD_VERSION: int = Field(default=1) ANDROID_LATEST_BUILD_VERSION: int = Field(default=1) diff --git a/src/ansari/presenters/whatsapp_presenter.py b/src/ansari/presenters/whatsapp_presenter.py index acb33ba..07fdbdd 100644 --- a/src/ansari/presenters/whatsapp_presenter.py +++ b/src/ansari/presenters/whatsapp_presenter.py @@ -1,6 +1,8 @@ # Unlike other files, the presenter's role here is just to provide functions for handling WhatsApp interactions import re +import asyncio +import time from datetime import datetime, timezone from typing import Any, Literal, Optional @@ -16,37 +18,74 @@ logger = get_logger(__name__) # Initialize the DB and agent -# TODO(odyash): A question for others: should I refer `db` of this file and `main_api.py` to a single instance of AnsariDB? -# instead of duplicating `db` instances? Will this cost more resources? db = AnsariDB(get_settings()) class WhatsAppPresenter: def __init__( self, - agent: Ansari, - access_token, - business_phone_number_id, - api_version="v21.0", + agent: Ansari | None = None, + access_token: str | None = None, + business_phone_number_id: str | None = None, + api_version: str = "v22.0", + user_whatsapp_number: str | None = None, + incoming_msg_type: str | None = None, + incoming_msg_body: dict | None = None, + message_id: str | None = None, ): - self.settings = agent.settings + if agent: + self.settings = agent.settings + else: + self.settings = get_settings() + self.access_token = access_token + self.business_phone_number_id = business_phone_number_id + self.api_version = api_version self.meta_api_url = f"https://graph.facebook.com/{api_version}/{business_phone_number_id}/messages" + # User-specific fields + self.user_whatsapp_number = user_whatsapp_number + self.incoming_msg_type = incoming_msg_type + self.incoming_msg_body = incoming_msg_body + self.message_id = message_id + self.typing_indicator_task = None + self.first_indicator_time = None + + @classmethod + def create_user_specific_presenter( + cls, + general_presenter, + user_whatsapp_number: str, + incoming_msg_type: str, + incoming_msg_body: dict, + message_id: str, + ): + """Creates a user-specific presenter instance from a general presenter.""" + return cls( + access_token=general_presenter.access_token, + business_phone_number_id=general_presenter.business_phone_number_id, + api_version=general_presenter.api_version, + user_whatsapp_number=user_whatsapp_number, + incoming_msg_type=incoming_msg_type, + incoming_msg_body=incoming_msg_body, + message_id=message_id, + ) + async def extract_relevant_whatsapp_message_details( self, body: dict[str, Any], - ) -> tuple[str, str, str] | str | None: + ) -> tuple[bool, str | None, str | None, dict | None, str | None]: """Extracts relevant whatsapp message details from the incoming webhook payload. Args: body (Dict[str, Any]): The JSON body of the incoming request. Returns: - Optional[Tuple[str, str, str]]: A tuple containing the business phone number ID, - the sender's WhatsApp number and the their message (if the extraction is successful). - Returns None if the extraction fails. + tuple[bool, Optional[str], Optional[str], Optional[dict], Optional[str]]: + A tuple of (is_status, user_whatsapp_number, incoming_msg_type, incoming_msg_body, message_id) + Raises: + Exception: If the payload structure is invalid or unsupported. """ # logger.debug(f"Received payload from WhatsApp user:\n{body}") @@ -70,8 +109,11 @@ async def extract_relevant_whatsapp_message_details( # logger.debug( # f"WhatsApp status update received:\n({status} at {timestamp}.)", # ) - return "status update" + return True, None, None, None, None + else: + is_status = False + # should never be entered if "messages" not in value: error_msg = f"Unsupported message type received from WhatsApp user:\n{body}" logger.error( @@ -81,6 +123,8 @@ async def extract_relevant_whatsapp_message_details( incoming_msg = value["messages"][0] + # Extract and store the message ID for use in send_whatsapp_typing_indicator + message_id = incoming_msg.get("id") # Extract the phone number of the WhatsApp sender user_whatsapp_number = incoming_msg["from"] # Meta API note: Meta sends "errors" key when receiving unsupported message types @@ -91,37 +135,27 @@ async def extract_relevant_whatsapp_message_details( logger.info(f"Received a supported whatsapp message from {user_whatsapp_number}: {incoming_msg_body}") - return ( - user_whatsapp_number, - incoming_msg_type, - incoming_msg_body, - ) + return (is_status, user_whatsapp_number, incoming_msg_type, incoming_msg_body, message_id) - async def check_and_register_user( - self, - user_whatsapp_number: str, - incoming_msg_type: str, - incoming_msg_body: dict, - ) -> None: + async def check_and_register_user(self) -> bool: """ Checks if the user's phone number is stored in the users table. If not, registers the user with the preferred language. - Args: - user_whatsapp_number (str): The phone number of the WhatsApp sender. - incoming_msg_type (str): The type of the incoming message (e.g., text, location). - incoming_msg_body (dict): The body of the incoming message. - Returns: - None + bool: True if user exists or was successfully registered, False otherwise. """ + if not self.user_whatsapp_number: + logger.error("User WhatsApp number not set in presenter instance") + return False + # Check if the user's phone number exists in users table - if db.account_exists(phone_num=user_whatsapp_number): + if db.account_exists(phone_num=self.user_whatsapp_number): return True # Else, register the user with the detected language - if incoming_msg_type == "text": - incoming_msg_text = incoming_msg_body["body"] + if self.incoming_msg_type == "text": + incoming_msg_text = self.incoming_msg_body["body"] user_lang = get_language_from_text(incoming_msg_text) else: # TODO(odyash, good_first_issue): use lightweight library/solution that gives us language from country code @@ -130,51 +164,136 @@ async def check_and_register_user( status: Literal["success", "failure"] = db.register( source=SourceType.WHATSAPP, - phone_num=user_whatsapp_number, + phone_num=self.user_whatsapp_number, preferred_language=user_lang, )["status"] if status == "success": - logger.info(f"Registered new whatsapp user (lang: {user_lang})!: {user_whatsapp_number}") + logger.info(f"Registered new whatsapp user (lang: {user_lang})!: {self.user_whatsapp_number}") return True else: - logger.error(f"Failed to register new whatsapp user: {user_whatsapp_number}") + logger.error(f"Failed to register new whatsapp user: {self.user_whatsapp_number}") return False - async def send_whatsapp_message( - self, - user_whatsapp_number: str, - msg_body: str, - ) -> None: + async def send_typing_indicator_then_start_loop(self) -> None: + """Sends a typing indicator and starts a loop to periodically send more while processing the message.""" + if not self.user_whatsapp_number or not self.message_id: + logger.error("Cannot start typing indicator loop: missing user_whatsapp_number or message_id") + return + + self.first_indicator_time = time.time() + + # Send the initial typing indicator + await self._send_whatsapp_typing_indicator() + + # Start an async task that will keep sending typing indicators + self.typing_indicator_task = asyncio.create_task(self._typing_indicator_loop()) + + async def _typing_indicator_loop(self) -> None: + """Loop that periodically sends typing indicators while processing a message.""" + MAX_DURATION_SECONDS = 300 # 5 minutes maximum + INDICATOR_INTERVAL_SECONDS = 26 # Send indicator every 26 seconds + + try: + while True: + logger.debug("Currently in typing indicator loop (i.e., Ansari is taking longer than usual to respond)") + # Sleep for the interval + await asyncio.sleep(INDICATOR_INTERVAL_SECONDS) + + # Check if we've exceeded the maximum duration + elapsed_time = time.time() - self.first_indicator_time + if elapsed_time > MAX_DURATION_SECONDS: + logger.warning(f"Typing indicator loop exceeded maximum duration of {MAX_DURATION_SECONDS}s. Stopping.") + break + + # If we're still processing the message, send another typing indicator + logger.debug(f"Sending follow-up typing indicator after {elapsed_time:.1f}s") + await self._send_whatsapp_typing_indicator() + + except asyncio.CancelledError: + logger.debug("cancelling asyncio task...") + except Exception as e: + logger.error(f"Error in typing indicator loop: {e}") + logger.exception(e) + + async def _send_whatsapp_typing_indicator(self) -> None: + """Sends a typing indicator to the WhatsApp sender.""" + if not self.user_whatsapp_number or not self.message_id: + logger.error("Cannot send typing indicator: missing user_whatsapp_number or message_id") + return + + url = self.meta_api_url + headers = { + "Authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json", + } + + try: + async with httpx.AsyncClient() as client: + logger.debug(f"SENDING TYPING INDICATOR REQUEST TO: {url}") + + json_data = { + "messaging_product": "whatsapp", + "status": "read", + "message_id": self.message_id, + "typing_indicator": {"type": "text"}, + } + + response = await client.post(url, headers=headers, json=json_data) + response.raise_for_status() # Raise an exception for HTTP errors + + logger.debug(f"Sent typing indicator to WhatsApp user {self.user_whatsapp_number}") + + except Exception as e: + logger.error(f"Error sending typing indicator: {e}. Details are in next log.") + logger.exception(e) + + async def send_whatsapp_message(self, msg_body: str) -> None: """Sends a message to the WhatsApp sender. Args: - user_whatsapp_number (str): The sender's WhatsApp number. msg_body (str): The message body to be sent. - """ + if not self.user_whatsapp_number: + logger.error("Cannot send message: missing user_whatsapp_number") + return + url = self.meta_api_url - # Temp fix to work around the the maximum length for Whatsapp text messages (4096 characters) - msg_body = msg_body[:4000] headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json", } - json_data = { - "messaging_product": "whatsapp", - "to": user_whatsapp_number, - "text": {"body": msg_body}, - } + # Split the message if it exceeds WhatsApp's character limit + message_parts = self._split_long_messages(msg_body) + + # Stop the typing indicator before sending the actual message + if self.typing_indicator_task and not self.typing_indicator_task.done(): + logger.debug("Typing indicator loop was cancelled (as Ansari will respond now)") + self.typing_indicator_task.cancel() + + # Send the message(s) to the user try: async with httpx.AsyncClient() as client: logger.debug(f"SENDING REQUEST TO: {url}") - response = await client.post(url, headers=headers, json=json_data) - response.raise_for_status() # Raise an exception for HTTP errors - if msg_body != "...": - logger.info( - f"Ansari responsded to WhatsApp user: {user_whatsapp_number} with:\n{msg_body}", - ) + + logger.info( + f"Ansari responded to WhatsApp user {self.user_whatsapp_number} with the following message part(s):\n\n" + ) + + # If we have multiple parts, send them sequentially + for part in message_parts: + json_data = { + "messaging_product": "whatsapp", + "to": self.user_whatsapp_number, + "text": {"body": part}, + } + + response = await client.post(url, headers=headers, json=json_data) + response.raise_for_status() # Raise an exception for HTTP errors + + if msg_body != "...": + logger.info("\n".join(f"[Part {i + 1}]: \n{part}" for i, part in enumerate(message_parts))) except Exception as e: logger.error(f"Error sending message: {e}. Details are in next log.") logger.exception(e) @@ -204,71 +323,356 @@ def _get_retention_time_in_seconds(self) -> int: def _get_whatsapp_markdown(self, msg: str) -> str: """Convert conventional markdown syntax to WhatsApp's markdown syntax""" - msg_direction = get_language_direction_from_text(msg) - # Replace text surrounded with single "*" with "_" - # (as WhatsApp doesn't support italic text with "*"; it uses "_" instead) + # Process standard markdown syntax + msg = self._convert_italic_syntax(msg) + msg = self._convert_bold_syntax(msg) + msg = self._convert_headers(msg) + + # Process lists based on text direction + if msg_direction in ["ltr", "rtl"]: + msg = self._format_nested_lists(msg) + + return msg + + def _convert_italic_syntax(self, text: str) -> str: + """Convert markdown italic syntax (*text*) to WhatsApp italic syntax (_text_)""" # Regex details: # (? str: + """Convert markdown bold syntax (**text**) to WhatsApp bold syntax (*text*)""" + return text.replace("**", "*") + + def _convert_headers(self, text: str) -> str: + """Convert markdown headers to WhatsApp's bold+italic format""" + # Process headers with content directly after them + # (?! ) # Ensures there's no space before the hash (avoiding matching in middle of text) + # #+ \**_* # Matches one or more hash symbols and ignores any bold/italic markers already present + # (.*?) # Captures the header text (non-greedy) + # \**_*\n # Matches any trailing formatting markers and the newline + # (?!\n) # Ensures the newline isn't followed by another newline (i.e., not an isolated header) + pattern = re.compile(r"(?! )#+ \**_*(.*?)\**_*\n(?!\n)") + text = pattern.sub(r"*_\1_*\n\n", text) - # Replace "**" (markdown bold) with "*" (whatsapp bold) - msg = msg.replace("**", "*") + # Process headers with empty line after them + pattern = re.compile(r"(?! )#+ \**_*(.*?)\**_*\n\n") + return pattern.sub(r"*_\1_*\n\n", text) - # Match headers (#*) (that doesn't have a space before it (i.e., in the middle of a text)) - # where there's text directly after them - # NOTE: the `\**_*` part is to neglect any */_ in the returned group (.*?) - pattern = re.compile(r"(?! )#+ \**_*(.*?)\**_*\n(?!\n)") + def _format_nested_lists(self, text: str) -> str: + """ + Format only nested lists/bullet points with WhatsApp's special formatting. - # Replace them with bold (*) and italic (_) markdown syntax - # and add extra newline (to leave space between header and content) - msg = pattern.sub(r"*_\1_*\n\n", msg) + This handles: + 1. Nested bullet points within numbered lists + 2. Nested numbered lists within bullet points + 3. Purely nested bullet points + 4. Purely nested numbered lists - # Match headers (#*) (that doesn't have a space before it (i.e., in the middle of a text)) - # where there's another newline directly after them - # NOTE: the `\**_*` part is to neglect any */_ in the returned group (.*?) - pattern = re.compile(r"(?! )#+ \**_*(.*?)\**_*\n\n") + Simple (non-nested) lists retain their original formatting. + """ + lines = text.split("\n") + processed_lines = [] + in_nested_section = False + nested_section_indent = 0 + + for i, line in enumerate(lines): + # Check for indentation to detect nesting + indent_match = re.match(r"^(\s+)", line) if line.strip() else None + current_indent = len(indent_match.group(1)) if indent_match else 0 + + # Check if this is a list item (numbered or bullet) + is_numbered_item = re.match(r"^\s*\d+\.\s", line) + is_bullet_item = re.match(r"^\s*[\*-]\s", line) + + # Determine if we're entering, in, or exiting a nested section + if (is_numbered_item or is_bullet_item) and current_indent > 0: + # This is a nested item + if not in_nested_section: + in_nested_section = True + nested_section_indent = current_indent + + # Format nested items + if is_numbered_item: + # Convert nested numbered list format: " 1. Item" -> " 1 - Item" + line = re.sub(r"(\s*)(\d+)(\.) ", r"\1\2 - ", line) + elif is_bullet_item: + # Convert nested bullet format: " - Item" or " * Item" -> " -- Item" + line = re.sub(r"(\s*)[\*-] ", r"\1-- ", line) + + elif in_nested_section and current_indent < nested_section_indent: + # We're exiting the nested section + in_nested_section = False + + # For non-nested items, leave them as they are + processed_lines.append(line) + + return "\n".join(processed_lines) + + def _split_long_messages(self, msg_body: str) -> list[str]: + """Split long messages into smaller chunks based on formatted headers or other patterns. + + This method implements a multi-level splitting strategy for messages that exceed + WhatsApp's character limit (4000): + 1. First tries to split by header pattern (*_HEADER_*) + 2. If that's not possible, tries to split by bold text (*BOLD*) + 3. Finally falls back to paragraph-based splitting - # Replace them with bold (*) and italic (_) markdown syntax - msg = pattern.sub(r"*_\1_*\n\n", msg) + Args: + msg_body (str): The message body to split if necessary - # As nested text always appears in left side, even if text is RTL, which could be confusing to the reader, - # we decided to manipulate the nesting symbols (i.e., \d+\. , * , - , etc) so that they appear in right side - # NOTE: added "ltr" for consistency of formatting across different languages - if msg_direction in ["ltr", "rtl"]: - # Replace lines that start with (possibly indented) "- " or "* " with "-- " - msg = re.sub(r"(\s*)[\*-] ", r"\1-- ", msg) + Returns: + list[str]: A list of message chunks that can be sent separately + """ + # WhatsApp character limit + MAX_LENGTH = 4000 - # Replace the dot numbered lists (1. , etc.) with a dash (e.g., 1 - ) - msg = re.sub(r"(\s*)(\d+)(\.) ", r"\1\2 - ", msg, flags=re.MULTILINE) + # If message is already under the limit, return it as is + if len(msg_body) <= MAX_LENGTH: + return [msg_body] - return msg + # Strategy 1: Try to split by formatted headers (*_HEADER_*) + header_chunks = self._split_by_headers(msg_body, MAX_LENGTH) + if len(header_chunks) > 1: + return header_chunks - async def handle_text_message( - self, - user_whatsapp_number: str, - incoming_txt_msg: str, - ) -> None: - """Processes the incoming text message and sends a response to the WhatsApp sender. + # Strategy 2: Try to split by bold formatting (*BOLD*) + bold_chunks = self._split_by_bold_text(msg_body, MAX_LENGTH) + if len(bold_chunks) > 1: + return bold_chunks + + # Strategy 3: Fall back to paragraph-based splitting + return self._split_by_paragraphs(msg_body, MAX_LENGTH) + + def _split_by_headers(self, text: str, max_length: int) -> list[str]: + """Split text by formatted header pattern (*_HEADER_*). + + Args: + text (str): Text to split + max_length (int): Maximum allowed length of each chunk + + Returns: + list[str]: List of text chunks split by headers + + Example: + >>> text = "Text before header\n*_First Header_*\nText\n\n*_Second Header_*\nMore text" + >>> _split_by_headers(text, 1000) + ['Text before header', '*_First Header_*\nText', '*_Second Header_*\nMore text'] + """ + # Look for *_HEADER_* pattern + header_pattern = re.compile(r"\*_[^*_]+_\*") + headers = list(header_pattern.finditer(text)) + + # If we don't have multiple headers, we can't split effectively + if not headers or len(headers) <= 1: + return [text] + + chunks = [] + + # Process each header as a potential chunk boundary + for i, match in enumerate(headers): + # For the first header, handle any text that comes before it + if i == 0 and match.start() > 0: + prefix = text[: match.start()] + + # Always include the text before the first header in its own message(s) + # If it's too long, recursively split it + if len(prefix) <= max_length: + chunks.append(prefix) + else: + # If prefix is too long, split it using paragraph-based splitting + prefix_chunks = self._split_by_paragraphs(prefix, max_length) + chunks.extend(prefix_chunks) + + # Determine the end position for the chunk containing this header + end_pos = headers[i + 1].start() if i < len(headers) - 1 else len(text) + chunk = text[match.start() : end_pos] + + # If chunk fits within limit, add it directly + if len(chunk) <= max_length: + chunks.append(chunk) + else: + # Otherwise, try more aggressive splitting for this chunk + # First try bold formatting, then paragraphs + sub_chunks = self._split_by_bold_text(chunk, max_length) + chunks.extend(sub_chunks) + + return chunks + + def _split_by_bold_text(self, text: str, max_length: int) -> list[str]: + """Split text by looking for bold formatting (*TEXT*) patterns. + + This function splits text at bold formatting markers (*TEXT*) when the text + exceeds the maximum length. It treats each bold pattern as a potential + break point, always keeping the bold text with the content that follows it. + + Args: + text (str): Text to split + max_length (int): Maximum allowed length of each chunk + + Returns: + list[str]: List of text chunks split by bold formatting + + Example: + >>> text = "Some intro text\n*First bold section*\nMiddle content\n*Second bold*\nMore text" + >>> _split_by_bold_text(text, 30) + ['Some intro text', '*First bold section*\nMiddle content', '*Second bold*\nMore text'] + """ + if len(text) <= max_length: + return [text] + + # Find *TEXT* patterns + bold_pattern = re.compile(r"\*[^*]+\*") + bold_matches = list(bold_pattern.finditer(text)) + + # If we don't have enough bold patterns for effective splitting + if not bold_matches or len(bold_matches) <= 1: + return self._split_by_paragraphs(text, max_length) + + chunks = [] + + # Process each bold pattern as a potential chunk boundary + for i, match in enumerate(bold_matches): + # For the first bold pattern, handle any text that comes before it + if i == 0 and match.start() > 0: + prefix = text[: match.start()] + + # Always include the text before the first bold pattern in its own message(s) + # If it's too long, recursively split it + if len(prefix) <= max_length: + chunks.append(prefix) + else: + # If prefix is too long, split it using paragraph-based splitting + prefix_chunks = self._split_by_paragraphs(prefix, max_length) + chunks.extend(prefix_chunks) + + # Determine the end position for the chunk containing this bold pattern + end_pos = bold_matches[i + 1].start() if i < len(bold_matches) - 1 else len(text) + chunk = text[match.start() : end_pos] + + # If chunk fits within limit, add it directly + if len(chunk) <= max_length: + chunks.append(chunk) + else: + # Otherwise, fall back to paragraph splitting for this chunk + sub_chunks = self._split_by_paragraphs(chunk, max_length) + chunks.extend(sub_chunks) + + return chunks + + def _split_by_paragraphs(self, text: str, max_length: int) -> list[str]: + """Split text by paragraphs or fall back to fixed-size chunks if needed. + + This method attempts to split text at natural paragraph breaks (double newlines). If paragraphs themselves are + too long, it uses fixed-size chunk splitting as a fallback. Args: - user_whatsapp_number (str): The sender's WhatsApp number. - incoming_txt_msg (str): The incoming text message from the sender. + text (str): Text to split + max_length (int): Maximum allowed length of each chunk + Returns: + list[str]: List of text chunks split by paragraphs or fixed chunks + + Example: + >>> text = "This is paragraph 1.\\n\\nThis is paragraph 2.\\n\\nThis is a very long paragraph 3 that exceeds" + >>> text += " the maximum length and will need to be split." + >>> _split_by_paragraphs(text, 50) + ['This is paragraph 1.', 'This is paragraph 2.', 'This is a very long paragraph 3 that exceeds the', + ' maximum length and will need to be split.'] """ + if len(text) <= max_length: + return [text] + + chunks = [] + + # Try splitting by paragraphs first (double newlines) + paragraphs = re.split(r"\n\n+", text) + + if len(paragraphs) > 1: + current = "" + + for para in paragraphs: + # If adding this paragraph would exceed the limit + if current and len(current) + len(para) + 2 > max_length: + chunks.append(current) + current = "" + + # If paragraph itself is too long, split it using fixed chunks + if len(para) > max_length: + # Add any accumulated text first + if current: + chunks.append(current) + current = "" + + # Use fixed-size chunk splitting for long paragraphs + para_chunks = self._split_by_fixed_chunks(para, max_length) + chunks.extend(para_chunks) + else: + # Add paragraph to current chunk with proper separator + if current: + current += "\n\n" + para + else: + current = para + + # Don't forget the last chunk + if current: + chunks.append(current) + + return chunks + else: + # If text doesn't have paragraphs, use fixed-size chunk splitting + return self._split_by_fixed_chunks(text, max_length) + + def _split_by_fixed_chunks(self, text: str, max_length: int) -> list[str]: + """Split text into fixed-size chunks of maximum length. + + This is the simplest fallback approach, which just takes chunks of + max_length characters until the entire text is processed. + + Args: + text (str): Text to split + max_length (int): Maximum allowed length of each chunk + + Returns: + list[str]: List of text chunks of maximum length + + Example: + >>> text = "This is a very long text that exceeds the maximum allowed length" + >>> _split_by_fixed_chunks(text, 20) + ['This is a very long ', 'text that exceeds the', ' maximum allowed len', 'gth'] + """ + # If text is already under the limit, return it as is + if len(text) <= max_length: + return [text] + + chunks = [] + + # Simply take max_length characters at a time + for i in range(0, len(text), max_length): + chunks.append(text[i : i + max_length]) + + return chunks + + async def handle_text_message(self) -> None: + """Processes the incoming text message and sends a response to the WhatsApp sender.""" + incoming_txt_msg = self.incoming_msg_body["body"] + try: logger.debug(f"Whatsapp user said: {incoming_txt_msg}") # Get user's ID from users_whatsapp table # NOTE: we're not checking for user's existence here, as we've already done that in `main_webhook()` - user_id_whatsapp = db.retrieve_user_info(source=SourceType.WHATSAPP, phone_num=user_whatsapp_number) + user_id_whatsapp = db.retrieve_user_info(source=SourceType.WHATSAPP, phone_num=self.user_whatsapp_number) # Get details of the thread that the user last interacted with (i.e., max(updated_at)) thread_id, last_msg_time = db.get_last_message_time_whatsapp(user_id_whatsapp) @@ -294,7 +698,6 @@ async def handle_text_message( if "error" in result: logger.error(f"Error creating a new thread for whatsapp user ({user_id_whatsapp}): {result['error']}") await self.send_whatsapp_message( - user_whatsapp_number, "An unexpected error occurred while creating a new chat session. Please try again later.", ) return @@ -311,7 +714,6 @@ async def handle_text_message( if "messages" not in thread_name_and_history: logger.error(f"Error retrieving message history for thread ({thread_id}) of user ({user_id_whatsapp})") await self.send_whatsapp_message( - user_whatsapp_number, "An unexpected error occurred while getting your last chat session. Please try again later.", ) return @@ -336,11 +738,13 @@ async def handle_text_message( # Send the thread's history to the Ansari agent which will # log (i.e., append) the message history's last user message to DB, # process the history, - # log (i.e., append) Ansari's output to DB, - # TODO(odyash, good_first_issue): change `stream` to False (and remove comprehensive loop) - # when `Ansari` is capable of handling it - response = [tok for tok in agent.replace_message_history(msg_history, stream=True) if tok] - response = "".join(response) + # log (i.e., append) Ansari's output to DB + response = "" + for token in agent.replace_message_history(msg_history): + # NOTE: Check the `async_await_backgroundtasks_visualized.md` file + # for details on why we added this `await` line + await asyncio.sleep(0) + response += token # Convert conventional markdown syntax to WhatsApp's markdown syntax logger.debug(f"Response before markdown conversion: \n\n{response}") @@ -349,67 +753,45 @@ async def handle_text_message( # Return the response back to the WhatsApp user if it's not empty # Else, send an error message to the user if response: - await self.send_whatsapp_message(user_whatsapp_number, response) + await self.send_whatsapp_message(response) else: logger.warning("Response was empty. Sending error message.") await self.send_whatsapp_message( - user_whatsapp_number, "Ansari returned an empty response. Please rephrase your question, then try again.", ) except Exception as e: logger.error(f"Error processing message: {e}. Details are in next log.") logger.exception(e) await self.send_whatsapp_message( - user_whatsapp_number, "An unexpected error occurred while processing your message. Please try again later.", ) # NOTE: This function assumes `loc_lat` and `loc_long` columns are in `users` DB table # If alternative columns are used (e.g., city), the function should be updated accordingly - async def handle_location_message( - self, - user_whatsapp_number: str, - incoming_msg_body: dict, - ) -> None: + async def handle_location_message(self) -> None: """ Handles an incoming location message by updating the user's location in the database and sending a confirmation message. - - Args: - user_whatsapp_number (str): The phone number of the WhatsApp sender. - incoming_msg_body (dict): The body of the incoming location message. - - Returns: - None """ - loc = incoming_msg_body - db.update_user_by_phone_num(user_whatsapp_number, {"loc_lat": loc["latitude"], "loc_long": loc["longitude"]}) + + loc = self.incoming_msg_body + db.update_user_by_phone_num(self.user_whatsapp_number, {"loc_lat": loc["latitude"], "loc_long": loc["longitude"]}) # TODO(odyash, good_first_issue): update msg below to also say something like: # 'Type "pt"/"prayer times" to get prayer times', then implement that feature await self.send_whatsapp_message( - user_whatsapp_number, "Stored your location successfully!", # This will help us give you accurate prayer times ISA 🙌. ) async def handle_unsupported_message( self, - user_whatsapp_number: str, - incoming_msg_type: str, ) -> None: """ Handles an incoming unsupported message by sending an appropriate response. - - Args: - user_whatsapp_number (str): The phone number of the WhatsApp sender. - incoming_msg_type (str): The type of the incoming message (e.g., image, video). - - Returns: - None """ - msg_type = incoming_msg_type + "s" if not incoming_msg_type.endswith("s") else incoming_msg_type + + msg_type = self.incoming_msg_type + "s" if not self.incoming_msg_type.endswith("s") else self.incoming_msg_type msg_type = msg_type.replace("unsupporteds", "this media type") await self.send_whatsapp_message( - user_whatsapp_number, f"Sorry, I can't process {msg_type} yet. Please send me a text message.", ) diff --git a/tests/unit/test_ansari_claude_message_sequence.py b/tests/unit/test_ansari_claude_message_sequence.py index 63cd696..945e86e 100644 --- a/tests/unit/test_ansari_claude_message_sequence.py +++ b/tests/unit/test_ansari_claude_message_sequence.py @@ -92,7 +92,7 @@ def mock_process_one_round(*args, **kwargs): original_history = [msg.copy() for msg in claude.message_history] # Process the message history - list(claude.process_message_history(use_tool=False, stream=False)) + list(claude.process_message_history(use_tool=False)) # Check that the message history structure was preserved processed_history = claude.message_history @@ -125,15 +125,15 @@ def mock_process_one_round(*args, **kwargs): orig_result_blocks = [b for b in orig["content"] if b.get("type") == "tool_result"] processed_result_blocks = [b for b in processed["content"] if b.get("type") == "tool_result"] - assert len(orig_result_blocks) == len(processed_result_blocks), ( - f"Tool result blocks count mismatch at message {i}" - ) + assert len(orig_result_blocks) == len( + processed_result_blocks + ), f"Tool result blocks count mismatch at message {i}" # If there are result blocks, check that IDs are preserved if orig_result_blocks: - assert orig_result_blocks[0]["tool_use_id"] == processed_result_blocks[0]["tool_use_id"], ( - f"Tool result ID mismatch at message {i}" - ) + assert ( + orig_result_blocks[0]["tool_use_id"] == processed_result_blocks[0]["tool_use_id"] + ), f"Tool result ID mismatch at message {i}" print("All assertions passed - message sequence with tool use/result is correctly processed!") diff --git a/tests/unit/test_ansari_claude_tool_sequence.py b/tests/unit/test_ansari_claude_tool_sequence.py index 5d9b3f8..f7efd0f 100644 --- a/tests/unit/test_ansari_claude_tool_sequence.py +++ b/tests/unit/test_ansari_claude_tool_sequence.py @@ -66,7 +66,7 @@ def add_assistant_response(*args, **kwargs): claude.process_one_round = MagicMock(side_effect=add_assistant_response) # Run the message processing - list(claude.process_message_history(use_tool=False, stream=False)) + list(claude.process_message_history(use_tool=False)) # Verify the results processed_history = claude.message_history