|
15 | 15 | integration capabilities while only needing to implement the core _invoke method. |
16 | 16 | """ |
17 | 17 |
|
| 18 | +import asyncio |
18 | 19 | import re |
19 | 20 | import sqlite3 |
20 | 21 | from abc import ABC, abstractmethod |
@@ -591,11 +592,74 @@ def _finalize_graph( |
591 | 592 | """Hook for subclasses to wrap or modify the compiled graph.""" |
592 | 593 | return graph_app |
593 | 594 |
|
| 595 | + def _tool_is_async_only(self, tool: Any) -> bool: |
| 596 | + """Return True for tools that can only be invoked asynchronously. |
| 597 | +
|
| 598 | + MCP tools are commonly exposed as StructuredTool instances with a |
| 599 | + coroutine implementation but no synchronous function implementation. |
| 600 | + Those raise errors like: |
| 601 | +
|
| 602 | + "StructuredTool does not support sync invocation." |
| 603 | +
|
| 604 | + when called via `.invoke()`. |
| 605 | + """ |
| 606 | + |
| 607 | + func = getattr(tool, "func", None) |
| 608 | + coroutine = getattr(tool, "coroutine", None) |
| 609 | + return func is None and coroutine is not None |
| 610 | + |
| 611 | + def _has_async_only_tools(self) -> bool: |
| 612 | + tools_obj = getattr(self, "tools", None) |
| 613 | + if not tools_obj: |
| 614 | + return False |
| 615 | + |
| 616 | + try: |
| 617 | + tool_iter = ( |
| 618 | + tools_obj.values() if isinstance(tools_obj, dict) else tools_obj |
| 619 | + ) |
| 620 | + except Exception: |
| 621 | + return False |
| 622 | + |
| 623 | + return any(self._tool_is_async_only(t) for t in tool_iter) |
| 624 | + |
594 | 625 | def _invoke(self, input, **config): |
595 | 626 | config = self.build_config(**config) |
596 | | - return self.compiled_graph.invoke( |
597 | | - input, config=config, context=self.context |
598 | | - ) |
| 627 | + |
| 628 | + # If we have async-only tools (e.g. MCP StructuredTools), we must run the |
| 629 | + # graph via `ainvoke` so ToolNode dispatches tools asynchronously. |
| 630 | + if self._has_async_only_tools(): |
| 631 | + try: |
| 632 | + asyncio.get_running_loop() |
| 633 | + except RuntimeError: |
| 634 | + return asyncio.run( |
| 635 | + self.compiled_graph.ainvoke( |
| 636 | + input, config=config, context=self.context |
| 637 | + ) |
| 638 | + ) |
| 639 | + |
| 640 | + raise RuntimeError( |
| 641 | + "This agent has async-only tools, but `.invoke()` was called " |
| 642 | + "from an async context (a running event loop was detected). " |
| 643 | + "Use `await agent.ainvoke(...)` instead." |
| 644 | + ) |
| 645 | + |
| 646 | + try: |
| 647 | + return self.compiled_graph.invoke( |
| 648 | + input, config=config, context=self.context |
| 649 | + ) |
| 650 | + except Exception as e: |
| 651 | + # Fallback: if a tool raises the canonical sync-invoke error, retry |
| 652 | + # with ainvoke for backwards compatibility. |
| 653 | + if "does not support sync invocation" in str(e): |
| 654 | + try: |
| 655 | + asyncio.get_running_loop() |
| 656 | + except RuntimeError: |
| 657 | + return asyncio.run( |
| 658 | + self.compiled_graph.ainvoke( |
| 659 | + input, config=config, context=self.context |
| 660 | + ) |
| 661 | + ) |
| 662 | + raise |
599 | 663 |
|
600 | 664 | async def _ainvoke(self, input, **config): |
601 | 665 | config = self.build_config(**config) |
|
0 commit comments