diff --git a/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs b/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs index ba70e04..aec90dd 100644 --- a/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs +++ b/src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs @@ -53,6 +53,14 @@ internal static void SetCommonAttributes(Activity? activity, ITransportConfigura } var productSchemaVersion = string.Empty; + foreach (var attribute in activity.TagObjects) + { + if (attribute.Key.Equals(OpenTelemetryAttributes.DbElasticsearchSchemaUrl, StringComparison.Ordinal)) + { + if (attribute.Value is string schemaVersion) + productSchemaVersion = schemaVersion; + } + } // We add the client schema version only when it differs from the product schema version if (!productSchemaVersion.Equals(OpenTelemetrySchemaVersion, StringComparison.Ordinal)) diff --git a/src/Elastic.Transport/DistributedTransport.cs b/src/Elastic.Transport/DistributedTransport.cs index dbe50ee..8364fd4 100644 --- a/src/Elastic.Transport/DistributedTransport.cs +++ b/src/Elastic.Transport/DistributedTransport.cs @@ -124,9 +124,6 @@ private async ValueTask RequestCoreAsync( if (activity is { IsAllDataRequested: true }) { - if (activity.IsAllDataRequested) - OpenTelemetry.SetCommonAttributes(activity, Configuration); - if (Configuration.Authentication is BasicAuthentication basicAuthentication) activity.SetTag(SemanticConventions.DbUser, basicAuthentication.Username); @@ -261,9 +258,13 @@ private async ValueTask RequestCoreAsync( activity?.SetTag(SemanticConventions.HttpResponseStatusCode, response.ApiCallDetails.HttpStatusCode); activity?.SetTag(OpenTelemetryAttributes.ElasticTransportAttemptedNodes, attemptedNodes); + // We don't check IsAllDataRequested here as that's left to the consumer. if (configureActivity is not null && activity is not null) configureActivity.Invoke(activity); + if (activity is { IsAllDataRequested: true }) + OpenTelemetry.SetCommonAttributes(activity, Configuration); + return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, auditor, seenExceptions, response); } finally diff --git a/src/Elastic.Transport/Responses/DefaultResponseFactory.cs b/src/Elastic.Transport/Responses/DefaultResponseFactory.cs index 46ae787..afaf378 100644 --- a/src/Elastic.Transport/Responses/DefaultResponseFactory.cs +++ b/src/Elastic.Transport/Responses/DefaultResponseFactory.cs @@ -79,13 +79,11 @@ private async ValueTask CreateCoreAsync( IReadOnlyDictionary? tcpStats, CancellationToken cancellationToken = default) where TResponse : TransportResponse, new() { - responseStream.ThrowIfNull(nameof(responseStream)); - var details = InitializeApiCallDetails(endpoint, boundConfiguration, postData, ex, statusCode, headers, contentType, threadPoolStats, tcpStats, contentLength); TResponse? response = null; - if (MayHaveBody(statusCode, endpoint.Method, contentLength) + if (responseStream is not null && MayHaveBody(statusCode, endpoint.Method, contentLength) && TryResolveBuilder(boundConfiguration.ResponseBuilders, boundConfiguration.ProductResponseBuilders, out var builder)) { var ownsStream = false; diff --git a/src/Elastic.Transport/Responses/Special/StreamResponse.cs b/src/Elastic.Transport/Responses/Special/StreamResponse.cs index d097ac3..fed6df0 100644 --- a/src/Elastic.Transport/Responses/Special/StreamResponse.cs +++ b/src/Elastic.Transport/Responses/Special/StreamResponse.cs @@ -8,65 +8,31 @@ namespace Elastic.Transport; /// -/// A response that exposes the response as . +/// A response that exposes the response as a . /// /// MUST be disposed after use to ensure the HTTP connection is freed for reuse. /// /// -public class StreamResponse : TransportResponse, IDisposable +public sealed class StreamResponse : StreamResponseBase, IDisposable { - private bool _disposed; - - /// - /// The MIME type of the response, if present. - /// - public string ContentType { get; } - /// - public StreamResponse() - { - Body = Stream.Null; + public StreamResponse() : base(Stream.Null) => ContentType = string.Empty; - } /// - public StreamResponse(Stream body, string? contentType) - { - Body = body; + public StreamResponse(Stream body, string? contentType) : base(body) => ContentType = contentType ?? string.Empty; - } - - internal override bool LeaveOpen => true; /// - /// Disposes the underlying stream. + /// The MIME type of the response, if present. /// - /// - protected virtual void Dispose(bool disposing) - { - if (!_disposed) - { - if (disposing) - { - Body.Dispose(); - - if (LinkedDisposables is not null) - { - foreach (var disposable in LinkedDisposables) - disposable.Dispose(); - } - } - - _disposed = true; - } - } + public string ContentType { get; } /// - /// Disposes the underlying stream. + /// The raw response stream. /// - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); - } + public Stream Body => Stream; + + /// + protected internal override bool LeaveOpen => true; } diff --git a/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs b/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs new file mode 100644 index 0000000..06ebb67 --- /dev/null +++ b/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.IO; +using Elastic.Transport.Extensions; + +namespace Elastic.Transport; + +/// +/// A base class for implementing responses that access the raw response stream. +/// +public abstract class StreamResponseBase : TransportResponse, IDisposable +{ + /// + protected internal override bool LeaveOpen => true; + + /// + /// The raw response stream from the HTTP layer. + /// + /// + /// MUST be disposed to release the underlying HTTP connection for reuse. + /// + protected Stream Stream { get; } + + /// + /// Indicates that the response has been disposed and it is not longer safe to access the stream. + /// + protected bool Disposed { get; private set; } + + /// + public StreamResponseBase(Stream responseStream) + { + responseStream.ThrowIfNull(nameof(responseStream)); + Stream = responseStream; + } + + /// + /// Disposes the underlying stream. + /// + /// + protected virtual void Dispose(bool disposing) + { + if (!Disposed) + { + if (disposing) + { + Stream?.Dispose(); + + if (LinkedDisposables is not null) + { + foreach (var disposable in LinkedDisposables) + disposable?.Dispose(); + } + } + + Disposed = true; + } + } + + /// + /// Disposes the underlying stream. + /// + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } +} diff --git a/src/Elastic.Transport/Responses/TransportResponse.cs b/src/Elastic.Transport/Responses/TransportResponse.cs index 4e7f4a8..ca72888 100644 --- a/src/Elastic.Transport/Responses/TransportResponse.cs +++ b/src/Elastic.Transport/Responses/TransportResponse.cs @@ -10,12 +10,12 @@ namespace Elastic.Transport; /// /// A response from an Elastic product including details about the request/response life cycle. Base class for the built in low level response -/// types, , , , and +/// types, , , , and /// public abstract class TransportResponse : TransportResponse { /// - /// The deserialized body returned by the product. + /// The (potentially deserialized) response returned by the product. /// public T Body { get; protected internal set; } } @@ -46,7 +46,7 @@ public override string ToString() => ApiCallDetails?.DebugInformation /// StreamResponse and kept internal. If we later make this public, we might need to refine this. /// [JsonIgnore] - internal IEnumerable? LinkedDisposables { get; set; } + protected internal IEnumerable? LinkedDisposables { get; internal set; } /// /// Allows the response to identify that the response stream should NOT be automatically disposed. @@ -55,6 +55,6 @@ public override string ToString() => ApiCallDetails?.DebugInformation /// Currently only used by StreamResponse and therefore internal. /// [JsonIgnore] - internal virtual bool LeaveOpen { get; } = false; + protected internal virtual bool LeaveOpen { get; } = false; }