Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions apps/web/src/lib/services/agent-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,27 @@ export const agentService = {
async getStatus(agentId: string) {
return apiClient.get<AgentStatusResponse>(`/api/v1/agents/${agentId}/status`);
},

/** Send an A2A inter-agent message. */
async sendA2AMessage(params: {
sender?: string;
recipient: string;
content: Record<string, unknown>;
conversation_id?: string;
}) {
return apiClient.post<{ conversation_id: string; timestamp: string }>(
'/api/v1/agents/a2a/send',
params,
);
},

/** Get A2A message log (optionally filtered by conversation). */
async getA2ALog(conversationId?: string, limit = 50) {
const params = new URLSearchParams();
if (conversationId) params.set('conversation_id', conversationId);
params.set('limit', String(limit));
return apiClient.get<{ messages: Record<string, unknown>[]; count: number }>(
`/api/v1/agents/a2a/log?${params}`,
);
},
};
52 changes: 52 additions & 0 deletions src/youtube_extension/backend/api/v1/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -1214,3 +1214,55 @@ async def get_agent_status(agent_id: str):
error=execution.error,
).model_dump()
)


# ============================================================
# A2A Inter-Agent Messaging
# ============================================================


@router.post(
"/agents/a2a/send",
response_model=ApiResponse,
summary="Send an A2A message between agents",
tags=["Agents"],
)
async def send_a2a_message(
body: dict[str, Any] = {},
):
Comment on lines +1230 to +1232
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The function signature uses dict[str, Any] for the request body, which violates the style guide's requirement for "strict type hinting". Using a Pydantic model provides automatic validation, better API documentation, and improved type safety.

You should define a A2ASendMessageRequest model in models.py and use it here. For example:

# In models.py
from pydantic import BaseModel, Field
# ...
class A2ASendMessageRequest(BaseModel):
    sender: Optional[str] = "frontend"
    recipient: str
    content: dict[str, Any] = Field(default_factory=dict)
    conversation_id: Optional[str] = None

Then, you can update the function signature and body to use this model, which also allows you to remove the manual validation for recipient.

Suggested change
async def send_a2a_message(
body: dict[str, Any] = {},
):
async def send_a2a_message(
body: A2ASendMessageRequest, # Define this Pydantic model in models.py
):
References
  1. The function uses dict[str, Any] for the request body instead of a strictly typed Pydantic model, which is required by the coding standards. (link)

"""Send a context-share or tool-request message between agents."""
Comment on lines +1231 to +1233
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using a mutable default for the request body (body: dict[str, Any] = {}), as it is shared at function definition time and can lead to surprising behavior. Prefer a required Pydantic model (recommended) or at least body: dict[str, Any] | None = None with body = body or {} inside.

Suggested change
body: dict[str, Any] = {},
):
"""Send a context-share or tool-request message between agents."""
body: dict[str, Any] | None = None,
):
"""Send a context-share or tool-request message between agents."""
body = body or {}

Copilot uses AI. Check for mistakes.
sender = body.get("sender", "frontend")
recipient = body.get("recipient")
content = body.get("content", {})
conversation_id = body.get("conversation_id")

if not recipient:
raise HTTPException(status_code=400, detail="recipient is required")

orch = AgentOrchestrator()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

AgentOrchestrator is instantiated directly. This endpoint should use the existing dependency injection pattern (Depends(get_agent_orchestrator_service)) to ensure it uses the shared global instance and maintains consistency with other endpoints. Please add orch: AgentOrchestrator = Depends(get_agent_orchestrator_service) to the function signature and remove this line.

msg = await orch.send_a2a_message(
sender=sender,
recipient=recipient,
content=content,
conversation_id=conversation_id,
)
Comment on lines +1242 to +1248
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_a2a_message/get_a2a_log create a fresh AgentOrchestrator() per request, which bypasses the app’s orchestrator singleton from get_agent_orchestrator_service() (and therefore has no registered agent types and an empty _a2a_log). This makes message delivery/log retrieval effectively non-functional across requests. Use the DI-provided orchestrator (e.g., orch: AgentOrchestrator = Depends(get_agent_orchestrator_service)) instead of instantiating a new one here.

