Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@
"envFile": "${workspaceFolder}/.env",
"cwd": "${workspaceFolder}/fastmcp-agents-library/agents/fastmcp-agents-library-agent-documentation-maintainer/playground/es_integrations",
},
{
"name": "Python: GitHub Triage CLI",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/fastmcp-agents-library/agents/fastmcp-agents-library-agents/src/fastmcp_agents/library/agents/github/cli.py",
"console": "integratedTerminal",
"args": [
"triage",
"--issue-owner",
"strawgate",
"--issue-repo",
"fastmcp-agents",
"--issue-number",
"1",
"--instructions",
"Please triage the issue."
]
},
// Debug Tests config
{
"name": "Python: Debug Tests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ def format_span(span: ReadableSpan) -> str:
tool_name: str | None = str(span.attributes.get("gen_ai.tool.name"))
tool_arguments: str | None = str(span.attributes.get("tool_arguments"))
tool_response: str | None = str(span.attributes.get("tool_response"))
tool_response_tokens: int = int(len(tool_response) / 4) if tool_response else 0

span_message = f"Model called {tool_name} with arguments: {tool_arguments} returned: {tool_response[:200]}"
span_message = (
f"Model called {tool_name} returned {tool_response_tokens} tokens. Arguments: {tool_arguments}: {tool_response[:2000]}"
)

case _ if span.name.startswith("chat "):
model_name = str(span.attributes.get("gen_ai.request.model"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import base64
import contextlib
from abc import ABC
from asyncio import Lock, Semaphore
from contextlib import AsyncExitStack
from dataclasses import field
from typing import TYPE_CHECKING, Any, Self, override

import pydantic_core
from fastmcp.client import Client
from fastmcp.client.transports import MCPConfigTransport
from fastmcp.exceptions import ToolError
from fastmcp.mcp_config import MCPConfig
from fastmcp.server.server import FastMCP
Expand All @@ -22,10 +24,7 @@
from pydantic_ai.toolsets.abstract import ToolsetTool

if TYPE_CHECKING:
from asyncio import Lock

from fastmcp import FastMCP
from fastmcp.client import Client
from fastmcp.client.client import CallToolResult
from fastmcp.client.transports import FastMCPTransport
from fastmcp.mcp_config import MCPServerTypes
Expand All @@ -46,20 +45,28 @@ class BaseFastMCPToolset[AgentDepsT](AbstractToolset[AgentDepsT], ABC):
def __init__(self, tool_retries: int = 2):
self._tool_retries = tool_retries

@property
def id(self) -> str | None:
return None


class FastMCPClientToolset(BaseFastMCPToolset[AgentDepsT]):
"""A toolset that uses a FastMCP client as the underlying toolset."""

_fastmcp_client: Client[FastMCPTransport] | None = None
_fastmcp_client: Client[Any] | None = None

_enter_lock: Lock = field(compare=False)
_enter_lock: Lock
_running_count: int
_exit_stack: AsyncExitStack | None
_semaphore: Semaphore

def __init__(self, client: Client[FastMCPTransport], tool_retries: int = 2):
def __init__(self, client: Client[Any], tool_retries: int = 2):
super().__init__(tool_retries=tool_retries)

self._fastmcp_client = client
self._enter_lock = Lock()
self._running_count = 0
self._semaphore = Semaphore(value=1)

async def __aenter__(self) -> Self:
async with self._enter_lock:
Expand Down Expand Up @@ -93,19 +100,41 @@ async def get_tools(self, ctx: RunContext[AgentDepsT]) -> dict[str, ToolsetTool[
return {tool.name: convert_mcp_tool_to_toolset_tool(toolset=self, mcp_tool=tool, retries=self._tool_retries) for tool in mcp_tools}

async def call_tool(self, name: str, tool_args: dict[str, Any], ctx: RunContext[AgentDepsT], tool: ToolsetTool[AgentDepsT]) -> Any: # pyright: ignore[reportAny]
call_tool_result: CallToolResult = await self.fastmcp_client.call_tool(name=name, arguments=tool_args)
async with self._semaphore:
try:
call_tool_result: CallToolResult = await self.fastmcp_client.call_tool(name=name, arguments=tool_args)
except ToolError as e:
raise ModelRetry(message=str(object=e)) from e

return call_tool_result.data or call_tool_result.structured_content or _map_fastmcp_tool_results(parts=call_tool_result.content)
# We don't use call_tool_result.data at the moment because it requires the json schema to be translatable
# back into pydantic models otherwise it will be missing data.

return call_tool_result.structured_content or _map_fastmcp_tool_results(parts=call_tool_result.content)

@classmethod
def from_mcp_server(cls, name: str, mcp_server: MCPServerTypes) -> Self:
return cls.from_mcp_config(mcp_config=MCPConfig(mcpServers={name: mcp_server}))

@classmethod
def from_mcp_config(cls, mcp_config: MCPConfig) -> Self:
fastmcp_client: Client[MCPConfigTransport] = Client[MCPConfigTransport](transport=mcp_config)
return cls(client=fastmcp_client, tool_retries=2)


class FastMCPServerToolset(BaseFastMCPToolset[AgentDepsT], ABC):
"""An abstract base class for toolsets that use a FastMCP server to provide the underlying toolset."""

_fastmcp_server: FastMCP[Any]
_semaphore: Semaphore

def __init__(self, server: FastMCP[Any], tool_retries: int = 2):
super().__init__(tool_retries=tool_retries)
self._fastmcp_server = server
self._semaphore = Semaphore(value=1)

async def __aenter__(self) -> Self:
await self._fastmcp_server.get_tools()
return self

async def _setup_fastmcp_server(self, ctx: RunContext[AgentDepsT]) -> None:
msg = "Subclasses must implement this method"
Expand All @@ -128,20 +157,20 @@ async def call_tool(self, name: str, tool_args: dict[str, Any], ctx: RunContext[
fastmcp_tools: dict[str, FastMCPTool] = await self._fastmcp_server.get_tools()

if not (matching_tool := fastmcp_tools.get(name)):
msg = f"Tool {name} not found in toolset {self.name}"
msg = f"Tool {name} not found in toolset {self._fastmcp_server.name}"
raise ValueError(msg)

try:
call_tool_result: ToolResult = await matching_tool.run(arguments=tool_args)
except ToolError as e:
raise ModelRetry(message=str(object=e)) from e
async with self._semaphore:
try:
call_tool_result: ToolResult = await matching_tool.run(arguments=tool_args)
except ToolError as e:
raise ModelRetry(message=str(object=e)) from e

return call_tool_result.structured_content or _map_fastmcp_tool_results(parts=call_tool_result.content)

@classmethod
def from_mcp_server(cls, name: str, mcp_server: MCPServerTypes) -> Self:
mcp_config: MCPConfig = MCPConfig(mcpServers={name: mcp_server})
return cls.from_mcp_config(mcp_config=mcp_config)
return cls.from_mcp_config(mcp_config=MCPConfig(mcpServers={name: mcp_server}))

@classmethod
def from_mcp_config(cls, mcp_config: MCPConfig) -> Self:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies = [
"fastmcp-agents-library-mcp",
"fastmcp_agents.bridge.pydantic_ai>=0.1.2",
"gitpython>=3.1.44",
"pydantic-ai",
"pydantic-ai>=0.7.2",
]

[project.scripts]
Expand All @@ -25,7 +25,6 @@ module-name = "fastmcp_agents.library.agents"
[tool.uv.sources]
fastmcp-agents-library-mcp = { workspace = true }
fastmcp-agents-bridge-pydantic-ai = { workspace = true }
pydantic-ai = { git = "https://github.com/strawgate/pydantic-ai.git", branch = "dynamic-toolset" }

[build-system]
requires = ["uv_build>=0.8.2,<0.9.0"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#!/usr/bin/env -S uv run fastmcp run

"""
This agent is used to perform GitHub tasks.
"""

import os
from typing import TYPE_CHECKING, Any, ClassVar

import yaml
from pydantic import BaseModel, ConfigDict, Field
from pydantic.type_adapter import TypeAdapter
from pydantic_ai import RunContext
from pydantic_ai.agent import Agent
from pydantic_ai.messages import ModelMessage

if TYPE_CHECKING:
from pydantic_ai.run import AgentRunResult

PERSONA = """
## Persona
You are a Judge! Congratulations on your accomplishment. You are tasked with evaluating the performance of another AI Agent to
determine whether the Agent has completed the task and whether it has done so correctly.
"""

EVALUATOR_INSTRUCTIONS = """
## Evaluator Instructions
You have one goal: review the task, tool calls, and tool call responses to determine whether the Agent did what it said it did
or whether it made up information or lied about what it did.
"""

DEFAULT_CRITERIA = """
## Does the response match the task?
Evaluate the task on the final result and determine if the final result is a relevant, complete, and accurate response to the task.
Providing an incomplete or inaccurate response must result in a failed evaluation. If the task was not possible to complete, the
evaluation can succeed only if the Agent clearly indicates that the task was not possible to complete.

## Is the response well grounded?
Ensure that each item of the final result is based off of information gathered during a "tool call" or from the "user prompt".
The Agent may not fabricate information. This will most commonly occur after a tool call failure. If the Agent fabricates information,
you will point out the specific piece of fabricated information to the Agent.

## Did the Agent lie about what it did?
The Agent may not lie about what it did. For example, if the Agent indicates that it executed tests for a code change, did the Agent
actually execute the tests? If the Agent lies about what it did, you will point out the specific piece of information that
the Agent falsified.

## Remediation Instructions
Whenever possible, provide specific and actionable remediation instructions to the Agent. These remediation instructions, if followed,
should be enough for the Agent to pass the next evaluation.
"""


class EvaluatorAgentDependency(BaseModel):
"""A dependency for the GitHub Research Agent."""

model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True)

messages: list[ModelMessage] = Field(description="The messages to evaluate.")

task: str = Field(description="The task to evaluate the model's performance on.")

criteria: str = Field(default=DEFAULT_CRITERIA, description="The criteria to evaluate the model's performance on.")


class EvaluatorAgentInput(EvaluatorAgentDependency):
@classmethod
def from_ctx(
cls, ctx: RunContext[Any], base_criteria: str | None = None, additional_criteria: str | None = None
) -> "EvaluatorAgentInput":
task = yaml.safe_dump(ctx.prompt)
return cls(
messages=ctx.messages,
task=task,
criteria=base_criteria or DEFAULT_CRITERIA + "\n\n" + (additional_criteria or ""),
)

def to_deps(self) -> EvaluatorAgentDependency:
return EvaluatorAgentDependency(
messages=self.messages,
task=self.task,
criteria=self.criteria,
)


class SuccessfulEvaluation(BaseModel):
"""A result from the evaluation."""

passed: bool = Field(description="Whether the Agent passed the evaluation.")


class FailedEvaluation(BaseModel):
"""A result from the evaluation."""

passed: bool = Field(description="Whether the Agent passed the evaluation.")
reason: str = Field(description="The reason the Agent passed or failed the evaluation.")
instructions: str = Field(description="Instructions to provide to the Agent to help it achieve success.")


evaluator_agent: Agent[EvaluatorAgentDependency, SuccessfulEvaluation | FailedEvaluation] = Agent[
EvaluatorAgentDependency, SuccessfulEvaluation | FailedEvaluation
](
name="evaluator-agent",
model=os.getenv("MODEL_EVALUATOR_AGENT") or os.getenv("MODEL"),
instructions=[
PERSONA,
EVALUATOR_INSTRUCTIONS,
],
deps_type=EvaluatorAgentDependency,
output_type=[SuccessfulEvaluation, FailedEvaluation],
)


async def evaluate_performance(
ctx: RunContext[Any], base_criteria: str | None = None, additional_criteria: str | None = None
) -> SuccessfulEvaluation | FailedEvaluation:
"""Evaluate the performance of the Agent."""

evaluator_input = EvaluatorAgentInput.from_ctx(
ctx=ctx,
base_criteria=base_criteria,
additional_criteria=additional_criteria,
)

agent_run_result: AgentRunResult[SuccessfulEvaluation | FailedEvaluation] = await evaluator_agent.run(deps=evaluator_input.to_deps())

return agent_run_result.output


REMOVE_MESSAGE_KEYS = ["instructions", "usage", "model_name", "timestamp", "provider_details", "provider_request_id"]


@evaluator_agent.instructions
async def evaluator_agent_instructions(ctx: RunContext[EvaluatorAgentDependency]) -> str:
"""Instructions for the evaluator agent."""

message_dump = TypeAdapter(list[ModelMessage]).dump_python(ctx.deps.messages)

for message in message_dump:
for key in REMOVE_MESSAGE_KEYS:
if key in message:
del message[key]

return f"""
## Original Task
{ctx.deps.task}

## Agent Instructions and Tool Calls
````````
{yaml.safe_dump(message_dump)}
```````

## Evaluation Criteria
{ctx.deps.criteria}
"""
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from fastmcp_agents.library.agents.filesystem.agents import read_only_filesystem_agent, read_write_filesystem_agent
from fastmcp_agents.library.agents.shared.logging import configure_console_logging
from fastmcp_agents.library.agents.shared.models import Failure
from fastmcp_agents.library.agents.shared.models.status import Failure


async def investigate_filesystem(
Expand Down
Loading
Loading