-
Notifications
You must be signed in to change notification settings - Fork 742
Sampling #386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Sampling #386
Conversation
- Introduced methods for listing and reading resources from MCP servers. - Enhanced MCPAggregator to handle resource mapping and initialization. - Updated Agent class to utilize new resource functionalities.
…and OpenAIAugmentedLLM
… to support multiple URIs
…thropicAugmentedLLM
…M and implement resource handling
…er for multipart message conversion
…rsion in GoogleAugmentedLLM
…feat/resources-support
…nstead of resource_uris
…remove unused prompt/resource attachment methods
…add weather report functionality and update main.py to handle resource and prompt messages.
…esources; remove get_resource_messages and get_prompt_messages methods
…etrieval using create_prompt method
…; update tests to validate resource handling.
Looks like lint and test are failing -- can you please look into those? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/mcp_agent/server/app_server.py (1)
296-347
: Functions declare “-> bool” but never return a value.Both workflows-resume and workflows-cancel should return the result boolean.
@@ - if result: + if result: logger.debug( f"Signaled workflow {workflow_name} with ID {run_id} with signal '{signal_name}' and payload '{payload}'" ) else: logger.error( f"Failed to signal workflow {workflow_name} with ID {run_id} with signal '{signal_name}' and payload '{payload}'" ) + return result @@ - if result: + if result: logger.debug(f"Cancelled workflow {workflow_name} with ID {run_id}") else: logger.error(f"Failed to cancel workflow {workflow_name} with ID {run_id}") + return resultAlso applies to: 348-377
src/mcp_agent/mcp/mcp_connection_manager.py (1)
174-186
: Bug: session ID callback set before session exists; it never runs.Set the callback after creating the session.
- async with transport_context as (read_stream, write_stream, *extras): - # If the transport provides a session ID callback (streamable_http does), - # store it in the server connection - if ( - len(extras) > 0 - and callable(extras[0]) - and isinstance(server_conn.session, MCPAgentClientSession) - ): - server_conn.session.set_session_id_callback(extras[0]) - - # Build a session - server_conn.create_session(read_stream, write_stream) + async with transport_context as (read_stream, write_stream, *extras): + # Build a session + server_conn.create_session(read_stream, write_stream) + # If the transport provides a session ID callback (streamable_http does), store it + if len(extras) > 0 and callable(extras[0]) and isinstance( + server_conn.session, MCPAgentClientSession + ): + server_conn.session.set_session_id_callback(extras[0])src/mcp_agent/mcp/mcp_server_registry.py (1)
137-141
: Use seconds in timedelta to avoid days-scale timeouts.
timedelta(config.read_timeout_seconds)
treats the value as days. Useseconds=
.Apply this diff:
- read_timeout_seconds = ( - timedelta(config.read_timeout_seconds) + read_timeout_seconds = ( + timedelta(seconds=config.read_timeout_seconds) if config.read_timeout_seconds else None )
♻️ Duplicate comments (4)
examples/mcp/mcp_prompts_and_resources/demo_server.py (2)
23-33
: Bug: get_users returns a 1-tuple instead of a JSON string.Remove the trailing comma/extra parentheses so the resource returns a plain string.
@mcp.resource("demo://data/friends") def get_users(): """Provide my friend list.""" - return ( - json.dumps( - [ - {"id": 1, "friend": "Alice"}, - ] - ) - ) + return json.dumps( + [ + {"id": 1, "friend": "Alice"}, + ] + )
64-67
: Handle CreateMessageResult.content as a list of contents.Extract text from TextContent items and join; current isinstance check will always fail.
- if isinstance(haiku.content, TextContent): - return haiku.content.text - else: - return "Haiku generation failed, unexpected content type." + texts = [c.text for c in haiku.content if isinstance(c, TextContent)] + if texts: + return "\n".join(texts) + return "Haiku generation failed, unexpected content type."src/mcp_agent/server/app_server.py (1)
404-413
: Guard against missing 'properties' when stripping 'self'.Accessing parameters["properties"] directly can KeyError; use get(...) consistently.
- if "self" in run_fn_tool.parameters["properties"]: - del(run_fn_tool.parameters["properties"]["self"]) + if "self" in run_fn_tool.parameters.get("properties", {}): + del run_fn_tool.parameters["properties"]["self"] req = run_fn_tool.parameters.get("required", []) if "self" in req: req.remove("self") run_fn_tool.parameters["required"] = reqsrc/mcp_agent/mcp/mcp_server_registry.py (1)
112-119
: Fix factory 4th-arg type (Context vs ServerSession) and align callable signatures across both methods.You pass a Context instance (
ctx_with_session
) as the 4th arg but the type hints declareServerSession | None
. Also,initialize_server
still requirestimedelta
/ServerSession
(non-optional). This causes type inconsistency and potential runtime confusion. Make both methods acceptContext | None
and optionaltimedelta
. Consider defaulting the factory toMCPAgentClientSession
since it expects the context.Apply this diff:
@@ - async def start_server( - self, - server_name: str, - client_session_factory: Callable[ - [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, ServerSession | None], - ClientSession, - ] = ClientSession, - session_id: str | None = None, - upstream_session: ServerSession | None = None, + async def start_server( + self, + server_name: str, + client_session_factory: Callable[ + [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, "Context" | None], + ClientSession, + ] = MCPAgentClientSession, + session_id: str | None = None, + upstream_session: ServerSession | None = None, @@ - async def initialize_server( - self, - server_name: str, - client_session_factory: Callable[ - [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession], - ClientSession, - ] = ClientSession, - init_hook: InitHookCallable = None, - session_id: str | None = None, - upstream_session: ServerSession | None = None, + async def initialize_server( + self, + server_name: str, + client_session_factory: Callable[ + [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, "Context" | None], + ClientSession, + ] = MCPAgentClientSession, + init_hook: InitHookCallable | None = None, + session_id: str | None = None, + upstream_session: ServerSession | None = None,Also applies to: 301-307
🧹 Nitpick comments (5)
examples/mcp/mcp_prompts_and_resources/README.md (1)
24-24
: Nit: clarify phrasing.“friends” → “friend list” reads better.
- - `demo://data/friends`: Example user data (JSON) + - `demo://data/friends`: Example friend list (JSON)src/mcp_agent/server/app_server.py (1)
114-119
: Avoid parameter shadowing; return the copied context.Rename the local copy to avoid shadowing the incoming ctx for readability.
- mcp_context = ctx - # Create a copy of the app context, since we don't want to attach this session to the - # global context - ctx = copy.deepcopy(app.context) - ctx.upstream_session = mcp_context.session - return app.workflows, ctx + mcp_context = ctx + # Create a copy of the app context (avoid mutating global context) + app_ctx = copy.deepcopy(app.context) + app_ctx.upstream_session = mcp_context.session + return app.workflows, app_ctxsrc/mcp_agent/mcp/mcp_server_registry.py (3)
126-131
: Correct the Returns/Yields section in docstrings.These context managers yield a ClientSession, not StdioServerParameters.
Apply this diff:
- Returns: - StdioServerParameters: The server parameters for stdio transport. + Yields: + ClientSession: An active MCP client session for the configured transport.(Apply in both places.)
Also applies to: 317-322
275-278
: Remove unnecessary pylint directive.
# pylint: disable=W0135
on this line seems unrelated. Drop it unless there’s a specific warning here.Apply this diff:
- async with websocket_client(url=config.url) as ( # pylint: disable=W0135 + async with websocket_client(url=config.url) as (
340-349
: Fix typo:intialization_callback
→initialization_callback
.Minor readability fix.
Apply this diff:
- intialization_callback = ( + initialization_callback = ( init_hook if init_hook is not None else self.init_hooks.get(server_name) ) - if intialization_callback: + if initialization_callback: logger.info(f"{server_name}: Executing init hook") - intialization_callback(session, config.auth) + initialization_callback(session, config.auth)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (9)
examples/mcp/mcp_prompts_and_resources/README.md
(1 hunks)examples/mcp/mcp_prompts_and_resources/demo_server.py
(2 hunks)examples/mcp/mcp_sampling/demo_server.py
(1 hunks)examples/mcp/mcp_sampling/main.py
(1 hunks)src/mcp_agent/agents/agent.py
(1 hunks)src/mcp_agent/mcp/gen_client.py
(4 hunks)src/mcp_agent/mcp/mcp_connection_manager.py
(5 hunks)src/mcp_agent/mcp/mcp_server_registry.py
(11 hunks)src/mcp_agent/server/app_server.py
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- examples/mcp/mcp_sampling/main.py
- examples/mcp/mcp_sampling/demo_server.py
- src/mcp_agent/agents/agent.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/mcp/mcp_server_registry.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/utils/config.py : Configuration values such as quality_threshold, max_refinement_attempts, consolidation_interval, and evaluator_model_provider must be loaded from mcp_agent.config.yaml.
Applied to files:
src/mcp_agent/mcp/mcp_server_registry.py
🧬 Code graph analysis (4)
src/mcp_agent/server/app_server.py (1)
src/mcp_agent/app.py (4)
context
(134-139)upstream_session
(158-159)upstream_session
(162-163)workflows
(166-167)
src/mcp_agent/mcp/gen_client.py (2)
src/mcp_agent/mcp/mcp_agent_client_session.py (1)
MCPAgentClientSession
(72-412)src/mcp_agent/app.py (3)
session_id
(174-175)upstream_session
(158-159)upstream_session
(162-163)
examples/mcp/mcp_prompts_and_resources/demo_server.py (1)
src/mcp_agent/core/context.py (1)
mcp
(101-102)
src/mcp_agent/mcp/mcp_server_registry.py (2)
src/mcp_agent/core/context.py (2)
mcp
(101-102)Context
(57-102)src/mcp_agent/app.py (4)
context
(134-139)session_id
(174-175)upstream_session
(158-159)upstream_session
(162-163)
🪛 GitHub Actions: Pull Request Checks
src/mcp_agent/mcp/mcp_server_registry.py
[error] 39-39: Ruff lint error: F401 'mcp_agent.core.context.Context' imported but unused in TYPE_CHECKING block. Remove unused import or run 'ruff check --fix' to auto-fix.
🪛 LanguageTool
examples/mcp/mcp_prompts_and_resources/README.md
[grammar] ~24-~24: There might be a mistake here.
Context: .../data/friends: Example user data (JSON) - **Prompt:** -
echo`: A simple prompt ...
(QB_NEW_EN)
🔇 Additional comments (1)
src/mcp_agent/mcp/mcp_server_registry.py (1)
243-248
: Timeout arguments accept seconds, no changes needed
timeout
andsse_read_timeout
both expect numeric seconds, so usingconfig.http_timeout_seconds
andconfig.read_timeout_seconds
directly is correct.
src/mcp_agent/mcp/gen_client.py
Outdated
[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession], | ||
ClientSession, | ||
] = MCPAgentClientSession, | ||
session_id: str | None = None, | ||
upstream_session: ServerSession | None = None, | ||
) -> AsyncGenerator[ClientSession, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Align factory default and typing with 4-arg shape; allow Optional[ServerSession].
Provide a wrapper default that accepts 4 args; keep upstream_session optional.
- client_session_factory: Callable[
- [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession],
- ClientSession,
- ] = MCPAgentClientSession,
+ client_session_factory: Callable[
+ [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, Optional[ServerSession]],
+ ClientSession,
+ ] = DEFAULT_CLIENT_SESSION_FACTORY,
@@
- upstream_session: ServerSession | None = None,
+ upstream_session: ServerSession | None = None,
@@
- client_session_factory: Callable[
- [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession],
- ClientSession,
- ] = MCPAgentClientSession,
+ client_session_factory: Callable[
+ [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, Optional[ServerSession]],
+ ClientSession,
+ ] = DEFAULT_CLIENT_SESSION_FACTORY,
Add this wrapper (outside the changed ranges):
def DEFAULT_CLIENT_SESSION_FACTORY(
read_stream: MemoryObjectReceiveStream,
write_stream: MemoryObjectSendStream,
read_timeout: timedelta | None,
upstream_session: ServerSession | None, # forwarded/ignored by default
) -> ClientSession:
return MCPAgentClientSession(read_stream, write_stream, read_timeout)
Also applies to: 50-55
🏁 Script executed:
#!/bin/bash
# Confirm gen_client uses the wrapper default and Optional[ServerSession]
rg -nP "DEFAULT_CLIENT_SESSION_FACTORY|Optional\[ServerSession" src/mcp_agent/mcp/gen_client.py
Length of output: 97
🏁 Script executed:
#!/bin/bash
sed -n '1,100p' src/mcp_agent/mcp/gen_client.py
Length of output: 3354
🏁 Script executed:
#!/bin/bash
# Locate and display the beginning of mcp_agent_client_session implementation
file=$(fd mcp_agent_client_session.py -t f)
echo "Located: $file"
sed -n '1,200p' "$file"
Length of output: 7689
Align client_session_factory
signature & default; add DEFAULT_CLIENT_SESSION_FACTORY
In src/mcp_agent/mcp/gen_client.py, update in both gen_client
(lines 21–26) and connect
(lines 50–55) the client_session_factory
annotation to
Callable[
[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, Optional[ServerSession]],
ClientSession,
] = DEFAULT_CLIENT_SESSION_FACTORY
and define outside the functions:
def DEFAULT_CLIENT_SESSION_FACTORY(
read_stream: MemoryObjectReceiveStream,
write_stream: MemoryObjectSendStream,
read_timeout: timedelta | None,
upstream_session: ServerSession | None,
) -> ClientSession:
return MCPAgentClientSession(read_stream, write_stream, read_timeout)
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/gen_client.py around lines 21–26 and 50–55, the
client_session_factory annotation and default need to be aligned to accept an
optional ServerSession and a nullable read_timeout and to default to a named
factory; change both annotations to Callable[[MemoryObjectReceiveStream,
MemoryObjectSendStream, timedelta | None, ServerSession | None], ClientSession]
= DEFAULT_CLIENT_SESSION_FACTORY, and add a module-level
DEFAULT_CLIENT_SESSION_FACTORY function (defined outside the functions) with
signature DEFAULT_CLIENT_SESSION_FACTORY(read_stream: MemoryObjectReceiveStream,
write_stream: MemoryObjectSendStream, read_timeout: timedelta | None,
upstream_session: ServerSession | None) -> ClientSession that returns
MCPAgentClientSession(read_stream, write_stream, read_timeout).
[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession], | ||
ClientSession, | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
API/Runtime mismatch: 4-arg client_session_factory typing, but only 3 args passed; default factory incompatible.
- create_session still passes 3 args.
- Defaults use MCPAgentClientSession (3-arg ctor), incompatible with 4-arg signature.
Fix by introducing a 4-arg wrapper default and passing a 4th argument (None) from create_session. Consider Optional[ServerSession] in the Callable types.
Apply diffs:
@@ class ServerConnection.__init__
- client_session_factory: Callable[
- [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession],
+ client_session_factory: Callable[
+ [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, Optional[ServerSession]],
ClientSession,
],
@@ def ServerConnection.create_session
- session = self._client_session_factory(read_stream, send_stream, read_timeout)
+ session = self._client_session_factory(read_stream, send_stream, read_timeout, None)
@@ MCPConnectionManager.launch_server signature
- client_session_factory: Callable[
- [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession],
+ client_session_factory: Callable[
+ [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, Optional[ServerSession]],
ClientSession,
],
@@ MCPConnectionManager.get_server signature
- client_session_factory: Callable[
- [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession],
- ClientSession,
- ] = MCPAgentClientSession,
+ client_session_factory: Callable[
+ [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, Optional[ServerSession]],
+ ClientSession,
+ ] = DEFAULT_CLIENT_SESSION_FACTORY,
@@ MCPConnectionManager.get_server_capabilities signature
- client_session_factory: Callable[
- [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession],
- ClientSession,
- ] = MCPAgentClientSession,
+ client_session_factory: Callable[
+ [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, Optional[ServerSession]],
+ ClientSession,
+ ] = DEFAULT_CLIENT_SESSION_FACTORY,
Add this wrapper (outside the changed ranges):
# Place near imports
def DEFAULT_CLIENT_SESSION_FACTORY(
read_stream: MemoryObjectReceiveStream,
write_stream: MemoryObjectSendStream,
read_timeout: timedelta | None,
upstream_session: Optional[ServerSession], # ignored in this default
) -> ClientSession:
return MCPAgentClientSession(read_stream, write_stream, read_timeout)
Also applies to: 289-291, 401-404, 447-449, 154-155
🏁 Script executed:
#!/bin/bash
# Verify no remaining 3-arg factories are used as defaults
rg -nP "=\s*MCPAgentClientSession\b" src/mcp_agent/mcp | sed -n '1,120p'
# Check all client_session_factory annotations are 4-arg Optional[ServerSession]
rg -nP "client_session_factory:\s*Callable\[\[" src/mcp_agent | sed -n '1,200p'
# Ensure all calls pass 4 args
rg -nP "_client_session_factory\([^)]*\)" src/mcp_agent -n -C1
Length of output: 1699
Fix default client_session_factory
usage in multiple modules
- Replace all default
client_session_factory=MCPAgentClientSession
(e.g. in mcp_connection_manager.py lines 403, 449; gen_client.py lines 23, 53; and in mcp_aggregator.py at eachclient_session_factory=MCPAgentClientSession
) withDEFAULT_CLIENT_SESSION_FACTORY
. - In
ServerConnection.create_session
(mcp_connection_manager.py line 154), call
self._client_session_factory(read_stream, send_stream, read_timeout, None)
instead of only three arguments. - Update every
client_session_factory: Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession], ClientSession]
annotation to useOptional[ServerSession]
for the fourth parameter. - Add the
DEFAULT_CLIENT_SESSION_FACTORY
wrapper near the imports to delegate toMCPAgentClientSession
and ignore the extraupstream_session
argument.
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/mcp_connection_manager.py around lines 68-70 and throughout
the module and related modules (gen_client.py and mcp_aggregator.py), replace
any default arguments of client_session_factory=MCPAgentClientSession with
client_session_factory=DEFAULT_CLIENT_SESSION_FACTORY; update the
ServerConnection.create_session call (around line 154) to pass four arguments by
calling self._client_session_factory(read_stream, send_stream, read_timeout,
None); change all Callable type annotations for client_session_factory from
Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta,
ServerSession], ClientSession] to use Optional[ServerSession] for the fourth
parameter; and add a DEFAULT_CLIENT_SESSION_FACTORY wrapper near the imports
that delegates to MCPAgentClientSession(read_stream, send_stream, read_timeout)
while accepting and ignoring the extra upstream_session parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/mcp_agent/mcp/mcp_server_registry.py (2)
156-163
: Make session construction resilient and DRY; fall back to 3-arg factory when needed.Some factories (e.g., mcp.ClientSession) accept only 3 args. Centralize creation and gracefully fall back to 3-arg call to avoid TypeError and duplicated code.
Apply this diff:
@@ - if config.transport == "stdio": + # Helper to build a session with a safe fallback to 3-arg signature. + def _build_session(rs, ws): + try: + return client_session_factory(rs, ws, read_timeout_seconds, ctx_with_session) + except TypeError: + return client_session_factory(rs, ws, read_timeout_seconds) + + if config.transport == "stdio": @@ - session = client_session_factory( - read_stream, - write_stream, - read_timeout_seconds, - ctx_with_session, - ) + session = _build_session(read_stream, write_stream) @@ - session = client_session_factory( - read_stream, - write_stream, - read_timeout_seconds, - ctx_with_session - ) + session = _build_session(read_stream, write_stream) @@ - session = client_session_factory( - read_stream, - write_stream, - read_timeout_seconds, - ctx_with_session - ) + session = _build_session(read_stream, write_stream) @@ - session = client_session_factory( - read_stream, - write_stream, - read_timeout_seconds, - ctx_with_session - ) + session = _build_session(read_stream, write_stream)Also applies to: 211-216, 251-256, 276-281
134-139
: Bug: timedelta() is using days, not seconds.timedelta(config.read_timeout_seconds) treats the value as days. Use seconds=.
Apply:
- read_timeout_seconds = ( - timedelta(config.read_timeout_seconds) + read_timeout_seconds = ( + timedelta(seconds=config.read_timeout_seconds) if config.read_timeout_seconds else None )
🧹 Nitpick comments (2)
src/mcp_agent/mcp/mcp_server_registry.py (2)
140-143
: Good: Deferred Context import resolves circular import risk.The local import is the right approach. Pair this with a TYPE_CHECKING-only import for annotations (see earlier diff) to keep linters quiet.
124-125
: Docstrings: Returns section mentions StdioServerParameters; these functions yield ClientSession contexts.Update the Returns description to reflect that the context managers yield a ClientSession.
Apply:
- Returns: - StdioServerParameters: The server parameters for stdio transport. + Returns: + Async generator yielding a ClientSession connected to the server.Also applies to: 314-316
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/mcp_agent/mcp/mcp_server_registry.py
(11 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/mcp/mcp_server_registry.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/utils/config.py : Configuration values such as quality_threshold, max_refinement_attempts, consolidation_interval, and evaluator_model_provider must be loaded from mcp_agent.config.yaml.
Applied to files:
src/mcp_agent/mcp/mcp_server_registry.py
🧬 Code graph analysis (1)
src/mcp_agent/mcp/mcp_server_registry.py (2)
src/mcp_agent/core/context.py (2)
mcp
(101-102)Context
(57-102)src/mcp_agent/app.py (4)
session_id
(174-175)upstream_session
(158-159)upstream_session
(162-163)context
(134-139)
🔇 Additional comments (2)
src/mcp_agent/mcp/mcp_server_registry.py (2)
91-92
: No functional change.Signature formatting is fine.
240-245
: Standardize SSE client timeout parameters. Confirm whether sse_client expects raw ints (seconds) or datetime.timedelta fortimeout
andsse_read_timeout
, and wrapconfig.http_timeout_seconds
andconfig.read_timeout_seconds
intimedelta(seconds=…)
if needed to match other transports and avoid silent mismatches.
client_session_factory: Callable[ | ||
[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, ServerSession | None], | ||
ClientSession, | ||
] = ClientSession, | ||
session_id: str | None = None, | ||
upstream_session: ServerSession | None = None, | ||
) -> AsyncGenerator[ClientSession, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix factory signature and default: 4th argument is Context, not ServerSession; align both methods and avoid a runtime TypeError.
- You pass a Context instance as the 4th arg to client_session_factory, but the type hints declare ServerSession | None. This is inconsistent and misleading.
- Defaulting the factory to ClientSession while always passing 4 args will raise TypeError if callers don’t override the factory.
Apply these diffs to align types and defaults across both methods:
@@
- client_session_factory: Callable[
- [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, ServerSession | None],
- ClientSession,
- ] = ClientSession,
+ client_session_factory: Callable[
+ [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, "Context" | None],
+ ClientSession,
+ ] = MCPAgentClientSession,
@@
- client_session_factory: Callable[
- [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession],
- ClientSession,
- ] = ClientSession,
+ client_session_factory: Callable[
+ [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, "Context" | None],
+ ClientSession,
+ ] = MCPAgentClientSession,
Additionally (outside the shown hunks), add a typing-only import to keep static checkers happy without reintroducing circular imports:
# near other imports
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mcp_agent.core.context import Context # noqa: F401
Also applies to: 298-305
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/mcp_server_registry.py around lines 110-116 (and similarly
at 298-305), the client_session_factory signature and default are incorrect: the
4th parameter passed is a Context instance but the type hint currently says
ServerSession | None and the default is ClientSession (a concrete class) which
will raise TypeError when called with four args. Update the Callable type hint
to accept Context | None as the 4th parameter (e.g.
Callable[[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None,
Context | None], ClientSession]) and change the default factory to a callable
that accepts four args (or set default to None and handle instantiation in the
function), and ensure both methods use the same corrected signature; also add a
typing-only import near other imports: from typing import TYPE_CHECKING followed
by if TYPE_CHECKING: from mcp_agent.core.context import Context # noqa: F401 to
avoid circular imports for static checkers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This warning seems legit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@saqadri You're absolutely right! This is indeed a legitimate issue that needs to be fixed. The type signature mismatch between what's declared (ServerSession | None
) and what's actually passed (Context
instance) creates both type checking issues and potential runtime errors.
Here's the corrected implementation:
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mcp_agent.core.context import Context # noqa: F401
# ... rest of imports ...
class ServerRegistry:
# ... other methods ...
@asynccontextmanager
async def start_server(
self,
server_name: str,
client_session_factory: Callable[
[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, "Context" | None],
ClientSession,
] = MCPAgentClientSession,
session_id: str | None = None,
upstream_session: ServerSession | None = None,
) -> AsyncGenerator[ClientSession, None]:
# ... method implementation ...
@asynccontextmanager
async def initialize_server(
self,
server_name: str,
client_session_factory: Callable[
[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, "Context" | None],
ClientSession,
] = MCPAgentClientSession,
init_hook: InitHookCallable = None,
session_id: str | None = None,
upstream_session: ServerSession | None = None,
) -> AsyncGenerator[ClientSession, None]:
# ... method implementation ...
The key fixes are:
- Type signature: Changed from
ServerSession | None
to"Context" | None
for the 4th parameter - Default factory: Changed from
ClientSession
toMCPAgentClientSession
(which accepts 4 parameters) - Typing import: Added proper TYPE_CHECKING import to avoid circular imports
- Consistency: Made both methods use the same corrected signature
This resolves both the type checking issue and prevents the runtime TypeError that would occur when the default ClientSession
constructor is called with 4 arguments.
🐰
╭─────╮
│ ○ ○ │ nods thoughtfully
│ ⌒ │ Good catch on that type mismatch!
╰─────╯
✏️ Learnings added
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:09.951Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great progress! I have a few fundamental suggestions for the sampling_handler. The original version @StreetLamb wrote was written before I had implemented factory.py
and TokenCounter
. I think we should utilize those plus ModelSelector
(which already works based on the ModelPreferences type to simplify this.
The main things that prevent this from landing as-is:
- Context being created on the fly (only MCPApp and ContextDependent typically create it)
- Setting active_llm, which seems brittle since the last constructed agent wins.
When the agent is acting as an mcp client (i.e. handle_sampling_with_human_approval
) we need to decide if:
- we allow dynamic creation of an AugmentedLLM via
factory.py
to meet the sampling request - we filter to just the available agents explcitly created
I would suggest keeping it simple, and documenting the expected behavior. It could be that by default we just try to use ModelSelector
to find the best available LLM that the user requested and try to use that (i.e. use the factory.py).
Perhaps if a caller specified hints
in the sampling request, we could use that to filter to an available LLM that was configured by an agent (i.e. replace active_llm
with available_llms
).
And for any token tracking etc, we should rely on the new TokenCounter class
client_session_factory: Callable[ | ||
[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, ServerSession | None], | ||
ClientSession, | ||
] = ClientSession, | ||
session_id: str | None = None, | ||
upstream_session: ServerSession | None = None, | ||
) -> AsyncGenerator[ClientSession, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This warning seems legit
# Record the upstream session (if any) in the context | ||
from mcp_agent.core.context import Context | ||
ctx_with_session = Context(upstream_session=upstream_session) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just thinking through the consequences of creating our own Context object here, vs. either making ServerRegistry a ContextDependent
, or simply passing the upstream_session
directly into the client_session_factory
.
Any thoughts? The idea for Context is there is one Context object that is passed through the lifetime of the MCPApp to anything that needs Context -- it doesn't get created by anyone other than MCPApp
or if unspecified, loaded as a global by ContextDependent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Been pondering both options this morning, and I don't think making the ServerRegistry ContextDependent
makes sense. Different workflows will have unique upstream_session
s, so storing it in a global context doesn't work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Different workflows will have unique upstream_sessions
Are you thinking of the case where the client reconnects? The ServerRegistry
is 1:1 with the MCPApp
, so in theory should always have a single upstream_session, but I haven't thought about it in too much detail, so would love to get your thoughts! I think keeping it to passing in the upstream_session is also pretty clean so based on your thinking here I'm good with either approach
src/mcp_agent/agents/agent.py
Outdated
|
||
# Ensure a context exists before updating active LLM | ||
if self.context is None: | ||
# Fall back to global context for convenience; callers can also set agent.context explicitly | ||
from mcp_agent.core.context import get_current_context | ||
self.context = get_current_context() | ||
|
||
# Prefer a helper if/when Context switches to task-local binding | ||
if hasattr(self.context, "set_active_llm"): | ||
self.context.set_active_llm(self.llm) | ||
else: | ||
self.context.active_llm = self.llm | ||
|
||
self.context.active_llm = self.llm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, this is a bit concerning because Context
is global to the MCPApp
, so any agent that is instantiated will set the "active_llm", which means the last agent that was instantiated is set as the "active_llm".
I actually think we can remove the concept of active_llm entirely and instead use the ModelSelector
class to get the LLM that best matches the user's preferences.
Then when a sampling request comes in, sampling_handler can use factory.py
to construct an AugmentedLLM with the right configuration to handle the sampling request.
If we want to limit sampling to JUST the agents that have been created, we can have context keep track of a Set/List of available_llms
which can be used to filter to the closest one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
src/mcp_agent/executor/workflow.py (1)
209-211
: Use asyncio.CancelledError and not concurrent.futures.CancelledError.Mixing the two leads to incorrect cancellation semantics.
- import asyncio - from concurrent.futures import CancelledError + import asyncio @@ - raise CancelledError("Workflow was cancelled") + raise asyncio.CancelledError("Workflow was cancelled") @@ - except CancelledError: + except asyncio.CancelledError: # Handle cancellation gracefully self._logger.info( f"Workflow {self.name} (ID: {self._run_id}) was cancelled" ) self.update_status("cancelled") raiseAlso applies to: 311-317, 295-298
src/mcp_agent/server/app_server.py (2)
336-356
: Bug: resume/cancel tools don’t return a boolean.Both functions promise a bool but return None. Return the operation result.
Apply:
@@ - if result: + if result: logger.debug( f"Signaled workflow {workflow_name} with ID {run_id} with signal '{signal_name}' and payload '{payload}'" ) else: logger.error( f"Failed to signal workflow {workflow_name} with ID {run_id} with signal '{signal_name}' and payload '{payload}'" ) + return bool(result) @@ - if result: + if result: logger.debug(f"Cancelled workflow {workflow_name} with ID {run_id}") else: logger.error(f"Failed to cancel workflow {workflow_name} with ID {run_id}") + return bool(result)Also applies to: 377-385
221-235
: Schema accuracy: exclude “self” from workflows-list run_parameters too.list_workflows exposes run_fn_tool.parameters with “self” still present for class methods, which confuses clients. Mirror the stripping you do in per-workflow tools.
- run_fn_tool = FastTool.from_function(workflow_cls.run) + run_fn_tool = FastTool.from_function(workflow_cls.run) + if "self" in run_fn_tool.parameters.get("properties", {}): + del run_fn_tool.parameters["properties"]["self"] + req = run_fn_tool.parameters.get("required", []) + if "self" in req: + req.remove("self") + run_fn_tool.parameters["required"] = reqsrc/mcp_agent/mcp/mcp_server_registry.py (2)
134-138
: Bug: read_timeout_seconds uses days instead of secondstimedelta(x) interprets x as days. This will blow up timeouts. Use seconds keyword.
- read_timeout_seconds = ( - timedelta(config.read_timeout_seconds) + read_timeout_seconds = ( + timedelta(seconds=config.read_timeout_seconds) if config.read_timeout_seconds else None )
141-145
: Fix validation and message for stdio transportOnly command is strictly required; current check/message imply both command and args.
- if not config.command and not config.args: - raise ValueError( - f"Command and args are required for stdio transport: {server_name}" - ) + if not config.command: + raise ValueError( + f"Command is required for stdio transport: {server_name}" + )
♻️ Duplicate comments (3)
examples/mcp/mcp_sampling/demo_server.py (1)
27-33
: Fix NameError/KeyError when secrets are absent (initialize and use .get).openai_secret is undefined if no secrets file is found, and yaml_secrets["openai"] can KeyError. Initialize to None and use .get().
-secrets_file = Settings.find_secrets() -if secrets_file and secrets_file.exists(): - with open(secrets_file, "r", encoding="utf-8") as f: - yaml_secrets = yaml.safe_load(f) or {} - openai_secret = yaml_secrets["openai"] +secrets_file = Settings.find_secrets() +openai_secret = None +if secrets_file and secrets_file.exists(): + with open(secrets_file, "r", encoding="utf-8") as f: + yaml_secrets = yaml.safe_load(f) or {} + openai_secret = yaml_secrets.get("openai") @@ - openai=openai_secret + openai=openai_secretAlso applies to: 45-46
src/mcp_agent/server/app_server.py (1)
421-429
: Harden removal of “self” from tool schema.Indexing parameters["properties"] can KeyError. Use .get(...) like we discussed previously.
Apply:
- if "self" in run_fn_tool.parameters["properties"]: - del(run_fn_tool.parameters["properties"]["self"]) + if "self" in run_fn_tool.parameters.get("properties", {}): + del run_fn_tool.parameters["properties"]["self"] req = run_fn_tool.parameters.get("required", []) if "self" in req: req.remove("self") run_fn_tool.parameters["required"] = reqsrc/mcp_agent/mcp/mcp_server_registry.py (1)
108-116
: Align client_session_factory signature and default; avoid runtime TypeError
- Defaulting to ClientSession while passing upstream_session will likely error.
- start_server and initialize_server have divergent types (timedelta | None vs timedelta; ServerSession | None vs ServerSession).
- Make both consistent and default to MCPAgentClientSession which accepts the extra arg.
@@ - client_session_factory: Callable[ - [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, ServerSession | None], - ClientSession, - ] = ClientSession, + client_session_factory: Callable[ + [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, ServerSession | None], + ClientSession, + ] = MCPAgentClientSession, @@ - client_session_factory: Callable[ - [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession], - ClientSession, - ] = ClientSession, + client_session_factory: Callable[ + [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, ServerSession | None], + ClientSession, + ] = MCPAgentClientSession, + init_hook: InitHookCallable | None = None,If the design decision is to pass Context instead, update both signatures to use "Context | None" and adjust call sites accordingly.
Also applies to: 292-300
🧹 Nitpick comments (11)
examples/mcp/mcp_sampling/demo_server.py (2)
82-89
: Handle missing OpenAI config and surface clearer errors.If openai_secret is None, generation will fail. Add a guard with an informative log or early error.
async with haiku_agent: llm = await haiku_agent.attach_llm(OpenAIAugmentedLLM) - + if settings.openai is None: + logger.warning("OpenAI configuration not found; haiku generation will likely fail.") + # Optionally: raise RuntimeError("OpenAI configuration missing")
103-104
: Remove redundant dictionary splat.The **({}) adds no value.
- mcp_server = create_mcp_server_for_app(agent_app, **({})) + mcp_server = create_mcp_server_for_app(agent_app)tests/test_app.py (1)
18-33
: Upstream session mock: ensure compatibility with ContextVar-based design.If any tests start exercising upstream_session, switch the mock to emulate a ContextVar with .set/.get/.reset to avoid attribute errors.
If needed, I can provide a small helper stub for a ContextVar-like mock.
src/mcp_agent/server/app_server.py (1)
513-545
: Upstream session injection: confirm downstream handling.You inject "__mcp_agent_upstream_session" into run_parameters. Verify that:
- workflow.run_async ultimately drops/consumes this kwarg and doesn’t pass it into user-defined run(...) (to avoid TypeError).
- Sensitive objects aren’t accidentally logged/serialized.
If available, prefer setting app_context.upstream_session.set(upstream_session) before the run and avoiding kwargs altogether.
- if upstream_session: - run_parameters["__mcp_agent_upstream_session"] = upstream_session + if upstream_session: + # Prefer per-context propagation to avoid leaking kwargs to user code + app_context.upstream_session.set(upstream_session)src/mcp_agent/mcp/mcp_agent_client_session.py (2)
96-100
: Be careful setting a ContextVar at init-time.Until Context uses a per-instance ContextVar (see my context.py comment), this set() can bleed across sessions sharing the same Context. After fixing Context, this is fine. If multiple client sessions share a Context concurrently, consider keeping self._upstream_session and reading from it directly.
- self.context.upstream_session.set(upstream_session) + self._upstream_session = upstream_session + if upstream_session is not None: + self.context.upstream_session.set(upstream_session)
349-361
: Log upstream failures for observability.Currently exceptions only return ErrorData. Log the exception for diagnostics.
except Exception as e: - return ErrorData(code=-32603, message=str(e)) + logger.error(f"Upstream sampling failed: {e}") + return ErrorData(code=-32603, message=str(e))src/mcp_agent/mcp/sampling_handler.py (2)
199-203
: Prefer structured exception logging over printing tracebacks at info.Use logger.error/exception with the caught exception rather than dumping traceback at info level.
- import traceback - logger.info(traceback.format_exc()) - logger.error(f"Unexpected error calling LLM: {e}") + logger.error(f"Unexpected error calling LLM: {e}")
205-214
: Dead code: _create_provider_instance is unused.Remove to reduce noise.
- def _create_provider_instance( - self, provider_class: type[AugmentedLLM] - ) -> AugmentedLLM | None: - """Create a minimal LLM instance for direct calls""" - try: - return provider_class(context=self.context) - except Exception as e: - logger.error(f"Failed to create provider instance: {e}") - return Nonesrc/mcp_agent/mcp/mcp_server_registry.py (3)
333-341
: Typo: intialization_callback → initialization_callbackRename for clarity; no behavior change.
- intialization_callback = ( + initialization_callback = ( init_hook if init_hook is not None else self.init_hooks.get(server_name) ) - if intialization_callback: + if initialization_callback: logger.info(f"{server_name}: Executing init hook") - intialization_callback(session, config.auth) + initialization_callback(session, config.auth)
306-315
: Docstrings: incorrect Returns sectionThese context managers yield a ClientSession; they don’t return StdioServerParameters.
@@ - Returns: - StdioServerParameters: The server parameters for stdio transport. + Yields: + ClientSession: Connected client session context manager.Apply similarly to initialize_server’s docstring.
Also applies to: 120-128
340-344
: Consider handling init hook failuresInit hook returns bool per type alias but result is ignored. Optionally log/abort on False.
- if initialization_callback: + if initialization_callback: logger.info(f"{server_name}: Executing init hook") - initialization_callback(session, config.auth) + ok = initialization_callback(session, config.auth) + if ok is False: + logger.warning(f"{server_name}: Init hook reported failure")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (10)
examples/mcp/mcp_sampling/demo_server.py
(1 hunks)src/mcp_agent/agents/agent.py
(1 hunks)src/mcp_agent/app.py
(0 hunks)src/mcp_agent/core/context.py
(2 hunks)src/mcp_agent/executor/workflow.py
(3 hunks)src/mcp_agent/mcp/mcp_agent_client_session.py
(5 hunks)src/mcp_agent/mcp/mcp_server_registry.py
(10 hunks)src/mcp_agent/mcp/sampling_handler.py
(1 hunks)src/mcp_agent/server/app_server.py
(6 hunks)tests/test_app.py
(23 hunks)
💤 Files with no reviewable changes (1)
- src/mcp_agent/app.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/mcp_agent/agents/agent.py
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-28T15:07:09.951Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:09.951Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.
Applied to files:
src/mcp_agent/core/context.py
src/mcp_agent/mcp/mcp_server_registry.py
src/mcp_agent/server/app_server.py
src/mcp_agent/mcp/mcp_agent_client_session.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/mcp/sampling_handler.py
src/mcp_agent/mcp/mcp_server_registry.py
src/mcp_agent/server/app_server.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/utils/config.py : Configuration values such as quality_threshold, max_refinement_attempts, consolidation_interval, and evaluator_model_provider must be loaded from mcp_agent.config.yaml.
Applied to files:
src/mcp_agent/mcp/mcp_server_registry.py
🧬 Code graph analysis (4)
src/mcp_agent/executor/workflow.py (1)
src/mcp_agent/app.py (1)
context
(130-135)
src/mcp_agent/mcp/sampling_handler.py (5)
src/mcp_agent/core/context.py (2)
mcp
(103-104)Context
(58-104)src/mcp_agent/core/context_dependent.py (1)
ContextDependent
(9-48)src/mcp_agent/workflows/llm/augmented_llm.py (4)
AugmentedLLM
(224-812)RequestParams
(124-169)get_request_params
(376-397)select_model
(330-374)src/mcp_agent/human_input/types.py (1)
HumanInputRequest
(7-29)src/mcp_agent/workflows/factory.py (1)
create_llm
(68-86)
examples/mcp/mcp_sampling/demo_server.py (4)
src/mcp_agent/server/app_server.py (4)
app
(79-81)create_mcp_server_for_app
(146-389)workflows
(84-86)run
(445-449)src/mcp_agent/app.py (7)
MCPApp
(39-632)config
(138-139)workflows
(154-155)workflow
(375-408)logger
(166-170)workflow_run
(460-500)run
(350-373)src/mcp_agent/config.py (2)
MCPServerSettings
(51-108)find_secrets
(637-639)src/mcp_agent/agents/agent.py (1)
attach_llm
(156-199)
src/mcp_agent/mcp/mcp_agent_client_session.py (2)
src/mcp_agent/mcp/sampling_handler.py (2)
SamplingHandler
(35-337)handle_sampling_with_human_approval
(41-73)src/mcp_agent/core/context_dependent.py (1)
ContextDependent
(9-48)
🔇 Additional comments (8)
tests/test_app.py (2)
131-132
: Formatting and fixture adjustments look good.Minor indent/patch cleanups are fine and keep tests readable.
Also applies to: 196-197, 208-209, 224-225, 306-307, 323-324, 333-334, 343-344, 353-354, 368-373, 406-407, 430-431, 459-460, 492-493, 551-552, 580-581, 606-611, 634-635, 646-647, 657-658, 701-703
1-10
: Update sanity check script to install the package and dev dependencies before lint and testsEnsure the project itself and its development dependencies are installed (e.g. editable install with extras or via a requirements-dev file), then run lint and tests:
python -m pip install -e .[dev] ruff check . python -m pytest -q
src/mcp_agent/core/context.py (1)
14-14
: Import path update looks correct.Switching to mcp.server.session.ServerSession aligns with upstream API.
src/mcp_agent/server/app_server.py (2)
283-283
: Good: Upstream session propagation is threaded into workflow runs.Passing ctx.session through to _workflow_run enables nested calls. Ensure MCPContext reliably exposes session across MCP versions.
449-449
: Good: Per-workflow run tool also threads ctx.session.Matches the general workflows-run path.
src/mcp_agent/mcp/mcp_agent_client_session.py (1)
341-346
: OK: clear split between local sampling vs. upstream pass-through.Human-in-the-loop path via SamplingHandler is a sensible default.
src/mcp_agent/mcp/mcp_server_registry.py (2)
236-241
: Skip converting SSE timeouts to timedelta
Thesse_client
API expects numeric seconds (float or int) for bothtimeout
andsse_read_timeout
, so passing the raw ints fromconfig
is correct.Likely an incorrect or invalid review comment.
152-158
: Default factory supportsupstream_session
—no changes needed.
command="uv", | ||
args=["run", "nested_server.py"], | ||
description="nested server providing a haiku generator" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Make nested_server path robust.
Use a path relative to this file to avoid CWD-related failures.
+from pathlib import Path
@@
- args=["run", "nested_server.py"],
+ args=["run", str(Path(__file__).parent / "nested_server.py")],
Also applies to: 8-8
🤖 Prompt for AI Agents
In examples/mcp/mcp_sampling/demo_server.py around lines 39 to 41, the args
currently pass "nested_server.py" which is CWD-dependent and can fail; change to
compute the nested_server path relative to this file (use
os.path.dirname(__file__) and join with "nested_server.py") and pass that
absolute path in the args (import os at top if missing) so the subprocess uses a
robust file path; apply the same change at the other occurrence on line 8.
src/mcp_agent/core/context.py
Outdated
# Use a ContextVar to store the upstream session, so different invocations do not clash | ||
upstream_session: ContextVar[ServerSession | None] = ContextVar( | ||
"upstream_session", default=None | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: ContextVar instance is shared across all Context objects (cross-context leakage).
Defining ContextVar at class definition time makes a single variable shared by every Context instance. A set() in one context can affect others running in the same task. Create a fresh ContextVar per Context via default_factory.
Apply this diff, and add the needed import:
@@
- # Use a ContextVar to store the upstream session, so different invocations do not clash
- upstream_session: ContextVar[ServerSession | None] = ContextVar(
- "upstream_session", default=None
- )
+ # Use a per-instance ContextVar so different Contexts and invocations do not clash
+ upstream_session: ContextVar[ServerSession | None] = Field(
+ default_factory=lambda: ContextVar("mcp_agent.upstream_session", default=None)
+ )
Add import (outside the selected lines):
-from pydantic import BaseModel, ConfigDict
+from pydantic import BaseModel, ConfigDict, Field
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Use a ContextVar to store the upstream session, so different invocations do not clash | |
upstream_session: ContextVar[ServerSession | None] = ContextVar( | |
"upstream_session", default=None | |
) | |
# Add Field to the imports | |
from pydantic import BaseModel, ConfigDict, Field | |
class Context(BaseModel): | |
- # Use a ContextVar to store the upstream session, so different invocations do not clash | |
- upstream_session: ContextVar[ServerSession | None] = ContextVar( | |
- "upstream_session", default=None | |
# Use a per-instance ContextVar so different Contexts and invocations do not clash | |
upstream_session: ContextVar[ServerSession | None] = Field( | |
default_factory=lambda: ContextVar("mcp_agent.upstream_session", default=None) | |
) |
🤖 Prompt for AI Agents
In src/mcp_agent/core/context.py around lines 92-96, the class-level ContextVar
upstream_session is shared across all Context instances causing cross-context
leakage; remove the class-level declaration and instead create a new ContextVar
per instance in the Context.__init__ (e.g., self.upstream_session =
ContextVar("upstream_session", default=None)), and ensure you have the
ContextVar import at the top of the file (from contextvars import ContextVar).
src/mcp_agent/executor/workflow.py
Outdated
provided_upstream_session = kwargs.pop("__mcp_agent_upstream_session", None) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Don’t overwrite upstream_session when not provided; reset ContextVar after run.
Setting None clobbers inherited context; also ensure reset to avoid leakage within the task.
- provided_upstream_session = kwargs.pop("__mcp_agent_upstream_session", None)
+ provided_upstream_session = kwargs.pop("__mcp_agent_upstream_session", None)
@@
- async def _execute_workflow():
+ async def _execute_workflow():
try:
+ token = None
@@
- else:
- self.context.upstream_session.set(provided_upstream_session)
+ else:
+ if provided_upstream_session is not None:
+ token = self.context.upstream_session.set(provided_upstream_session)
run_task = asyncio.create_task(self.run(*args, **kwargs))
cancel_task = asyncio.create_task(self._cancel_task())
tasks.extend([run_task, cancel_task])
@@
finally:
try:
# Pop token context if we pushed it
+ if token is not None:
+ try:
+ self.context.upstream_session.reset(token)
+ except Exception:
+ pass
if (
pushed_token_context
and self.context
and self.context.token_counter
):
Also applies to: 249-283, 339-346
🤖 Prompt for AI Agents
In src/mcp_agent/executor/workflow.py around lines 218-219 (and similarly for
ranges 249-283 and 339-346): currently
kwargs.pop("__mcp_agent_upstream_session", None) unconditionally assigns None
which clobbers an inherited ContextVar; instead, only extract the special kwarg
if present (use kwargs.pop only when key in kwargs or check for None) and, when
you do set the ContextVar, capture the token returned by ContextVar.set(...) and
ensure you call ContextVar.reset(token) in a finally block after the run to
avoid cross-task leakage; apply this same conditional-set + token/reset pattern
to the other indicated blocks.
llm = create_llm(agent_name="sampling", | ||
server_names=[], | ||
instruction=None, | ||
provider=model_info.provider, | ||
model=model_info.name, | ||
request_params=None, | ||
context=None) | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Pass the active Context into create_llm.
You’re creating the provider with context=None, losing app settings, tracing, token counters, server access, etc. Use self.context.
- llm = create_llm(agent_name="sampling",
- server_names=[],
- instruction=None,
- provider=model_info.provider,
- model=model_info.name,
- request_params=None,
- context=None)
+ llm = create_llm(
+ agent_name="sampling",
+ server_names=[],
+ instruction=None,
+ provider=model_info.provider,
+ model=model_info.name,
+ request_params=None,
+ context=self.context,
+ )
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
llm = create_llm(agent_name="sampling", | |
server_names=[], | |
instruction=None, | |
provider=model_info.provider, | |
model=model_info.name, | |
request_params=None, | |
context=None) | |
else: | |
llm = create_llm( | |
agent_name="sampling", | |
server_names=[], | |
instruction=None, | |
provider=model_info.provider, | |
model=model_info.name, | |
request_params=None, | |
context=self.context, | |
) | |
else: |
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/sampling_handler.py around lines 169 to 176, the create_llm
call is passing context=None which loses app settings, tracing, token counters
and server access; replace the None with the active context (self.context) so
the LLM is constructed with the current execution context (i.e., call
create_llm(..., context=self.context)) and ensure any upstream variable names
remain consistent.
if server_session: | ||
logger.info("Passing sampling request to upstream server session") | ||
print("XX Passing sampling request to upstream server session") | ||
else: | ||
logger.info("No upstream server session, handling sampling locally") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic Issue: There's a redundant condition in the sampling request handler. The code first checks if server_session:
and logs a message, then immediately checks if server_session is None:
in the next block. This creates unreachable code paths.
Consider restructuring to a single conditional block:
if server_session is None:
logger.info("No upstream server session, handling sampling locally")
# Handle locally...
else:
logger.info("Passing sampling request to upstream server session")
# Pass to upstream...
This would make the control flow clearer and eliminate the redundant condition check.
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
client_session_factory: Callable[ | ||
[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], | ||
ClientSession, | ||
] = ClientSession, | ||
session_id: str | None = None, | ||
context: Optional["Context"] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type signature for client_session_factory
is inconsistent with how it's used throughout the codebase. The signature shows three parameters:
[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None]
However, the actual calls pass four parameters, including a ServerSession
:
[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession]
This mismatch will cause runtime errors when the function is called. The signature should be updated to match the actual implementation:
[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession]
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
@@ -60,6 +20,53 @@ def echo(message: str) -> str: | |||
return f"Prompt: {message}" | |||
|
|||
|
|||
@mcp.resource("demo://data/friends") | |||
def get_users(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function name get_users()
doesn't match its purpose or registration. It's registered as a resource for demo://data/friends
and returns friend data, but the function name suggests it retrieves user data. Consider renaming this function to get_friends()
to maintain semantic consistency between the function name, its registration path, and the data it returns.
def get_users(): | |
def get_friends(): |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
src/mcp_agent/mcp/mcp_agent_client_session.py (2)
339-374
: Fix broken local-sampling error path; remove prints and stale comment.Local branch swallows exceptions and returns None; also includes debug prints and an obsolete comment. This breaks the JSON-RPC contract and CI logging.
Apply:
- logger.info(f"Handling sampling request: {params}") - server_session = self.context.upstream_session - if server_session: - logger.info("Passing sampling request to upstream server session") - print("XX Passing sampling request to upstream server session") - else: - logger.info("No upstream server session, handling sampling locally") - - if server_session is None: - # Enhanced sampling with human approval workflow - print("XX No upstream server session, handling sampling locally") - try: - # handle_sampling() missing 2 required positional arguments: 'request_context' and 'params' - return await self._sampling_handler.handle_sampling( - context=self.context, - params=params) - except: - import traceback - print(f"XX Error in local sampling handler: {traceback.format_exc()}") - else: - try: - # If a server_session is available, we'll pass-through the sampling request to the upstream client - result = await server_session.send_request( - request=ServerRequest( - CreateMessageRequest( - method="sampling/createMessage", params=params - ) - ), - result_type=CreateMessageResult, - ) - - # Pass the result from the upstream client back to the server. We just act as a pass-through client here. - return result - except Exception as e: - return ErrorData(code=-32603, message=str(e)) + logger.info(f"Handling sampling request: {params}") + server_session = getattr(self.context, "upstream_session", None) + if server_session is None: + try: + return await self._sampling_handler.handle_sampling( + context=self.context, params=params + ) + except Exception as e: + logger.exception("Local sampling failed") + return ErrorData(code=-32603, message=f"Local sampling failed: {e}") + else: + logger.info("Passing sampling request to upstream server session") + try: + result = await server_session.send_request( + request=ServerRequest( + CreateMessageRequest( + method="sampling/createMessage", params=params + ) + ), + result_type=CreateMessageResult, + ) + return result + except Exception as e: + logger.exception("Upstream sampling pass-through failed") + return ErrorData(code=-32603, message=str(e))
415-428
: Guard server_config correctly to avoid AttributeError.
hasattr(self, "server_config") and self.server_config.roots
will access.roots
on None.- if hasattr(self, "server_config") and self.server_config.roots: + if self.server_config and self.server_config.roots:src/mcp_agent/mcp/mcp_server_registry.py (1)
138-142
: Use seconds in timedelta; current code treats seconds as days.This inflates timeouts massively.
- read_timeout_seconds = ( - timedelta(config.read_timeout_seconds) + read_timeout_seconds = ( + timedelta(seconds=config.read_timeout_seconds) if config.read_timeout_seconds else None )src/mcp_agent/server/app_server.py (1)
106-114
: Fix AttributeError: use self.workflows (not self.context.workflows).Context doesn’t expose a workflows dict; this will crash at runtime.
- def register_workflow(self, workflow_name: str, workflow_cls: Type[Workflow]): + def register_workflow(self, workflow_name: str, workflow_cls: Type[Workflow]): """Register a workflow class.""" - if workflow_name not in self.context.workflows: + if workflow_name not in self.workflows: self.workflows[workflow_name] = workflow_cls
♻️ Duplicate comments (5)
src/mcp_agent/mcp/mcp_server_registry.py (2)
110-120
: Fix factory signature/default and pass Context consistently.Current signature advertises 3 args and defaults to
ClientSession
, but code calls withcontext=...
(4th arg), causing TypeError. Align with the constructor that accepts context and update the Callable.- async def start_server( - self, - server_name: str, - client_session_factory: Callable[ - [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], - ClientSession, - ] = ClientSession, - session_id: str | None = None, - context: Optional["Context"] = None, + async def start_server( + self, + server_name: str, + client_session_factory: Callable[ + [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, "Context" | None], + ClientSession, + ] = MCPAgentClientSession, + session_id: str | None = None, + context: Optional["Context"] = None,
294-305
: Make initialize_server’s factory signature/default match start_server.Keep both methods consistent to avoid caller confusion and type errors.
- async def initialize_server( - self, - server_name: str, - client_session_factory: Callable[ - [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession], - ClientSession, - ] = ClientSession, + async def initialize_server( + self, + server_name: str, + client_session_factory: Callable[ + [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, "Context" | None], + ClientSession, + ] = MCPAgentClientSession, init_hook: InitHookCallable = None, session_id: str | None = None, context: Optional["Context"] = None,src/mcp_agent/mcp/gen_client.py (2)
21-25
: Default factory incompatible with new 4-arg signature (will raise TypeError).MCPAgentClientSession doesn’t accept the upstream_session arg. Provide a wrapper default and make the 3rd/4th params optional to align with call sites.
- client_session_factory: Callable[ - [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession], - ClientSession, - ] = MCPAgentClientSession, + client_session_factory: Callable[ + [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, ServerSession | None], + ClientSession, + ] = DEFAULT_CLIENT_SESSION_FACTORY,Add this helper at module scope:
def DEFAULT_CLIENT_SESSION_FACTORY( read_stream: MemoryObjectReceiveStream, write_stream: MemoryObjectSendStream, read_timeout: timedelta | None, upstream_session: ServerSession | None, ) -> ClientSession: # Ignore upstream_session by default; advanced factories may use it. return MCPAgentClientSession(read_stream, write_stream, read_timeout)
51-55
: Keep connect() typing consistent with the wrapper default.Mirror the optional timeout/upstream types and the same default.
- client_session_factory: Callable[ - [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession], - ClientSession, - ] = MCPAgentClientSession, + client_session_factory: Callable[ + [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, ServerSession | None], + ClientSession, + ] = DEFAULT_CLIENT_SESSION_FACTORY,src/mcp_agent/mcp/sampling_handler.py (1)
209-216
: Pass active Context to create_llm (prevents loss of config/tracing).This was flagged earlier; still passing None here.
- llm = create_llm(agent_name="sampling", - server_names=[], - instruction=None, - provider=model_info.provider, - model=model_info.name, - request_params=None, - context=None) + llm = create_llm( + agent_name="sampling", + server_names=[], + instruction=None, + provider=model_info.provider, + model=model_info.name, + request_params=None, + context=self.context, + )
🧹 Nitpick comments (22)
examples/mcp/mcp_sampling/asyncio/mcp_agent.config.yaml (1)
5-13
: Log file pattern is fine; ensure directory exists at runtime.If logs/ doesn’t exist, file transport may fail depending on logger behavior. Consider documenting or creating the directory on startup.
examples/mcp/mcp_sampling/asyncio/main.py (1)
36-36
: Minor: avoid redundant f-string.definition is already a string.
- logger.info(f"{definition}") + logger.info(definition)examples/mcp/mcp_sampling/temporal/demo_server_sse_worker.py (1)
31-36
: Optional: wrap worker.run with graceful shutdown logging.Improves observability when Ctrl-C or worker exits.
- async with create_temporal_worker_for_app(app) as worker: - await worker.run() + async with create_temporal_worker_for_app(app) as worker: + try: + await worker.run() + except asyncio.CancelledError: + logger.info("Worker cancelled, shutting down...")examples/mcp/mcp_sampling/asyncio/nested_server.py (1)
33-36
: Handle non-text content and errors explicitly.Consider logging unexpected content to aid debugging, similar to the Temporal variant.
- else: - return "Haiku generation failed, unexpected content type." + else: + # Optional: add logging here if a logger is available + return "Haiku generation failed, unexpected content type."examples/mcp/mcp_sampling/temporal/nested_server.py (1)
14-18
: Log path points to asyncio directory in a temporal sample.Cosmetic but confusing; consider using temporal/logs for consistency.
- path_pattern="asyncio/logs/nested_server-{unique_id}.jsonl", + path_pattern="temporal/logs/nested_server-{unique_id}.jsonl",examples/mcp/mcp_sampling/temporal/mcp_agent.config.yaml (2)
3-3
: Engine name vs folder name may confuse readers.execution_engine: "asyncio" inside a temporal sample folder might mislead. If intentional (client only uses SSE), add a brief comment; else switch to "temporal".
-execution_engine: "asyncio" +# Using asyncio here because this is an SSE client for a Temporal-backed server +execution_engine: "asyncio"
16-18
: Confirm SSE URL and port match the server.Examples elsewhere use 0.0.0.0:8000; here it’s localhost:8000. Align to avoid connection surprises.
- url: http://localhost:8000/sse + url: http://0.0.0.0:8000/sseexamples/mcp/mcp_sampling/temporal/client.py (2)
33-41
: Be consistent: log messages say BasicAgentWorkflow but you’re calling HaikuWorkflow.Rename the log strings to match the tool IDs you call.
- # Call the BasicAgentWorkflow + # Call the HaikuWorkflow - logger.info(f"{type(server)}") + logger.info(f"{type(server)}")
116-126
: _tool_result_to_json: broaden robustness.Also handle empty content safely.
- if tool_result.content and len(tool_result.content) > 0: + if getattr(tool_result, "content", None) and len(tool_result.content) > 0: text = tool_result.content[0].text try: # Try to parse the response as JSON if it's a string import json return json.loads(text) except (json.JSONDecodeError, TypeError): # If it's not valid JSON, just use the text return None + return Noneexamples/mcp/mcp_sampling/asyncio/demo_server.py (1)
47-51
: Make nested server path robust across working directories.Optional, but avoids failures when CWD ≠ file dir.
+from pathlib import Path @@ - "haiku_server": MCPServerSettings( - command="uv", - args=["run", "nested_server.py"], + "haiku_server": MCPServerSettings( + command="uv", + args=["run", str(Path(__file__).with_name("nested_server.py"))],src/mcp_agent/mcp/mcp_server_registry.py (3)
156-163
: Remove stray comment and keep explicit context kwarg.Minor clean-up; keeps call aligned with new factory signature.
- session = client_session_factory( + session = client_session_factory( read_stream, write_stream, read_timeout_seconds, - context=context # XX WHICH CONTEXT? + context=context )
36-40
: Drop unnecessary ServerSession import once factory uses Context.Prevents lint F401 and clarifies the API.
-from mcp.server.session import ServerSession - -if TYPE_CHECKING: - from mcp_agent.core.context import Context +if TYPE_CHECKING: + from mcp_agent.core.context import Context # noqa: F401
124-132
: Docstring return type is wrong (mentions StdioServerParameters).These methods yield ClientSession, not server parameters.
Update the “Returns” section accordingly in both methods.
src/mcp_agent/server/app_server.py (4)
5-13
: Clean up duplicate/unused imports to fix lint failures.Duplicate asyncio import and unused pydantic/from_json imports.
-import asyncio +import asyncio @@ -from pydantic import BaseModel, ValidationError -from pydantic_core._pydantic_core import from_json +# (removed unused pydantic imports)
21-23
: Remove unused mcp.types imports (only referenced in commented code).These trigger lint errors.
-from mcp.server.session import ServerSession -from mcp.types import ModelPreferences, SamplingMessage, TextContent +from mcp.server.session import ServerSession
340-341
: Replace debug prints with logger.debug to avoid noisy stdout.Keeps logs consistent and controllable via level.
- print("Installing internal routes") + logger.debug("Installing internal routes") @@ - print(f"XX /internal/session/by-run/{execution_id}/request") - print(f"XX {method} {params}") + logger.debug("Relay request exec_id=%s method=%s params=%s", execution_id, method, params) @@ - print(f"XX sampling/createMessage: {params}") + logger.debug("sampling/createMessage params=%s", params)Also applies to: 452-454, 467-467
460-516
: Restrict relayable methods to a whitelist.Generic passthrough to rpc.request accepts arbitrary method names. Consider restricting to allowed methods to reduce blast radius if the gateway token leaks.
src/mcp_agent/mcp/sampling_handler.py (4)
202-206
: Use context-aware ModelSelector.Ensures selector honors app settings, tracing, etc.
- model_selector = self.context.model_selector or ModelSelector() + model_selector = self.context.model_selector or ModelSelector(context=self.context)
20-21
: Remove unused import.RequestContext isn’t referenced.
-from mcp.shared.context import RequestContext
244-253
: Drop unused provider factory helper.Dead code; keeps lints green.
- def _create_provider_instance( - self, provider_class: type[AugmentedLLM] - ) -> AugmentedLLM | None: - """Create a minimal LLM instance for direct calls""" - try: - return provider_class(context=self.context) - except Exception as e: - logger.error(f"Failed to create provider instance: {e}") - return None
77-80
: Remove noisy debug prints; log exceptions once.Prefer logger.debug/logger.exception without printing stack traces to stdout.
- except Exception as e: - import traceback - logger.error(f"XX Error passing sampling request to upstream server session: {traceback.format_exc()}") - return ErrorData(code=-32603, message=str(e)) + except Exception as e: + logger.exception("Error forwarding sampling request upstream") + return ErrorData(code=-32603, message=str(e))examples/mcp/mcp_sampling/temporal/demo_server_sse.py (1)
61-66
: Make nested_server.py path robust across working directories.Use an absolute path based on this file’s directory.
- "haiku_server": MCPServerSettings( - command="uv", - args=["run", "nested_server.py"], + "haiku_server": MCPServerSettings( + command="uv", + args=["run", str((__file__ and __import__('pathlib').Path(__file__).parent / 'nested_server.py').resolve())], description="nested server providing a haiku generator" )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (22)
examples/mcp/mcp_sampling/asyncio/demo_server.py
(1 hunks)examples/mcp/mcp_sampling/asyncio/main.py
(1 hunks)examples/mcp/mcp_sampling/asyncio/mcp_agent.config.yaml
(1 hunks)examples/mcp/mcp_sampling/asyncio/nested_server.py
(1 hunks)examples/mcp/mcp_sampling/temporal/client.py
(1 hunks)examples/mcp/mcp_sampling/temporal/demo_server_sse.py
(1 hunks)examples/mcp/mcp_sampling/temporal/demo_server_sse_worker.py
(1 hunks)examples/mcp/mcp_sampling/temporal/mcp_agent.config.yaml
(1 hunks)examples/mcp/mcp_sampling/temporal/nested_server.py
(1 hunks)src/mcp_agent/agents/agent.py
(1 hunks)src/mcp_agent/app.py
(0 hunks)src/mcp_agent/core/context.py
(1 hunks)src/mcp_agent/executor/temporal/session_proxy.py
(1 hunks)src/mcp_agent/executor/temporal/system_activities.py
(1 hunks)src/mcp_agent/executor/workflow.py
(2 hunks)src/mcp_agent/mcp/gen_client.py
(4 hunks)src/mcp_agent/mcp/mcp_agent_client_session.py
(4 hunks)src/mcp_agent/mcp/mcp_aggregator.py
(1 hunks)src/mcp_agent/mcp/mcp_connection_manager.py
(5 hunks)src/mcp_agent/mcp/mcp_server_registry.py
(10 hunks)src/mcp_agent/mcp/sampling_handler.py
(1 hunks)src/mcp_agent/server/app_server.py
(11 hunks)
💤 Files with no reviewable changes (1)
- src/mcp_agent/app.py
✅ Files skipped from review due to trivial changes (1)
- src/mcp_agent/executor/temporal/session_proxy.py
🚧 Files skipped from review as they are similar to previous changes (5)
- src/mcp_agent/agents/agent.py
- src/mcp_agent/core/context.py
- src/mcp_agent/executor/workflow.py
- src/mcp_agent/mcp/mcp_connection_manager.py
- src/mcp_agent/mcp/mcp_aggregator.py
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
examples/mcp/mcp_sampling/asyncio/main.py
examples/mcp/mcp_sampling/temporal/mcp_agent.config.yaml
examples/mcp/mcp_sampling/asyncio/mcp_agent.config.yaml
src/mcp_agent/mcp/sampling_handler.py
src/mcp_agent/server/app_server.py
src/mcp_agent/mcp/mcp_server_registry.py
src/mcp_agent/mcp/gen_client.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/utils/config.py : Configuration values such as quality_threshold, max_refinement_attempts, consolidation_interval, and evaluator_model_provider must be loaded from mcp_agent.config.yaml.
Applied to files:
examples/mcp/mcp_sampling/temporal/mcp_agent.config.yaml
examples/mcp/mcp_sampling/asyncio/mcp_agent.config.yaml
src/mcp_agent/mcp/mcp_server_registry.py
📚 Learning: 2025-08-28T15:07:10.015Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:10.015Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.
Applied to files:
src/mcp_agent/server/app_server.py
src/mcp_agent/mcp/mcp_server_registry.py
src/mcp_agent/mcp/gen_client.py
src/mcp_agent/mcp/mcp_agent_client_session.py
📚 Learning: 2025-09-05T14:31:48.115Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.115Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
src/mcp_agent/mcp/mcp_server_registry.py
🧬 Code graph analysis (12)
examples/mcp/mcp_sampling/asyncio/main.py (4)
src/mcp_agent/agents/agent.py (2)
Agent
(62-999)attach_llm
(157-200)src/mcp_agent/workflows/llm/augmented_llm_openai.py (1)
OpenAIAugmentedLLM
(83-883)src/mcp_agent/human_input/handler.py (1)
console_input_callback
(68-78)src/mcp_agent/app.py (1)
logger
(179-196)
examples/mcp/mcp_sampling/temporal/client.py (4)
src/mcp_agent/config.py (1)
MCPServerSettings
(52-113)src/mcp_agent/executor/workflow.py (4)
executor
(118-123)WorkflowExecution
(64-71)run
(163-174)run_id
(133-138)src/mcp_agent/mcp/gen_client.py (1)
gen_client
(18-45)src/mcp_agent/human_input/handler.py (1)
console_input_callback
(68-78)
examples/mcp/mcp_sampling/temporal/nested_server.py (4)
src/mcp_agent/app.py (2)
config
(151-152)logger
(179-196)src/mcp_agent/config.py (2)
LoggerSettings
(535-580)LogPathSettings
(507-532)src/mcp_agent/logging/logger.py (2)
info
(271-279)error
(291-299)src/mcp_agent/executor/temporal/session_proxy.py (1)
create_message
(235-273)
examples/mcp/mcp_sampling/temporal/demo_server_sse.py (4)
src/mcp_agent/server/app_server.py (4)
app
(117-119)create_mcp_server_for_app
(305-1003)workflows
(122-124)run
(1384-1389)src/mcp_agent/app.py (3)
config
(151-152)logger
(179-196)workflow_run
(492-532)src/mcp_agent/config.py (6)
LoggerSettings
(535-580)MCPSettings
(116-124)MCPServerSettings
(52-113)LogPathSettings
(507-532)TemporalSettings
(393-413)find_secrets
(647-649)examples/mcp/mcp_sampling/asyncio/demo_server.py (3)
HaikuWorkflow
(66-99)run
(72-99)main
(102-116)
examples/mcp/mcp_sampling/temporal/demo_server_sse_worker.py (2)
src/mcp_agent/executor/temporal/__init__.py (1)
create_temporal_worker_for_app
(479-535)examples/mcp/mcp_sampling/temporal/demo_server_sse.py (2)
main
(117-131)run
(86-114)
examples/mcp/mcp_sampling/asyncio/nested_server.py (3)
src/mcp_agent/config.py (2)
LoggerSettings
(535-580)LogPathSettings
(507-532)examples/mcp/mcp_sampling/temporal/nested_server.py (1)
get_haiku
(27-56)src/mcp_agent/executor/temporal/session_proxy.py (1)
create_message
(235-273)
src/mcp_agent/mcp/sampling_handler.py (5)
src/mcp_agent/core/context.py (2)
mcp
(103-104)Context
(58-104)src/mcp_agent/core/context_dependent.py (1)
ContextDependent
(9-48)src/mcp_agent/workflows/llm/augmented_llm.py (4)
AugmentedLLM
(229-817)RequestParams
(124-174)get_request_params
(381-402)select_model
(335-379)src/mcp_agent/human_input/types.py (1)
HumanInputRequest
(7-29)src/mcp_agent/workflows/factory.py (1)
create_llm
(68-86)
src/mcp_agent/server/app_server.py (1)
src/mcp_agent/core/context.py (2)
mcp
(103-104)Context
(58-104)
src/mcp_agent/mcp/mcp_server_registry.py (2)
src/mcp_agent/core/context.py (2)
mcp
(103-104)Context
(58-104)src/mcp_agent/app.py (2)
context
(143-148)session_id
(175-176)
src/mcp_agent/mcp/gen_client.py (1)
src/mcp_agent/core/context.py (2)
mcp
(103-104)Context
(58-104)
examples/mcp/mcp_sampling/asyncio/demo_server.py (3)
src/mcp_agent/server/app_server.py (4)
app
(117-119)create_mcp_server_for_app
(305-1003)workflows
(122-124)run
(1384-1389)src/mcp_agent/config.py (3)
MCPServerSettings
(52-113)LogPathSettings
(507-532)find_secrets
(647-649)src/mcp_agent/mcp/mcp_aggregator.py (1)
run_stdio_async
(1474-1481)
src/mcp_agent/mcp/mcp_agent_client_session.py (1)
src/mcp_agent/mcp/sampling_handler.py (2)
SamplingHandler
(36-376)handle_sampling
(42-79)
🔇 Additional comments (5)
examples/mcp/mcp_sampling/asyncio/mcp_agent.config.yaml (1)
16-19
: Point uv at the repo file or set cwd for demo_serverdemo_server.py exists at examples/mcp/mcp_sampling/asyncio/demo_server.py. "uv run demo_server.py" only works if the process cwd is that directory — either:
- Change args to ["run","examples/mcp/mcp_sampling/asyncio/demo_server.py"], or
- Add a cwd: examples/mcp/mcp_sampling/asyncio entry if the launcher supports it.
examples/mcp/mcp_sampling/temporal/nested_server.py (1)
30-56
: Good sampling flow with structured preferences.The tool’s construction of SamplingMessage and ModelPreferences looks consistent with the session proxy API.
src/mcp_agent/mcp/mcp_server_registry.py (3)
209-216
: Pass Context consistently for Streamable HTTP sessions.LGTM with signature fix above.
248-256
: Pass Context consistently for SSE sessions.LGTM with signature fix above.
273-281
: Pass Context consistently for WebSocket sessions.LGTM with signature fix above.
# Create settings explicitly, as we want to use a different configuration from the main app | ||
secrets_file = Settings.find_secrets() | ||
if secrets_file and secrets_file.exists(): | ||
with open(secrets_file, "r", encoding="utf-8") as f: | ||
yaml_secrets = yaml.safe_load(f) or {} | ||
openai_secret = yaml_secrets["openai"] | ||
|
||
|
||
settings = Settings( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid NameError/KeyError when secrets are absent.
openai_secret
may be undefined or key missing.
-secrets_file = Settings.find_secrets()
-if secrets_file and secrets_file.exists():
- with open(secrets_file, "r", encoding="utf-8") as f:
- yaml_secrets = yaml.safe_load(f) or {}
- openai_secret = yaml_secrets["openai"]
+secrets_file = Settings.find_secrets()
+openai_secret = None
+if secrets_file and secrets_file.exists():
+ with open(secrets_file, "r", encoding="utf-8") as f:
+ yaml_secrets = yaml.safe_load(f) or {}
+ openai_secret = yaml_secrets.get("openai")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Create settings explicitly, as we want to use a different configuration from the main app | |
secrets_file = Settings.find_secrets() | |
if secrets_file and secrets_file.exists(): | |
with open(secrets_file, "r", encoding="utf-8") as f: | |
yaml_secrets = yaml.safe_load(f) or {} | |
openai_secret = yaml_secrets["openai"] | |
settings = Settings( | |
# Create settings explicitly, as we want to use a different configuration from the main app | |
secrets_file = Settings.find_secrets() | |
openai_secret = None | |
if secrets_file and secrets_file.exists(): | |
with open(secrets_file, "r", encoding="utf-8") as f: | |
yaml_secrets = yaml.safe_load(f) or {} | |
openai_secret = yaml_secrets.get("openai") | |
settings = Settings( |
🤖 Prompt for AI Agents
In examples/mcp/mcp_sampling/asyncio/demo_server.py around lines 27 to 35,
openai_secret can be left undefined or KeyError raised when the secrets file or
"openai" key is absent; initialize openai_secret = None before the if block and
when loading YAML replace yaml_secrets["openai"] with yaml_secrets.get("openai")
(or use a try/except KeyError) so openai_secret is always defined (None if
missing) before passing into Settings, and optionally log/warn if the secret is
missing.
) | ||
|
||
async with agent: | ||
llm = await agent.attach_llm(OpenAIAugmentedLLM) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
attach_llm is likely synchronous—don’t await it.
Agent.attach_llm returns an LLM instance; awaiting a non-awaitable raises TypeError. Use the return value directly.
Apply this diff:
- llm = await agent.attach_llm(OpenAIAugmentedLLM)
+ llm = agent.attach_llm(OpenAIAugmentedLLM)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
llm = await agent.attach_llm(OpenAIAugmentedLLM) | |
llm = agent.attach_llm(OpenAIAugmentedLLM) |
🤖 Prompt for AI Agents
In examples/mcp/mcp_sampling/asyncio/main.py around line 28, the code
incorrectly awaits agent.attach_llm which is synchronous and returns an LLM
instance; remove the await and assign the return value directly (llm =
agent.attach_llm(OpenAIAugmentedLLM)). Ensure no async/await is used when
calling attach_llm and that the surrounding code treats llm as a normal object,
not a coroutine.
execution = WorkflowExecution(**json.loads(run_result.content[0].text)) | ||
run_id = execution.run_id | ||
logger.info( | ||
f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Defensive parse of run_result.
If content is empty or non-JSON, this will throw. Reuse _tool_result_to_json().
- execution = WorkflowExecution(**json.loads(run_result.content[0].text))
+ parsed = _tool_result_to_json(run_result)
+ if not parsed:
+ logger.error("Failed to parse run result", data={"run_result": str(run_result)})
+ return
+ execution = WorkflowExecution(**parsed)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
execution = WorkflowExecution(**json.loads(run_result.content[0].text)) | |
run_id = execution.run_id | |
logger.info( | |
f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}" | |
) | |
parsed = _tool_result_to_json(run_result) | |
if not parsed: | |
logger.error("Failed to parse run result", data={"run_result": str(run_result)}) | |
return | |
execution = WorkflowExecution(**parsed) | |
run_id = execution.run_id | |
logger.info( | |
f"Started BasicAgentWorkflow-run. workflow ID={execution.workflow_id}, run ID={run_id}" | |
) |
🤖 Prompt for AI Agents
In examples/mcp/mcp_sampling/temporal/client.py around lines 43 to 47, the code
directly json.loads run_result.content[0].text which will throw if content is
empty or not valid JSON; replace this with a defensive parse using the existing
_tool_result_to_json() helper to extract and validate the tool result, check for
None or parsing errors, then construct WorkflowExecution from the returned dict;
if parsing fails log an error and abort early (or raise) instead of calling
json.loads on potentially empty/invalid content so run_id and workflow_id are
accessed only after successful parsing.
# Wait for the workflow to complete | ||
while True: | ||
get_status_result = await server.call_tool( | ||
"workflows-HaikuWorkflow-get_status", | ||
arguments={"run_id": run_id}, | ||
) | ||
|
||
workflow_status = _tool_result_to_json(get_status_result) | ||
if workflow_status is None: | ||
logger.error( | ||
f"Failed to parse workflow status response: {get_status_result}" | ||
) | ||
break | ||
|
||
logger.info( | ||
f"Workflow run {run_id} status:", | ||
data=workflow_status, | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Guard against missing run_id and add a max wait.
Prevents an infinite loop on bad state.
# Wait for the workflow to complete
- while True:
+ if not run_id:
+ logger.error("Empty run_id; cannot poll status.", data={"execution": execution.model_dump()})
+ return
+ max_polls = 120 # 10 minutes @ 5s
+ polls = 0
+ while polls < max_polls:
get_status_result = await server.call_tool(
"workflows-HaikuWorkflow-get_status",
arguments={"run_id": run_id},
)
@@
- await asyncio.sleep(5)
+ await asyncio.sleep(5)
+ polls += 1
+
+ if polls >= max_polls:
+ logger.error(f"Timed out waiting for workflow run {run_id} to complete.")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# Wait for the workflow to complete | |
while True: | |
get_status_result = await server.call_tool( | |
"workflows-HaikuWorkflow-get_status", | |
arguments={"run_id": run_id}, | |
) | |
workflow_status = _tool_result_to_json(get_status_result) | |
if workflow_status is None: | |
logger.error( | |
f"Failed to parse workflow status response: {get_status_result}" | |
) | |
break | |
logger.info( | |
f"Workflow run {run_id} status:", | |
data=workflow_status, | |
) | |
# Wait for the workflow to complete | |
if not run_id: | |
logger.error("Empty run_id; cannot poll status.", data={"execution": execution.model_dump()}) | |
return | |
max_polls = 120 # 10 minutes @ 5s | |
polls = 0 | |
while polls < max_polls: | |
get_status_result = await server.call_tool( | |
"workflows-HaikuWorkflow-get_status", | |
arguments={"run_id": run_id}, | |
) | |
workflow_status = _tool_result_to_json(get_status_result) | |
if workflow_status is None: | |
logger.error( | |
f"Failed to parse workflow status response: {get_status_result}" | |
) | |
break | |
logger.info( | |
f"Workflow run {run_id} status:", | |
data=workflow_status, | |
) | |
await asyncio.sleep(5) | |
polls += 1 | |
if polls >= max_polls: | |
logger.error(f"Timed out waiting for workflow run {run_id} to complete.") |
🤖 Prompt for AI Agents
In examples/mcp/mcp_sampling/temporal/client.py around lines 49 to 67, the loop
that polls workflow status does not guard against a missing/invalid run_id and
can run forever on bad state; validate run_id before entering the loop and if
it's falsy log an error and return/raise, and add a max-wait mechanism (e.g.,
max_iterations or timeout with elapsed time) inside the loop so it exits after
the ceiling is reached while logging a timeout error; also include a small await
sleep between iterations to avoid busy-waiting and ensure any final status is
returned or an appropriate exception is raised.
|
||
from mcp_agent.executor.temporal import create_temporal_worker_for_app | ||
|
||
from demo_server_sse import app |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Import may fail when run from repo root.
from demo_server_sse import app assumes CWD == this directory. Make import robust for module execution from repo root.
Apply this diff:
+import sys
+from pathlib import Path
+# Ensure this directory is on sys.path when executed from repo root
+sys.path.append(str(Path(__file__).resolve().parent))
-from demo_server_sse import app
+from demo_server_sse import app
Alternative: run as a module: python -m examples.mcp.mcp_sampling.temporal.demo_server_sse_worker.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
from demo_server_sse import app | |
import sys | |
from pathlib import Path | |
# Ensure this directory is on sys.path when executed from repo root | |
sys.path.append(str(Path(__file__).resolve().parent)) | |
from demo_server_sse import app |
🤖 Prompt for AI Agents
In examples/mcp/mcp_sampling/temporal/demo_server_sse_worker.py around line 15,
the import "from demo_server_sse import app" assumes the current working
directory is the file's directory and will fail when run from the repo root; fix
by making the import robust: either replace the local import with the
package-qualified import path (e.g., import the module via its package path
under examples.mcp.mcp_sampling.temporal) or programmatically ensure the
module's directory is on sys.path before importing (compute
Path(__file__).resolve().parent and insert it into sys.path), so the module can
be imported both from the file directory and when executed from the repo root.
secrets_file = Settings.find_secrets() | ||
if secrets_file and secrets_file.exists(): | ||
with open(secrets_file, "r", encoding="utf-8") as f: | ||
yaml_secrets = yaml.safe_load(f) or {} | ||
openai_secret = yaml_secrets["openai"] | ||
|
||
settings = Settings( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
openai_secret may be undefined when secrets file is missing.
Initialize a default to avoid NameError.
-secrets_file = Settings.find_secrets()
+secrets_file = Settings.find_secrets()
+openai_secret = None
if secrets_file and secrets_file.exists():
with open(secrets_file, "r", encoding="utf-8") as f:
yaml_secrets = yaml.safe_load(f) or {}
- openai_secret = yaml_secrets["openai"]
+ openai_secret = yaml_secrets.get("openai")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
secrets_file = Settings.find_secrets() | |
if secrets_file and secrets_file.exists(): | |
with open(secrets_file, "r", encoding="utf-8") as f: | |
yaml_secrets = yaml.safe_load(f) or {} | |
openai_secret = yaml_secrets["openai"] | |
settings = Settings( | |
secrets_file = Settings.find_secrets() | |
openai_secret = None | |
if secrets_file and secrets_file.exists(): | |
with open(secrets_file, "r", encoding="utf-8") as f: | |
yaml_secrets = yaml.safe_load(f) or {} | |
openai_secret = yaml_secrets.get("openai") | |
settings = Settings( |
🤖 Prompt for AI Agents
In examples/mcp/mcp_sampling/temporal/demo_server_sse.py around lines 37 to 43,
openai_secret can be undefined when the secrets file is missing; initialize a
default before the conditional and read the secret safely. Add a declaration
like openai_secret = None before calling Settings.find_secrets(), then when
loading YAML use yaml_secrets.get("openai") (or provide a fallback) so
openai_secret is always defined even if the file or key is missing.
@activity.defn(name="mcp_sample") | ||
async def sample( | ||
self, execution_id: str, params: Any | ||
) -> Any: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Implement and wire up mcp_sample (currently a no-op).
As written, the activity returns None and will cause downstream failures. Either remove it and use relay_request, or implement a thin proxy to sampling/createMessage and register it with the worker.
Apply this diff to implement the proxy:
@activity.defn(name="mcp_sample")
async def sample(
- self, execution_id: str, params: Any
+ self, execution_id: str, params: Dict[str, Any] | None
) -> Any:
- pass
+ gateway_url = getattr(self.context, "gateway_url", None)
+ gateway_token = getattr(self.context, "gateway_token", None)
+ # Default to createMessage if method not explicitly provided
+ method = (params or {}).pop("method", "sampling/createMessage")
+ return await request_via_proxy(
+ execution_id=execution_id,
+ method=method,
+ params=params or {},
+ gateway_url=gateway_url,
+ gateway_token=gateway_token,
+ )
Additionally, ensure it’s registered in the worker (create_temporal_worker_for_app):
# In src/mcp_agent/executor/temporal/__init__.py after other system activities:
app.workflow_task(name="mcp_sample")(system_activities.sample)
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "PWD: $(pwd)"
echo "---- List temporal directory (if present) ----"
fd -HI --hidden --exclude .git 'src/mcp_agent/executor/temporal' || true
echo
echo "---- system_activities.py (up to 300 lines) ----"
if [ -f src/mcp_agent/executor/temporal/system_activities.py ]; then
sed -n '1,300p' src/mcp_agent/executor/temporal/system_activities.py || true
else
echo "MISSING: src/mcp_agent/executor/temporal/system_activities.py"
fi
echo
echo "---- __init__.py (up to 300 lines) ----"
if [ -f src/mcp_agent/executor/temporal/__init__.py ]; then
sed -n '1,300p' src/mcp_agent/executor/temporal/__init__.py || true
else
echo "MISSING: src/mcp_agent/executor/temporal/__init__.py"
fi
echo
echo "---- Search repo for related symbols ----"
rg -n --hidden --no-ignore -S "request_via_proxy|relay_request|mcp_sample|create_temporal_worker_for_app|gateway_url|gateway_token" || true
echo
echo "---- Find all @activity.defn usages ----"
rg -n --hidden --no-ignore -S "@activity.defn" || true
echo
echo "---- Find registrations referencing system_activities.sample or mcp_sample ----"
rg -n --hidden --no-ignore -S "system_activities\.sample|mcp_sample|workflow_task\(name=\"mcp_sample\"\)" || true
Length of output: 27218
Implement and register mcp_sample proxy (critical)
mcp_sample is a no-op and will return None; implement a thin proxy to request_via_proxy (or remove it) and register it in the Temporal worker. Apply the diff below and add the workflow_task registration.
@activity.defn(name="mcp_sample")
async def sample(
- self, execution_id: str, params: Any
+ self, execution_id: str, params: Dict[str, Any] | None
) -> Any:
- pass
+ gateway_url = getattr(self.context, "gateway_url", None)
+ gateway_token = getattr(self.context, "gateway_token", None)
+ # Default to createMessage if method not explicitly provided
+ method = (params or {}).pop("method", "sampling/createMessage")
+ return await request_via_proxy(
+ execution_id=execution_id,
+ method=method,
+ params=params or {},
+ gateway_url=gateway_url,
+ gateway_token=gateway_token,
+ )
Register in the worker (src/mcp_agent/executor/temporal/init.py) after other system activities:
app.workflow_task(name="mcp_sample")(system_activities.sample)
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/system_activities.py around lines 91 to 95,
the mcp_sample activity is currently a no-op — implement it as a thin proxy that
forwards the call to the existing request_via_proxy function (pass execution_id
and params, await the result, and return it), and ensure proper error
handling/propagation (do not swallow exceptions). Then register this activity as
a workflow task in src/mcp_agent/executor/temporal/__init__.py by adding
app.workflow_task(name="mcp_sample")(system_activities.sample) after the other
system activity registrations.
async def handle_sampling(self, | ||
context: "Context", | ||
params: CreateMessageRequestParams) -> CreateMessageResult | ErrorData: | ||
logger.info(f"Handling sampling request: {params}") | ||
try: | ||
server_session = context.upstream_session | ||
logger.info(f"Current upstream server session: {type(server_session)}") | ||
print(f"XX Current upstream server session: {type(server_session)}") | ||
except: | ||
import traceback | ||
logger.error(f"Error getting upstream server session: {traceback.format_exc()}") | ||
print(f"XX Error getting upstream server session: {traceback.format_exc()}") | ||
|
||
if server_session is None: | ||
# Enhanced sampling with human approval workflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid UnboundLocalError; make context optional and drop prints.
server_session may be undefined after the except; also class already carries context. Use a safe getattr and log at debug level.
- async def handle_sampling(self,
- context: "Context",
- params: CreateMessageRequestParams) -> CreateMessageResult | ErrorData:
+ async def handle_sampling(
+ self,
+ context: "Context" | None,
+ params: CreateMessageRequestParams,
+ ) -> CreateMessageResult | ErrorData:
logger.info(f"Handling sampling request: {params}")
- try:
- server_session = context.upstream_session
- logger.info(f"Current upstream server session: {type(server_session)}")
- print(f"XX Current upstream server session: {type(server_session)}")
- except:
- import traceback
- logger.error(f"Error getting upstream server session: {traceback.format_exc()}")
- print(f"XX Error getting upstream server session: {traceback.format_exc()}")
+ ctx = context or self.context
+ server_session = getattr(ctx, "upstream_session", None)
+ logger.debug("Upstream server session: %s", type(server_session))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def handle_sampling(self, | |
context: "Context", | |
params: CreateMessageRequestParams) -> CreateMessageResult | ErrorData: | |
logger.info(f"Handling sampling request: {params}") | |
try: | |
server_session = context.upstream_session | |
logger.info(f"Current upstream server session: {type(server_session)}") | |
print(f"XX Current upstream server session: {type(server_session)}") | |
except: | |
import traceback | |
logger.error(f"Error getting upstream server session: {traceback.format_exc()}") | |
print(f"XX Error getting upstream server session: {traceback.format_exc()}") | |
if server_session is None: | |
# Enhanced sampling with human approval workflow | |
async def handle_sampling( | |
self, | |
context: "Context" | None, | |
params: CreateMessageRequestParams, | |
) -> CreateMessageResult | ErrorData: | |
logger.info(f"Handling sampling request: {params}") | |
ctx = context or self.context | |
server_session = getattr(ctx, "upstream_session", None) | |
logger.debug("Upstream server session: %s", type(server_session)) | |
if server_session is None: | |
# Enhanced sampling with human approval workflow |
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/sampling_handler.py around lines 42 to 56, the current
try/except can leave server_session undefined and uses prints; replace that
logic by deriving the context safely (use the passed-in context if provided else
self.context), obtain server_session with getattr(context, "upstream_session",
None) to avoid UnboundLocalError, remove all print() calls, and log the
retrieved value at debug level (logger.debug) instead of info; if you still need
to handle exceptions, catch Exception explicitly, log the traceback via
logger.error, and ensure server_session is always defined (None when
unavailable) before the subsequent if-check.
@@ -151,6 +158,7 @@ async def start_server( | |||
read_stream, | |||
write_stream, | |||
read_timeout_seconds, | |||
context=context # XX WHICH CONTEXT? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There appears to be a type signature mismatch in the client_session_factory
parameter. The function signature in several places expects timedelta | None
as the third parameter, but the implementation on line 299 shows it should be timedelta, ServerSession
.
This inconsistency will likely cause runtime errors when the factory is called with incorrect parameter types. The type annotation should be updated to match the actual implementation:
[MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta, ServerSession]
This change should be applied consistently across all function signatures that use this callback type.
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
llm = create_llm(agent_name="sampling", | ||
server_names=[], | ||
instruction=None, | ||
provider=model_info.provider, | ||
model=model_info.name, | ||
request_params=None, | ||
context=None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The create_llm
function is called with context=None
, but the sampling handler has a valid context available as self.context
. This will cause the LLM to fail initialization or operate incorrectly since it won't have access to necessary configuration and services.
Consider changing:
context=None
to:
context=self.context
This ensures the LLM has access to the proper configuration, logging, and other context-dependent services.
llm = create_llm(agent_name="sampling", | |
server_names=[], | |
instruction=None, | |
provider=model_info.provider, | |
model=model_info.name, | |
request_params=None, | |
context=None) | |
llm = create_llm(agent_name="sampling", | |
server_names=[], | |
instruction=None, | |
provider=model_info.provider, | |
model=model_info.name, | |
request_params=None, | |
context=self.context) |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
type="file", | ||
level="debug", | ||
path_settings=LogPathSettings( | ||
path_pattern="asyncio/logs/nested_server-{unique_id}.jsonl", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log file path pattern asyncio/logs/nested_server-{unique_id}.jsonl
is incorrect for a file in the temporal
directory. This will cause file creation errors if the asyncio
directory doesn't exist in the expected location.
Consider changing the path to either:
logs/nested_server-{unique_id}.jsonl
(relative to execution directory)temporal/logs/nested_server-{unique_id}.jsonl
(explicitly in temporal directory)
This ensures logs are written to an accessible location regardless of which example is being run.
path_pattern="asyncio/logs/nested_server-{unique_id}.jsonl", | |
path_pattern="logs/nested_server-{unique_id}.jsonl", |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/mcp_agent/mcp/mcp_agent_client_session.py (1)
407-415
: Fix None-deref on server_config.rootsserver_config is initialized to None in init (line 118); the current guard can raise when server_config is None — check the instance first.
File: src/mcp_agent/mcp/mcp_agent_client_session.py:406-415
- if hasattr(self, "server_config") and self.server_config.roots: + if self.server_config and self.server_config.roots:Also replace redundant hasattr(self, "server_config") checks elsewhere (e.g., line 383) with simple truthiness checks.
♻️ Duplicate comments (3)
src/mcp_agent/mcp/mcp_agent_client_session.py (1)
339-346
: Collapse redundant upstream-session checks.You log on both
if server_session:
and then branch onif server_session is None:
below. Remove the first check and log within each branch to avoid duplication.Apply this diff:
- server_session = self.context.upstream_session - if server_session: - logger.debug("Passing sampling request to upstream server session") - else: - logger.debug("No upstream server session, handling sampling locally") + server_session = self.context.upstream_sessionAnd add branch-local logs:
- if server_session is None: + if server_session is None: + logger.debug("No upstream server session, handling sampling locally") # Enhanced sampling with human approval workflow return await self._sampling_handler.handle_sampling( context=self.context, params=params) else: + logger.debug("Passing sampling request to upstream server session") try:src/mcp_agent/mcp/sampling_handler.py (2)
41-47
: Make context optional and reduce log noise; avoid UnboundLocalError.Fall back to
self.context
whencontext
isn’t provided and log at debug level.Apply this diff:
- async def handle_sampling(self, - context: "Context", - params: CreateMessageRequestParams) -> CreateMessageResult | ErrorData: - logger.info(f"Handling sampling request: {params}") - server_session = context.upstream_session - logger.info(f"Current upstream server session: {type(server_session)}") + async def handle_sampling( + self, + context: "Context" | None, + params: CreateMessageRequestParams, + ) -> CreateMessageResult | ErrorData: + logger.debug("Handling sampling request", data=params.model_dump()) + ctx = context or self.context + server_session = getattr(ctx, "upstream_session", None) + logger.debug("Upstream server session: %s", type(server_session).__name__)Optionally also change the branch logs to debug:
- logger.info("No upstream server session, handling sampling locally") + logger.debug("No upstream server session; handling locally") ... - logger.info("Passing sampling request to upstream server session") + logger.debug("Passing sampling request to upstream server session")
198-207
: Preserve app settings and tracing: pass self.context to create_llm.Creating the LLM with
context=None
drops config, logging, and upstream access.Apply this diff:
- llm = create_llm(agent_name="sampling", - server_names=[], - instruction=None, - provider=model_info.provider, - model=model_info.name, - request_params=None, - context=None) + llm = create_llm( + agent_name="sampling", + server_names=[], + instruction=None, + provider=model_info.provider, + model=model_info.name, + request_params=None, + context=self.context, + )
🧹 Nitpick comments (4)
src/mcp_agent/server/app_server.py (1)
336-336
: Use logger instead of print for internal route installation.Prefer structured logging to keep output consistent and capturable.
Apply this diff:
- print("Installing internal routes") + logger.debug("Installing internal routes")src/mcp_agent/mcp/sampling_handler.py (3)
192-197
: Pass Context into ModelSelector for better selection fidelity.Selector may rely on context (settings, telemetry). Use the active context.
Apply this diff:
- model_selector = self.context.model_selector or ModelSelector() + model_selector = self.context.model_selector or ModelSelector(context=self.context)
230-233
: Prefer logger.exception over manual traceback logging.It records stack traces without extra imports and avoids info-level stack dumps.
Apply this diff:
- import traceback - logger.info(traceback.format_exc()) - logger.error(f"Unexpected error calling LLM: {e}") + logger.exception("Unexpected error calling LLM") return None
235-244
: Remove unused provider factory or wire it in.
_create_provider_instance
is unused after switching tocreate_llm
. Dead code adds maintenance overhead.Apply this diff to remove it:
- def _create_provider_instance( - self, provider_class: type[AugmentedLLM] - ) -> AugmentedLLM | None: - """Create a minimal LLM instance for direct calls""" - try: - return provider_class(context=self.context) - except Exception as e: - logger.error(f"Failed to create provider instance: {e}") - return None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
examples/mcp/mcp_sampling/asyncio/nested_server.py
(1 hunks)src/mcp_agent/core/context.py
(1 hunks)src/mcp_agent/mcp/mcp_agent_client_session.py
(3 hunks)src/mcp_agent/mcp/sampling_handler.py
(1 hunks)src/mcp_agent/server/app_server.py
(9 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- src/mcp_agent/core/context.py
- examples/mcp/mcp_sampling/asyncio/nested_server.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-28T15:07:10.015Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:10.015Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.
Applied to files:
src/mcp_agent/mcp/mcp_agent_client_session.py
src/mcp_agent/server/app_server.py
src/mcp_agent/mcp/sampling_handler.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/mcp/mcp_agent_client_session.py
src/mcp_agent/server/app_server.py
src/mcp_agent/mcp/sampling_handler.py
🧬 Code graph analysis (3)
src/mcp_agent/mcp/mcp_agent_client_session.py (1)
src/mcp_agent/mcp/sampling_handler.py (2)
SamplingHandler
(35-367)handle_sampling
(41-70)
src/mcp_agent/server/app_server.py (2)
src/mcp_agent/core/context.py (2)
mcp
(102-103)Context
(57-103)src/mcp_agent/app.py (1)
logger
(179-196)
src/mcp_agent/mcp/sampling_handler.py (7)
src/mcp_agent/core/context.py (2)
mcp
(102-103)Context
(57-103)src/mcp_agent/core/context_dependent.py (1)
ContextDependent
(9-48)src/mcp_agent/workflows/llm/augmented_llm.py (4)
AugmentedLLM
(229-817)RequestParams
(124-174)get_request_params
(381-402)select_model
(335-379)src/mcp_agent/workflows/llm/llm_selector.py (2)
ModelSelector
(98-449)select_best_model
(136-284)src/mcp_agent/mcp/mcp_agent_client_session.py (1)
send_request
(148-230)src/mcp_agent/human_input/types.py (1)
HumanInputRequest
(7-29)src/mcp_agent/workflows/factory.py (1)
create_llm
(68-86)
🔇 Additional comments (3)
src/mcp_agent/mcp/mcp_agent_client_session.py (2)
64-64
: LGTM: Local sampling handler import.Import path looks correct and aligns with the new SamplingHandler module.
118-120
: LGTM: Initialize a reusable SamplingHandler.Good to instantiate once per session for reuse.
src/mcp_agent/server/app_server.py (1)
127-136
: LGTM: Upstream session registry API.Simple map with clear semantics; types look correct.
final_request_params = llm.get_request_params( | ||
self._build_llm_request_params(params) | ||
) | ||
model_name = await llm.select_model(final_request_params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential Race Condition in Response Metadata
The code retrieves model information after generation has completed, which could lead to inconsistent metadata if the LLM's state changes between generation and metadata collection.
Consider either:
- Capturing
model_name
and request parameters before the generation call, or - Ensuring the LLM implementation maintains consistent state throughout the request lifecycle
This would ensure the CreateMessageResult
contains metadata that accurately reflects what was used during the actual generation process.
# Example approach
request_params = self._build_llm_request_params(params)
final_request_params = llm.get_request_params(request_params)
model_name = await llm.select_model(final_request_params)
result = await llm.generate_str(
message=messages, request_params=request_params
)
return CreateMessageResult(
role="assistant",
content=TextContent(type="text", text=result),
model=model_name or "unknown",
)
final_request_params = llm.get_request_params( | |
self._build_llm_request_params(params) | |
) | |
model_name = await llm.select_model(final_request_params) | |
request_params = self._build_llm_request_params(params) | |
final_request_params = llm.get_request_params(request_params) | |
model_name = await llm.select_model(final_request_params) |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
async def get_haiku(topic: str) -> str: | ||
"""Use sampling to generate a haiku about the given topic.""" | ||
|
||
app.logger.info(f"Generating haiku about topic: {topic} via sampling") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The app.logger
access in the get_haiku
function may cause an AttributeError
if the MCPApp
instance hasn't been properly initialized. Consider adding a null check before accessing the logger or ensuring the app is initialized before this tool is called:
# Safer approach with null check
if hasattr(app, 'logger') and app.logger:
app.logger.info(f"Generating haiku about topic: {topic} via sampling")
else:
print(f"Generating haiku about topic: {topic} via sampling") # Fallback logging
Alternatively, use a module-level logger that doesn't depend on app initialization.
app.logger.info(f"Generating haiku about topic: {topic} via sampling") | |
if hasattr(app, 'logger') and app.logger: | |
app.logger.info(f"Generating haiku about topic: {topic} via sampling") | |
else: | |
print(f"Generating haiku about topic: {topic} via sampling") # Fallback logging |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/mcp_agent/mcp/mcp_server_registry.py (1)
137-141
: Timedelta constructed with days, not seconds.
timedelta(config.read_timeout_seconds)
interprets the value as days. Use the namedseconds=
arg.- read_timeout_seconds = ( - timedelta(config.read_timeout_seconds) + read_timeout_seconds = ( + timedelta(seconds=config.read_timeout_seconds) if config.read_timeout_seconds else None )src/mcp_agent/executor/workflow.py (1)
213-216
: Use asyncio.CancelledError; current handler won’t catch task cancellations.Catching/raising
concurrent.futures.CancelledError
won’t interceptasyncio
task cancellations, causing cancelled runs to be marked as “error”.- import asyncio - from concurrent.futures import CancelledError + import asyncio @@ - except CancelledError: + except asyncio.CancelledError: # Handle cancellation gracefully self._logger.info( f"Workflow {self.name} (ID: {self._run_id}) was cancelled" ) self.update_status("cancelled") raise @@ - self.update_status("cancelled") - raise CancelledError("Workflow was cancelled") + self.update_status("cancelled") + raise asyncio.CancelledError("Workflow was cancelled")Also applies to: 315-321, 297-302
src/mcp_agent/server/app_server.py (1)
1309-1327
: Broken call:ServerRegistry
hasget_server_config
, notget_server_context
.This will raise AttributeError at runtime.
- config = server_registry.get_server_context(server_name) + config = server_registry.get_server_config(server_name)
♻️ Duplicate comments (2)
src/mcp_agent/mcp/mcp_server_registry.py (2)
37-39
: Silence Ruff F401 for TYPE_CHECKING-only import, or remove it.The
Context
import under TYPE_CHECKING isn’t referenced (string annotations are used), tripping F401.-if TYPE_CHECKING: - from mcp_agent.core.context import Context +if TYPE_CHECKING: + from mcp_agent.core.context import Context # noqa: F401
111-119
: Fix factory signature + default to avoid runtime TypeError when passingcontext
.You pass
context=...
intoclient_session_factory(...)
but the callable type and default don’t accept it. This will raise at runtime (and confuses type checkers). Align both methods to a 4-arg factory and default toMCPAgentClientSession
which supportscontext
.@@ - async def start_server( - self, - server_name: str, - client_session_factory: Callable[ - [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], - ClientSession, - ] = ClientSession, - session_id: str | None = None, - context: Optional["Context"] = None, + async def start_server( + self, + server_name: str, + client_session_factory: Callable[ + [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, "Context" | None], + ClientSession, + ] = MCPAgentClientSession, + session_id: str | None = None, + context: Optional["Context"] = None, @@ - async def initialize_server( - self, - server_name: str, - client_session_factory: Callable[ - [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None], - ClientSession, - ] = ClientSession, - init_hook: InitHookCallable = None, - session_id: str | None = None, - context: Optional["Context"] = None, + async def initialize_server( + self, + server_name: str, + client_session_factory: Callable[ + [MemoryObjectReceiveStream, MemoryObjectSendStream, timedelta | None, "Context" | None], + ClientSession, + ] = MCPAgentClientSession, + init_hook: InitHookCallable = None, + session_id: str | None = None, + context: Optional["Context"] = None,Also applies to: 295-304
🧹 Nitpick comments (5)
src/mcp_agent/mcp/mcp_server_registry.py (2)
126-131
: Docstring return type is incorrect.This context manager yields a
ClientSession
, notStdioServerParameters
.- Returns: - StdioServerParameters: The server parameters for stdio transport. + Returns: + Async context manager yielding a ClientSession for the chosen transport.
231-244
: Normalize timeout units across transports.HTTP/Streamable HTTP use
timedelta
, SSE path passes raw seconds. Ensure both match each client’s signature to prevent subtle timeout bugs.Would you like me to align these after confirming expected types for
sse_client(timeout=..., sse_read_timeout=...)
?Also applies to: 188-205
src/mcp_agent/executor/workflow.py (2)
205-208
: Docstring tweak: clarify memo usage scope.Note that
__mcp_agent_workflow_memo
only affects Temporal engine.- - __mcp_agent_workflow_memo: the memo passed to the temporal workflow + - __mcp_agent_workflow_memo: memo for Temporal workflows (ignored by asyncio engine)
743-779
: Memo retrieval: prefer a single, stable API.
workflow.memo()
may not exist in all Temporal Python versions;workflow.info().memo
is widely supported. Current try/except is defensive, but consider dropping thememo()
probe to reduce noise.I can adjust this after confirming the latest Temporal API you target.
src/mcp_agent/server/app_server.py (1)
324-366
: DRY the gateway auth check.The Bearer/X-MCP-Gateway-Token verification logic is duplicated across three routes; extract a small helper to reduce risk of inconsistent fixes.
I can provide a minimal helper and apply it at call sites if you want.
Also applies to: 498-547, 548-602
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
src/mcp_agent/executor/workflow.py
(1 hunks)src/mcp_agent/mcp/gen_client.py
(3 hunks)src/mcp_agent/mcp/mcp_agent_client_session.py
(3 hunks)src/mcp_agent/mcp/mcp_aggregator.py
(1 hunks)src/mcp_agent/mcp/mcp_server_registry.py
(10 hunks)src/mcp_agent/mcp/sampling_handler.py
(1 hunks)src/mcp_agent/server/app_server.py
(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- src/mcp_agent/mcp/gen_client.py
- src/mcp_agent/mcp/mcp_agent_client_session.py
- src/mcp_agent/mcp/mcp_aggregator.py
- src/mcp_agent/mcp/sampling_handler.py
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-08-28T15:07:10.015Z
Learnt from: saqadri
PR: lastmile-ai/mcp-agent#386
File: src/mcp_agent/mcp/mcp_server_registry.py:110-116
Timestamp: 2025-08-28T15:07:10.015Z
Learning: In MCP server registry methods, when client_session_factory parameters are updated to accept additional context parameters, ensure the type hints match what is actually passed (Context instance vs ServerSession) and that the default factory (MCPAgentClientSession) can handle the number of arguments being passed to avoid TypeError at runtime.
Applied to files:
src/mcp_agent/server/app_server.py
src/mcp_agent/mcp/mcp_server_registry.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/server/app_server.py
src/mcp_agent/mcp/mcp_server_registry.py
📚 Learning: 2025-09-05T14:31:48.115Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.115Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
src/mcp_agent/mcp/mcp_server_registry.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/utils/config.py : Configuration values such as quality_threshold, max_refinement_attempts, consolidation_interval, and evaluator_model_provider must be loaded from mcp_agent.config.yaml.
Applied to files:
src/mcp_agent/mcp/mcp_server_registry.py
🧬 Code graph analysis (1)
src/mcp_agent/mcp/mcp_server_registry.py (2)
src/mcp_agent/app.py (2)
context
(147-152)session_id
(187-188)src/mcp_agent/core/context.py (1)
Context
(57-103)
🔇 Additional comments (3)
src/mcp_agent/server/app_server.py (3)
671-708
: Schema tool: ensure we never exposeself
/Context params.The
_build_run_param_tool
path looks correct; just confirming this replaces previous brittle mutation ofparameters["required"]
. LGTM.
1356-1496
: Great end-to-end wiring for workflow memo + upstream session registration.The execution/memo propagation and session registry hookup look solid.
668-707
: Set upstream session early: good pattern.Calling
_set_upstream_from_request_ctx_if_available
at tool entry is the right guard to preserve log forwarding. Nice.Also applies to: 741-767, 768-797, 862-913
Based on #255.
Summary by CodeRabbit
New Features
Documentation
Tests