Copilot uses AI. Check for mistakes.
return ApiResponse.success({
"conversation_id": msg.conversation_id,
"timestamp": msg.timestamp,
})


@router.get(
"/agents/a2a/log",
response_model=ApiResponse,
summary="Get A2A message log",
tags=["Agents"],
)
async def get_a2a_log(
conversation_id: Optional[str] = None,
limit: int = 50,
):
"""Return recent A2A inter-agent messages."""
orch = AgentOrchestrator()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

AgentOrchestrator is instantiated directly. This endpoint should use the existing dependency injection pattern (Depends(get_agent_orchestrator_service)) to ensure it uses the shared global instance and maintains consistency with other endpoints. Please add orch: AgentOrchestrator = Depends(get_agent_orchestrator_service) to the function signature and remove this line.

log = orch.get_a2a_log(conversation_id=conversation_id, limit=limit)
return ApiResponse.success({"messages": log, "count": len(log)})
Comment on lines +1224 to +1268
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New A2A endpoints and orchestrator behavior are introduced here, but there are no accompanying tests. There are already unit tests covering API v1 models and orchestrator behavior in tests/; adding tests for /api/v1/agents/a2a/send, /api/v1/agents/a2a/log, and the expected context-sharing/logging semantics would help prevent regressions and keep coverage in line with repo expectations.

Copilot generated this review using guidance from repository custom instructions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,31 @@

import asyncio
import logging
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional

from ..base_agent import AgentRequest, AgentResult, BaseAgent


@dataclass
class A2AContextMessage:
"""Lightweight A2A context-share message within the orchestrator."""

sender: str
recipient: str
content: dict[str, Any]
conversation_id: str = ""
timestamp: str = ""

def __post_init__(self):
if not self.conversation_id:
self.conversation_id = str(uuid.uuid4())
if not self.timestamp:
self.timestamp = datetime.utcnow().isoformat()


@dataclass
class OrchestrationResult:
"""Result from agent orchestration"""
Expand All @@ -42,6 +60,7 @@ def __init__(self):
self.logger = logging.getLogger("agent_orchestrator")
self._agents: dict[str, BaseAgent] = {}
self._agent_types: dict[str, type[BaseAgent]] = {}
self._a2a_log: list[A2AContextMessage] = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The _a2a_log is an in-memory list that is appended to but never cleared or capped. In a long-running process, this will lead to unbounded memory growth and eventually cause performance issues or crashes. To prevent this, you should use a capped collection, such as collections.deque with a maxlen.

You'll need to from collections import deque.

Suggested change
self._a2a_log: list[A2AContextMessage] = []
self._a2a_log: "deque[A2AContextMessage]" = deque(maxlen=1000) # Using a capped collection

Comment on lines 60 to +63
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_a2a_log is an in-memory list with no retention policy, and the broadcast path stores each agent’s full output for every sender/recipient pair. With a long-lived orchestrator singleton this can grow unbounded and retain large payloads (transcripts, etc.). Consider capping the log (e.g., deque with maxlen), truncating/storing summaries, or making logging opt-in via config.

Copilot uses AI. Check for mistakes.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The _a2a_log list in the singleton AgentOrchestrator is appended to without any cleanup mechanism, causing it to grow indefinitely and risk a memory leak.
Severity: HIGH

Suggested Fix

Implement a retention policy for the _a2a_log. A simple approach is to cap its size using a collections.deque with a maxlen or by manually trimming the list after each append to ensure it does not exceed a predefined limit.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/youtube_extension/services/agents/adapters/agent_orchestrator.py#L63

Potential issue: The `_a2a_log` list within the `AgentOrchestrator` class accumulates
`A2AContextMessage` objects from agent-to-agent communications. Because the
`AgentOrchestrator` is instantiated as a long-lived singleton, this log grows
indefinitely without any clearing or trimming mechanism. In a production environment
with continuous operation, this unbounded growth will lead to excessive memory
consumption, eventually causing the service to crash with an out-of-memory error.

Did we get this right? 👍 / 👎 to inform future reviews.

