Skip to content

DRAFT - HttpRequest.Builder customizer #388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.modelcontextprotocol.client.transport;

import java.net.URI;
import java.net.http.HttpRequest;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/**
* Customize {@link HttpRequest.Builder} before sending out SSE or Streamable HTTP
* transport.
* <p>
* When used in a non-blocking context, implementations MUST be non-blocking.
*/
public interface AsyncHttpRequestCustomizer {

Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
@Nullable String body);

AsyncHttpRequestCustomizer NOOP = new Noop();

/**
* Wrap a sync implementation in an async wrapper.
* <p>
* Do NOT use in a non-blocking context.
*/
static AsyncHttpRequestCustomizer fromSync(SyncHttpRequestCustomizer customizer) {
return (builder, method, uri, body) -> Mono.defer(() -> {
customizer.customize(builder, method, uri, body);
return Mono.just(builder);
});
}

class Noop implements AsyncHttpRequestCustomizer {

@Override
public Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
String body) {
return Mono.just(builder);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public class HttpClientSseClientTransport implements McpClientTransport {
*/
protected final Sinks.One<String> messageEndpointSink = Sinks.one();

// TODO
private final AsyncHttpRequestCustomizer httpRequestCustomizer;

/**
* Creates a new transport instance with default HTTP client and object mapper.
* @param baseUri the base URI of the MCP server
Expand Down Expand Up @@ -172,18 +175,38 @@ public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, HttpReques
* @param objectMapper the object mapper for JSON serialization/deserialization
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
*/
@Deprecated(forRemoval = true)
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
String sseEndpoint, ObjectMapper objectMapper) {
this(httpClient, requestBuilder, baseUri, sseEndpoint, objectMapper, AsyncHttpRequestCustomizer.NOOP);
}

/**
* Creates a new transport instance with custom HTTP client builder, object mapper,
* and headers.
* @param httpClient the HTTP client to use
* @param requestBuilder the HTTP request builder to use
* @param baseUri the base URI of the MCP server
* @param sseEndpoint the SSE endpoint path
* @param objectMapper the object mapper for JSON serialization/deserialization
* @param httpRequestCustomizer customizer for the requestBuilder before sending
* requests
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
*/
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
String sseEndpoint, ObjectMapper objectMapper, AsyncHttpRequestCustomizer httpRequestCustomizer) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.hasText(baseUri, "baseUri must not be empty");
Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
Assert.notNull(httpClient, "httpClient must not be null");
Assert.notNull(requestBuilder, "requestBuilder must not be null");
Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null");
this.baseUri = URI.create(baseUri);
this.sseEndpoint = sseEndpoint;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
this.httpRequestCustomizer = httpRequestCustomizer;
}

/**
Expand Down Expand Up @@ -213,6 +236,8 @@ public static class Builder {
private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.header("Content-Type", "application/json");

private AsyncHttpRequestCustomizer httpRequestCustomizer = AsyncHttpRequestCustomizer.NOOP;

/**
* Creates a new builder instance.
*/
Expand Down Expand Up @@ -310,96 +335,111 @@ public Builder objectMapper(ObjectMapper objectMapper) {
return this;
}

/**
* In reactive, DONT USE THIS. Use AsyncHttpRequestCustomizer.
*/
public Builder httpRequestCustomizer(SyncHttpRequestCustomizer syncHttpRequestCustomizer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels awkward. Perhaps we should leave only the async method and let the user convert the sync customizer outside

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point.

I’m trying to ensure sync-based users don’t have to think about async versions of this API at all. My goal is for the user to make a “sync customizer” and only have to worry about that. Making the transformation outside is one more layer they’d have to think about (and that we would have to explain in the docs).
Also, we’ve had feedback from people absolutely NOT wanting to deal with async.

let me know what you think; happy to make the change to “only async” if you think it’s more confusing than helpful.

this.httpRequestCustomizer = AsyncHttpRequestCustomizer.fromSync(syncHttpRequestCustomizer);
return this;
}

public Builder httpRequestCustomizer(AsyncHttpRequestCustomizer asyncHttpRequestCustomizer) {
this.httpRequestCustomizer = asyncHttpRequestCustomizer;
return this;
}

/**
* Builds a new {@link HttpClientSseClientTransport} instance.
* @return a new transport instance
*/
public HttpClientSseClientTransport build() {
return new HttpClientSseClientTransport(clientBuilder.build(), requestBuilder, baseUri, sseEndpoint,
objectMapper);
objectMapper, httpRequestCustomizer);
}

}

