Skip to content

Commit

Permalink
Simplify RequestPipeline and reuse a singleton instance if we can
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz committed Nov 1, 2024
1 parent 11aadde commit ac8afb0
Show file tree
Hide file tree
Showing 15 changed files with 112 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ public ExposingPipelineFactory(TConfiguration configuration)
public ITransport<TConfiguration> Transport { get; }

public override RequestPipeline Create(RequestData requestData) =>
new DefaultRequestPipeline(requestData);
new RequestPipeline(requestData);
}
142 changes: 78 additions & 64 deletions src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

namespace Elastic.Transport;

/// <inheritdoc cref="RequestPipeline" />
public class DefaultRequestPipeline : RequestPipeline
/// Models the workflow of a request to multiple nodes
public class RequestPipeline
{
private readonly IRequestInvoker _requestInvoker;
private readonly NodePool _nodePool;
Expand All @@ -32,7 +32,7 @@ public class DefaultRequestPipeline : RequestPipeline
private readonly ITransportConfiguration _settings;

/// <inheritdoc cref="RequestPipeline" />
internal DefaultRequestPipeline(RequestData requestData)
internal RequestPipeline(RequestData requestData)
{
_requestData = requestData;
_settings = requestData.ConnectionSettings;
Expand All @@ -44,12 +44,10 @@ internal DefaultRequestPipeline(RequestData requestData)
_productRegistration = requestData.ConnectionSettings.ProductRegistration;
_responseBuilder = _productRegistration.ResponseBuilder;
_nodePredicate = requestData.ConnectionSettings.NodePredicate ?? _productRegistration.NodePredicate;

StartedOn = _dateTimeProvider.Now();
}

/// <inheritdoc cref="RequestPipeline.AuditTrail" />
public override IEnumerable<Audit> AuditTrail => _auditTrail;
/// A list of <see cref="Audit"/> events
public IEnumerable<Audit>? AuditTrail => _auditTrail;

private RequestConfiguration PingAndSniffRequestConfiguration
{
Expand All @@ -72,50 +70,45 @@ private RequestConfiguration PingAndSniffRequestConfiguration
}
}

//TODO xmldocs
#pragma warning disable 1591
public bool DepletedRetries => Retried >= MaxRetries + 1 || IsTakingTooLong;
private bool DepletedRetries(DateTimeOffset startedOn) => Retried >= MaxRetries + 1 || IsTakingTooLong(startedOn);

public override bool FirstPoolUsageNeedsSniffing =>
private bool FirstPoolUsageNeedsSniffing =>
!RequestDisabledSniff
&& _nodePool.SupportsReseeding && _settings.SniffsOnStartup && !_nodePool.SniffedOnStartup;

public override bool IsTakingTooLong
private bool IsTakingTooLong(DateTimeOffset startedOn)
{
get
{
var timeout = _settings.MaxRetryTimeout.GetValueOrDefault(RequestTimeout);
var now = _dateTimeProvider.Now();

//we apply a soft margin so that if a request times out at 59 seconds when the maximum is 60 we also abort.
var margin = timeout.TotalMilliseconds / 100.0 * 98;
var marginTimeSpan = TimeSpan.FromMilliseconds(margin);
var timespanCall = now - StartedOn;
var tookToLong = timespanCall >= marginTimeSpan;
return tookToLong;
}
var timeout = _settings.MaxRetryTimeout.GetValueOrDefault(RequestTimeout);
var now = _dateTimeProvider.Now();

//we apply a soft margin so that if a request times out at 59 seconds when the maximum is 60 we also abort.
var margin = timeout.TotalMilliseconds / 100.0 * 98;
var marginTimeSpan = TimeSpan.FromMilliseconds(margin);
var timespanCall = now - startedOn;
var tookToLong = timespanCall >= marginTimeSpan;
return tookToLong;
}

public override int MaxRetries => _requestData.MaxRetries;
private int MaxRetries => _requestData.MaxRetries;

public bool Refresh { get; private set; }
private bool Refresh { get; set; }

public int Retried { get; private set; }
private int Retried { get; set; }

public IEnumerable<Node> SniffNodes => _nodePool
private IEnumerable<Node> SniffNodes => _nodePool
.CreateView(LazyAuditable)
.ToList()
.OrderBy(n => _productRegistration.SniffOrder(n));

