Skip to content

Commit

Permalink
Finalise implementation and test all possible scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
stevejgordon committed Oct 29, 2024
1 parent 1e7ba25 commit 24a1224
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 52 deletions.
43 changes: 31 additions & 12 deletions src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,15 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
details.ResponseBodyInBytes = bytes;
}

if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;
if (TrySetSpecialType<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;

if (details.HttpStatusCode.HasValue &&
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
{
// In this scenario, we always dispose as we've explicitly skipped reading the response
responseStream.Dispose();
return null;
}

var serializer = requestData.ConnectionSettings.RequestResponseSerializer;

Expand All @@ -249,6 +253,9 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);

if (!response.LeaveOpen)
responseStream.Dispose();

return response;
}

Expand All @@ -260,11 +267,18 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
{
response = new TResponse();
SetErrorOnResponse(response, error);

if (!response.LeaveOpen)
responseStream.Dispose();

return response;
}

if (!requestData.ValidateResponseContentType(mimeType))
{
responseStream.Dispose();
return default;
}

var beforeTicks = Stopwatch.GetTimestamp();

Expand All @@ -278,34 +292,36 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);

if (!response.LeaveOpen)
if (response is null || !response.LeaveOpen)
responseStream.Dispose();

return response;
}
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
{
// Note that this is only thrown after a check if the stream length is zero. When the length is zero,
// `default` is returned by Deserialize(Async) instead.
responseStream.Dispose();
return default;
}
}

private static bool SetSpecialTypes<TResponse>(string mimeType, byte[] bytes, Stream responseStream,
MemoryStreamFactory memoryStreamFactory, out TResponse cs)
private static bool TrySetSpecialType<TResponse>(string mimeType, byte[] bytes, Stream responseStream,
MemoryStreamFactory memoryStreamFactory, out TResponse response)
where TResponse : TransportResponse, new()
{
cs = null;
response = null;
var responseType = typeof(TResponse);
if (!SpecialTypes.Contains(responseType)) return false;

if (responseType == typeof(StringResponse))
cs = new StringResponse(bytes.Utf8String()) as TResponse;
response = new StringResponse(bytes.Utf8String()) as TResponse;
else if (responseType == typeof(StreamResponse))
cs = new StreamResponse(responseStream, mimeType) as TResponse;
response = new StreamResponse(responseStream, mimeType) as TResponse;
else if (responseType == typeof(BytesResponse))
cs = new BytesResponse(bytes) as TResponse;
response = new BytesResponse(bytes) as TResponse;
else if (responseType == typeof(VoidResponse))
cs = VoidResponse.Default as TResponse;
response = VoidResponse.Default as TResponse;
else if (responseType == typeof(DynamicResponse))
{
//if not json store the result under "body"
Expand All @@ -315,17 +331,20 @@ private static bool SetSpecialTypes<TResponse>(string mimeType, byte[] bytes, St
{
["body"] = new DynamicValue(bytes.Utf8String())
};
cs = new DynamicResponse(dictionary) as TResponse;
response = new DynamicResponse(dictionary) as TResponse;
}
else
{
using var ms = memoryStreamFactory.Create(bytes);
var body = LowLevelRequestResponseSerializer.Instance.Deserialize<DynamicDictionary>(ms);
cs = new DynamicResponse(body) as TResponse;
response = new DynamicResponse(body) as TResponse;
}
}

return cs != null;
if (!response.LeaveOpen)
responseStream.Dispose();

return response != null;
}

private static bool NeedsToEagerReadStream<TResponse>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,17 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

// Defer disposal of the response message
if (response is StreamResponse sr)
sr.Finalizer = () => receivedResponse.Dispose();
// Unless indicated otherwise by the TransportResponse, we've now handled the response stream, so we can dispose of the HttpResponseMessage
// to release the connection. In cases, where the derived response works directly on the stream, it can be left open and additional IDisposable
// resources can be linked such that their disposal is deferred.
if (response.LeaveOpen)
{
response.LinkedDisposables = [receivedResponse];
}
else
{
receivedResponse.Dispose();
}

if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
return response;
Expand All @@ -180,11 +188,6 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
foreach (var attribute in attributes)
Activity.Current?.SetTag(attribute.Key, attribute.Value);

// Unless indicated otherwise by the TransportResponse, we've now handled the response stream, so we can dispose of the HttpResponseMessage
// to release the connection.
if (!response.LeaveOpen)
receivedResponse.Dispose();

return response;
}
catch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,17 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

// Defer disposal of the response message
if (response is StreamResponse sr)
sr.Finalizer = () => receivedResponse.Dispose();
// Unless indicated otherwise by the TransportResponse, we've now handled the response stream, so we can dispose of the HttpResponseMessage
// to release the connection. In cases, where the derived response works directly on the stream, it can be left open and additional IDisposable
// resources can be linked such that their disposal is deferred.
if (response.LeaveOpen)
{
response.LinkedDisposables = [receivedResponse];
}
else
{
receivedResponse.Dispose();
}

if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
{
Expand All @@ -190,9 +198,6 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
}
}

if (!response.LeaveOpen)
receivedResponse.Dispose();

return response;
}
catch
Expand Down
13 changes: 7 additions & 6 deletions src/Elastic.Transport/Responses/Special/StreamResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,10 @@ namespace Elastic.Transport;
/// <strong>MUST</strong> be disposed after use to ensure the HTTP connection is freed for reuse.
/// </para>
/// </summary>
public class StreamResponse :
TransportResponse<Stream>,
IDisposable
public class StreamResponse : TransportResponse<Stream>, IDisposable
{
private bool _disposed;

internal Action? Finalizer { get; set; }

/// <summary>
/// The MIME type of the response, if present.
/// </summary>
Expand Down Expand Up @@ -53,7 +49,12 @@ protected virtual void Dispose(bool disposing)
if (disposing)
{
Body.Dispose();
Finalizer?.Invoke();

if (LinkedDisposables is not null)
{
foreach (var disposable in LinkedDisposables)
disposable.Dispose();
}
}

_disposed = true;
Expand Down
6 changes: 6 additions & 0 deletions src/Elastic.Transport/Responses/TransportResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using System.Text.Json.Serialization;

namespace Elastic.Transport;
Expand Down Expand Up @@ -35,6 +37,10 @@ public override string ToString() => ApiCallDetails?.DebugInformation
// ReSharper disable once ConstantNullCoalescingCondition
?? $"{nameof(ApiCallDetails)} not set, likely a bug, reverting to default ToString(): {base.ToString()}";

[JsonIgnore]
internal IEnumerable<IDisposable>? LinkedDisposables { get; set; }

[JsonIgnore]
internal virtual bool LeaveOpen { get; } = false;
}

Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
// See the LICENSE file in the project root for more information

using System;
using Elastic.Transport.Products;

namespace Elastic.Transport.Tests.Plumbing
{
public static class InMemoryConnectionFactory
{
public static TransportConfiguration Create()
public static TransportConfiguration Create(ProductRegistration productRegistration = null)
{
var invoker = new InMemoryRequestInvoker();
var pool = new SingleNodePool(new Uri("http://localhost:9200"));
var settings = new TransportConfiguration(pool, invoker);
var settings = new TransportConfiguration(pool, invoker, productRegistration: productRegistration);
return settings;
}
}
Expand Down
Loading

0 comments on commit 24a1224

Please sign in to comment.