@Override
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
var uri = Utils.resolveUri(this.baseUri, this.sseEndpoint);

return Mono.create(sink -> {

HttpRequest request = requestBuilder.copy()
.uri(Utils.resolveUri(this.baseUri, this.sseEndpoint))
return Mono
.just(requestBuilder.copy()
.uri(uri)
.header("Accept", "text/event-stream")
.header("Cache-Control", "no-cache")
.GET()
.build();

Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
.exceptionallyCompose(e -> {
sseSink.error(e);
return CompletableFuture.failedFuture(e);
}))
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
.flatMap(responseEvent -> {
if (isClosing) {
return Mono.empty();
}

int statusCode = responseEvent.responseInfo().statusCode();
.GET())
.flatMap(builder -> Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null)))
.map(HttpRequest.Builder::build)
.flatMap(request -> Mono.create(sink -> {
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
.exceptionallyCompose(e -> {
sseSink.error(e);
return CompletableFuture.failedFuture(e);
}))
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
.flatMap(responseEvent -> {
if (isClosing) {
return Mono.empty();
}

if (statusCode >= 200 && statusCode < 300) {
try {
if (ENDPOINT_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
String messageEndpointUri = responseEvent.sseEvent().data();
if (this.messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
int statusCode = responseEvent.responseInfo().statusCode();

if (statusCode >= 200 && statusCode < 300) {
try {
if (ENDPOINT_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
String messageEndpointUri = responseEvent.sseEvent().data();
if (this.messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
sink.success();
return Flux.empty(); // No further processing
// needed
}
else {
sink.error(new McpError("Failed to handle SSE endpoint event"));
}
}
else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper,
responseEvent.sseEvent().data());
sink.success();
return Flux.empty(); // No further processing needed
return Flux.just(message);
}
else {
sink.error(new McpError("Failed to handle SSE endpoint event"));
logger.error("Received unrecognized SSE event type: {}",
responseEvent.sseEvent().event());
sink.error(new McpError("Received unrecognized SSE event type: "
+ responseEvent.sseEvent().event()));
}
}
else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper,
responseEvent.sseEvent().data());
sink.success();
return Flux.just(message);
}
else {
logger.error("Received unrecognized SSE event type: {}",
responseEvent.sseEvent().event());
sink.error(new McpError(
"Received unrecognized SSE event type: " + responseEvent.sseEvent().event()));
catch (IOException e) {
logger.error("Error processing SSE event", e);
sink.error(new McpError("Error processing SSE event"));
}
}
catch (IOException e) {
logger.error("Error processing SSE event", e);
sink.error(new McpError("Error processing SSE event"));
return Flux.<McpSchema.JSONRPCMessage>error(
new RuntimeException("Failed to send message: " + responseEvent));

})
.flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage)))
.onErrorComplete(t -> {
if (!isClosing) {
logger.warn("SSE stream observed an error", t);
sink.error(t);
}
}
return Flux.<McpSchema.JSONRPCMessage>error(
new RuntimeException("Failed to send message: " + responseEvent));

})
.flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage)))
.onErrorComplete(t -> {
if (!isClosing) {
logger.warn("SSE stream observed an error", t);
sink.error(t);
}
return true;
})
.doFinally(s -> {
Disposable ref = this.sseSubscription.getAndSet(null);
if (ref != null && !ref.isDisposed()) {
ref.dispose();
}
})
.contextWrite(sink.contextView())
.subscribe();
return true;
})
.doFinally(s -> {
Disposable ref = this.sseSubscription.getAndSet(null);
if (ref != null && !ref.isDisposed()) {
ref.dispose();
}
})
.contextWrite(sink.contextView())
.subscribe();

this.sseSubscription.set(connection);
});
this.sseSubscription.set(connection);
}));
}

/**
Expand Down Expand Up @@ -455,13 +495,11 @@ private Mono<String> serializeMessage(final JSONRPCMessage message) {

private Mono<HttpResponse<Void>> sendHttpPost(final String endpoint, final String body) {
final URI requestUri = Utils.resolveUri(baseUri, endpoint);
final HttpRequest request = this.requestBuilder.copy()
.uri(requestUri)
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();

// TODO: why discard the body?
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding()));
return Mono.just(this.requestBuilder.copy().uri(requestUri).POST(HttpRequest.BodyPublishers.ofString(body)))
.flatMap(builder -> Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body)))
.map(HttpRequest.Builder::build)
// TODO: why discard the body?
.flatMap(request -> Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding())));
}

/**
Expand Down
Loading
Loading