public override bool SniffsOnConnectionFailure =>
private bool SniffsOnConnectionFailure =>
!RequestDisabledSniff
&& _nodePool.SupportsReseeding && _settings.SniffsOnConnectionFault;

public override bool SniffsOnStaleCluster =>
private bool SniffsOnStaleCluster =>
!RequestDisabledSniff
&& _nodePool.SupportsReseeding && _settings.SniffInformationLifeSpan.HasValue;

public override bool StaleClusterState
private bool StaleClusterState
{
get
{
Expand All @@ -132,17 +125,18 @@ public override bool StaleClusterState
}
}

public override DateTimeOffset StartedOn { get; }

private TimeSpan PingTimeout => _requestData.PingTimeout;

private bool RequestDisabledSniff => _requestData.DisableSniff;

private TimeSpan RequestTimeout => _requestData.RequestTimeout;

public override void AuditCancellationRequested() => Audit(CancellationRequested)?.Dispose();
/// Emit <see cref="AuditEvent.CancellationRequested"/> event
public void AuditCancellationRequested() => Audit(CancellationRequested)?.Dispose();

public override void BadResponse<TResponse>(ref TResponse response, ApiCallDetails callDetails, Endpoint endpoint, RequestData data, PostData? postData, TransportException exception)
/// Ensures a response is returned with <see cref="ApiCallDetails"/> and <see cref="AuditTrail"/> if available
public void BadResponse<TResponse>(ref TResponse? response, ApiCallDetails? callDetails, Endpoint endpoint, RequestData data, PostData? postData, TransportException exception)
where TResponse : TransportResponse, new()
{
if (response == null)
{
Expand All @@ -155,10 +149,14 @@ public override void BadResponse<TResponse>(ref TResponse response, ApiCallDetai
response.ApiCallDetails.AuditTrail = AuditTrail;
}

public override TResponse CallProductEndpoint<TResponse>(Endpoint endpoint, RequestData requestData, PostData? postData)
/// Call the product's API endpoint ensuring rich enough exceptions are thrown
public TResponse CallProductEndpoint<TResponse>(Endpoint endpoint, RequestData requestData, PostData? postData)
where TResponse : TransportResponse, new()
=> CallProductEndpointCoreAsync<TResponse>(false, endpoint, requestData, postData).EnsureCompleted();

public override Task<TResponse> CallProductEndpointAsync<TResponse>(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default)
/// Call the product's API endpoint ensuring rich enough exceptions are thrown
public Task<TResponse> CallProductEndpointAsync<TResponse>(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new()
=> CallProductEndpointCoreAsync<TResponse>(true, endpoint, requestData, postData, cancellationToken).AsTask();

private async ValueTask<TResponse> CallProductEndpointCoreAsync<TResponse>(bool isAsync, Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -199,13 +197,16 @@ private async ValueTask<TResponse> CallProductEndpointCoreAsync<TResponse>(bool
}
}

public override TransportException? CreateClientException<TResponse>(
/// Create a rich enough <see cref="TransportException"/>
public TransportException? CreateClientException<TResponse>(
TResponse response,
ApiCallDetails? callDetails,
Endpoint endpoint,
RequestData data,
DateTimeOffset startedOn,
List<PipelineException>? seenExceptions
)
where TResponse : TransportResponse, new()
{
if (callDetails?.HasSuccessfulStatusCodeAndExpectedContentType ?? false) return null;

Expand All @@ -224,7 +225,7 @@ private async ValueTask<TResponse> CallProductEndpointCoreAsync<TResponse>(bool

var exceptionMessage = innerException?.Message ?? "Request failed to execute";

if (IsTakingTooLong)
if (IsTakingTooLong(startedOn))
{
pipelineFailure = PipelineFailure.MaxTimeoutReached;
Audit(MaxTimeoutReached);
Expand Down Expand Up @@ -259,7 +260,8 @@ private async ValueTask<TResponse> CallProductEndpointCoreAsync<TResponse>(bool
return clientException;
}

public override void FirstPoolUsage(SemaphoreSlim semaphore)
/// Routine for the first call into the product, potentially sniffing to discover the network topology
public void FirstPoolUsage(SemaphoreSlim semaphore)
{
if (!FirstPoolUsageNeedsSniffing) return;

Expand Down Expand Up @@ -291,7 +293,8 @@ public override void FirstPoolUsage(SemaphoreSlim semaphore)
}
}

public override async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken cancellationToken)
/// <inheritdoc cref="FirstPoolUsage"/>
public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken cancellationToken)
{
if (!FirstPoolUsageNeedsSniffing) return;

Expand Down Expand Up @@ -325,17 +328,19 @@ public override async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, Cancella
}
}

public override void MarkAlive(Node node) => node.MarkAlive();
/// Mark <paramref name="node"/> as alive putting it back in rotation.
public void MarkAlive(Node node) => node.MarkAlive();

public override void MarkDead(Node node)
/// Mark <paramref name="node"/> as dead, taking it out of rotation.
public void MarkDead(Node node)
{
var deadUntil = _dateTimeProvider.DeadTime(node.FailedAttempts, _settings.DeadTimeout, _settings.MaxDeadTimeout);
node.MarkDead(deadUntil);
Retried++;
}

/// <inheritdoc />
public override bool TryGetSingleNode(out Node node)
/// Fast path for <see cref="NextNode"/> if only a single node could ever be yielded this save an IEnumerator allocation
public bool TryGetSingleNode(out Node? node)
{
if (_nodePool.Nodes.Count <= 1 && _nodePool.MaxRetries <= _nodePool.Nodes.Count &&
!_nodePool.SupportsPinging && !_nodePool.SupportsReseeding)
Expand All @@ -349,7 +354,8 @@ public override bool TryGetSingleNode(out Node node)
return false;
}

public override IEnumerable<Node> NextNode()
/// returns a consistent enumerable view into the available nodes
public IEnumerable<Node> NextNode(DateTimeOffset startedOn)
{
if (_requestData.ForceNode != null)
{
Expand All @@ -364,11 +370,11 @@ public override IEnumerable<Node> NextNode()
var refreshed = false;
for (var i = 0; i < 100; i++)
{
if (DepletedRetries) yield break;
if (DepletedRetries(startedOn)) yield break;

foreach (var node in _nodePool.CreateView(LazyAuditable))
{
if (DepletedRetries) break;
if (DepletedRetries(startedOn)) break;

if (!_nodePredicate(node)) continue;

Expand All @@ -386,12 +392,14 @@ public override IEnumerable<Node> NextNode()
}
}

public override void Ping(Node node) => PingCoreAsync(false, node).EnsureCompleted();
/// ping <paramref name="node"/> as a fast path ensuring its alive
public void Ping(Node node) => PingCoreAsync(false, node).EnsureCompleted();

public override Task PingAsync(Node node, CancellationToken cancellationToken = default)
/// ping <paramref name="node"/> as a fast path ensuring its alive
public Task PingAsync(Node node, CancellationToken cancellationToken = default)
=> PingCoreAsync(true, node, cancellationToken).AsTask();

public async ValueTask PingCoreAsync(bool isAsync, Node node, CancellationToken cancellationToken = default)
private async ValueTask PingCoreAsync(bool isAsync, Node node, CancellationToken cancellationToken = default)
{
if (!_productRegistration.SupportsPing) return;
if (PingDisabled(node)) return;
Expand Down Expand Up @@ -433,12 +441,14 @@ public async ValueTask PingCoreAsync(bool isAsync, Node node, CancellationToken
}
}

public override void Sniff() => SniffCoreAsync(false).EnsureCompleted();
/// Discover the products network topology to yield all available nodes
public void Sniff() => SniffCoreAsync(false).EnsureCompleted();

public override Task SniffAsync(CancellationToken cancellationToken = default)
/// Discover the products network topology to yield all available nodes
public Task SniffAsync(CancellationToken cancellationToken = default)
=> SniffCoreAsync(true, cancellationToken).AsTask();

public async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellationToken = default)
private async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellationToken = default)
{
if (!_productRegistration.SupportsSniff) return;

Expand Down Expand Up @@ -495,23 +505,26 @@ public async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellati
}
}

public override void SniffOnConnectionFailure()
/// sniff the topology when a connection failure happens
public void SniffOnConnectionFailure()
{
if (!SniffsOnConnectionFailure) return;

using (Audit(SniffOnFail))
Sniff();
}

public override async Task SniffOnConnectionFailureAsync(CancellationToken cancellationToken)
/// sniff the topology when a connection failure happens
public async Task SniffOnConnectionFailureAsync(CancellationToken cancellationToken)
{
if (!SniffsOnConnectionFailure) return;

using (Audit(SniffOnFail))
await SniffAsync(cancellationToken).ConfigureAwait(false);
}

public override void SniffOnStaleCluster()
/// sniff the topology after a set period to ensure its up to date
public void SniffOnStaleCluster()
{
if (!StaleClusterState) return;

Expand All @@ -522,7 +535,8 @@ public override void SniffOnStaleCluster()
}
}

public override async Task SniffOnStaleClusterAsync(CancellationToken cancellationToken)
/// sniff the topology after a set period to ensure its up to date
public async Task SniffOnStaleClusterAsync(CancellationToken cancellationToken)
{
if (!StaleClusterState) return;

Expand All @@ -533,21 +547,22 @@ public override async Task SniffOnStaleClusterAsync(CancellationToken cancellati
}
}

public override void ThrowNoNodesAttempted(Endpoint endpoint, List<PipelineException>? seenExceptions)
/// emit <see cref="AuditEvent.NoNodesAttempted"/> event in case no nodes were available
public void ThrowNoNodesAttempted(Endpoint endpoint, List<PipelineException>? seenExceptions)
{
var clientException = new TransportException(PipelineFailure.NoNodesAttempted, RequestPipelineStatics.NoNodesAttemptedMessage, (Exception)null);
var clientException = new TransportException(PipelineFailure.NoNodesAttempted, RequestPipelineStatics.NoNodesAttemptedMessage);
using (Audit(NoNodesAttempted))
throw new UnexpectedTransportException(clientException, seenExceptions) { Endpoint = endpoint, AuditTrail = AuditTrail };
}

private bool PingDisabled(Node node) => _requestData.DisablePings || !node.IsResurrected;

private Auditable? Audit(AuditEvent type, Node node = null) =>
private Auditable? Audit(AuditEvent type, Node? node = null) =>
!_settings.DisableAuditTrail ?? true ? new(type, ref _auditTrail, _dateTimeProvider, node) : null;

private static void ThrowBadAuthPipelineExceptionWhenNeeded(ApiCallDetails details, TransportResponse response = null)
private static void ThrowBadAuthPipelineExceptionWhenNeeded(ApiCallDetails details, TransportResponse? response = null)
{
if (details?.HttpStatusCode == 401)
if (details.HttpStatusCode == 401)
throw new PipelineException(PipelineFailure.BadAuthentication, details.OriginalException) { Response = response };
}

Expand All @@ -556,4 +571,3 @@ private void LazyAuditable(AuditEvent e, Node n)
using (new Auditable(e, ref _auditTrail, _dateTimeProvider, n)) { }
}
}
#pragma warning restore 1591
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace Elastic.Transport;

/// <summary>
/// A pipeline exception is throw when ever a known failing exit point is reached in <see cref="DefaultRequestPipeline"/>
/// A pipeline exception is throw when ever a known failing exit point is reached in <see cref="RequestPipeline"/>
/// <para>See <see cref="PipelineFailure"/> for known exits points</para>
/// </summary>
public class PipelineException : Exception
Expand Down
4 changes: 2 additions & 2 deletions src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace Elastic.Transport;

/// <summary>
/// A failure in <see cref="DefaultRequestPipeline"/>'s workflow that caused it to end prematurely.
/// A failure in <see cref="RequestPipeline"/>'s workflow that caused it to end prematurely.
/// </summary>
public enum PipelineFailure
{
Expand Down Expand Up @@ -43,7 +43,7 @@ public enum PipelineFailure
MaxRetriesReached,

/// <summary>
/// An exception occurred during <see cref="DefaultRequestPipeline"/> that could not be handled
/// An exception occurred during <see cref="RequestPipeline"/> that could not be handled
/// </summary>
Unexpected,

Expand Down
Loading

0 comments on commit ac8afb0

Please sign in to comment.