From ac8afb0d4f7c23a06e926d730a010b6cc9fb0e82 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Fri, 1 Nov 2024 14:16:30 +0100 Subject: [PATCH] Simplify RequestPipeline and reuse a singleton instance if we can --- .../Components/ExposingPipelineFactory.cs | 2 +- .../Pipeline/DefaultRequestPipeline.cs | 142 ++++++++++-------- .../Components/Pipeline/PipelineException.cs | 2 +- .../Components/Pipeline/PipelineFailure.cs | 4 +- .../Components/Pipeline/RequestPipeline.cs | 132 ---------------- .../DefaultRequestPipelineFactory.cs | 6 +- .../Configuration/TransportConfiguration.cs | 3 +- .../Diagnostics/AuditDiagnosticObserver.cs | 2 +- .../Diagnostics/Auditing/Audit.cs | 2 +- .../Diagnostics/Auditing/Auditable.cs | 2 +- .../Diagnostics/DiagnosticSources.cs | 4 +- .../RequestPipelineDiagnosticObserver.cs | 2 +- src/Elastic.Transport/DistributedTransport.cs | 20 ++- .../Exceptions/TransportException.cs | 4 +- .../Responses/HttpDetails/ApiCallDetails.cs | 6 +- 15 files changed, 112 insertions(+), 221 deletions(-) delete mode 100644 src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs diff --git a/src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs b/src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs index 5a1f421..59b6b33 100644 --- a/src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs +++ b/src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs @@ -21,5 +21,5 @@ public ExposingPipelineFactory(TConfiguration configuration) public ITransport Transport { get; } public override RequestPipeline Create(RequestData requestData) => - new DefaultRequestPipeline(requestData); + new RequestPipeline(requestData); } diff --git a/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs b/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs index 450c73f..d099c01 100644 --- a/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs +++ b/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs @@ -15,8 +15,8 @@ namespace Elastic.Transport; -/// -public class DefaultRequestPipeline : RequestPipeline +/// Models the workflow of a request to multiple nodes +public class RequestPipeline { private readonly IRequestInvoker _requestInvoker; private readonly NodePool _nodePool; @@ -32,7 +32,7 @@ public class DefaultRequestPipeline : RequestPipeline private readonly ITransportConfiguration _settings; /// - internal DefaultRequestPipeline(RequestData requestData) + internal RequestPipeline(RequestData requestData) { _requestData = requestData; _settings = requestData.ConnectionSettings; @@ -44,12 +44,10 @@ internal DefaultRequestPipeline(RequestData requestData) _productRegistration = requestData.ConnectionSettings.ProductRegistration; _responseBuilder = _productRegistration.ResponseBuilder; _nodePredicate = requestData.ConnectionSettings.NodePredicate ?? _productRegistration.NodePredicate; - - StartedOn = _dateTimeProvider.Now(); } - /// - public override IEnumerable AuditTrail => _auditTrail; + /// A list of events + public IEnumerable? AuditTrail => _auditTrail; private RequestConfiguration PingAndSniffRequestConfiguration { @@ -72,50 +70,45 @@ private RequestConfiguration PingAndSniffRequestConfiguration } } - //TODO xmldocs -#pragma warning disable 1591 - public bool DepletedRetries => Retried >= MaxRetries + 1 || IsTakingTooLong; + private bool DepletedRetries(DateTimeOffset startedOn) => Retried >= MaxRetries + 1 || IsTakingTooLong(startedOn); - public override bool FirstPoolUsageNeedsSniffing => + private bool FirstPoolUsageNeedsSniffing => !RequestDisabledSniff && _nodePool.SupportsReseeding && _settings.SniffsOnStartup && !_nodePool.SniffedOnStartup; - public override bool IsTakingTooLong + private bool IsTakingTooLong(DateTimeOffset startedOn) { - get - { - var timeout = _settings.MaxRetryTimeout.GetValueOrDefault(RequestTimeout); - var now = _dateTimeProvider.Now(); - - //we apply a soft margin so that if a request times out at 59 seconds when the maximum is 60 we also abort. - var margin = timeout.TotalMilliseconds / 100.0 * 98; - var marginTimeSpan = TimeSpan.FromMilliseconds(margin); - var timespanCall = now - StartedOn; - var tookToLong = timespanCall >= marginTimeSpan; - return tookToLong; - } + var timeout = _settings.MaxRetryTimeout.GetValueOrDefault(RequestTimeout); + var now = _dateTimeProvider.Now(); + + //we apply a soft margin so that if a request times out at 59 seconds when the maximum is 60 we also abort. + var margin = timeout.TotalMilliseconds / 100.0 * 98; + var marginTimeSpan = TimeSpan.FromMilliseconds(margin); + var timespanCall = now - startedOn; + var tookToLong = timespanCall >= marginTimeSpan; + return tookToLong; } - public override int MaxRetries => _requestData.MaxRetries; + private int MaxRetries => _requestData.MaxRetries; - public bool Refresh { get; private set; } + private bool Refresh { get; set; } - public int Retried { get; private set; } + private int Retried { get; set; } - public IEnumerable SniffNodes => _nodePool + private IEnumerable SniffNodes => _nodePool .CreateView(LazyAuditable) .ToList() .OrderBy(n => _productRegistration.SniffOrder(n)); - public override bool SniffsOnConnectionFailure => + private bool SniffsOnConnectionFailure => !RequestDisabledSniff && _nodePool.SupportsReseeding && _settings.SniffsOnConnectionFault; - public override bool SniffsOnStaleCluster => + private bool SniffsOnStaleCluster => !RequestDisabledSniff && _nodePool.SupportsReseeding && _settings.SniffInformationLifeSpan.HasValue; - public override bool StaleClusterState + private bool StaleClusterState { get { @@ -132,17 +125,18 @@ public override bool StaleClusterState } } - public override DateTimeOffset StartedOn { get; } - private TimeSpan PingTimeout => _requestData.PingTimeout; private bool RequestDisabledSniff => _requestData.DisableSniff; private TimeSpan RequestTimeout => _requestData.RequestTimeout; - public override void AuditCancellationRequested() => Audit(CancellationRequested)?.Dispose(); + /// Emit event + public void AuditCancellationRequested() => Audit(CancellationRequested)?.Dispose(); - public override void BadResponse(ref TResponse response, ApiCallDetails callDetails, Endpoint endpoint, RequestData data, PostData? postData, TransportException exception) + /// Ensures a response is returned with and if available + public void BadResponse(ref TResponse? response, ApiCallDetails? callDetails, Endpoint endpoint, RequestData data, PostData? postData, TransportException exception) + where TResponse : TransportResponse, new() { if (response == null) { @@ -155,10 +149,14 @@ public override void BadResponse(ref TResponse response, ApiCallDetai response.ApiCallDetails.AuditTrail = AuditTrail; } - public override TResponse CallProductEndpoint(Endpoint endpoint, RequestData requestData, PostData? postData) + /// Call the product's API endpoint ensuring rich enough exceptions are thrown + public TResponse CallProductEndpoint(Endpoint endpoint, RequestData requestData, PostData? postData) + where TResponse : TransportResponse, new() => CallProductEndpointCoreAsync(false, endpoint, requestData, postData).EnsureCompleted(); - public override Task CallProductEndpointAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default) + /// Call the product's API endpoint ensuring rich enough exceptions are thrown + public Task CallProductEndpointAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default) + where TResponse : TransportResponse, new() => CallProductEndpointCoreAsync(true, endpoint, requestData, postData, cancellationToken).AsTask(); private async ValueTask CallProductEndpointCoreAsync(bool isAsync, Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default) @@ -199,13 +197,16 @@ private async ValueTask CallProductEndpointCoreAsync(bool } } - public override TransportException? CreateClientException( + /// Create a rich enough + public TransportException? CreateClientException( TResponse response, ApiCallDetails? callDetails, Endpoint endpoint, RequestData data, + DateTimeOffset startedOn, List? seenExceptions ) + where TResponse : TransportResponse, new() { if (callDetails?.HasSuccessfulStatusCodeAndExpectedContentType ?? false) return null; @@ -224,7 +225,7 @@ private async ValueTask CallProductEndpointCoreAsync(bool var exceptionMessage = innerException?.Message ?? "Request failed to execute"; - if (IsTakingTooLong) + if (IsTakingTooLong(startedOn)) { pipelineFailure = PipelineFailure.MaxTimeoutReached; Audit(MaxTimeoutReached); @@ -259,7 +260,8 @@ private async ValueTask CallProductEndpointCoreAsync(bool return clientException; } - public override void FirstPoolUsage(SemaphoreSlim semaphore) + /// Routine for the first call into the product, potentially sniffing to discover the network topology + public void FirstPoolUsage(SemaphoreSlim semaphore) { if (!FirstPoolUsageNeedsSniffing) return; @@ -291,7 +293,8 @@ public override void FirstPoolUsage(SemaphoreSlim semaphore) } } - public override async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken cancellationToken) + /// + public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken cancellationToken) { if (!FirstPoolUsageNeedsSniffing) return; @@ -325,17 +328,19 @@ public override async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, Cancella } } - public override void MarkAlive(Node node) => node.MarkAlive(); + /// Mark as alive putting it back in rotation. + public void MarkAlive(Node node) => node.MarkAlive(); - public override void MarkDead(Node node) + /// Mark as dead, taking it out of rotation. + public void MarkDead(Node node) { var deadUntil = _dateTimeProvider.DeadTime(node.FailedAttempts, _settings.DeadTimeout, _settings.MaxDeadTimeout); node.MarkDead(deadUntil); Retried++; } - /// - public override bool TryGetSingleNode(out Node node) + /// Fast path for if only a single node could ever be yielded this save an IEnumerator allocation + public bool TryGetSingleNode(out Node? node) { if (_nodePool.Nodes.Count <= 1 && _nodePool.MaxRetries <= _nodePool.Nodes.Count && !_nodePool.SupportsPinging && !_nodePool.SupportsReseeding) @@ -349,7 +354,8 @@ public override bool TryGetSingleNode(out Node node) return false; } - public override IEnumerable NextNode() + /// returns a consistent enumerable view into the available nodes + public IEnumerable NextNode(DateTimeOffset startedOn) { if (_requestData.ForceNode != null) { @@ -364,11 +370,11 @@ public override IEnumerable NextNode() var refreshed = false; for (var i = 0; i < 100; i++) { - if (DepletedRetries) yield break; + if (DepletedRetries(startedOn)) yield break; foreach (var node in _nodePool.CreateView(LazyAuditable)) { - if (DepletedRetries) break; + if (DepletedRetries(startedOn)) break; if (!_nodePredicate(node)) continue; @@ -386,12 +392,14 @@ public override IEnumerable NextNode() } } - public override void Ping(Node node) => PingCoreAsync(false, node).EnsureCompleted(); + /// ping as a fast path ensuring its alive + public void Ping(Node node) => PingCoreAsync(false, node).EnsureCompleted(); - public override Task PingAsync(Node node, CancellationToken cancellationToken = default) + /// ping as a fast path ensuring its alive + public Task PingAsync(Node node, CancellationToken cancellationToken = default) => PingCoreAsync(true, node, cancellationToken).AsTask(); - public async ValueTask PingCoreAsync(bool isAsync, Node node, CancellationToken cancellationToken = default) + private async ValueTask PingCoreAsync(bool isAsync, Node node, CancellationToken cancellationToken = default) { if (!_productRegistration.SupportsPing) return; if (PingDisabled(node)) return; @@ -433,12 +441,14 @@ public async ValueTask PingCoreAsync(bool isAsync, Node node, CancellationToken } } - public override void Sniff() => SniffCoreAsync(false).EnsureCompleted(); + /// Discover the products network topology to yield all available nodes + public void Sniff() => SniffCoreAsync(false).EnsureCompleted(); - public override Task SniffAsync(CancellationToken cancellationToken = default) + /// Discover the products network topology to yield all available nodes + public Task SniffAsync(CancellationToken cancellationToken = default) => SniffCoreAsync(true, cancellationToken).AsTask(); - public async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellationToken = default) + private async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellationToken = default) { if (!_productRegistration.SupportsSniff) return; @@ -495,7 +505,8 @@ public async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellati } } - public override void SniffOnConnectionFailure() + /// sniff the topology when a connection failure happens + public void SniffOnConnectionFailure() { if (!SniffsOnConnectionFailure) return; @@ -503,7 +514,8 @@ public override void SniffOnConnectionFailure() Sniff(); } - public override async Task SniffOnConnectionFailureAsync(CancellationToken cancellationToken) + /// sniff the topology when a connection failure happens + public async Task SniffOnConnectionFailureAsync(CancellationToken cancellationToken) { if (!SniffsOnConnectionFailure) return; @@ -511,7 +523,8 @@ public override async Task SniffOnConnectionFailureAsync(CancellationToken cance await SniffAsync(cancellationToken).ConfigureAwait(false); } - public override void SniffOnStaleCluster() + /// sniff the topology after a set period to ensure its up to date + public void SniffOnStaleCluster() { if (!StaleClusterState) return; @@ -522,7 +535,8 @@ public override void SniffOnStaleCluster() } } - public override async Task SniffOnStaleClusterAsync(CancellationToken cancellationToken) + /// sniff the topology after a set period to ensure its up to date + public async Task SniffOnStaleClusterAsync(CancellationToken cancellationToken) { if (!StaleClusterState) return; @@ -533,21 +547,22 @@ public override async Task SniffOnStaleClusterAsync(CancellationToken cancellati } } - public override void ThrowNoNodesAttempted(Endpoint endpoint, List? seenExceptions) + /// emit event in case no nodes were available + public void ThrowNoNodesAttempted(Endpoint endpoint, List? seenExceptions) { - var clientException = new TransportException(PipelineFailure.NoNodesAttempted, RequestPipelineStatics.NoNodesAttemptedMessage, (Exception)null); + var clientException = new TransportException(PipelineFailure.NoNodesAttempted, RequestPipelineStatics.NoNodesAttemptedMessage); using (Audit(NoNodesAttempted)) throw new UnexpectedTransportException(clientException, seenExceptions) { Endpoint = endpoint, AuditTrail = AuditTrail }; } private bool PingDisabled(Node node) => _requestData.DisablePings || !node.IsResurrected; - private Auditable? Audit(AuditEvent type, Node node = null) => + private Auditable? Audit(AuditEvent type, Node? node = null) => !_settings.DisableAuditTrail ?? true ? new(type, ref _auditTrail, _dateTimeProvider, node) : null; - private static void ThrowBadAuthPipelineExceptionWhenNeeded(ApiCallDetails details, TransportResponse response = null) + private static void ThrowBadAuthPipelineExceptionWhenNeeded(ApiCallDetails details, TransportResponse? response = null) { - if (details?.HttpStatusCode == 401) + if (details.HttpStatusCode == 401) throw new PipelineException(PipelineFailure.BadAuthentication, details.OriginalException) { Response = response }; } @@ -556,4 +571,3 @@ private void LazyAuditable(AuditEvent e, Node n) using (new Auditable(e, ref _auditTrail, _dateTimeProvider, n)) { } } } -#pragma warning restore 1591 diff --git a/src/Elastic.Transport/Components/Pipeline/PipelineException.cs b/src/Elastic.Transport/Components/Pipeline/PipelineException.cs index 37f92e2..ca68411 100644 --- a/src/Elastic.Transport/Components/Pipeline/PipelineException.cs +++ b/src/Elastic.Transport/Components/Pipeline/PipelineException.cs @@ -7,7 +7,7 @@ namespace Elastic.Transport; /// -/// A pipeline exception is throw when ever a known failing exit point is reached in +/// A pipeline exception is throw when ever a known failing exit point is reached in /// See for known exits points /// public class PipelineException : Exception diff --git a/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs b/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs index 4312188..7252d6e 100644 --- a/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs +++ b/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs @@ -7,7 +7,7 @@ namespace Elastic.Transport; /// -/// A failure in 's workflow that caused it to end prematurely. +/// A failure in 's workflow that caused it to end prematurely. /// public enum PipelineFailure { @@ -43,7 +43,7 @@ public enum PipelineFailure MaxRetriesReached, /// - /// An exception occurred during that could not be handled + /// An exception occurred during that could not be handled /// Unexpected, diff --git a/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs b/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs deleted file mode 100644 index 99343b3..0000000 --- a/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs +++ /dev/null @@ -1,132 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Elastic.Transport.Diagnostics.Auditing; - -namespace Elastic.Transport; - -/// -/// Models the workflow of a request to multiple nodes -/// -public abstract class RequestPipeline : IDisposable -{ - private bool _disposed; - - internal RequestPipeline() { } - - /// - /// An audit trail that can be used for logging and debugging purposes. Giving insights into how - /// the request made its way through the workflow - /// - public abstract IEnumerable AuditTrail { get; } - - /// - /// Should the workflow attempt the initial sniff as requested by - /// - /// - public abstract bool FirstPoolUsageNeedsSniffing { get; } - - //TODO xmldocs -#pragma warning disable 1591 - public abstract bool IsTakingTooLong { get; } - - public abstract int MaxRetries { get; } - - public abstract bool SniffsOnConnectionFailure { get; } - - public abstract bool SniffsOnStaleCluster { get; } - - public abstract bool StaleClusterState { get; } - - public abstract DateTimeOffset StartedOn { get; } - - public abstract TResponse CallProductEndpoint(Endpoint endpoint, RequestData requestData, PostData? postData) - where TResponse : TransportResponse, new(); - - public abstract Task CallProductEndpointAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken) - where TResponse : TransportResponse, new(); - - public abstract void MarkAlive(Node node); - - public abstract void MarkDead(Node node); - - /// - /// Attempt to get a single node when the underlying connection pool contains only one node. - /// - /// This provides an optimised path for single node pools by avoiding an Enumerator on each call. - /// - /// - /// - /// true when a single node exists which has been set on the . - public abstract bool TryGetSingleNode(out Node node); - - public abstract IEnumerable NextNode(); - - public abstract void Ping(Node node); - - public abstract Task PingAsync(Node node, CancellationToken cancellationToken); - - public abstract void FirstPoolUsage(SemaphoreSlim semaphore); - - public abstract Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken cancellationToken); - - public abstract void Sniff(); - - public abstract Task SniffAsync(CancellationToken cancellationToken); - - public abstract void SniffOnStaleCluster(); - - public abstract Task SniffOnStaleClusterAsync(CancellationToken cancellationToken); - - public abstract void SniffOnConnectionFailure(); - - public abstract Task SniffOnConnectionFailureAsync(CancellationToken cancellationToken); - - public abstract void BadResponse(ref TResponse response, ApiCallDetails callDetails, Endpoint endpoint, RequestData data, PostData? postData, TransportException exception) - where TResponse : TransportResponse, new(); - - public abstract void ThrowNoNodesAttempted(Endpoint endpoint, List? seenExceptions); - - public abstract void AuditCancellationRequested(); - - public abstract TransportException? CreateClientException(TResponse? response, ApiCallDetails? callDetails, - Endpoint endpoint, RequestData data, List? seenExceptions) - where TResponse : TransportResponse, new(); -#pragma warning restore 1591 - - /// - /// - /// - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - - /// - /// - /// - /// - protected virtual void Dispose(bool disposing) - { - if (_disposed) - return; - - if (disposing) - { - DisposeManagedResources(); - } - - _disposed = true; - } - - /// - /// - /// - protected virtual void DisposeManagedResources() { } -} diff --git a/src/Elastic.Transport/Components/Providers/DefaultRequestPipelineFactory.cs b/src/Elastic.Transport/Components/Providers/DefaultRequestPipelineFactory.cs index db1f25e..399381d 100644 --- a/src/Elastic.Transport/Components/Providers/DefaultRequestPipelineFactory.cs +++ b/src/Elastic.Transport/Components/Providers/DefaultRequestPipelineFactory.cs @@ -5,14 +5,14 @@ namespace Elastic.Transport; /// -/// The default implementation for that returns +/// The default implementation for that returns /// internal sealed class DefaultRequestPipelineFactory : RequestPipelineFactory { public static readonly DefaultRequestPipelineFactory Default = new (); /// - /// returns instances of + /// returns instances of /// public override RequestPipeline Create(RequestData requestData) => - new DefaultRequestPipeline(requestData); + new RequestPipeline(requestData); } diff --git a/src/Elastic.Transport/Configuration/TransportConfiguration.cs b/src/Elastic.Transport/Configuration/TransportConfiguration.cs index a915e2a..7058fac 100644 --- a/src/Elastic.Transport/Configuration/TransportConfiguration.cs +++ b/src/Elastic.Transport/Configuration/TransportConfiguration.cs @@ -89,11 +89,12 @@ public TransportConfiguration( ProductRegistration = productRegistration ?? DefaultProductRegistration.Default; Connection = invoker ?? new HttpRequestInvoker(); RequestResponseSerializer = serializer ?? new LowLevelRequestResponseSerializer(); - PipelineProvider = DefaultRequestPipelineFactory.Default; DateTimeProvider = DefaultDateTimeProvider.Default; MetaHeaderProvider = productRegistration?.MetaHeaderProvider; UrlFormatter = new UrlFormatter(this); + PipelineProvider = DefaultRequestPipelineFactory.Default; + Accept = productRegistration?.DefaultMimeType; ConnectionLimit = DefaultConnectionLimit; DnsRefreshTimeout = DefaultDnsRefreshTimeout; diff --git a/src/Elastic.Transport/Diagnostics/AuditDiagnosticObserver.cs b/src/Elastic.Transport/Diagnostics/AuditDiagnosticObserver.cs index f9dc741..6e99a1d 100644 --- a/src/Elastic.Transport/Diagnostics/AuditDiagnosticObserver.cs +++ b/src/Elastic.Transport/Diagnostics/AuditDiagnosticObserver.cs @@ -8,7 +8,7 @@ namespace Elastic.Transport.Diagnostics; -/// Provides a typed listener to events that emits +/// Provides a typed listener to events that emits internal sealed class AuditDiagnosticObserver : TypedDiagnosticObserver { /// diff --git a/src/Elastic.Transport/Diagnostics/Auditing/Audit.cs b/src/Elastic.Transport/Diagnostics/Auditing/Audit.cs index 53954ff..1059016 100644 --- a/src/Elastic.Transport/Diagnostics/Auditing/Audit.cs +++ b/src/Elastic.Transport/Diagnostics/Auditing/Audit.cs @@ -24,7 +24,7 @@ internal Audit(AuditEvent type, DateTimeOffset started) /// /// The node on which the request was made. /// - public Node Node { get; internal set; } + public Node? Node { get; internal init; } /// /// The path of the request. diff --git a/src/Elastic.Transport/Diagnostics/Auditing/Auditable.cs b/src/Elastic.Transport/Diagnostics/Auditing/Auditable.cs index a0c40f2..b820cbd 100644 --- a/src/Elastic.Transport/Diagnostics/Auditing/Auditable.cs +++ b/src/Elastic.Transport/Diagnostics/Auditing/Auditable.cs @@ -13,7 +13,7 @@ internal class Auditable : IDisposable private readonly DateTimeProvider _dateTimeProvider; - public Auditable(AuditEvent type, ref List auditTrail, DateTimeProvider dateTimeProvider, Node node) + public Auditable(AuditEvent type, ref List auditTrail, DateTimeProvider dateTimeProvider, Node? node) { auditTrail ??= new List(); diff --git a/src/Elastic.Transport/Diagnostics/DiagnosticSources.cs b/src/Elastic.Transport/Diagnostics/DiagnosticSources.cs index 618f1e4..257d038 100644 --- a/src/Elastic.Transport/Diagnostics/DiagnosticSources.cs +++ b/src/Elastic.Transport/Diagnostics/DiagnosticSources.cs @@ -76,7 +76,7 @@ public class SerializerDiagnosticKeys : IDiagnosticsKeys } /// - /// Provides access to the string event names that emits + /// Provides access to the string event names that emits /// public class RequestPipelineDiagnosticKeys : IDiagnosticsKeys { @@ -97,7 +97,7 @@ public class RequestPipelineDiagnosticKeys : IDiagnosticsKeys /// /// Reference to the diagnostic source name that allows you to listen to all decisions that - /// makes. Events it emits are the names on + /// makes. Events it emits are the names on /// public class AuditDiagnosticKeys : IDiagnosticsKeys { diff --git a/src/Elastic.Transport/Diagnostics/RequestPipelineDiagnosticObserver.cs b/src/Elastic.Transport/Diagnostics/RequestPipelineDiagnosticObserver.cs index 77f15a1..dcac354 100644 --- a/src/Elastic.Transport/Diagnostics/RequestPipelineDiagnosticObserver.cs +++ b/src/Elastic.Transport/Diagnostics/RequestPipelineDiagnosticObserver.cs @@ -7,7 +7,7 @@ namespace Elastic.Transport.Diagnostics; -/// Provides a typed listener to actions that takes e.g sniff, ping, or making an API call ; +/// Provides a typed listener to actions that takes e.g sniff, ping, or making an API call ; internal sealed class RequestPipelineDiagnosticObserver : TypedDiagnosticObserver { /// diff --git a/src/Elastic.Transport/DistributedTransport.cs b/src/Elastic.Transport/DistributedTransport.cs index 01c5bf3..9860cb0 100644 --- a/src/Elastic.Transport/DistributedTransport.cs +++ b/src/Elastic.Transport/DistributedTransport.cs @@ -51,10 +51,12 @@ public DistributedTransport(TConfiguration configuration) _productRegistration = configuration.ProductRegistration; Configuration = configuration; - TransportRequestData = new RequestData(Configuration); MemoryStreamFactory = configuration.MemoryStreamFactory; + TransportRequestData = new RequestData(Configuration); + TransportPipeline = Configuration.PipelineProvider.Create(TransportRequestData); } + private RequestPipeline TransportPipeline { get; } private MemoryStreamFactory MemoryStreamFactory { get; } private RequestData TransportRequestData { get; } @@ -114,7 +116,8 @@ private async ValueTask RequestCoreAsync( Configuration.OnRequestDataCreated?.Invoke(requestData); - using var pipeline = Configuration.PipelineProvider.Create(requestData); + var pipeline = requestData == TransportRequestData ? TransportPipeline :Configuration.PipelineProvider.Create(requestData); + var startedOn = Configuration.DateTimeProvider.Now(); if (isAsync) await pipeline.FirstPoolUsageAsync(Configuration.BootstrapLock, cancellationToken).ConfigureAwait(false); @@ -180,7 +183,7 @@ private async ValueTask RequestCoreAsync( } else { - foreach (var node in pipeline.NextNode()) + foreach (var node in pipeline.NextNode(startedOn)) { attemptedNodes++; endpoint = endpoint with { Node = node }; @@ -269,7 +272,7 @@ private async ValueTask RequestCoreAsync( activity?.SetTag(SemanticConventions.HttpResponseStatusCode, response.ApiCallDetails.HttpStatusCode); activity?.SetTag(OpenTelemetryAttributes.ElasticTransportAttemptedNodes, attemptedNodes); - return FinalizeResponse(endpoint, requestData, data, pipeline, seenExceptions, response); + return FinalizeResponse(endpoint, requestData, data, pipeline, startedOn, seenExceptions, response); } finally { @@ -301,7 +304,12 @@ ref List seenExceptions seenExceptions.Add(ex); } - private TResponse FinalizeResponse(Endpoint endpoint, RequestData requestData, PostData? postData, RequestPipeline pipeline, + private TResponse FinalizeResponse( + Endpoint endpoint, + RequestData requestData, + PostData? postData, + RequestPipeline pipeline, + DateTimeOffset startedOn, List? seenExceptions, TResponse? response ) where TResponse : TransportResponse, new() @@ -310,7 +318,7 @@ private TResponse FinalizeResponse(Endpoint endpoint, RequestData req pipeline.ThrowNoNodesAttempted(endpoint, seenExceptions); var callDetails = GetMostRecentCallDetails(response, seenExceptions); - var clientException = pipeline.CreateClientException(response, callDetails, endpoint, requestData, seenExceptions); + var clientException = pipeline.CreateClientException(response, callDetails, endpoint, requestData, startedOn, seenExceptions); if (response?.ApiCallDetails == null) pipeline.BadResponse(ref response, callDetails, endpoint, requestData, postData, clientException); diff --git a/src/Elastic.Transport/Exceptions/TransportException.cs b/src/Elastic.Transport/Exceptions/TransportException.cs index fe94bb0..59b66cd 100644 --- a/src/Elastic.Transport/Exceptions/TransportException.cs +++ b/src/Elastic.Transport/Exceptions/TransportException.cs @@ -25,7 +25,7 @@ public class TransportException : Exception public TransportException(string message) : base(message) => FailureReason = PipelineFailure.Unexpected; /// - public TransportException(PipelineFailure failure, string message, Exception innerException) + public TransportException(PipelineFailure failure, string message, Exception? innerException = null) : base(message, innerException) => FailureReason = failure; /// @@ -41,7 +41,7 @@ public TransportException(PipelineFailure failure, string message, TransportResp /// The audit trail keeping track of what happened during the invocation of /// a request, up until the moment of this exception. /// - public IEnumerable AuditTrail { get; internal set; } + public IEnumerable? AuditTrail { get; internal init; } /// /// The reason this exception occurred was one of the well defined exit points as modelled by diff --git a/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs b/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs index 08bc240..804a7e4 100644 --- a/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs +++ b/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs @@ -24,17 +24,17 @@ internal ApiCallDetails() { } /// /// /// > - public IEnumerable AuditTrail { get; internal set; } + public IEnumerable? AuditTrail { get; internal set; } /// /// /// - internal IReadOnlyDictionary ThreadPoolStats { get; set; } + internal IReadOnlyDictionary? ThreadPoolStats { get; init; } /// /// /// - internal IReadOnlyDictionary TcpStats { get; set; } + internal IReadOnlyDictionary? TcpStats { get; init; } /// ///