From 0ed68aeab29c2260ea1d7a2eabd4a0e69fe87d40 Mon Sep 17 00:00:00 2001 From: Philippe Lehoux Date: Sun, 22 Feb 2026 15:32:48 -0500 Subject: [PATCH 1/2] Handle inline SSE POST responses without blocking (Notion) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Notion’s MCP server responds to every POST with text/event-stream and keeps the stream open after sending the result. The previous synchronous post() blocked until operation_timeout, so wait_for_response_with_timeout never ran and queued results timed out. Route wait_for_response: true requests through a background thread + queue so the main thread can poll results while HTTPX processes the stream. Keep synchronous behavior for fire-and-forget requests. --- .../mcp/native/transports/streamable_http.rb | 126 +++++++++--- .../native/transports/streamable_http_spec.rb | 185 ++++++++++++++++++ 2 files changed, 281 insertions(+), 30 deletions(-) diff --git a/lib/ruby_llm/mcp/native/transports/streamable_http.rb b/lib/ruby_llm/mcp/native/transports/streamable_http.rb index 6f4c39d..dfb9517 100644 --- a/lib/ruby_llm/mcp/native/transports/streamable_http.rb +++ b/lib/ruby_llm/mcp/native/transports/streamable_http.rb @@ -119,10 +119,10 @@ def request(body, wait_for_response: true) # Extract the request ID from the body (if present) request_id = body.is_a?(Hash) ? (body["id"] || body[:id]) : nil - is_initialization = body.is_a?(Hash) && (body["method"] == "initialize" || body[:method] == :initialize) + # Register queue before send_http_request to avoid push-before-register races. response_queue = setup_response_queue(request_id, wait_for_response) - result = send_http_request(body, request_id, is_initialization: is_initialization) + result = send_http_request(body, request_id) return result if result.is_a?(RubyLLM::MCP::Result) if wait_for_response && request_id @@ -307,7 +307,7 @@ def setup_response_queue(request_id, wait_for_response) response_queue end - def send_http_request(body, request_id, is_initialization: false) + def send_http_request(body, request_id) headers = build_common_headers headers["Content-Type"] = "application/json" headers["Accept"] = "application/json, text/event-stream" @@ -315,33 +315,79 @@ def send_http_request(body, request_id, is_initialization: false) json_body = JSON.generate(body) RubyLLM::MCP.logger.debug "Sending Request: #{json_body}" - request_client = nil + # Use a background thread only when a queue is registered for this request. + # When wait_for_response: false no queue is registered, so we run synchronously to + # ensure errors and results propagate immediately back to the caller. + # When wait_for_response: true the queue exists, so we must be async: some servers + # (e.g. Notion) respond with an inline SSE stream that stays open indefinitely after + # delivering the result, and blocking the main thread inside post() would cause a + # TimeoutError before wait_for_response_with_timeout is ever reached. + use_background = request_id && @pending_mutex.synchronize { @pending_requests.key?(request_id.to_s) } + + if use_background + send_request_in_background(body, request_id, headers) + else + send_request_synchronously(body, request_id, headers) + end + end + + def send_request_in_background(body, request_id, headers) + request_client = create_connection_with_streaming_callbacks(request_id, close_when_fulfilled: true) + Thread.new do + response = request_client.post(@url, json: body, headers: headers) + handle_response(response, request_id, body) + rescue Errors::BaseError => e + # Push all BaseError subclasses directly so wait_for_response_with_timeout + # re-raises them with their original type (e.g. SessionExpiredError, not + # a generic TransportError wrapper). + @pending_mutex.synchronize do + queue = @pending_requests.delete(request_id.to_s) + queue&.push(e) + end + rescue StandardError => e + RubyLLM::MCP.logger.error "Background request error: #{e.message}" + @pending_mutex.synchronize do + queue = @pending_requests.delete(request_id.to_s) + queue&.push(Errors::TransportError.new(message: e.message, code: nil)) + end + ensure + close_client(request_client) + end + nil + end + + def send_request_synchronously(body, request_id, headers) + request_client = create_connection_with_streaming_callbacks(request_id, close_when_fulfilled: false) begin - connection = if is_initialization - @connection - else - request_client = create_connection_with_streaming_callbacks(request_id) - request_client - end - - response = connection.post(@url, json: body, headers: headers) + response = request_client.post(@url, json: body, headers: headers) handle_response(response, request_id, body) ensure - @pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) } if request_id - close_client(request_client) if request_client && !is_initialization + close_client(request_client) end end - def create_connection_with_streaming_callbacks(request_id) + def create_connection_with_streaming_callbacks(request_id, close_when_fulfilled: false) buffer = +"" client = Support::HTTPClient.connection.plugin(:callbacks) - client = client.on_response_body_chunk do |request, _response, chunk| + client = client.on_response_body_chunk do |request, response, chunk| next unless running? + # Capture session ID from response headers before processing SSE events so that + # initialize_notification (sent after the initialize result is dequeued) includes it. + if (session_id = response.headers["mcp-session-id"]) && !@session_id + @session_id = session_id + end + RubyLLM::MCP.logger.debug "Received chunk: #{chunk.bytesize} bytes for #{request.uri}" buffer << chunk process_sse_buffer_events(buffer, request_id&.to_s) + + # Only close when wait_for_response registered a queue; fire-and-forget IDs shouldn't auto-close. + if close_when_fulfilled && request_id + fulfilled = @pending_mutex.synchronize { !@pending_requests.key?(request_id.to_s) } + request.close if fulfilled + end end client = client.with( timeout: { @@ -402,7 +448,13 @@ def handle_success_response(response, request_id, _original_message) result = RubyLLM::MCP::Result.new(json_response, session_id: @session_id) if request_id - @pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) } + # Push to the queue rather than returning up the call stack: this method runs + # inside a background thread, so the return value is never observed by the + # caller. The main thread receives the result via wait_for_response_with_timeout. + @pending_mutex.synchronize do + queue = @pending_requests.delete(request_id.to_s) + queue&.push(result) + end end result @@ -571,7 +623,7 @@ def attempt_authentication_retry(www_authenticate, resource_metadata_url, reques if success RubyLLM::MCP.logger.info("Authentication challenge handled successfully, retrying request") - result = send_http_request(original_message, request_id, is_initialization: false) + result = send_http_request(original_message, request_id) @auth_retry_attempted = false return result end @@ -894,21 +946,35 @@ def parse_and_validate_http_response(response_body) end def wait_for_response_with_timeout(request_id, response_queue) - result = with_timeout(@request_timeout / 1000, request_id: request_id) do - response_queue.pop - end + timeout_seconds = @request_timeout / 1000.0 + deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout_seconds - # Check if we received a shutdown error sentinel - if result.is_a?(Errors::TransportError) - raise result - end + # Poll non-blocking; Thread.join(timeout) can starve under HTTPX callbacks. + loop do + result = response_queue.pop(true) + # Background thread pushes exceptions directly into the queue to preserve their + # original type (e.g. SessionExpiredError). Re-raise any of them here. + raise result if result.is_a?(Exception) - result - rescue RubyLLM::MCP::Errors::TimeoutError => e - log_message = "StreamableHTTP request timeout (ID: #{request_id}) after #{@request_timeout / 1000} seconds" - RubyLLM::MCP.logger.error(log_message) + return result + rescue ThreadError + if Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline + log_message = + "StreamableHTTP request timeout (ID: #{request_id}) after #{@request_timeout / 1000} seconds" + RubyLLM::MCP.logger.error(log_message) + @pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) } + raise Errors::TimeoutError.new( + message: "Request timed out after #{@request_timeout / 1000} seconds", + request_id: request_id + ) + end + + sleep(0.05) + end + ensure + # Clean up the pending entry on any exit path not already handled above (e.g. when + # an exception escapes from pop itself rather than being pushed via the queue). @pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) } - raise e end def cleanup_sse_resources diff --git a/spec/ruby_llm/mcp/native/transports/streamable_http_spec.rb b/spec/ruby_llm/mcp/native/transports/streamable_http_spec.rb index 6383b66..8970f0a 100644 --- a/spec/ruby_llm/mcp/native/transports/streamable_http_spec.rb +++ b/spec/ruby_llm/mcp/native/transports/streamable_http_spec.rb @@ -2171,6 +2171,191 @@ end end + describe "background thread routing for wait_for_response: true requests" do + before do + WebMock.enable! + end + + after do + WebMock.reset! + WebMock.enable! + end + + context "when queue is registered (wait_for_response: true)" do + it "delivers successful 200 JSON response via background thread queue" do + stub_request(:post, TestServerManager::HTTP_SERVER_URL) + .to_return( + status: 200, + headers: { "Content-Type" => "application/json" }, + body: { "jsonrpc" => "2.0", "id" => 1, "result" => { "tools" => [] } }.to_json + ) + + result = transport.request({ "method" => "tools/list", "id" => 1 }) + + expect(result).to be_a(RubyLLM::MCP::Result) + expect(result.result["tools"]).to eq([]) + end + + it "preserves AuthenticationRequiredError type from background thread without wrapping" do + stub_request(:post, TestServerManager::HTTP_SERVER_URL) + .to_return(status: 401) + + expect do + transport.request({ "method" => "tools/list", "id" => 2 }) + end.to raise_error( + RubyLLM::MCP::Errors::AuthenticationRequiredError, + /no OAuth provider configured/ + ) + end + end + end + + describe "inline SSE stream handling for POST responses" do + let(:request_id) { 1 } + let(:response_queue) { Queue.new } + let(:http_client) { RubyLLM::MCP::Native::Transports::Support::HTTPClient } + let(:fake_client_class) do + Class.new do + attr_reader :callback + + def plugin(_) + self + end + + def on_response_body_chunk(&block) + @callback = block + self + end + + def with(*) + self + end + end + end + let(:fake_client) { fake_client_class.new } + + before do + transport + + allow(mock_coordinator).to receive(:process_result) { |result| result } + allow(http_client).to receive(:connection).and_return(fake_client) + + transport.instance_variable_get(:@pending_mutex).synchronize do + transport.instance_variable_get(:@pending_requests)[request_id.to_s] = response_queue + end + end + + after do + allow(http_client).to receive(:connection).and_call_original + end + + it "pushes inline SSE result to queue and closes request when fulfilled" do + transport.send(:create_connection_with_streaming_callbacks, request_id, close_when_fulfilled: true) + + request = instance_double(HTTPX::Request, uri: "http://example.test", close: nil) + response = instance_double(HTTPX::Response, headers: { "mcp-session-id" => "session-abc" }) + payload = { "jsonrpc" => "2.0", "id" => request_id, "result" => { "ok" => true } }.to_json + + expect(request).to receive(:close) + + fake_client.callback.call(request, response, "data: #{payload}\n\n") + + result = response_queue.pop(true) + expect(result).to be_a(RubyLLM::MCP::Result) + expect(result.result["ok"]).to be(true) + expect(transport.instance_variable_get(:@session_id)).to eq("session-abc") + + pending_requests = transport.instance_variable_get(:@pending_requests) + expect(pending_requests).not_to have_key(request_id.to_s) + end + + it "does not close request when close_when_fulfilled is false" do + transport.send(:create_connection_with_streaming_callbacks, request_id, close_when_fulfilled: false) + + request = instance_double(HTTPX::Request, uri: "http://example.test", close: nil) + response = instance_double(HTTPX::Response, headers: {}) + payload = { "jsonrpc" => "2.0", "id" => request_id, "result" => { "ok" => true } }.to_json + + expect(request).not_to receive(:close) + + fake_client.callback.call(request, response, "data: #{payload}\n\n") + + result = response_queue.pop(true) + expect(result).to be_a(RubyLLM::MCP::Result) + end + end + + describe "exception sentinel re-raising in wait_for_response_with_timeout" do + let(:request_id) { "sentinel-rethrow-test" } + let(:response_queue) { Queue.new } + + before do + transport.instance_variable_get(:@pending_mutex).synchronize do + transport.instance_variable_get(:@pending_requests)[request_id] = response_queue + end + end + + it "re-raises SessionExpiredError pushed to queue by background thread" do + session_error = RubyLLM::MCP::Errors::SessionExpiredError.new( + message: "Session has expired" + ) + response_queue.push(session_error) + + expect do + transport.send(:wait_for_response_with_timeout, request_id, response_queue) + end.to raise_error(RubyLLM::MCP::Errors::SessionExpiredError, /Session has expired/) + end + + it "re-raises AuthenticationRequiredError pushed to queue by background thread" do + auth_error = RubyLLM::MCP::Errors::AuthenticationRequiredError.new( + message: "Authentication required" + ) + response_queue.push(auth_error) + + expect do + transport.send(:wait_for_response_with_timeout, request_id, response_queue) + end.to raise_error(RubyLLM::MCP::Errors::AuthenticationRequiredError, /Authentication required/) + end + + it "cleans up pending request via ensure block when exception sentinel is re-raised" do + session_error = RubyLLM::MCP::Errors::SessionExpiredError.new(message: "expired") + response_queue.push(session_error) + + begin + transport.send(:wait_for_response_with_timeout, request_id, response_queue) + rescue RubyLLM::MCP::Errors::SessionExpiredError + nil + end + + expect(transport.instance_variable_get(:@pending_requests)).not_to have_key(request_id) + end + end + + describe "polling timeout loop in wait_for_response_with_timeout" do + it "times out and cleans up when queue remains empty" do + short_timeout_transport = described_class.new( + url: TestServerManager::HTTP_SERVER_URL, + request_timeout: 50, + coordinator: mock_coordinator, + options: {} + ) + + request_id = "polling-timeout" + response_queue = Queue.new + + short_timeout_transport.instance_variable_get(:@pending_mutex).synchronize do + short_timeout_transport.instance_variable_get(:@pending_requests)[request_id] = response_queue + end + + expect do + short_timeout_transport.send(:wait_for_response_with_timeout, request_id, response_queue) + end.to raise_error(RubyLLM::MCP::Errors::TimeoutError, /Request timed out/) + + pending_requests = short_timeout_transport.instance_variable_get(:@pending_requests) + expect(pending_requests).not_to have_key(request_id) + end + end + describe "204 No Content response handling for session termination" do before do WebMock.enable! From e373e4225e06c9c38648f90e870a8557f47c73cc Mon Sep 17 00:00:00 2001 From: Philippe Lehoux Date: Sun, 22 Feb 2026 17:48:46 -0500 Subject: [PATCH 2/2] Remove comments as per @patvice advice --- .../mcp/native/transports/streamable_http.rb | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/lib/ruby_llm/mcp/native/transports/streamable_http.rb b/lib/ruby_llm/mcp/native/transports/streamable_http.rb index dfb9517..d864090 100644 --- a/lib/ruby_llm/mcp/native/transports/streamable_http.rb +++ b/lib/ruby_llm/mcp/native/transports/streamable_http.rb @@ -120,7 +120,6 @@ def request(body, wait_for_response: true) # Extract the request ID from the body (if present) request_id = body.is_a?(Hash) ? (body["id"] || body[:id]) : nil - # Register queue before send_http_request to avoid push-before-register races. response_queue = setup_response_queue(request_id, wait_for_response) result = send_http_request(body, request_id) return result if result.is_a?(RubyLLM::MCP::Result) @@ -315,13 +314,6 @@ def send_http_request(body, request_id) json_body = JSON.generate(body) RubyLLM::MCP.logger.debug "Sending Request: #{json_body}" - # Use a background thread only when a queue is registered for this request. - # When wait_for_response: false no queue is registered, so we run synchronously to - # ensure errors and results propagate immediately back to the caller. - # When wait_for_response: true the queue exists, so we must be async: some servers - # (e.g. Notion) respond with an inline SSE stream that stays open indefinitely after - # delivering the result, and blocking the main thread inside post() would cause a - # TimeoutError before wait_for_response_with_timeout is ever reached. use_background = request_id && @pending_mutex.synchronize { @pending_requests.key?(request_id.to_s) } if use_background @@ -337,9 +329,6 @@ def send_request_in_background(body, request_id, headers) response = request_client.post(@url, json: body, headers: headers) handle_response(response, request_id, body) rescue Errors::BaseError => e - # Push all BaseError subclasses directly so wait_for_response_with_timeout - # re-raises them with their original type (e.g. SessionExpiredError, not - # a generic TransportError wrapper). @pending_mutex.synchronize do queue = @pending_requests.delete(request_id.to_s) queue&.push(e) @@ -373,8 +362,6 @@ def create_connection_with_streaming_callbacks(request_id, close_when_fulfilled: client = client.on_response_body_chunk do |request, response, chunk| next unless running? - # Capture session ID from response headers before processing SSE events so that - # initialize_notification (sent after the initialize result is dequeued) includes it. if (session_id = response.headers["mcp-session-id"]) && !@session_id @session_id = session_id end @@ -383,7 +370,6 @@ def create_connection_with_streaming_callbacks(request_id, close_when_fulfilled: buffer << chunk process_sse_buffer_events(buffer, request_id&.to_s) - # Only close when wait_for_response registered a queue; fire-and-forget IDs shouldn't auto-close. if close_when_fulfilled && request_id fulfilled = @pending_mutex.synchronize { !@pending_requests.key?(request_id.to_s) } request.close if fulfilled @@ -448,9 +434,6 @@ def handle_success_response(response, request_id, _original_message) result = RubyLLM::MCP::Result.new(json_response, session_id: @session_id) if request_id - # Push to the queue rather than returning up the call stack: this method runs - # inside a background thread, so the return value is never observed by the - # caller. The main thread receives the result via wait_for_response_with_timeout. @pending_mutex.synchronize do queue = @pending_requests.delete(request_id.to_s) queue&.push(result) @@ -949,11 +932,8 @@ def wait_for_response_with_timeout(request_id, response_queue) timeout_seconds = @request_timeout / 1000.0 deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout_seconds - # Poll non-blocking; Thread.join(timeout) can starve under HTTPX callbacks. loop do result = response_queue.pop(true) - # Background thread pushes exceptions directly into the queue to preserve their - # original type (e.g. SessionExpiredError). Re-raise any of them here. raise result if result.is_a?(Exception) return result @@ -972,8 +952,6 @@ def wait_for_response_with_timeout(request_id, response_queue) sleep(0.05) end ensure - # Clean up the pending entry on any exit path not already handled above (e.g. when - # an exception escapes from pop itself rather than being pushed via the queue). @pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) } end