From 3fa80784aada3a4c4e4d9da9c0ebf6d5d0300b5f Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Thu, 21 Nov 2024 06:45:47 +0000 Subject: [PATCH 1/6] Null check bug fixes --- src/Elastic.Transport/Responses/DefaultResponseFactory.cs | 4 +--- src/Elastic.Transport/Responses/Special/StreamResponse.cs | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/Elastic.Transport/Responses/DefaultResponseFactory.cs b/src/Elastic.Transport/Responses/DefaultResponseFactory.cs index 46ae787..afaf378 100644 --- a/src/Elastic.Transport/Responses/DefaultResponseFactory.cs +++ b/src/Elastic.Transport/Responses/DefaultResponseFactory.cs @@ -79,13 +79,11 @@ private async ValueTask CreateCoreAsync( IReadOnlyDictionary? tcpStats, CancellationToken cancellationToken = default) where TResponse : TransportResponse, new() { - responseStream.ThrowIfNull(nameof(responseStream)); - var details = InitializeApiCallDetails(endpoint, boundConfiguration, postData, ex, statusCode, headers, contentType, threadPoolStats, tcpStats, contentLength); TResponse? response = null; - if (MayHaveBody(statusCode, endpoint.Method, contentLength) + if (responseStream is not null && MayHaveBody(statusCode, endpoint.Method, contentLength) && TryResolveBuilder(boundConfiguration.ResponseBuilders, boundConfiguration.ProductResponseBuilders, out var builder)) { var ownsStream = false; diff --git a/src/Elastic.Transport/Responses/Special/StreamResponse.cs b/src/Elastic.Transport/Responses/Special/StreamResponse.cs index d097ac3..5e6d813 100644 --- a/src/Elastic.Transport/Responses/Special/StreamResponse.cs +++ b/src/Elastic.Transport/Responses/Special/StreamResponse.cs @@ -53,7 +53,7 @@ protected virtual void Dispose(bool disposing) if (LinkedDisposables is not null) { foreach (var disposable in LinkedDisposables) - disposable.Dispose(); + disposable?.Dispose(); } } From 4907cfd0f25de4ce770e24f1d1e5aeae9cbe3d06 Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Thu, 21 Nov 2024 06:47:56 +0000 Subject: [PATCH 2/6] Expose TransportResponse to derived classes --- src/Elastic.Transport/Responses/TransportResponse.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Elastic.Transport/Responses/TransportResponse.cs b/src/Elastic.Transport/Responses/TransportResponse.cs index 4e7f4a8..d7ad7bd 100644 --- a/src/Elastic.Transport/Responses/TransportResponse.cs +++ b/src/Elastic.Transport/Responses/TransportResponse.cs @@ -46,7 +46,7 @@ public override string ToString() => ApiCallDetails?.DebugInformation /// StreamResponse and kept internal. If we later make this public, we might need to refine this. /// [JsonIgnore] - internal IEnumerable? LinkedDisposables { get; set; } + protected internal IEnumerable? LinkedDisposables { get; internal set; } /// /// Allows the response to identify that the response stream should NOT be automatically disposed. @@ -55,6 +55,6 @@ public override string ToString() => ApiCallDetails?.DebugInformation /// Currently only used by StreamResponse and therefore internal. /// [JsonIgnore] - internal virtual bool LeaveOpen { get; } = false; + protected internal virtual bool LeaveOpen { get; } = false; } From 7cac105b788d166304352737a517a0984cfbb594 Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Thu, 21 Nov 2024 07:00:24 +0000 Subject: [PATCH 3/6] Update StreamResponse --- src/Elastic.Transport/Responses/Special/StreamResponse.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Elastic.Transport/Responses/Special/StreamResponse.cs b/src/Elastic.Transport/Responses/Special/StreamResponse.cs index 5e6d813..4009794 100644 --- a/src/Elastic.Transport/Responses/Special/StreamResponse.cs +++ b/src/Elastic.Transport/Responses/Special/StreamResponse.cs @@ -36,7 +36,10 @@ public StreamResponse(Stream body, string? contentType) ContentType = contentType ?? string.Empty; } - internal override bool LeaveOpen => true; + /// + /// + /// + protected internal override bool LeaveOpen => true; /// /// Disposes the underlying stream. From 35d95ed2e14398671ce4a80facb6e92f879c8af8 Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Thu, 21 Nov 2024 10:20:21 +0000 Subject: [PATCH 4/6] Introduce StreamResponseBase --- .../Responses/Special/StreamResponse.cs | 57 +++-------------- .../Responses/Special/StreamResponseBase.cs | 62 +++++++++++++++++++ .../Responses/TransportResponse.cs | 4 +- 3 files changed, 74 insertions(+), 49 deletions(-) create mode 100644 src/Elastic.Transport/Responses/Special/StreamResponseBase.cs diff --git a/src/Elastic.Transport/Responses/Special/StreamResponse.cs b/src/Elastic.Transport/Responses/Special/StreamResponse.cs index 4009794..fed6df0 100644 --- a/src/Elastic.Transport/Responses/Special/StreamResponse.cs +++ b/src/Elastic.Transport/Responses/Special/StreamResponse.cs @@ -8,68 +8,31 @@ namespace Elastic.Transport; /// -/// A response that exposes the response as . +/// A response that exposes the response as a . /// /// MUST be disposed after use to ensure the HTTP connection is freed for reuse. /// /// -public class StreamResponse : TransportResponse, IDisposable +public sealed class StreamResponse : StreamResponseBase, IDisposable { - private bool _disposed; - - /// - /// The MIME type of the response, if present. - /// - public string ContentType { get; } - /// - public StreamResponse() - { - Body = Stream.Null; + public StreamResponse() : base(Stream.Null) => ContentType = string.Empty; - } /// - public StreamResponse(Stream body, string? contentType) - { - Body = body; + public StreamResponse(Stream body, string? contentType) : base(body) => ContentType = contentType ?? string.Empty; - } /// - /// + /// The MIME type of the response, if present. /// - protected internal override bool LeaveOpen => true; + public string ContentType { get; } /// - /// Disposes the underlying stream. + /// The raw response stream. /// - /// - protected virtual void Dispose(bool disposing) - { - if (!_disposed) - { - if (disposing) - { - Body.Dispose(); - - if (LinkedDisposables is not null) - { - foreach (var disposable in LinkedDisposables) - disposable?.Dispose(); - } - } - - _disposed = true; - } - } + public Stream Body => Stream; - /// - /// Disposes the underlying stream. - /// - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); - } + /// + protected internal override bool LeaveOpen => true; } diff --git a/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs b/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs new file mode 100644 index 0000000..2c5478c --- /dev/null +++ b/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs @@ -0,0 +1,62 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.IO; + +namespace Elastic.Transport; + +/// +/// A base class for implementing responses that access the raw response stream. +/// +public abstract class StreamResponseBase(Stream stream) : TransportResponse, IDisposable +{ + /// + protected internal override bool LeaveOpen => true; + + /// + /// The raw response stream from the HTTP layer. + /// + /// + /// MUST be disposed to release the underlying HTTP connection for reuse. + /// + protected Stream Stream { get; } = stream; + + /// + /// Indicates that the response has been disposed and it is not longer safe to access the stream. + /// + protected bool Disposed { get; private set; } + + /// + /// Disposes the underlying stream. + /// + /// + protected virtual void Dispose(bool disposing) + { + if (!Disposed) + { + if (disposing) + { + Stream?.Dispose(); + + if (LinkedDisposables is not null) + { + foreach (var disposable in LinkedDisposables) + disposable?.Dispose(); + } + } + + Disposed = true; + } + } + + /// + /// Disposes the underlying stream. + /// + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } +} diff --git a/src/Elastic.Transport/Responses/TransportResponse.cs b/src/Elastic.Transport/Responses/TransportResponse.cs index d7ad7bd..ca72888 100644 --- a/src/Elastic.Transport/Responses/TransportResponse.cs +++ b/src/Elastic.Transport/Responses/TransportResponse.cs @@ -10,12 +10,12 @@ namespace Elastic.Transport; /// /// A response from an Elastic product including details about the request/response life cycle. Base class for the built in low level response -/// types, , , , and +/// types, , , , and /// public abstract class TransportResponse : TransportResponse { /// - /// The deserialized body returned by the product. + /// The (potentially deserialized) response returned by the product. /// public T Body { get; protected internal set; } } From 0d433b48c9a5562ac8e72ecad60568c14a2f6999 Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Thu, 21 Nov 2024 10:42:40 +0000 Subject: [PATCH 5/6] Ensure we can still parse the product schema version --- .../Diagnostics/OpenTelemetry/OpenTelemetry.cs | 8 ++++++++ src/Elastic.Transport/DistributedTransport.cs | 7 ++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs b/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs index ba70e04..aec90dd 100644 --- a/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs +++ b/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs @@ -53,6 +53,14 @@ internal static void SetCommonAttributes(Activity? activity, ITransportConfigura } var productSchemaVersion = string.Empty; + foreach (var attribute in activity.TagObjects) + { + if (attribute.Key.Equals(OpenTelemetryAttributes.DbElasticsearchSchemaUrl, StringComparison.Ordinal)) + { + if (attribute.Value is string schemaVersion) + productSchemaVersion = schemaVersion; + } + } // We add the client schema version only when it differs from the product schema version if (!productSchemaVersion.Equals(OpenTelemetrySchemaVersion, StringComparison.Ordinal)) diff --git a/src/Elastic.Transport/DistributedTransport.cs b/src/Elastic.Transport/DistributedTransport.cs index dbe50ee..8364fd4 100644 --- a/src/Elastic.Transport/DistributedTransport.cs +++ b/src/Elastic.Transport/DistributedTransport.cs @@ -124,9 +124,6 @@ private async ValueTask RequestCoreAsync( if (activity is { IsAllDataRequested: true }) { - if (activity.IsAllDataRequested) - OpenTelemetry.SetCommonAttributes(activity, Configuration); - if (Configuration.Authentication is BasicAuthentication basicAuthentication) activity.SetTag(SemanticConventions.DbUser, basicAuthentication.Username); @@ -261,9 +258,13 @@ private async ValueTask RequestCoreAsync( activity?.SetTag(SemanticConventions.HttpResponseStatusCode, response.ApiCallDetails.HttpStatusCode); activity?.SetTag(OpenTelemetryAttributes.ElasticTransportAttemptedNodes, attemptedNodes); + // We don't check IsAllDataRequested here as that's left to the consumer. if (configureActivity is not null && activity is not null) configureActivity.Invoke(activity); + if (activity is { IsAllDataRequested: true }) + OpenTelemetry.SetCommonAttributes(activity, Configuration); + return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, auditor, seenExceptions, response); } finally From fb0a516ecdb5da516600ba612d7808983cda5e42 Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Thu, 21 Nov 2024 10:43:11 +0000 Subject: [PATCH 6/6] PR feedback - add null check on ctor --- .../Responses/Special/StreamResponseBase.cs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs b/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs index 2c5478c..06ebb67 100644 --- a/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs +++ b/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs @@ -4,13 +4,14 @@ using System; using System.IO; +using Elastic.Transport.Extensions; namespace Elastic.Transport; /// /// A base class for implementing responses that access the raw response stream. /// -public abstract class StreamResponseBase(Stream stream) : TransportResponse, IDisposable +public abstract class StreamResponseBase : TransportResponse, IDisposable { /// protected internal override bool LeaveOpen => true; @@ -21,13 +22,20 @@ public abstract class StreamResponseBase(Stream stream) : TransportResponse, IDi /// /// MUST be disposed to release the underlying HTTP connection for reuse. /// - protected Stream Stream { get; } = stream; + protected Stream Stream { get; } /// /// Indicates that the response has been disposed and it is not longer safe to access the stream. /// protected bool Disposed { get; private set; } + /// + public StreamResponseBase(Stream responseStream) + { + responseStream.ThrowIfNull(nameof(responseStream)); + Stream = responseStream; + } + /// /// Disposes the underlying stream. ///