Skip to content

Commit f2d0cc2

Browse files
committed
catch and send errors on stream
1 parent de2289d commit f2d0cc2

File tree

1 file changed

+31
-21
lines changed

1 file changed

+31
-21
lines changed

src/mcp/client/streamable_http.py

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,13 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None:
251251
await event_source.response.aclose()
252252
break
253253

254+
async def _send_error_response(self, ctx: RequestContext, error: Exception) -> None:
255+
"""Send an error response to the client."""
256+
error_data = ErrorData(code=32000, message=str(error))
257+
jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=ctx.session_message.message.root.id, error=error_data)
258+
session_message = SessionMessage(message=JSONRPCMessage(jsonrpc_error))
259+
await ctx.read_stream_writer.send(session_message)
260+
254261
async def _handle_post_request(self, ctx: RequestContext) -> None:
255262
"""Handle a POST request with response processing."""
256263
headers = self._prepare_request_headers(ctx.headers)
@@ -321,23 +328,23 @@ async def _handle_sse_response(
321328
is_initialization: bool = False,
322329
) -> None:
323330
"""Handle SSE response from the server."""
324-
try:
325-
event_source = EventSource(response)
326-
async for sse in event_source.aiter_sse():
327-
is_complete = await self._handle_sse_event(
328-
sse,
329-
ctx.read_stream_writer,
330-
resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None),
331-
is_initialization=is_initialization,
332-
)
333-
# If the SSE event indicates completion, like returning respose/error
334-
# break the loop
335-
if is_complete:
336-
await response.aclose()
337-
break
338-
except Exception as e:
339-
logger.exception("Error reading SSE stream:")
340-
await ctx.read_stream_writer.send(e)
331+
event_source = EventSource(response)
332+
finished = False
333+
async for sse in event_source.aiter_sse():
334+
is_complete = await self._handle_sse_event(
335+
sse,
336+
ctx.read_stream_writer,
337+
resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None),
338+
is_initialization=is_initialization,
339+
)
340+
# If the SSE event indicates completion, like returning respose/error
341+
# break the loop
342+
if is_complete:
343+
finished = True
344+
await response.aclose()
345+
break
346+
if not finished:
347+
raise Exception("SSE stream ended without completing")
341348

342349
async def _handle_unexpected_content_type(
343350
self,
@@ -403,10 +410,13 @@ async def post_writer(
403410
)
404411

405412
async def handle_request_async():
406-
if is_resumption:
407-
await self._handle_resumption_request(ctx)
408-
else:
409-
await self._handle_post_request(ctx)
413+
try:
414+
if is_resumption:
415+
await self._handle_resumption_request(ctx)
416+
else:
417+
await self._handle_post_request(ctx)
418+
except Exception as e:
419+
await self._send_error_response(ctx, e)
410420

411421
# If this is a request, start a new task to handle it
412422
if isinstance(message.root, JSONRPCRequest):

0 commit comments

Comments
 (0)