Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance StreamResponse handling and update dependencies #121

Merged
merged 9 commits into from
Oct 30, 2024
142 changes: 79 additions & 63 deletions src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal static class ResponseBuilderDefaults

public static readonly Type[] SpecialTypes =
{
typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse)
typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse), typeof(StreamResponse)
};
}

Expand Down Expand Up @@ -66,11 +66,8 @@ IReadOnlyDictionary<TcpState, int> tcpStats
// Only attempt to set the body if the response may have content
if (MayHaveBody(statusCode, requestData.Method, contentLength))
response = SetBody<TResponse>(details, requestData, responseStream, mimeType);
else
responseStream.Dispose();

response ??= new TResponse();

response.ApiCallDetails = details;
return response;
}
Expand Down Expand Up @@ -101,11 +98,8 @@ public override async Task<TResponse> ToResponseAsync<TResponse>(
if (MayHaveBody(statusCode, requestData.Method, contentLength))
response = await SetBodyAsync<TResponse>(details, requestData, responseStream, mimeType,
cancellationToken).ConfigureAwait(false);
else
responseStream.Dispose();

response ??= new TResponse();

response.ApiCallDetails = details;
return response;
}
Expand Down Expand Up @@ -211,6 +205,8 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
var disableDirectStreaming = requestData.PostData?.DisableDirectStreaming ?? requestData.ConnectionSettings.DisableDirectStreaming;
var requiresErrorDeserialization = RequiresErrorDeserialization(details, requestData);

var ownsStream = false;

if (disableDirectStreaming || NeedsToEagerReadStream<TResponse>() || requiresErrorDeserialization)
{
var inMemoryStream = requestData.MemoryStreamFactory.Create();
Expand All @@ -221,90 +217,111 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
responseStream.CopyTo(inMemoryStream, BufferSize);

bytes = SwapStreams(ref responseStream, ref inMemoryStream);
ownsStream = true;
details.ResponseBodyInBytes = bytes;
}

using (responseStream)
if (TrySetSpecialType<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var response))
{
if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;
ConditionalDisposal(responseStream, ownsStream, response);
return response;
}

if (details.HttpStatusCode.HasValue &&
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
return null;
if (details.HttpStatusCode.HasValue &&
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
{
ConditionalDisposal(responseStream, ownsStream, response);
return null;
}

var serializer = requestData.ConnectionSettings.RequestResponseSerializer;
var serializer = requestData.ConnectionSettings.RequestResponseSerializer;

TResponse response;
if (requestData.CustomResponseBuilder != null)
{
var beforeTicks = Stopwatch.GetTimestamp();
if (requestData.CustomResponseBuilder != null)
{
var beforeTicks = Stopwatch.GetTimestamp();

if (isAsync)
response = await requestData.CustomResponseBuilder
.DeserializeResponseAsync(serializer, details, responseStream, cancellationToken)
.ConfigureAwait(false) as TResponse;
else
response = requestData.CustomResponseBuilder
.DeserializeResponse(serializer, details, responseStream) as TResponse;
if (isAsync)
response = await requestData.CustomResponseBuilder
.DeserializeResponseAsync(serializer, details, responseStream, cancellationToken)
.ConfigureAwait(false) as TResponse;
else
response = requestData.CustomResponseBuilder
.DeserializeResponse(serializer, details, responseStream) as TResponse;

var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);

ConditionalDisposal(responseStream, ownsStream, response);
return response;
}

// TODO: Handle empty data in a nicer way as throwing exceptions has a cost we'd like to avoid!
// ie. check content-length (add to ApiCallDetails)? Content-length cannot be retrieved from a GZip content stream which is annoying.
try
{
if (requiresErrorDeserialization && TryGetError(details, requestData, responseStream, out var error) && error.HasError())
{
response = new TResponse();
SetErrorOnResponse(response, error);
ConditionalDisposal(responseStream, ownsStream, response);
return response;
}

// TODO: Handle empty data in a nicer way as throwing exceptions has a cost we'd like to avoid!
// ie. check content-length (add to ApiCallDetails)? Content-length cannot be retrieved from a GZip content stream which is annoying.
try
if (!requestData.ValidateResponseContentType(mimeType))
{
if (requiresErrorDeserialization && TryGetError(details, requestData, responseStream, out var error) && error.HasError())
{
response = new TResponse();
SetErrorOnResponse(response, error);
return response;
}
ConditionalDisposal(responseStream, ownsStream, response);
return default;
}

if (!requestData.ValidateResponseContentType(mimeType))
return default;
var beforeTicks = Stopwatch.GetTimestamp();

var beforeTicks = Stopwatch.GetTimestamp();
if (isAsync)
response = await serializer.DeserializeAsync<TResponse>(responseStream, cancellationToken).ConfigureAwait(false);
else
response = serializer.Deserialize<TResponse>(responseStream);

if (isAsync)
response = await serializer.DeserializeAsync<TResponse>(responseStream, cancellationToken).ConfigureAwait(false);
else
response = serializer.Deserialize<TResponse>(responseStream);
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);

var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);

if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
ConditionalDisposal(responseStream, ownsStream, response);
return response;
}
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
{
// Note the exception this handles is ONLY thrown after a check if the stream length is zero.
// When the length is zero, `default` is returned by Deserialize(Async) instead.
ConditionalDisposal(responseStream, ownsStream, response);
return default;
}