self._task_mappings: dict[str, list[str]] = {
"video_analysis": [
"video_master",
Expand Down Expand Up @@ -178,6 +197,25 @@ async def execute_task(
asyncio.get_event_loop().time() - start_time
)

# A2A context sharing: broadcast each agent's output to all others
if orchestration_result.success and len(orchestration_result.results) > 1:
conv_id = str(uuid.uuid4())
for sender_name, sender_result in orchestration_result.results.items():
for recipient_name in orchestration_result.results:
if recipient_name != sender_name:
msg = A2AContextMessage(
sender=sender_name,
recipient=recipient_name,
content={"type": "context_share", "output": sender_result.output},
conversation_id=conv_id,
)
self._a2a_log.append(msg)
Comment on lines +200 to +212
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The “A2A context sharing” block only appends messages to _a2a_log; it doesn’t actually deliver them to recipient agents. Since none of the agents (and not even BaseAgent) define receive_context, this code currently cannot share context as described. Consider either (1) calling send_a2a_message(...) for each sender/recipient pair (and defining a receive_context contract on BaseAgent), or (2) renaming/commenting this as logging-only if that’s the intent.

Copilot uses AI. Check for mistakes.
self.logger.debug(
"A2A context shared across %d agents (conv=%s)",
len(orchestration_result.results),
conv_id,
)

self.logger.info(
f"Task execution completed: {task_type} "
f"(success={orchestration_result.success}, "
Expand Down Expand Up @@ -262,6 +300,54 @@ def add_task_mapping(self, task_type: str, agent_names: list[str]):
self._task_mappings[task_type] = agent_names
self.logger.info(f"Added task mapping: {task_type} -> {agent_names}")

# --- A2A messaging ---

async def send_a2a_message(
self,
sender: str,
recipient: str,
content: dict[str, Any],
conversation_id: str | None = None,
) -> A2AContextMessage:
"""Send an A2A context-share message between agents."""
msg = A2AContextMessage(
sender=sender,
recipient=recipient,
content=content,
conversation_id=conversation_id or str(uuid.uuid4()),
)
self._a2a_log.append(msg)

# Deliver to recipient agent if it exists
agent = self._agents.get(recipient)
if agent and hasattr(agent, "receive_context"):
try:
await agent.receive_context(content)
except Exception as e:
self.logger.warning("Agent %s failed to receive context: %s", recipient, e)

return msg

def get_a2a_log(
self,
conversation_id: str | None = None,
limit: int = 50,
) -> list[dict[str, Any]]:
"""Return recent A2A messages, optionally filtered by conversation."""
msgs = self._a2a_log
if conversation_id:
msgs = [m for m in msgs if m.conversation_id == conversation_id]
return [
{
"sender": m.sender,
"recipient": m.recipient,
"content": m.content,
"conversation_id": m.conversation_id,
"timestamp": m.timestamp,
}
for m in msgs[-limit:]
]
Comment on lines +331 to +349
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function can be improved for type safety and robustness:

  1. Return Type: Returning list[dict[str, Any]] is not strictly typed. You can return list[A2AContextMessage] directly, as FastAPI can serialize dataclasses. This aligns better with the style guide's emphasis on strict typing.
  2. Slicing with Deque: If _a2a_log is converted to a deque to prevent memory leaks (as suggested in another comment), the msgs[-limit:] slice will fail when no conversation_id is provided. Deques do not support negative slicing.

Here's a revised implementation that addresses both points and is robust for both list and deque.

    def get_a2a_log(
        self,
        conversation_id: str | None = None,
        limit: int = 50,
    ) -> list[A2AContextMessage]:
        """Return recent A2A messages, optionally filtered by conversation."""
        if conversation_id:
            # Filtering creates a list, so negative slicing is safe here.
            return [m for m in self._a2a_log if m.conversation_id == conversation_id][-limit:]

        # If self._a2a_log is a deque, convert to list for slicing.
        # This is safe but could be optimized for very large deques if performance is critical.
        return list(self._a2a_log)[-limit:]
References
  1. The function returns a list[dict[str, Any]] which is not strictly typed. The suggestion improves this by returning a list of strongly-typed A2AContextMessage objects. (link)



# Global orchestrator instance
orchestrator = AgentOrchestrator()
Loading