diff --git a/apps/web/src/lib/services/agent-service.ts b/apps/web/src/lib/services/agent-service.ts index dbaf60c5b..d58c96506 100644 --- a/apps/web/src/lib/services/agent-service.ts +++ b/apps/web/src/lib/services/agent-service.ts @@ -19,4 +19,27 @@ export const agentService = { async getStatus(agentId: string) { return apiClient.get(`/api/v1/agents/${agentId}/status`); }, + + /** Send an A2A inter-agent message. */ + async sendA2AMessage(params: { + sender?: string; + recipient: string; + content: Record; + conversation_id?: string; + }) { + return apiClient.post<{ conversation_id: string; timestamp: string }>( + '/api/v1/agents/a2a/send', + params, + ); + }, + + /** Get A2A message log (optionally filtered by conversation). */ + async getA2ALog(conversationId?: string, limit = 50) { + const params = new URLSearchParams(); + if (conversationId) params.set('conversation_id', conversationId); + params.set('limit', String(limit)); + return apiClient.get<{ messages: Record[]; count: number }>( + `/api/v1/agents/a2a/log?${params}`, + ); + }, }; diff --git a/src/youtube_extension/backend/api/v1/router.py b/src/youtube_extension/backend/api/v1/router.py index 234170a8c..3772063fb 100644 --- a/src/youtube_extension/backend/api/v1/router.py +++ b/src/youtube_extension/backend/api/v1/router.py @@ -1214,3 +1214,55 @@ async def get_agent_status(agent_id: str): error=execution.error, ).model_dump() ) + + +# ============================================================ +# A2A Inter-Agent Messaging +# ============================================================ + + +@router.post( + "/agents/a2a/send", + response_model=ApiResponse, + summary="Send an A2A message between agents", + tags=["Agents"], +) +async def send_a2a_message( + body: dict[str, Any] = {}, +): + """Send a context-share or tool-request message between agents.""" + sender = body.get("sender", "frontend") + recipient = body.get("recipient") + content = body.get("content", {}) + conversation_id = body.get("conversation_id") + + if not recipient: + raise HTTPException(status_code=400, detail="recipient is required") + + orch = AgentOrchestrator() + msg = await orch.send_a2a_message( + sender=sender, + recipient=recipient, + content=content, + conversation_id=conversation_id, + ) + return ApiResponse.success({ + "conversation_id": msg.conversation_id, + "timestamp": msg.timestamp, + }) + + +@router.get( + "/agents/a2a/log", + response_model=ApiResponse, + summary="Get A2A message log", + tags=["Agents"], +) +async def get_a2a_log( + conversation_id: Optional[str] = None, + limit: int = 50, +): + """Return recent A2A inter-agent messages.""" + orch = AgentOrchestrator() + log = orch.get_a2a_log(conversation_id=conversation_id, limit=limit) + return ApiResponse.success({"messages": log, "count": len(log)}) diff --git a/src/youtube_extension/services/agents/adapters/agent_orchestrator.py b/src/youtube_extension/services/agents/adapters/agent_orchestrator.py index 62d4682b5..52d1d3f03 100644 --- a/src/youtube_extension/services/agents/adapters/agent_orchestrator.py +++ b/src/youtube_extension/services/agents/adapters/agent_orchestrator.py @@ -9,6 +9,7 @@ import asyncio import logging +import uuid from dataclasses import dataclass, field from datetime import datetime from typing import Any, Optional @@ -16,6 +17,23 @@ from ..base_agent import AgentRequest, AgentResult, BaseAgent +@dataclass +class A2AContextMessage: + """Lightweight A2A context-share message within the orchestrator.""" + + sender: str + recipient: str + content: dict[str, Any] + conversation_id: str = "" + timestamp: str = "" + + def __post_init__(self): + if not self.conversation_id: + self.conversation_id = str(uuid.uuid4()) + if not self.timestamp: + self.timestamp = datetime.utcnow().isoformat() + + @dataclass class OrchestrationResult: """Result from agent orchestration""" @@ -42,6 +60,7 @@ def __init__(self): self.logger = logging.getLogger("agent_orchestrator") self._agents: dict[str, BaseAgent] = {} self._agent_types: dict[str, type[BaseAgent]] = {} + self._a2a_log: list[A2AContextMessage] = [] self._task_mappings: dict[str, list[str]] = { "video_analysis": [ "video_master", @@ -178,6 +197,25 @@ async def execute_task( asyncio.get_event_loop().time() - start_time ) + # A2A context sharing: broadcast each agent's output to all others + if orchestration_result.success and len(orchestration_result.results) > 1: + conv_id = str(uuid.uuid4()) + for sender_name, sender_result in orchestration_result.results.items(): + for recipient_name in orchestration_result.results: + if recipient_name != sender_name: + msg = A2AContextMessage( + sender=sender_name, + recipient=recipient_name, + content={"type": "context_share", "output": sender_result.output}, + conversation_id=conv_id, + ) + self._a2a_log.append(msg) + self.logger.debug( + "A2A context shared across %d agents (conv=%s)", + len(orchestration_result.results), + conv_id, + ) + self.logger.info( f"Task execution completed: {task_type} " f"(success={orchestration_result.success}, " @@ -262,6 +300,54 @@ def add_task_mapping(self, task_type: str, agent_names: list[str]): self._task_mappings[task_type] = agent_names self.logger.info(f"Added task mapping: {task_type} -> {agent_names}") + # --- A2A messaging --- + + async def send_a2a_message( + self, + sender: str, + recipient: str, + content: dict[str, Any], + conversation_id: str | None = None, + ) -> A2AContextMessage: + """Send an A2A context-share message between agents.""" + msg = A2AContextMessage( + sender=sender, + recipient=recipient, + content=content, + conversation_id=conversation_id or str(uuid.uuid4()), + ) + self._a2a_log.append(msg) + + # Deliver to recipient agent if it exists + agent = self._agents.get(recipient) + if agent and hasattr(agent, "receive_context"): + try: + await agent.receive_context(content) + except Exception as e: + self.logger.warning("Agent %s failed to receive context: %s", recipient, e) + + return msg + + def get_a2a_log( + self, + conversation_id: str | None = None, + limit: int = 50, + ) -> list[dict[str, Any]]: + """Return recent A2A messages, optionally filtered by conversation.""" + msgs = self._a2a_log + if conversation_id: + msgs = [m for m in msgs if m.conversation_id == conversation_id] + return [ + { + "sender": m.sender, + "recipient": m.recipient, + "content": m.content, + "conversation_id": m.conversation_id, + "timestamp": m.timestamp, + } + for m in msgs[-limit:] + ] + # Global orchestrator instance orchestrator = AgentOrchestrator()