return response;
}
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
{
return default;
}
static void ConditionalDisposal(Stream responseStream, bool ownsStream, TResponse response)
{
// We only dispose of the responseStream if we created it (i.e. it is a MemoryStream) we
// created via MemoryStreamFactory.
if (ownsStream && (response is null || !response.LeaveOpen))
responseStream.Dispose();
}
}

private static bool SetSpecialTypes<TResponse>(string mimeType, byte[] bytes, Stream responseStream,
MemoryStreamFactory memoryStreamFactory, out TResponse cs)
private static bool TrySetSpecialType<TResponse>(string mimeType, byte[] bytes, Stream responseStream,
MemoryStreamFactory memoryStreamFactory, out TResponse response)
where TResponse : TransportResponse, new()
{
cs = null;
response = null;
var responseType = typeof(TResponse);
if (!SpecialTypes.Contains(responseType)) return false;

if (responseType == typeof(StringResponse))
cs = new StringResponse(bytes.Utf8String()) as TResponse;
response = new StringResponse(bytes.Utf8String()) as TResponse;
else if (responseType == typeof(StreamResponse))
cs = new StreamResponse(responseStream, mimeType) as TResponse;
response = new StreamResponse(responseStream, mimeType) as TResponse;
else if (responseType == typeof(BytesResponse))
cs = new BytesResponse(bytes) as TResponse;
response = new BytesResponse(bytes) as TResponse;
else if (responseType == typeof(VoidResponse))
cs = VoidResponse.Default as TResponse;
response = VoidResponse.Default as TResponse;
else if (responseType == typeof(DynamicResponse))
{
//if not json store the result under "body"
Expand All @@ -314,17 +331,17 @@ private static bool SetSpecialTypes<TResponse>(string mimeType, byte[] bytes, St
{
["body"] = new DynamicValue(bytes.Utf8String())
};
cs = new DynamicResponse(dictionary) as TResponse;
response = new DynamicResponse(dictionary) as TResponse;
}
else
{
using var ms = memoryStreamFactory.Create(bytes);
var body = LowLevelRequestResponseSerializer.Instance.Deserialize<DynamicDictionary>(ms);
cs = new DynamicResponse(body) as TResponse;
response = new DynamicResponse(body) as TResponse;
}
}

return cs != null;
return response != null;
}

