-
Notifications
You must be signed in to change notification settings - Fork 485
feat: support multiple A2AStarletteApplication on a single port #323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1b8b509
0b9ce11
7839d4f
79b1122
2bbe76a
582752c
db82848
ec224cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| # Example: Using a2a-python SDK Without an LLM Framework | ||
|
|
||
| This repository demonstrates how to set up and use the [a2a-python SDK](https://github.com/google/a2a-python) to create a simple server and client, without relying on any agent framework. | ||
|
|
||
| ## Overview | ||
|
|
||
| - **A2A (Agent-to-Agent):** A protocol and SDK for communication with AI Agents. | ||
| - **This Example:** Shows how to support multiple A2AStarletteApplication instances or AgentExecutor implementations on a single port. | ||
|
|
||
| More information refer to [a2a-mcp-without-framework](https://github.com/a2aproject/a2a-samples/blob/main/samples/python/agents/a2a-mcp-without-framework/README.md) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| [project] | ||
| name = "no-llm-framework" | ||
| version = "0.1.0" | ||
| description = "Use A2A without any agent framework" | ||
|
Comment on lines
+2
to
+4
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The project |
||
| readme = "README.md" | ||
| authors = [{ name = "prem", email = "[email protected]" }] | ||
| requires-python = ">=3.13" | ||
| dependencies = [ | ||
| "a2a-sdk>=0.3.0", | ||
| "asyncclick>=8.1.8", | ||
| "colorama>=0.4.6", | ||
| "fastmcp>=2.3.4", | ||
| "google-genai", | ||
| "jinja2>=3.1.6", | ||
| "rich>=14.0.0", | ||
| "starlette>=0.47.2" | ||
| ] | ||
| [build-system] | ||
| requires = ["hatchling"] | ||
| build-backend = "hatchling.build" | ||
|
|
||
| [project.scripts] | ||
| a2a-server = "no_llm_framework.server.__main__:main" | ||
| a2a-client = "no_llm_framework.client.__main__:main" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| import asyncio | ||
| from typing import Literal | ||
|
|
||
|
|
||
| import asyncclick as click | ||
| import colorama | ||
| from no_llm_framework.client.agent import Agent | ||
|
|
||
|
|
||
| @click.command() | ||
| @click.option('--host', 'host', default='localhost') | ||
| @click.option('--port', 'port', default=9999) | ||
| @click.option('--mode', 'mode', default='streaming') | ||
| @click.option('--question', 'question', required=True) | ||
| async def a_main( | ||
| host: str, | ||
| port: int, | ||
| mode: Literal['completion', 'streaming'], | ||
|
Comment on lines
+13
to
+18
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The To fix this, you should pass the |
||
| question: str, | ||
| ): | ||
| """Main function to run the A2A Repo Agent client. | ||
|
|
||
| Args: | ||
| host (str): The host address to run the server on. | ||
| port (int): The port number to run the server on. | ||
| mode (Literal['completion', 'streaming']): The mode to run the server on. | ||
| question (str): The question to ask the Agent. | ||
| """ # noqa: E501 | ||
| agent_index = 1 | ||
| agent = Agent( | ||
| mode='stream', | ||
| token_stream_callback=None, | ||
| agent_urls=[f'http://{host}:{port}/{agent_index}'], | ||
| ) | ||
| async for chunk in agent.stream(question): | ||
| if chunk.startswith('<Agent name="'): | ||
| print(colorama.Fore.CYAN + chunk, end='', flush=True) | ||
| elif chunk.startswith('</Agent>'): | ||
| print(colorama.Fore.RESET + chunk, end='', flush=True) | ||
| else: | ||
| print(chunk, end='', flush=True) | ||
|
|
||
|
|
||
| def main() -> None: | ||
| """Main function to run the A2A Repo Agent client.""" | ||
| asyncio.run(a_main()) | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| main() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,259 @@ | ||
| import asyncio | ||
| import json | ||
| import re | ||
|
|
||
| from collections.abc import Callable, Generator | ||
| from pathlib import Path | ||
| from typing import Literal | ||
| from uuid import uuid4 | ||
|
|
||
| import httpx | ||
|
|
||
| from a2a.client import A2ACardResolver, A2AClient | ||
| from a2a.types import ( | ||
| AgentCard, | ||
| Message, | ||
| MessageSendParams, | ||
| Part, | ||
| Role, | ||
| SendStreamingMessageRequest, | ||
| SendStreamingMessageSuccessResponse, | ||
| TaskStatusUpdateEvent, | ||
| TextPart, | ||
| ) | ||
| from google import genai | ||
| from jinja2 import Template | ||
|
|
||
| from no_llm_framework.client.constant import GOOGLE_API_KEY | ||
|
|
||
|
|
||
| dir_path = Path(__file__).parent | ||
|
|
||
| with Path(dir_path / 'decide.jinja').open('r') as f: | ||
| decide_template = Template(f.read()) | ||
|
|
||
| with Path(dir_path / 'agents.jinja').open('r') as f: | ||
| agents_template = Template(f.read()) | ||
|
|
||
| with Path(dir_path / 'agent_answer.jinja').open('r') as f: | ||
| agent_answer_template = Template(f.read()) | ||
|
|
||
|
|
||
| def stream_llm(prompt: str) -> Generator[str]: | ||
| """Stream LLM response. | ||
|
|
||
| Args: | ||
| prompt (str): The prompt to send to the LLM. | ||
|
|
||
| Returns: | ||
| Generator[str, None, None]: A generator of the LLM response. | ||
| """ | ||
| client = genai.Client(vertexai=False, api_key=GOOGLE_API_KEY) | ||
| for chunk in client.models.generate_content_stream( | ||
| model='gemini-2.5-flash-lite', | ||
| contents=prompt, | ||
| ): | ||
| yield chunk.text | ||
|
|
||
|
|
||
| class Agent: | ||
| """Agent for interacting with the Google Gemini LLM in different modes.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| mode: Literal['complete', 'stream'] = 'stream', | ||
| token_stream_callback: Callable[[str], None] | None = None, | ||
| agent_urls: list[str] | None = None, | ||
| agent_prompt: str | None = None, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| ): | ||
| self.mode = mode | ||
| self.token_stream_callback = token_stream_callback | ||
| self.agent_urls = agent_urls | ||
| self.agents_registry: dict[str, AgentCard] = {} | ||
|
|
||
| async def get_agents(self) -> tuple[dict[str, AgentCard], str]: | ||
| """Retrieve agent cards from all agent URLs and render the agent prompt. | ||
|
|
||
| Returns: | ||
| tuple[dict[str, AgentCard], str]: A dictionary mapping agent names to AgentCard objects, and the rendered agent prompt string. | ||
| """ | ||
| async with httpx.AsyncClient() as httpx_client: | ||
| card_resolvers = [ | ||
| A2ACardResolver(httpx_client, url) for url in self.agent_urls | ||
| ] | ||
| agent_cards = await asyncio.gather( | ||
| *[ | ||
| card_resolver.get_agent_card() | ||
| for card_resolver in card_resolvers | ||
| ] | ||
| ) | ||
| agents_registry = { | ||
| agent_card.name: agent_card for agent_card in agent_cards | ||
| } | ||
| agent_prompt = agents_template.render(agent_cards=agent_cards) | ||
| return agents_registry, agent_prompt | ||
|
|
||
| def call_llm(self, prompt: str) -> str: | ||
| """Call the LLM with the given prompt and return the response as a string or generator. | ||
|
|
||
| Args: | ||
| prompt (str): The prompt to send to the LLM. | ||
|
|
||
| Returns: | ||
| str or Generator[str]: The LLM response as a string or generator, depending on mode. | ||
| """ | ||
|
Comment on lines
+96
to
+104
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic in Additionally, the return type hint is |
||
| if self.mode == 'complete': | ||
| return stream_llm(prompt) | ||
|
|
||
| result = '' | ||
| for chunk in stream_llm(prompt): | ||
| result += chunk | ||
| return result | ||
|
|
||
| async def decide( | ||
| self, | ||
| question: str, | ||
| agents_prompt: str, | ||
| called_agents: list[dict] | None = None, | ||
| ) -> Generator[str, None]: | ||
|
Comment on lines
+113
to
+118
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The To fix this, you should remove the |
||
| """Decide which agent(s) to use to answer the question. | ||
|
|
||
| Args: | ||
| question (str): The question to answer. | ||
| agents_prompt (str): The prompt describing available agents. | ||
| called_agents (list[dict] | None): Previously called agents and their answers. | ||
|
|
||
| Returns: | ||
| Generator[str, None]: The LLM's response as a generator of strings. | ||
| """ | ||
| if called_agents: | ||
| call_agent_prompt = agent_answer_template.render( | ||
| called_agents=called_agents | ||
| ) | ||
| else: | ||
| call_agent_prompt = '' | ||
| prompt = decide_template.render( | ||
| question=question, | ||
| agent_prompt=agents_prompt, | ||
| call_agent_prompt=call_agent_prompt, | ||
| ) | ||
| return self.call_llm(prompt) | ||
|
|
||
| def extract_agents(self, response: str) -> list[dict]: | ||
| """Extract the agents from the response. | ||
|
|
||
| Args: | ||
| response (str): The response from the LLM. | ||
| """ | ||
| pattern = r'```json\n(.*?)\n```' | ||
| match = re.search(pattern, response, re.DOTALL) | ||
| if match: | ||
| return json.loads(match.group(1)) | ||
| return [] | ||
|
|
||
| async def send_message_to_an_agent( | ||
| self, agent_card: AgentCard, message: str | ||
| ): | ||
| """Send a message to a specific agent and yield the streaming response. | ||
|
|
||
| Args: | ||
| agent_card (AgentCard): The agent to send the message to. | ||
| message (str): The message to send. | ||
|
|
||
| Yields: | ||
| str: The streaming response from the agent. | ||
| """ | ||
| async with httpx.AsyncClient() as httpx_client: | ||
| client = A2AClient(httpx_client, agent_card=agent_card) | ||
| message = MessageSendParams( | ||
| message=Message( | ||
| role=Role.user, | ||
| parts=[Part(TextPart(text=message))], | ||
| message_id=uuid4().hex, | ||
| task_id=uuid4().hex, | ||
| ) | ||
| ) | ||
|
|
||
| streaming_request = SendStreamingMessageRequest( | ||
| id=str(uuid4().hex), params=message | ||
| ) | ||
| async for chunk in client.send_message_streaming(streaming_request): | ||
| if isinstance( | ||
| chunk.root, SendStreamingMessageSuccessResponse | ||
| ) and isinstance(chunk.root.result, TaskStatusUpdateEvent): | ||
| message = chunk.root.result.status.message | ||
| if message: | ||
| yield message.parts[0].root.text | ||
|
|
||
| async def stream(self, question: str): | ||
| """Stream the process of answering a question, possibly involving multiple agents. | ||
|
|
||
| Args: | ||
| question (str): The question to answer. | ||
|
|
||
| Yields: | ||
| str: Streaming output, including agent responses and intermediate steps. | ||
| """ | ||
| agent_answers: list[dict] = [] | ||
| for _ in range(3): | ||
| agents_registry, agent_prompt = await self.get_agents() | ||
| response = '' | ||
| for chunk in await self.decide( | ||
| question, agent_prompt, agent_answers | ||
| ): | ||
| response += chunk | ||
| if self.token_stream_callback: | ||
| self.token_stream_callback(chunk) | ||
| yield chunk | ||
|
|
||
| agents = self.extract_agents(response) | ||
| if agents: | ||
| for agent in agents: | ||
| agent_response = '' | ||
| agent_card = agents_registry[agent['name']] | ||
| yield f'<Agent name="{agent["name"]}">\n' | ||
| async for chunk in self.send_message_to_an_agent( | ||
| agent_card, agent['prompt'] | ||
| ): | ||
| agent_response += chunk | ||
| if self.token_stream_callback: | ||
| self.token_stream_callback(chunk) | ||
| yield chunk | ||
| yield '</Agent>\n' | ||
| match = re.search( | ||
| r'<Answer>(.*?)</Answer>', agent_response, re.DOTALL | ||
| ) | ||
| answer = match.group(1).strip() if match else agent_response | ||
| agent_answers.append( | ||
| { | ||
| 'name': agent['name'], | ||
| 'prompt': agent['prompt'], | ||
| 'answer': answer, | ||
| } | ||
| ) | ||
| else: | ||
| return | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| import asyncio | ||
|
|
||
| import colorama | ||
|
|
||
| async def main(): | ||
| """Main function to run the A2A Repo Agent client.""" | ||
| agent = Agent( | ||
| mode='stream', | ||
| token_stream_callback=None, | ||
| agent_urls=['http://localhost:9999/'], | ||
| ) | ||
|
|
||
| async for chunk in agent.stream('What is A2A protocol?'): | ||
| if chunk.startswith('<Agent name="'): | ||
| print(colorama.Fore.CYAN + chunk, end='', flush=True) | ||
| elif chunk.startswith('</Agent>'): | ||
| print(colorama.Fore.RESET + chunk, end='', flush=True) | ||
| else: | ||
| print(chunk, end='', flush=True) | ||
|
|
||
| asyncio.run(main()) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| Previous agents have been called. {% for agent in called_agents %} | ||
| - Agent: {{ agent.name }} | ||
| - Prompt: {{ agent.prompt }} | ||
| - Answer: {{ agent.answer }} | ||
| {% endfor %} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| # Agent contexts | ||
|
|
||
| These are some consults from agent(s) that may be useful to answer the question | ||
| {% for agent in agent_cards %} | ||
| Agent name: {{agent.name}} | ||
| Agent description: {{agent.description}} | ||
| Agent skills: {% for skill in agent.skills%} | ||
| - name: {{skill.name}} | ||
| - description: {{skill.description}} | ||
| - example: {{skill.examples}}{% endfor %}{% endfor %} | ||
| ------------------- |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| Previous agents have been called. {% for agent in called_agents %} | ||
| - Agent: {{ agent.name }} | ||
| - Prompt: {{ agent.prompt }} | ||
| - Answer: {{ agent.answer }} | ||
| -------------------------------- | ||
| {% endfor %} | ||
|
Comment on lines
+1
to
+6
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| import os | ||
|
|
||
| GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The title and overview of this README seem to be copied from another example and don't accurately reflect the purpose of this sample. The title says "Using a2a-python SDK Without an LLM Framework", but the example is about "support multiple A2AStarletteApplication instances or AgentExecutor implementations on a single port". This is confusing for users. Please update the title and description to match the example's content.
For example:
Title:
Example: Supporting Multiple A2A Agents on a Single PortAlso, the file structure references
no_llm_frameworkwhich is also confusing. Consider renaming the directory to better reflect the example.