private static bool NeedsToEagerReadStream<TResponse>()
Expand All @@ -336,7 +353,6 @@ private static bool NeedsToEagerReadStream<TResponse>()
private static byte[] SwapStreams(ref Stream responseStream, ref MemoryStream ms)
{
var bytes = ms.ToArray();
responseStream.Dispose();
responseStream = ms;
responseStream.Position = 0;
return bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
Exception ex = null;
string mimeType = null;
long contentLength = -1;
IDisposable receive = DiagnosticSources.SingletonDisposable;
IDisposable receivedResponse = DiagnosticSources.SingletonDisposable;
ReadOnlyDictionary<TcpState, int> tcpStats = null;
ReadOnlyDictionary<string, ThreadPoolStatistics> threadPoolStats = null;
Dictionary<string, IEnumerable<string>> responseHeaders = null;
Expand Down Expand Up @@ -118,7 +118,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
responseMessage = client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).GetAwaiter().GetResult();
#endif

receive = responseMessage;
receivedResponse = responseMessage;
statusCode = (int)responseMessage.StatusCode;
}

Expand Down Expand Up @@ -154,13 +154,10 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
ex = e;
}

var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);
TResponse response;

using (isStreamResponse ? DiagnosticSources.SingletonDisposable : receive)
using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null)
try
{
TResponse response;

if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
Expand All @@ -169,9 +166,18 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

// Defer disposal of the response message
if (response is StreamResponse sr)
sr.Finalizer = () => receive.Dispose();
// Unless indicated otherwise by the TransportResponse, we've now handled the response stream, so we can dispose of the HttpResponseMessage
// to release the connection. In cases, where the derived response works directly on the stream, it can be left open and additional IDisposable
// resources can be linked such that their disposal is deferred.
if (response.LeaveOpen)
{
response.LinkedDisposables = [receivedResponse, responseStream];
}
else
{
responseStream.Dispose();
receivedResponse.Dispose();
}

if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
return response;
Expand All @@ -185,6 +191,13 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req

return response;
}
catch
{
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
responseStream.Dispose();
receivedResponse.Dispose();
throw;
}
}

private static Dictionary<string, IEnumerable<string>>? ParseHeaders(RequestData requestData, HttpResponseMessage responseMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
Exception ex = null;
string mimeType = null;
long contentLength = -1;
IDisposable receivedResponse = DiagnosticSources.SingletonDisposable;
ReadOnlyDictionary<TcpState, int> tcpStats = null;
ReadOnlyDictionary<string, ThreadPoolStatistics> threadPoolStats = null;
Dictionary<string, IEnumerable<string>> responseHeaders = null;
Expand Down Expand Up @@ -146,6 +147,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
httpWebResponse = (HttpWebResponse)request.GetResponse();
}

receivedResponse = httpWebResponse;

HandleResponse(httpWebResponse, out statusCode, out responseStream, out mimeType);
responseHeaders = ParseHeaders(requestData, httpWebResponse, responseHeaders);
contentLength = httpWebResponse.ContentLength;
Expand All @@ -161,28 +164,50 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
{
unregisterWaitHandle?.Invoke();
}
responseStream ??= Stream.Null;

TResponse response;

if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
try
{
var attributes = requestData.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
foreach (var attribute in attributes)
TResponse response;

if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

// Unless indicated otherwise by the TransportResponse, we've now handled the response stream, so we can dispose of the HttpResponseMessage
// to release the connection. In cases, where the derived response works directly on the stream, it can be left open and additional IDisposable
// resources can be linked such that their disposal is deferred.
if (response.LeaveOpen)
{
Activity.Current?.SetTag(attribute.Key, attribute.Value);
response.LinkedDisposables = [receivedResponse, responseStream];
}
else
{
responseStream.Dispose();
receivedResponse.Dispose();
}
}

return response;
if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
{
var attributes = requestData.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
foreach (var attribute in attributes)
{
Activity.Current?.SetTag(attribute.Key, attribute.Value);
}
}

return response;
}
catch
{
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
responseStream.Dispose();
receivedResponse.Dispose();
throw;
}
}

private static Dictionary<string, IEnumerable<string>> ParseHeaders(RequestData requestData, HttpWebResponse responseMessage, Dictionary<string, IEnumerable<string>> responseHeaders)
Expand Down
Loading
Loading