diff --git a/Elastic.Transport.sln.DotSettings b/Elastic.Transport.sln.DotSettings
index ae0ae0b..fa6ec13 100644
--- a/Elastic.Transport.sln.DotSettings
+++ b/Elastic.Transport.sln.DotSettings
@@ -104,7 +104,7 @@ See the LICENSE file in the project root for more information
</Entry.Match>
<Entry.SortBy>
<Kind Is="Member" />
- <Name Is="Enter Pattern Here" />
+ <Name />
</Entry.SortBy>
</Entry>
<Entry DisplayName="Fields">
@@ -119,7 +119,7 @@ See the LICENSE file in the project root for more information
<Entry.SortBy>
<Access />
<Readonly />
- <Name Is="Enter Pattern Here" />
+ <Name />
</Entry.SortBy>
</Entry>
<Entry DisplayName="Constructors">
@@ -139,7 +139,7 @@ See the LICENSE file in the project root for more information
</Entry.Match>
<Entry.SortBy>
<Access />
- <Name Is="Enter Pattern Here" />
+ <Name />
</Entry.SortBy>
</Entry>
<Entry DisplayName="Setup/Teardown Methods" Priority="100">
@@ -203,7 +203,7 @@ See the LICENSE file in the project root for more information
</Entry.Match>
<Entry.SortBy>
<Kind Is="Member" />
- <Name Is="Enter Pattern Here" />
+ <Name />
</Entry.SortBy>
</Entry>
<Entry DisplayName="Fields">
@@ -218,7 +218,7 @@ See the LICENSE file in the project root for more information
<Entry.SortBy>
<Access />
<Readonly />
- <Name Is="Enter Pattern Here" />
+ <Name />
</Entry.SortBy>
</Entry>
<Entry DisplayName="Constructors">
@@ -238,7 +238,7 @@ See the LICENSE file in the project root for more information
</Entry.Match>
<Entry.SortBy>
<Access />
- <Name Is="Enter Pattern Here" />
+ <Name />
</Entry.SortBy>
</Entry>
<Entry DisplayName="Interface Implementations">
@@ -251,7 +251,7 @@ See the LICENSE file in the project root for more information
<Entry.SortBy>
<ImplementsInterface Name="IDisposable" />
<Access />
- <Name Is="Enter Pattern Here" />
+ <Name />
</Entry.SortBy>
</Entry>
<Entry DisplayName="All other members" />
@@ -505,6 +505,7 @@ See the LICENSE file in the project root for more information
TrueTrueTrue
+ TrueTrueTrueFalse
diff --git a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualClusterConnection.cs b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualClusterConnection.cs
index e357c1f..873c12e 100644
--- a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualClusterConnection.cs
+++ b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualClusterConnection.cs
@@ -2,6 +2,7 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
+#nullable enable
using System;
using System.Collections.Generic;
using System.IO;
@@ -34,7 +35,7 @@ public class VirtualClusterRequestInvoker : IRequestInvoker
{
private static readonly object Lock = new();
- private static byte[] _defaultResponseBytes;
+ private static byte[]? _defaultResponseBytes;
private VirtualCluster _cluster;
private readonly TestableDateTimeProvider _dateTimeProvider;
@@ -45,7 +46,9 @@ public class VirtualClusterRequestInvoker : IRequestInvoker
internal VirtualClusterRequestInvoker(VirtualCluster cluster, TestableDateTimeProvider dateTimeProvider)
{
- UpdateCluster(cluster);
+ _cluster = cluster;
+ _calls = cluster.Nodes.ToDictionary(n => n.Uri.Port, v => new State());
+ _productRegistration = cluster.ProductRegistration;
_dateTimeProvider = dateTimeProvider;
_productRegistration = cluster.ProductRegistration;
_inMemoryRequestInvoker = new InMemoryRequestInvoker();
@@ -100,40 +103,41 @@ private static object DefaultResponse
private void UpdateCluster(VirtualCluster cluster)
{
- if (cluster == null) return;
-
lock (Lock)
{
_cluster = cluster;
_calls = cluster.Nodes.ToDictionary(n => n.Uri.Port, v => new State());
_productRegistration = cluster.ProductRegistration;
}
+
}
- private bool IsSniffRequest(RequestData requestData) => _productRegistration.IsSniffRequest(requestData);
+ private bool IsSniffRequest(Endpoint endpoint) => _productRegistration.IsSniffRequest(endpoint);
- private bool IsPingRequest(RequestData requestData) => _productRegistration.IsPingRequest(requestData);
+ private bool IsPingRequest(Endpoint endpoint) => _productRegistration.IsPingRequest(endpoint);
/// >
- public Task RequestAsync(RequestData requestData, CancellationToken cancellationToken)
+ public Task RequestAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken)
where TResponse : TransportResponse, new() =>
- Task.FromResult(Request(requestData));
+ Task.FromResult(Request(endpoint, requestData, postData));
/// >
- public TResponse Request(RequestData requestData)
+ public TResponse Request(Endpoint endpoint, RequestData requestData, PostData? postData)
where TResponse : TransportResponse, new()
{
- if (!_calls.ContainsKey(requestData.Uri.Port))
- throw new Exception($"Expected a call to happen on port {requestData.Uri.Port} but received none");
+ if (!_calls.ContainsKey(endpoint.Uri.Port))
+ throw new Exception($"Expected a call to happen on port {endpoint.Uri.Port} but received none");
try
{
- var state = _calls[requestData.Uri.Port];
- if (IsSniffRequest(requestData))
+ var state = _calls[endpoint.Uri.Port];
+ if (IsSniffRequest(endpoint))
{
_ = Interlocked.Increment(ref state.Sniffed);
return HandleRules(
+ endpoint,
requestData,
+ postData,
nameof(VirtualCluster.Sniff),
_cluster.SniffingRules,
requestData.RequestTimeout,
@@ -141,11 +145,13 @@ public TResponse Request(RequestData requestData)
(r) => _productRegistration.CreateSniffResponseBytes(_cluster.Nodes, _cluster.ElasticsearchVersion, _cluster.PublishAddressOverride, _cluster.SniffShouldReturnFqnd)
);
}
- if (IsPingRequest(requestData))
+ if (IsPingRequest(endpoint))
{
_ = Interlocked.Increment(ref state.Pinged);
return HandleRules(
+ endpoint,
requestData,
+ postData,
nameof(VirtualCluster.Ping),
_cluster.PingingRules,
requestData.PingTimeout,
@@ -155,7 +161,9 @@ public TResponse Request(RequestData requestData)
}
_ = Interlocked.Increment(ref state.Called);
return HandleRules(
+ endpoint,
requestData,
+ postData,
nameof(VirtualCluster.ClientCalls),
_cluster.ClientCallRules,
requestData.RequestTimeout,
@@ -165,22 +173,23 @@ public TResponse Request(RequestData requestData)
}
catch (TheException e)
{
- return requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse(requestData, e, null, null, Stream.Null, null, -1, null, null);
+ return requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse(endpoint, requestData, postData, e, null, null, Stream.Null, null, -1, null, null);
}
}
private TResponse HandleRules(
+ Endpoint endpoint,
RequestData requestData,
+ PostData? postData,
string origin,
IList rules,
TimeSpan timeout,
Action beforeReturn,
- Func successResponse
+ Func successResponse
)
where TResponse : TransportResponse, new()
where TRule : IRule
{
- requestData.MadeItToResponse = true;
if (rules.Count == 0)
throw new Exception($"No {origin} defined for the current VirtualCluster, so we do not know how to respond");
@@ -189,32 +198,31 @@ Func successResponse
var always = rule.Times.Match(t => true, t => false);
var times = rule.Times.Match(t => -1, t => t);
- if (rule.OnPort == null || rule.OnPort.Value != requestData.Uri.Port) continue;
+ if (rule.OnPort == null || rule.OnPort.Value != endpoint.Uri.Port) continue;
if (always)
- return Always(requestData, timeout, beforeReturn, successResponse, rule);
+ return Always(endpoint, requestData, postData, timeout, beforeReturn, successResponse, rule);
if (rule.ExecuteCount > times) continue;
- return Sometimes(requestData, timeout, beforeReturn, successResponse, rule);
+ return Sometimes(endpoint, requestData, postData, timeout, beforeReturn, successResponse, rule);
}
foreach (var rule in rules.Where(s => !s.OnPort.HasValue))
{
var always = rule.Times.Match(t => true, t => false);
var times = rule.Times.Match(t => -1, t => t);
if (always)
- return Always(requestData, timeout, beforeReturn, successResponse, rule);
+ return Always(endpoint, requestData, postData, timeout, beforeReturn, successResponse, rule);
if (rule.ExecuteCount > times) continue;
- return Sometimes(requestData, timeout, beforeReturn, successResponse, rule);
+ return Sometimes(endpoint, requestData, postData, timeout, beforeReturn, successResponse, rule);
}
var count = _calls.Select(kv => kv.Value.Called).Sum();
- throw new Exception($@"No global or port specific {origin} rule ({requestData.Uri.Port}) matches any longer after {count} calls in to the cluster");
+ throw new Exception($@"No global or port specific {origin} rule ({endpoint.Uri.Port}) matches any longer after {count} calls in to the cluster");
}
- private TResponse Always(RequestData requestData, TimeSpan timeout, Action beforeReturn,
- Func successResponse, TRule rule
+ private TResponse Always(Endpoint endpoint, RequestData requestData, PostData? postData, TimeSpan timeout, Action beforeReturn, Func successResponse, TRule rule
)
where TResponse : TransportResponse, new()
where TRule : IRule
@@ -231,12 +239,12 @@ private TResponse Always(RequestData requestData, TimeSpan tim
}
return rule.Succeeds
- ? Success(requestData, beforeReturn, successResponse, rule)
- : Fail(requestData, rule);
+ ? Success(endpoint, requestData, postData, beforeReturn, successResponse, rule)
+ : Fail(endpoint, requestData, postData, rule);
}
private TResponse Sometimes(
- RequestData requestData, TimeSpan timeout, Action beforeReturn, Func successResponse, TRule rule
+ Endpoint endpoint, RequestData requestData, PostData? postData, TimeSpan timeout, Action beforeReturn, Func successResponse, TRule rule
)
where TResponse : TransportResponse, new()
where TRule : IRule
@@ -253,16 +261,16 @@ private TResponse Sometimes(
}
if (rule.Succeeds)
- return Success(requestData, beforeReturn, successResponse, rule);
+ return Success(endpoint, requestData, postData, beforeReturn, successResponse, rule);
- return Fail(requestData, rule);
+ return Fail(endpoint, requestData, postData, rule);
}
- private TResponse Fail(RequestData requestData, TRule rule, RuleOption returnOverride = null)
+ private TResponse Fail(Endpoint endpoint, RequestData requestData, PostData? postData, TRule rule, RuleOption? returnOverride = null)
where TResponse : TransportResponse, new()
where TRule : IRule
{
- var state = _calls[requestData.Uri.Port];
+ var state = _calls[endpoint.Uri.Port];
_ = Interlocked.Increment(ref state.Failures);
var ret = returnOverride ?? rule.Return;
rule.RecordExecuted();
@@ -271,25 +279,25 @@ private TResponse Fail(RequestData requestData, TRule rule, Ru
throw new TheException();
return ret.Match(
- (e) => throw e,
- (statusCode) => _inMemoryRequestInvoker.BuildResponse(requestData, CallResponse(rule),
+ e => throw e,
+ statusCode => _inMemoryRequestInvoker.BuildResponse(endpoint, requestData, postData, CallResponse(rule),
//make sure we never return a valid status code in Fail responses because of a bad rule.
statusCode >= 200 && statusCode < 300 ? 502 : statusCode, rule.ReturnContentType)
);
}
- private TResponse Success(RequestData requestData, Action beforeReturn, Func successResponse,
+ private TResponse Success(Endpoint endpoint, RequestData requestData, PostData? postData, Action beforeReturn, Func successResponse,
TRule rule
)
where TResponse : TransportResponse, new()
where TRule : IRule
{
- var state = _calls[requestData.Uri.Port];
+ var state = _calls[endpoint.Uri.Port];
_ = Interlocked.Increment(ref state.Successes);
rule.RecordExecuted();
beforeReturn?.Invoke(rule);
- return _inMemoryRequestInvoker.BuildResponse(requestData, successResponse(rule), contentType: rule.ReturnContentType);
+ return _inMemoryRequestInvoker.BuildResponse(endpoint, requestData, postData, successResponse(rule), contentType: rule.ReturnContentType);
}
private static byte[] CallResponse(TRule rule)
diff --git a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs
index 7ede512..6ab83c4 100644
--- a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs
+++ b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs
@@ -18,7 +18,9 @@ public class VirtualizedCluster
private Func, Func, Task> _asyncCall;
private Func, Func, TransportResponse> _syncCall;
- private class VirtualResponse : TransportResponse { }
+ private class VirtualResponse : TransportResponse;
+
+ private static readonly EndpointPath RootPath = new(HttpMethod.GET, "/");
internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, TransportConfiguration settings)
{
@@ -27,24 +29,20 @@ internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, Transport
_exposingRequestPipeline = new ExposingPipelineFactory(settings, _dateTimeProvider);
_syncCall = (t, r) => t.Request(
- method: HttpMethod.GET,
- path: "/",
+ path: RootPath,
postData: PostData.Serializable(new { }),
- requestParameters: new DefaultRequestParameters(),
openTelemetryData: default,
- localConfiguration: r?.Invoke(new RequestConfigurationDescriptor(null)),
+ localConfiguration: r?.Invoke(new RequestConfigurationDescriptor()),
responseBuilder: null
);
_asyncCall = async (t, r) =>
{
var res = await t.RequestAsync
(
- method: HttpMethod.GET,
- path: "/",
+ path: RootPath,
postData: PostData.Serializable(new { }),
- requestParameters: new DefaultRequestParameters(),
openTelemetryData: default,
- localConfiguration: r?.Invoke(new RequestConfigurationDescriptor(null)),
+ localConfiguration: r?.Invoke(new RequestConfigurationDescriptor()),
responseBuilder: null,
CancellationToken.None
).ConfigureAwait(false);
diff --git a/src/Elastic.Transport.VirtualizedCluster/Products/Elasticsearch/ElasticsearchMockProductRegistration.cs b/src/Elastic.Transport.VirtualizedCluster/Products/Elasticsearch/ElasticsearchMockProductRegistration.cs
index 5f3c3f5..7e92e1c 100644
--- a/src/Elastic.Transport.VirtualizedCluster/Products/Elasticsearch/ElasticsearchMockProductRegistration.cs
+++ b/src/Elastic.Transport.VirtualizedCluster/Products/Elasticsearch/ElasticsearchMockProductRegistration.cs
@@ -22,10 +22,9 @@ public sealed class ElasticsearchMockProductRegistration : MockProductRegistrati
public override byte[] CreateSniffResponseBytes(IReadOnlyList nodes, string stackVersion, string publishAddressOverride, bool returnFullyQualifiedDomainNames) =>
ElasticsearchSniffResponseFactory.Create(nodes, stackVersion, publishAddressOverride, returnFullyQualifiedDomainNames);
- public override bool IsSniffRequest(RequestData requestData) =>
- requestData.PathAndQuery.StartsWith(ElasticsearchProductRegistration.SniffPath, StringComparison.Ordinal);
+ public override bool IsSniffRequest(Endpoint endpoint) =>
+ endpoint.PathAndQuery.StartsWith(ElasticsearchProductRegistration.SniffPath, StringComparison.Ordinal);
- public override bool IsPingRequest(RequestData requestData) =>
- requestData.Method == HttpMethod.HEAD &&
- (requestData.PathAndQuery == string.Empty || requestData.PathAndQuery.StartsWith("?"));
+ public override bool IsPingRequest(Endpoint endpoint) =>
+ endpoint.Method == HttpMethod.HEAD && (endpoint.PathAndQuery == string.Empty || endpoint.PathAndQuery.StartsWith("?"));
}
diff --git a/src/Elastic.Transport.VirtualizedCluster/Products/MockProductRegistration.cs b/src/Elastic.Transport.VirtualizedCluster/Products/MockProductRegistration.cs
index 63df510..a738634 100644
--- a/src/Elastic.Transport.VirtualizedCluster/Products/MockProductRegistration.cs
+++ b/src/Elastic.Transport.VirtualizedCluster/Products/MockProductRegistration.cs
@@ -31,7 +31,7 @@ public abstract class MockProductRegistration
/// see uses this to determine if the current request is a sniff request and should follow
/// the sniffing rules
///
- public abstract bool IsSniffRequest(RequestData requestData);
+ public abstract bool IsSniffRequest(Endpoint endpoint);
- public abstract bool IsPingRequest(RequestData requestData);
+ public abstract bool IsPingRequest(Endpoint endpoint);
}
diff --git a/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs b/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs
index d1cf207..af31cad 100644
--- a/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs
+++ b/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs
@@ -47,7 +47,7 @@ internal DefaultRequestPipeline(
_productRegistration = configurationValues.ProductRegistration;
_responseBuilder = _productRegistration.ResponseBuilder;
_nodePredicate = _settings.NodePredicate ?? _productRegistration.NodePredicate;
- RequestConfiguration = requestConfiguration;
+ RequestConfig = requestConfiguration;
StartedOn = dateTimeProvider.Now();
}
@@ -66,9 +66,9 @@ private RequestConfiguration PingAndSniffRequestConfiguration
{
PingTimeout = PingTimeout,
RequestTimeout = PingTimeout,
- AuthenticationHeader = RequestConfiguration?.AuthenticationHeader ?? _settings.Authentication,
- EnableHttpPipelining = RequestConfiguration?.EnableHttpPipelining ?? _settings.HttpPipeliningEnabled,
- ForceNode = RequestConfiguration?.ForceNode
+ Authentication = RequestConfig?.Authentication ?? _settings.Authentication,
+ EnableHttpPipelining = RequestConfig?.HttpPipeliningEnabled ?? _settings.HttpPipeliningEnabled,
+ ForceNode = RequestConfig?.ForceNode
};
return _pingAndSniffRequestConfiguration;
@@ -100,9 +100,9 @@ public override bool IsTakingTooLong
}
public override int MaxRetries =>
- RequestConfiguration?.ForceNode != null
+ RequestConfig?.ForceNode != null
? 0
- : Math.Min(RequestConfiguration?.MaxRetries ?? _settings.MaxRetries.GetValueOrDefault(int.MaxValue), _nodePool.MaxRetries);
+ : Math.Min(RequestConfig?.MaxRetries ?? _settings.MaxRetries.GetValueOrDefault(int.MaxValue), _nodePool.MaxRetries);
public bool Refresh { get; private set; }
@@ -141,77 +141,86 @@ public override bool StaleClusterState
public override DateTimeOffset StartedOn { get; }
private TimeSpan PingTimeout =>
- RequestConfiguration?.PingTimeout
+ RequestConfig?.PingTimeout
?? _settings.PingTimeout
- ?? (_nodePool.UsingSsl ? TransportConfiguration.DefaultPingTimeoutOnSsl : TransportConfiguration.DefaultPingTimeout);
+ ?? (_nodePool.UsingSsl ? RequestConfiguration.DefaultPingTimeoutOnSsl : RequestConfiguration.DefaultPingTimeout);
- private IRequestConfiguration RequestConfiguration { get; }
+ private IRequestConfiguration RequestConfig { get; }
- private bool RequestDisabledSniff => RequestConfiguration != null && (RequestConfiguration.DisableSniff ?? false);
+ private bool RequestDisabledSniff => RequestConfig != null && (RequestConfig.DisableSniff ?? false);
- private TimeSpan RequestTimeout => RequestConfiguration?.RequestTimeout ?? _settings.RequestTimeout;
+ private TimeSpan RequestTimeout => RequestConfig?.RequestTimeout ?? _settings.RequestTimeout ?? RequestConfiguration.DefaultRequestTimeout;
public override void AuditCancellationRequested() => Audit(CancellationRequested).Dispose();
- public override void BadResponse(ref TResponse response, ApiCallDetails callDetails, RequestData data, TransportException exception)
+ public override void BadResponse(ref TResponse response, ApiCallDetails callDetails, Endpoint endpoint, RequestData data, PostData? postData, TransportException exception)
{
if (response == null)
{
//make sure we copy over the error body in case we disabled direct streaming.
var s = callDetails?.ResponseBodyInBytes == null ? Stream.Null : _memoryStreamFactory.Create(callDetails.ResponseBodyInBytes);
var m = callDetails?.ResponseMimeType ?? RequestData.DefaultMimeType;
- response = _responseBuilder.ToResponse(data, exception, callDetails?.HttpStatusCode, null, s, m, callDetails?.ResponseBodyInBytes?.Length ?? -1, null, null);
+ response = _responseBuilder.ToResponse(endpoint, data, postData, exception, callDetails?.HttpStatusCode, null, s, m, callDetails?.ResponseBodyInBytes?.Length ?? -1, null, null);
}
response.ApiCallDetails.AuditTrail = AuditTrail;
}
- public override TResponse CallProductEndpoint(RequestData requestData)
- => CallProductEndpointCoreAsync(false, requestData).EnsureCompleted();
+ public override TResponse CallProductEndpoint(Endpoint endpoint, RequestData requestData, PostData? postData)
+ => CallProductEndpointCoreAsync(false, endpoint, requestData, postData).EnsureCompleted();
- public override Task CallProductEndpointAsync(RequestData requestData, CancellationToken cancellationToken = default)
- => CallProductEndpointCoreAsync(true, requestData, cancellationToken).AsTask();
+ public override Task CallProductEndpointAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default)
+ => CallProductEndpointCoreAsync(true, endpoint, requestData, postData, cancellationToken).AsTask();
- private async ValueTask CallProductEndpointCoreAsync(bool isAsync, RequestData requestData, CancellationToken cancellationToken = default)
+ private async ValueTask CallProductEndpointCoreAsync(bool isAsync, Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new()
{
- using var audit = Audit(HealthyResponse, requestData.Node);
+ using var audit = Audit(HealthyResponse, endpoint.Node);
if (audit is not null)
- audit.PathAndQuery = requestData.PathAndQuery;
+ audit.PathAndQuery = endpoint.PathAndQuery;
try
{
TResponse response;
if (isAsync)
- response = await _requestInvoker.RequestAsync(requestData, cancellationToken).ConfigureAwait(false);
+ response = await _requestInvoker.RequestAsync(endpoint, requestData, postData, cancellationToken).ConfigureAwait(false);
else
- response = _requestInvoker.Request(requestData);
+ response = _requestInvoker.Request(endpoint, requestData, postData);
response.ApiCallDetails.AuditTrail = AuditTrail;
ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails, response);
if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType && audit is not null)
- audit.Event = requestData.OnFailureAuditEvent;
+ {
+ var @event = response.ApiCallDetails.HttpStatusCode != null ? AuditEvent.BadResponse : BadRequest;
+ audit.Event = @event;
+ }
return response;
}
catch (Exception e) when (audit is not null)
{
- audit.Event = requestData.OnFailureAuditEvent;
+ var @event = e is TransportException t && t.ApiCallDetails.HttpStatusCode != null ? AuditEvent.BadResponse : BadRequest;
+ audit.Event = @event;
audit.Exception = e;
throw;
}
}
- public override TransportException? CreateClientException(TResponse response, ApiCallDetails? callDetails,
- RequestData data, List? seenExceptions)
+ public override TransportException? CreateClientException(
+ TResponse response,
+ ApiCallDetails? callDetails,
+ Endpoint endpoint,
+ RequestData data,
+ List? seenExceptions
+ )
{
if (callDetails?.HasSuccessfulStatusCodeAndExpectedContentType ?? false) return null;
- var pipelineFailure = data.OnFailurePipelineFailure;
+ var pipelineFailure = callDetails?.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest;
var innerException = callDetails?.OriginalException;
if (seenExceptions is not null && seenExceptions.HasAny(out var exs))
{
@@ -253,7 +262,7 @@ private async ValueTask CallProductEndpointCoreAsync(bool
var clientException = new TransportException(pipelineFailure, exceptionMessage, innerException)
{
- Request = data,
+ Endpoint = endpoint,
ApiCallDetails = callDetails,
AuditTrail = AuditTrail
};
@@ -265,7 +274,7 @@ public override void FirstPoolUsage(SemaphoreSlim semaphore)
{
if (!FirstPoolUsageNeedsSniffing) return;
- if (!semaphore.Wait(_settings.RequestTimeout))
+ if (!semaphore.Wait(RequestTimeout))
{
if (FirstPoolUsageNeedsSniffing)
throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null);
@@ -299,7 +308,7 @@ public override async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, Cancella
// TODO cancellationToken could throw here and will bubble out as OperationCancelledException
// everywhere else it would bubble out wrapped in a `UnexpectedTransportException`
- var success = await semaphore.WaitAsync(_settings.RequestTimeout, cancellationToken).ConfigureAwait(false);
+ var success = await semaphore.WaitAsync(RequestTimeout, cancellationToken).ConfigureAwait(false);
if (!success)
{
if (FirstPoolUsageNeedsSniffing)
@@ -353,9 +362,9 @@ public override bool TryGetSingleNode(out Node node)
public override IEnumerable NextNode()
{
- if (RequestConfiguration?.ForceNode != null)
+ if (RequestConfig?.ForceNode != null)
{
- yield return new Node(RequestConfiguration.ForceNode);
+ yield return new Node(RequestConfig.ForceNode);
yield break;
}
@@ -398,27 +407,33 @@ public async ValueTask PingCoreAsync(bool isAsync, Node node, CancellationToken
if (!_productRegistration.SupportsPing) return;
if (PingDisabled(node)) return;
- var pingData = _productRegistration.CreatePingRequestData(node, PingAndSniffRequestConfiguration, _settings, _memoryStreamFactory);
+ var pingEndpoint = _productRegistration.CreatePingEndpoint(node, PingAndSniffRequestConfiguration);
using var audit = Audit(PingSuccess, node);
if (audit is not null)
- audit.PathAndQuery = pingData.PathAndQuery;
+ audit.PathAndQuery = pingEndpoint.PathAndQuery;
TransportResponse response;
+ //TODO remove
+ var requestData = new RequestData(_settings, null, null, _memoryStreamFactory);
+
try
{
if (isAsync)
- response = await _productRegistration.PingAsync(_requestInvoker, pingData, cancellationToken).ConfigureAwait(false);
+ response = await _productRegistration.PingAsync(_requestInvoker, pingEndpoint, requestData, cancellationToken).ConfigureAwait(false);
else
- response = _productRegistration.Ping(_requestInvoker, pingData);
+ response = _productRegistration.Ping(_requestInvoker, pingEndpoint, requestData);
ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails);
//ping should not silently accept bad but valid http responses
if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType)
- throw new PipelineException(pingData.OnFailurePipelineFailure, response.ApiCallDetails.OriginalException) { Response = response };
+ {
+ var pipelineFailure = response.ApiCallDetails.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest;
+ throw new PipelineException(pipelineFailure, response.ApiCallDetails.OriginalException) { Response = response };
+ }
}
catch (Exception e)
{
@@ -445,13 +460,14 @@ public async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellati
foreach (var node in SniffNodes)
{
- var requestData =
- _productRegistration.CreateSniffRequestData(node, PingAndSniffRequestConfiguration, _settings, _memoryStreamFactory);
+ var sniffEndpoint = _productRegistration.CreateSniffEndpoint(node, PingAndSniffRequestConfiguration, _settings);
+ //TODO remove
+ var requestData = new RequestData(_settings, null, null, _memoryStreamFactory);
using var audit = Audit(SniffSuccess, node);
if (audit is not null)
- audit.PathAndQuery = requestData.PathAndQuery;
+ audit.PathAndQuery = sniffEndpoint.PathAndQuery;
Tuple> result;
@@ -459,17 +475,20 @@ public async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellati
{
if (isAsync)
result = await _productRegistration
- .SniffAsync(_requestInvoker, _nodePool.UsingSsl, requestData, cancellationToken)
+ .SniffAsync(_requestInvoker, _nodePool.UsingSsl, sniffEndpoint, requestData, cancellationToken)
.ConfigureAwait(false);
else
result = _productRegistration
- .Sniff(_requestInvoker, _nodePool.UsingSsl, requestData);
+ .Sniff(_requestInvoker, _nodePool.UsingSsl, sniffEndpoint, requestData);
ThrowBadAuthPipelineExceptionWhenNeeded(result.Item1.ApiCallDetails);
//sniff should not silently accept bad but valid http responses
if (!result.Item1.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType)
- throw new PipelineException(requestData.OnFailurePipelineFailure, result.Item1.ApiCallDetails.OriginalException) { Response = result.Item1 };
+ {
+ var pipelineFailure = result.Item1.ApiCallDetails.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest;
+ throw new PipelineException(pipelineFailure, result.Item1.ApiCallDetails.OriginalException) { Response = result.Item1 };
+ }
_nodePool.Reseed(result.Item2);
Refresh = true;
@@ -528,20 +547,19 @@ public override async Task SniffOnStaleClusterAsync(CancellationToken cancellati
}
}
- public override void ThrowNoNodesAttempted(RequestData requestData, List? seenExceptions)
+ public override void ThrowNoNodesAttempted(Endpoint endpoint, List? seenExceptions)
{
- var clientException = new TransportException(PipelineFailure.NoNodesAttempted, RequestPipelineStatics.NoNodesAttemptedMessage,
- (Exception)null);
+ var clientException = new TransportException(PipelineFailure.NoNodesAttempted, RequestPipelineStatics.NoNodesAttemptedMessage, (Exception)null);
using (Audit(NoNodesAttempted))
- throw new UnexpectedTransportException(clientException, seenExceptions) { Request = requestData, AuditTrail = AuditTrail };
+ throw new UnexpectedTransportException(clientException, seenExceptions) { Endpoint = endpoint, AuditTrail = AuditTrail };
}
private bool PingDisabled(Node node) =>
- (RequestConfiguration?.DisablePing).GetValueOrDefault(false)
- || _settings.DisablePings || !_nodePool.SupportsPinging || !node.IsResurrected;
+ (RequestConfig?.DisablePings).GetValueOrDefault(false)
+ || (_settings.DisablePings ?? false) || !_nodePool.SupportsPinging || !node.IsResurrected;
- private Auditable Audit(AuditEvent type, Node node = null) =>
- !_settings.DisableAuditTrail ? (new(type, ref _auditTrail, _dateTimeProvider, node)) : null;
+ private Auditable? Audit(AuditEvent type, Node node = null) =>
+ !_settings.DisableAuditTrail ?? true ? new(type, ref _auditTrail, _dateTimeProvider, node) : null;
private static void ThrowBadAuthPipelineExceptionWhenNeeded(ApiCallDetails details, TransportResponse response = null)
{
diff --git a/src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs b/src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs
index e7b2a77..0bb90c3 100644
--- a/src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs
+++ b/src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs
@@ -33,7 +33,7 @@ internal static class ResponseBuilderDefaults
///
/// A helper class that deals with handling how a is transformed to the requested
/// implementation. This includes handling optionally buffering based on
-/// . And handling short circuiting special responses
+/// . And handling short circuiting special responses
/// such as , and
///
internal class DefaultResponseBuilder : ResponseBuilder where TError : ErrorResponse, new()
@@ -46,7 +46,9 @@ internal static class ResponseBuilderDefaults
/// Create an instance of from
///
public override TResponse ToResponse(
+ Endpoint endpoint,
RequestData requestData,
+ PostData postData,
Exception ex,
int? statusCode,
Dictionary> headers,
@@ -59,12 +61,12 @@ IReadOnlyDictionary tcpStats
{
responseStream.ThrowIfNull(nameof(responseStream));
- var details = Initialize(requestData, ex, statusCode, headers, mimeType, threadPoolStats, tcpStats, contentLength);
+ var details = Initialize(endpoint, requestData, postData, ex, statusCode, headers, mimeType, threadPoolStats, tcpStats, contentLength);
TResponse response = null;
// Only attempt to set the body if the response may have content
- if (MayHaveBody(statusCode, requestData.Method, contentLength))
+ if (MayHaveBody(statusCode, endpoint.Method, contentLength))
response = SetBody(details, requestData, responseStream, mimeType);
response ??= new TResponse();
@@ -76,7 +78,9 @@ IReadOnlyDictionary tcpStats
/// Create an instance of from
///
public override async Task ToResponseAsync(
+ Endpoint endpoint,
RequestData requestData,
+ PostData postData,
Exception ex,
int? statusCode,
Dictionary> headers,
@@ -90,12 +94,12 @@ public override async Task ToResponseAsync(
{
responseStream.ThrowIfNull(nameof(responseStream));
- var details = Initialize(requestData, ex, statusCode, headers, mimeType, threadPoolStats, tcpStats, contentLength);
+ var details = Initialize(endpoint, requestData, postData, ex, statusCode, headers, mimeType, threadPoolStats, tcpStats, contentLength);
TResponse response = null;
// Only attempt to set the body if the response may have content
- if (MayHaveBody(statusCode, requestData.Method, contentLength))
+ if (MayHaveBody(statusCode, endpoint.Method, contentLength))
response = await SetBodyAsync(details, requestData, responseStream, mimeType,
cancellationToken).ConfigureAwait(false);
@@ -104,50 +108,6 @@ public override async Task ToResponseAsync(
return response;
}
- // A helper which returns true if the response could potentially have a body.
- // We check for content-length != 0 rather than > 0 as we may not have a content-length header and the length may be -1.
- // In that case, we may have a body and can only use the status code and method conditions to rule out a potential body.
- private static bool MayHaveBody(int? statusCode, HttpMethod httpMethod, long contentLength) =>
- contentLength != 0 && (!statusCode.HasValue || statusCode.Value != 204 && httpMethod != HttpMethod.HEAD);
-
- private static ApiCallDetails Initialize(RequestData requestData, Exception exception, int? statusCode, Dictionary> headers, string mimeType, IReadOnlyDictionary threadPoolStats, IReadOnlyDictionary tcpStats, long contentLength)
- {
- var hasSuccessfulStatusCode = false;
- var allowedStatusCodes = requestData.AllowedStatusCodes;
- if (statusCode.HasValue)
- {
- if (allowedStatusCodes.Contains(-1) || allowedStatusCodes.Contains(statusCode.Value))
- hasSuccessfulStatusCode = true;
- else
- hasSuccessfulStatusCode = requestData.ConnectionSettings
- .StatusCodeToResponseSuccess(requestData.Method, statusCode.Value);
- }
-
- // We don't validate the content-type (MIME type) for HEAD requests or responses that have no content (204 status code).
- // Elastic Cloud responses to HEAD requests strip the content-type header so we want to avoid validation in that case.
- var hasExpectedContentType = !MayHaveBody(statusCode, requestData.Method, contentLength) || requestData.ValidateResponseContentType(mimeType);
-
- var details = new ApiCallDetails
- {
- HasSuccessfulStatusCode = hasSuccessfulStatusCode,
- HasExpectedContentType = hasExpectedContentType,
- OriginalException = exception,
- HttpStatusCode = statusCode,
- RequestBodyInBytes = requestData.PostData?.WrittenBytes,
- Uri = requestData.Uri,
- HttpMethod = requestData.Method,
- TcpStats = tcpStats,
- ThreadPoolStats = threadPoolStats,
- ResponseMimeType = mimeType,
- TransportConfiguration = requestData.ConnectionSettings
- };
-
- if (headers is not null)
- details.ParsedHeaders = new ReadOnlyDictionary>(headers);
-
- return details;
- }
///
///
@@ -202,7 +162,7 @@ private async ValueTask SetBodyCoreAsync(bool isAsync,
where TResponse : TransportResponse, new()
{
byte[] bytes = null;
- var disableDirectStreaming = requestData.PostData?.DisableDirectStreaming ?? requestData.ConnectionSettings.DisableDirectStreaming;
+ var disableDirectStreaming = requestData.DisableDirectStreaming;
var requiresErrorDeserialization = RequiresErrorDeserialization(details, requestData);
var ownsStream = false;
diff --git a/src/Elastic.Transport/Components/Pipeline/PipelineException.cs b/src/Elastic.Transport/Components/Pipeline/PipelineException.cs
index 3d7044c..ba035a3 100644
--- a/src/Elastic.Transport/Components/Pipeline/PipelineException.cs
+++ b/src/Elastic.Transport/Components/Pipeline/PipelineException.cs
@@ -17,7 +17,7 @@ public PipelineException(PipelineFailure failure)
: base(GetMessage(failure)) => FailureReason = failure;
///
- public PipelineException(PipelineFailure failure, Exception innerException)
+ public PipelineException(PipelineFailure failure, Exception? innerException)
: base(GetMessage(failure), innerException) => FailureReason = failure;
///
diff --git a/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs b/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs
index be86684..585114b 100644
--- a/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs
+++ b/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs
@@ -33,12 +33,12 @@ public enum PipelineFailure
CouldNotStartSniffOnStartup,
///
- /// The overall timeout specified by was reached
+ /// The overall timeout specified by was reached
///
MaxTimeoutReached,
///
- /// The overall max retries as specified by was reached
+ /// The overall max retries as specified by was reached
///
MaxRetriesReached,
diff --git a/src/Elastic.Transport/Components/Pipeline/RequestData.cs b/src/Elastic.Transport/Components/Pipeline/RequestData.cs
index cb9b0b0..778ab7b 100644
--- a/src/Elastic.Transport/Components/Pipeline/RequestData.cs
+++ b/src/Elastic.Transport/Components/Pipeline/RequestData.cs
@@ -12,6 +12,66 @@
namespace Elastic.Transport;
+///
+/// Represents the path of an endpoint in a transport request, including the HTTP method
+/// and the path and query information.
+///
+///
+/// This struct is used to store information about the HTTP method and the path and query of an endpoint,
+/// which are essential components when constructing a request URI.
+///
+public readonly record struct EndpointPath(HttpMethod Method, string PathAndQuery);
+
+
+///
+/// Represents an endpoint in a transport request, encapsulating the HTTP method, path and query,
+/// and the node to which the request is being sent.
+///
+///
+/// This class is used to construct the URI for the request based on the node's URI and the path and query.
+/// An empty endpoint can be created using the method as a default or placeholder instance.
+///
+public record Endpoint(in EndpointPath Path, Node Node)
+{
+ /// Represents an empty endpoint used as a default or placeholder instance of .
+ public static Endpoint Empty(in EndpointPath path) => new(path, EmptyNode);
+
+ private static readonly Node EmptyNode = new(new Uri("http://empty.example"));
+
+ /// Indicates whether the endpoint is an empty placeholder instance.
+ public bool IsEmpty => Node == EmptyNode;
+
+ /// The for the request.
+ public Uri Uri { get; private init; } = new(Node.Uri, Path.PathAndQuery);
+
+ /// The HTTP method used for the request (e.g., GET, POST, PUT, DELETE, HEAD).
+ public HttpMethod Method => Path.Method;
+
+ /// Gets the path and query of the endpoint.
+ public string PathAndQuery => Path.PathAndQuery;
+
+ private readonly Node _node = Node;
+
+ ///
+ /// Represents a node within the transport layer of the Elastic search client.
+ /// This object encapsulates the characteristics of a node, allowing for comparisons and operations
+ /// within the broader search infrastructure.
+ ///
+ public Node Node
+ {
+ get => _node;
+ init
+ {
+ _node = value;
+ Uri = new(Node.Uri, Path.PathAndQuery);
+ }
+ }
+
+ ///
+ public override string ToString() => $"{Path.Method.GetStringValue()} {Uri}";
+
+}
+
///
/// Where and how should connect to.
///
@@ -24,40 +84,56 @@ public sealed class RequestData
//TODO add xmldocs and clean up this class
#pragma warning disable 1591
public const string DefaultMimeType = "application/json";
- public const string MimeTypeTextPlain = "text/plain";
public const string OpaqueIdHeader = "X-Opaque-Id";
public const string RunAsSecurityHeader = "es-security-runas-user";
- private Uri? _requestUri;
- private Node? _node;
-
public RequestData(
- HttpMethod method,
- string pathAndQuery,
- PostData? data,
ITransportConfiguration global,
IRequestConfiguration? local,
CustomResponseBuilder? customResponseBuilder,
- MemoryStreamFactory memoryStreamFactory,
- OpenTelemetryData openTelemetryData
+ MemoryStreamFactory memoryStreamFactory
)
{
- OpenTelemetryData = openTelemetryData;
CustomResponseBuilder = customResponseBuilder;
ConnectionSettings = global;
MemoryStreamFactory = memoryStreamFactory;
- Method = method;
- PostData = data;
- PathAndQuery = pathAndQuery;
+ SkipDeserializationForStatusCodes = global.SkipDeserializationForStatusCodes;
+ DnsRefreshTimeout = global.DnsRefreshTimeout;
+ MetaHeaderProvider = global.MetaHeaderProvider;
+ ProxyAddress = global.ProxyAddress;
+ ProxyUsername = global.ProxyUsername;
+ ProxyPassword = global.ProxyPassword;
+ DisableAutomaticProxyDetection = global.DisableAutomaticProxyDetection;
+ UserAgent = global.UserAgent;
+ KeepAliveInterval = (int)(global.KeepAliveInterval?.TotalMilliseconds ?? 2000);
+ KeepAliveTime = (int)(global.KeepAliveTime?.TotalMilliseconds ?? 2000);
+
+ RunAs = local?.RunAs ?? global.RunAs;
- if (data != null)
- data.DisableDirectStreaming = local?.DisableDirectStreaming ?? global.DisableDirectStreaming;
+ DisableDirectStreaming = local?.DisableDirectStreaming ?? global.DisableDirectStreaming ?? false;
- Pipelined = local?.EnableHttpPipelining ?? global.HttpPipeliningEnabled;
- HttpCompression = global.EnableHttpCompression;
- ContentType = local?.ContentType ?? global.ProductRegistration.DefaultMimeType ?? DefaultMimeType;
- Accept = local?.Accept ?? global.ProductRegistration.DefaultMimeType ?? DefaultMimeType;
+ Pipelined = local?.HttpPipeliningEnabled ?? global.HttpPipeliningEnabled ?? true;
+ HttpCompression = global.EnableHttpCompression ?? local?.EnableHttpCompression ?? true;
+ ContentType = local?.ContentType ?? global.Accept ?? DefaultMimeType;
+ Accept = local?.Accept ?? global.Accept ?? DefaultMimeType;
+ ThrowExceptions = local?.ThrowExceptions ?? global.ThrowExceptions ?? false;
+ RequestTimeout = local?.RequestTimeout ?? global.RequestTimeout ?? RequestConfiguration.DefaultRequestTimeout;
+ RequestMetaData = local?.RequestMetaData?.Items ?? EmptyReadOnly.Dictionary;
+ AuthenticationHeader = local?.Authentication ?? global.Authentication;
+ AllowedStatusCodes = local?.AllowedStatusCodes ?? EmptyReadOnly.Collection;
+ ClientCertificates = local?.ClientCertificates ?? global.ClientCertificates;
+ TransferEncodingChunked = local?.TransferEncodingChunked ?? global.TransferEncodingChunked ?? false;
+ TcpStats = local?.EnableTcpStats ?? global.EnableTcpStats ?? true;
+ ThreadPoolStats = local?.EnableThreadPoolStats ?? global.EnableThreadPoolStats ?? true;
+ ParseAllHeaders = local?.ParseAllHeaders ?? global.ParseAllHeaders ?? false;
+ ResponseHeadersToParse = local is not null
+ ? new HeadersList(local.ResponseHeadersToParse, global.ResponseHeadersToParse)
+ : global.ResponseHeadersToParse;
+ PingTimeout =
+ local?.PingTimeout
+ ?? global.PingTimeout
+ ?? (global.NodePool.UsingSsl ? RequestConfiguration.DefaultPingTimeoutOnSsl : RequestConfiguration.DefaultPingTimeout);
if (global.Headers != null)
Headers = new NameValueCollection(global.Headers);
@@ -75,79 +151,26 @@ OpenTelemetryData openTelemetryData
Headers.Add(OpaqueIdHeader, local.OpaqueId);
}
- RunAs = local?.RunAs;
- SkipDeserializationForStatusCodes = global.SkipDeserializationForStatusCodes;
- ThrowExceptions = local?.ThrowExceptions ?? global.ThrowExceptions;
-
- RequestTimeout = local?.RequestTimeout ?? global.RequestTimeout;
- PingTimeout =
- local?.PingTimeout
- ?? global.PingTimeout
- ?? (global.NodePool.UsingSsl ? TransportConfiguration.DefaultPingTimeoutOnSsl : TransportConfiguration.DefaultPingTimeout);
-
- KeepAliveInterval = (int)(global.KeepAliveInterval?.TotalMilliseconds ?? 2000);
- KeepAliveTime = (int)(global.KeepAliveTime?.TotalMilliseconds ?? 2000);
- DnsRefreshTimeout = global.DnsRefreshTimeout;
-
- MetaHeaderProvider = global.MetaHeaderProvider;
- RequestMetaData = local?.RequestMetaData?.Items ?? EmptyReadOnly.Dictionary;
-
- ProxyAddress = global.ProxyAddress;
- ProxyUsername = global.ProxyUsername;
- ProxyPassword = global.ProxyPassword;
- DisableAutomaticProxyDetection = global.DisableAutomaticProxyDetection;
- AuthenticationHeader = local?.AuthenticationHeader ?? global.Authentication;
- AllowedStatusCodes = local?.AllowedStatusCodes ?? EmptyReadOnly.Collection;
- ClientCertificates = local?.ClientCertificates ?? global.ClientCertificates;
- UserAgent = global.UserAgent;
- TransferEncodingChunked = local?.TransferEncodingChunked ?? global.TransferEncodingChunked;
- TcpStats = local?.EnableTcpStats ?? global.EnableTcpStats;
- ThreadPoolStats = local?.EnableThreadPoolStats ?? global.EnableThreadPoolStats;
- ParseAllHeaders = local?.ParseAllHeaders ?? global.ParseAllHeaders ?? false;
-
- if (local is not null)
- {
- ResponseHeadersToParse = local.ResponseHeadersToParse;
- ResponseHeadersToParse = new HeadersList(local.ResponseHeadersToParse, global.ResponseHeadersToParse);
- }
- else
- ResponseHeadersToParse = global.ResponseHeadersToParse;
}
public string Accept { get; }
public IReadOnlyCollection AllowedStatusCodes { get; }
- public AuthorizationHeader AuthenticationHeader { get; }
- public X509CertificateCollection ClientCertificates { get; }
+ public AuthorizationHeader? AuthenticationHeader { get; }
+ public X509CertificateCollection? ClientCertificates { get; }
public ITransportConfiguration ConnectionSettings { get; }
public CustomResponseBuilder? CustomResponseBuilder { get; }
- public bool DisableAutomaticProxyDetection { get; }
- public HeadersList ResponseHeadersToParse { get; }
+ public HeadersList? ResponseHeadersToParse { get; }
+ public NameValueCollection? Headers { get; }
+ public bool DisableDirectStreaming { get; }
public bool ParseAllHeaders { get; }
- public NameValueCollection Headers { get; }
+ public bool DisableAutomaticProxyDetection { get; }
public bool HttpCompression { get; }
public int KeepAliveInterval { get; }
public int KeepAliveTime { get; }
- public bool MadeItToResponse { get; set; }
public MemoryStreamFactory MemoryStreamFactory { get; }
- public HttpMethod Method { get; }
-
- public Node Node
- {
- get => _node;
- set
- {
- // We want the Uri to regenerate when the node changes
- _requestUri = null;
- _node = value;
- }
- }
- public AuditEvent OnFailureAuditEvent => MadeItToResponse ? AuditEvent.BadResponse : AuditEvent.BadRequest;
- public PipelineFailure OnFailurePipelineFailure => MadeItToResponse ? PipelineFailure.BadResponse : PipelineFailure.BadRequest;
- public string PathAndQuery { get; }
public TimeSpan PingTimeout { get; }
public bool Pipelined { get; }
- public PostData? PostData { get; }
public string ProxyAddress { get; }
public string ProxyPassword { get; }
public string ProxyUsername { get; }
@@ -161,31 +184,13 @@ public Node Node
public bool TcpStats { get; }
public bool ThreadPoolStats { get; }
- ///
- /// The for the request.
- ///
- public Uri Uri
- {
- get
- {
- if (_requestUri is not null) return _requestUri;
-
- _requestUri = Node is not null ? new Uri(Node.Uri, PathAndQuery) : null;
- return _requestUri;
- }
- }
-
public TimeSpan DnsRefreshTimeout { get; }
- public MetaHeaderProvider MetaHeaderProvider { get; }
+ public MetaHeaderProvider? MetaHeaderProvider { get; }
public IReadOnlyDictionary RequestMetaData { get; }
- public bool IsAsync { get; internal set; }
-
- internal OpenTelemetryData OpenTelemetryData { get; }
-
- public override string ToString() => $"{Method.GetStringValue()} {PathAndQuery}";
+ //internal OpenTelemetryData OpenTelemetryData { get; }
internal bool ValidateResponseContentType(string responseMimeType)
{
diff --git a/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs b/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs
index 4a36a03..79dc658 100644
--- a/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs
+++ b/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs
@@ -45,10 +45,10 @@ internal RequestPipeline() { }
public abstract DateTimeOffset StartedOn { get; }
- public abstract TResponse CallProductEndpoint(RequestData requestData)
+ public abstract TResponse CallProductEndpoint(Endpoint endpoint, RequestData requestData, PostData? postData)
where TResponse : TransportResponse, new();
- public abstract Task CallProductEndpointAsync(RequestData requestData, CancellationToken cancellationToken)
+ public abstract Task CallProductEndpointAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken)
where TResponse : TransportResponse, new();
public abstract void MarkAlive(Node node);
@@ -87,15 +87,15 @@ public abstract Task CallProductEndpointAsync(RequestData
public abstract Task SniffOnConnectionFailureAsync(CancellationToken cancellationToken);
- public abstract void BadResponse(ref TResponse response, ApiCallDetails callDetails, RequestData data, TransportException exception)
+ public abstract void BadResponse(ref TResponse response, ApiCallDetails callDetails, Endpoint endpoint, RequestData data, PostData? postData, TransportException exception)
where TResponse : TransportResponse, new();
- public abstract void ThrowNoNodesAttempted(RequestData requestData, List? seenExceptions);
+ public abstract void ThrowNoNodesAttempted(Endpoint endpoint, List? seenExceptions);
public abstract void AuditCancellationRequested();
public abstract TransportException? CreateClientException(TResponse? response, ApiCallDetails? callDetails,
- RequestData data, List? seenExceptions)
+ Endpoint endpoint, RequestData data, List? seenExceptions)
where TResponse : TransportResponse, new();
#pragma warning restore 1591
diff --git a/src/Elastic.Transport/Components/Pipeline/ResponseBuilder.cs b/src/Elastic.Transport/Components/Pipeline/ResponseBuilder.cs
index f4952a9..95f18c3 100644
--- a/src/Elastic.Transport/Components/Pipeline/ResponseBuilder.cs
+++ b/src/Elastic.Transport/Components/Pipeline/ResponseBuilder.cs
@@ -4,7 +4,9 @@
using System;
using System.Collections.Generic;
+using System.Collections.ObjectModel;
using System.IO;
+using System.Linq;
using System.Net.NetworkInformation;
using System.Threading;
using System.Threading.Tasks;
@@ -24,15 +26,17 @@ public abstract class ResponseBuilder
/// Create an instance of from
///
public abstract TResponse ToResponse(
+ Endpoint endpoint,
RequestData requestData,
- Exception ex,
+ PostData? postData,
+ Exception? ex,
int? statusCode,
- Dictionary> headers,
+ Dictionary>? headers,
Stream responseStream,
- string mimeType,
+ string? mimeType,
long contentLength,
- IReadOnlyDictionary threadPoolStats,
- IReadOnlyDictionary tcpStats
+ IReadOnlyDictionary? threadPoolStats,
+ IReadOnlyDictionary? tcpStats
) where TResponse : TransportResponse, new();
@@ -40,15 +44,74 @@ IReadOnlyDictionary tcpStats
/// Create an instance of from
///
public abstract Task ToResponseAsync(
+ Endpoint endpoint,
RequestData requestData,
- Exception ex,
+ PostData? postData,
+ Exception? ex,
int? statusCode,
- Dictionary> headers,
+ Dictionary>? headers,
Stream responseStream,
- string mimeType,
+ string? mimeType,
long contentLength,
- IReadOnlyDictionary threadPoolStats,
- IReadOnlyDictionary tcpStats,
+ IReadOnlyDictionary? threadPoolStats,
+ IReadOnlyDictionary? tcpStats,
CancellationToken cancellationToken = default
) where TResponse : TransportResponse, new();
+
+ internal static ApiCallDetails Initialize(
+ Endpoint endpoint,
+ RequestData requestData,
+ PostData? postData,
+ Exception exception,
+ int? statusCode,
+ Dictionary> headers, string mimeType,
+ IReadOnlyDictionary threadPoolStats, IReadOnlyDictionary tcpStats,
+ long contentLength
+ )
+ {
+ var hasSuccessfulStatusCode = false;
+ var allowedStatusCodes = requestData.AllowedStatusCodes;
+ if (statusCode.HasValue)
+ {
+ if (allowedStatusCodes.Contains(-1) || allowedStatusCodes.Contains(statusCode.Value))
+ hasSuccessfulStatusCode = true;
+ else
+ hasSuccessfulStatusCode = requestData.ConnectionSettings
+ .StatusCodeToResponseSuccess(endpoint.Method, statusCode.Value);
+ }
+
+ // We don't validate the content-type (MIME type) for HEAD requests or responses that have no content (204 status code).
+ // Elastic Cloud responses to HEAD requests strip the content-type header so we want to avoid validation in that case.
+ var hasExpectedContentType = !MayHaveBody(statusCode, endpoint.Method, contentLength) || requestData.ValidateResponseContentType(mimeType);
+
+ var details = new ApiCallDetails
+ {
+ HasSuccessfulStatusCode = hasSuccessfulStatusCode,
+ HasExpectedContentType = hasExpectedContentType,
+ OriginalException = exception,
+ HttpStatusCode = statusCode,
+ RequestBodyInBytes = postData?.WrittenBytes,
+ Uri = endpoint.Uri,
+ HttpMethod = endpoint.Method,
+ TcpStats = tcpStats,
+ ThreadPoolStats = threadPoolStats,
+ ResponseMimeType = mimeType,
+ TransportConfiguration = requestData.ConnectionSettings
+ };
+
+ if (headers is not null)
+ details.ParsedHeaders = new ReadOnlyDictionary>(headers);
+
+ return details;
+ }
+
+ ///
+ /// A helper which returns true if the response could potentially have a body.
+ /// We check for content-length != 0 rather than > 0 as we may not have a content-length header and the length may be -1.
+ /// In that case, we may have a body and can only use the status code and method conditions to rule out a potential body.
+ ///
+ protected static bool MayHaveBody(int? statusCode, HttpMethod httpMethod, long contentLength) =>
+ contentLength != 0 && (!statusCode.HasValue || statusCode.Value != 204 && httpMethod != HttpMethod.HEAD);
+
}
diff --git a/src/Elastic.Transport/Components/TransportClient/Content/RequestDataContent.cs b/src/Elastic.Transport/Components/TransportClient/Content/RequestDataContent.cs
index a73a09e..2b9dd3a 100644
--- a/src/Elastic.Transport/Components/TransportClient/Content/RequestDataContent.cs
+++ b/src/Elastic.Transport/Components/TransportClient/Content/RequestDataContent.cs
@@ -29,17 +29,19 @@ namespace Elastic.Transport;
internal sealed class RequestDataContent : HttpContent
{
private readonly RequestData _requestData;
+ private readonly PostData? _postData;
- private readonly Func
+ private readonly Func
_onStreamAvailableAsync;
- private readonly Action _onStreamAvailable;
+ private readonly Action _onStreamAvailable;
private readonly CancellationToken _token;
/// Constructor used in synchronous paths.
- public RequestDataContent(RequestData requestData)
+ public RequestDataContent(RequestData requestData, PostData postData)
{
_requestData = requestData;
+ _postData = postData;
_token = default;
Headers.TryAddWithoutValidation("Content-Type", requestData.ContentType);
@@ -51,11 +53,16 @@ public RequestDataContent(RequestData requestData)
_onStreamAvailableAsync = OnStreamAvailableAsync;
}
- private static void OnStreamAvailable(RequestData data, Stream stream, HttpContent content, TransportContext context)
+ private static void OnStreamAvailable(RequestData data, PostData? postData, Stream stream, HttpContent content, TransportContext context)
{
+ if (postData == null)
+ {
+ stream.Dispose();
+ return;
+ }
if (data.HttpCompression) stream = new GZipStream(stream, CompressionMode.Compress, false);
- using (stream) data.PostData.Write(stream, data.ConnectionSettings);
+ using (stream) postData.Write(stream, data.ConnectionSettings, data.DisableDirectStreaming);
}
/// Constructor used in asynchronous paths.
@@ -73,8 +80,17 @@ public RequestDataContent(RequestData requestData, CancellationToken token)
_onStreamAvailableAsync = OnStreamAvailableAsync;
}
- private static async Task OnStreamAvailableAsync(RequestData data, Stream stream, HttpContent content, TransportContext context, CancellationToken ctx = default)
+ private static async Task OnStreamAvailableAsync(RequestData data, PostData? postData, Stream stream, HttpContent content, TransportContext context, CancellationToken ctx = default)
{
+ if (postData == null)
+ {
+#if NET6_0_OR_GREATER
+ await stream.DisposeAsync().ConfigureAwait(false);
+#else
+ stream.Dispose();
+#endif
+ return;
+ }
if (data.HttpCompression) stream = new GZipStream(stream, CompressionMode.Compress, false);
#if NET6_0_OR_GREATER
@@ -82,7 +98,7 @@ private static async Task OnStreamAvailableAsync(RequestData data, Stream stream
#else
using (stream)
#endif
- await data.PostData.WriteAsync(stream, data.ConnectionSettings, ctx).ConfigureAwait(false);
+ await postData.WriteAsync(stream, data.ConnectionSettings, data.DisableDirectStreaming, ctx).ConfigureAwait(false);
}
///
@@ -108,7 +124,7 @@ async Task SerializeToStreamAsync(Stream stream, TransportContext context, Cance
var source = CancellationTokenSource.CreateLinkedTokenSource(_token, cancellationToken);
var serializeToStreamTask = new TaskCompletionSource();
var wrappedStream = new CompleteTaskOnCloseStream(stream, serializeToStreamTask);
- await _onStreamAvailableAsync(_requestData, wrappedStream, this, context, source.Token).ConfigureAwait(false);
+ await _onStreamAvailableAsync(_requestData, _postData, wrappedStream, this, context, source.Token).ConfigureAwait(false);
await serializeToStreamTask.Task.ConfigureAwait(false);
}
@@ -117,7 +133,7 @@ protected override void SerializeToStream(Stream stream, TransportContext contex
{
var serializeToStreamTask = new TaskCompletionSource();
using var wrappedStream = new CompleteTaskOnCloseStream(stream, serializeToStreamTask);
- _onStreamAvailable(_requestData, wrappedStream, this, context);
+ _onStreamAvailable(_requestData, _postData, wrappedStream, this, context);
//await serializeToStreamTask.Task.ConfigureAwait(false);
}
#endif
diff --git a/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs b/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs
index 4fd3335..00031d8 100644
--- a/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs
+++ b/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs
@@ -56,16 +56,16 @@ public HttpRequestInvoker(Func
- public TResponse Request(RequestData requestData)
+ public TResponse Request(Endpoint endpoint, RequestData requestData, PostData? postData)
where TResponse : TransportResponse, new() =>
- RequestCoreAsync(false, requestData).EnsureCompleted();
+ RequestCoreAsync(false, endpoint, requestData, postData).EnsureCompleted();
///
- public Task RequestAsync(RequestData requestData, CancellationToken cancellationToken)
+ public Task RequestAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken)
where TResponse : TransportResponse, new() =>
- RequestCoreAsync(true, requestData, cancellationToken).AsTask();
+ RequestCoreAsync(true, endpoint, requestData, postData, cancellationToken).AsTask();
- private async ValueTask RequestCoreAsync(bool isAsync, RequestData requestData, CancellationToken cancellationToken = default)
+ private async ValueTask RequestCoreAsync(bool isAsync, Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new()
{
var client = GetClient(requestData);
@@ -79,20 +79,19 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req
ReadOnlyDictionary tcpStats = null;
ReadOnlyDictionary threadPoolStats = null;
Dictionary> responseHeaders = null;
- requestData.IsAsync = isAsync;
var beforeTicks = Stopwatch.GetTimestamp();
try
{
- var requestMessage = CreateHttpRequestMessage(requestData);
+ var requestMessage = CreateHttpRequestMessage(endpoint, requestData, isAsync);
- if (requestData.PostData is not null)
+ if (postData is not null)
{
if (isAsync)
- await SetContentAsync(requestMessage, requestData, cancellationToken).ConfigureAwait(false);
+ await SetContentAsync(requestMessage, requestData, postData, cancellationToken).ConfigureAwait(false);
else
- SetContent(requestMessage, requestData);
+ SetContent(requestMessage, requestData, postData);
}
using (requestMessage?.Content ?? (IDisposable)Stream.Null)
@@ -122,7 +121,6 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req
statusCode = (int)responseMessage.StatusCode;
}
- requestData.MadeItToResponse = true;
mimeType = responseMessage.Content.Headers.ContentType?.ToString();
responseHeaders = ParseHeaders(requestData, responseMessage);
@@ -160,11 +158,11 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req
{
if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync
- (requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
+ (endpoint, requestData, postData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse
- (requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);
+ (endpoint, requestData, postData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);
// Unless indicated otherwise by the TransportResponse, we've now handled the response stream, so we can dispose of the HttpResponseMessage
// to release the connection. In cases, where the derived response works directly on the stream, it can be left open and additional IDisposable
@@ -195,7 +193,7 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req
{
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
responseStream.Dispose();
- receivedResponse.Dispose();
+ receivedResponse.Dispose();
throw;
}
}
@@ -217,7 +215,7 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req
responseHeaders ??= new Dictionary>();
responseHeaders.Add(header.Key, header.Value);
}
- else if (requestData.ResponseHeadersToParse.Count > 0)
+ else if (requestData.ResponseHeadersToParse is { Count: > 0 })
foreach (var headerToParse in requestData.ResponseHeadersToParse)
if (responseMessage.Headers.TryGetValues(headerToParse, out var values))
{
@@ -324,35 +322,38 @@ private string ComparableFingerprint(string fingerprint)
/// Creates an instance of using the .
/// This method is virtual so subclasses of can modify the instance if needed.
///
- /// An instance of describing where and how to call out to
+ /// An object describing where we want to call out to
+ /// An object describing how we want to call out to
+ ///
///
/// Can throw if is set but the platform does
/// not allow this to be set on
///
- internal HttpRequestMessage CreateHttpRequestMessage(RequestData requestData)
+ internal HttpRequestMessage CreateHttpRequestMessage(Endpoint endpoint, RequestData requestData, bool isAsync)
{
- var request = CreateRequestMessage(requestData);
- SetAuthenticationIfNeeded(request, requestData);
+ var request = CreateRequestMessage(endpoint, requestData, isAsync);
+ SetAuthenticationIfNeeded(endpoint, requestData, request);
return request;
}
/// Isolated hook for subclasses to set authentication on
/// The instance of that needs authentication details
- /// An object describing where and how we want to call out to
- internal void SetAuthenticationIfNeeded(HttpRequestMessage requestMessage, RequestData requestData)
+ /// An object describing where we want to call out to
+ /// An object describing how we want to call out to
+ internal void SetAuthenticationIfNeeded(Endpoint endpoint, RequestData requestData, HttpRequestMessage requestMessage)
{
//If user manually specifies an Authorization Header give it preference
- if (requestData.Headers.HasKeys() && requestData.Headers.AllKeys.Contains("Authorization"))
+ if (requestData.Headers != null && requestData.Headers.HasKeys() && requestData.Headers.AllKeys.Contains("Authorization"))
{
var header = AuthenticationHeaderValue.Parse(requestData.Headers["Authorization"]);
requestMessage.Headers.Authorization = header;
return;
}
- SetConfiguredAuthenticationHeaderIfNeeded(requestMessage, requestData);
+ SetConfiguredAuthenticationHeaderIfNeeded(endpoint, requestData, requestMessage);
}
- private static void SetConfiguredAuthenticationHeaderIfNeeded(HttpRequestMessage requestMessage, RequestData requestData)
+ private static void SetConfiguredAuthenticationHeaderIfNeeded(Endpoint endpoint, RequestData requestData, HttpRequestMessage requestMessage)
{
// Basic auth credentials take the following precedence (highest -> lowest):
// 1 - Specified with the URI (highest precedence)
@@ -361,9 +362,9 @@ private static void SetConfiguredAuthenticationHeaderIfNeeded(HttpRequestMessage
string parameters = null;
string scheme = null;
- if (!requestData.Uri.UserInfo.IsNullOrEmpty())
+ if (!endpoint.Uri.UserInfo.IsNullOrEmpty())
{
- parameters = BasicAuthentication.GetBase64String(Uri.UnescapeDataString(requestData.Uri.UserInfo));
+ parameters = BasicAuthentication.GetBase64String(Uri.UnescapeDataString(endpoint.Uri.UserInfo));
scheme = BasicAuthentication.BasicAuthenticationScheme;
}
else if (requestData.AuthenticationHeader != null && requestData.AuthenticationHeader.TryGetAuthorizationParameters(out var v))
@@ -377,10 +378,10 @@ private static void SetConfiguredAuthenticationHeaderIfNeeded(HttpRequestMessage
requestMessage.Headers.Authorization = new AuthenticationHeaderValue(scheme, parameters);
}
- private static HttpRequestMessage CreateRequestMessage(RequestData requestData)
+ private static HttpRequestMessage CreateRequestMessage(Endpoint endpoint, RequestData requestData, bool isAsync)
{
- var method = ConvertHttpMethod(requestData.Method);
- var requestMessage = new HttpRequestMessage(method, requestData.Uri);
+ var method = ConvertHttpMethod(endpoint.Method);
+ var requestMessage = new HttpRequestMessage(method, endpoint.Uri);
if (requestData.Headers != null)
foreach (string key in requestData.Headers)
@@ -404,7 +405,7 @@ private static HttpRequestMessage CreateRequestMessage(RequestData requestData)
{
foreach (var producer in requestData.MetaHeaderProvider.Producers)
{
- var value = producer.ProduceHeaderValue(requestData);
+ var value = producer.ProduceHeaderValue(requestData, isAsync);
if (!string.IsNullOrEmpty(value))
requestMessage.Headers.TryAddWithoutValidation(producer.HeaderName, value);
@@ -414,25 +415,25 @@ private static HttpRequestMessage CreateRequestMessage(RequestData requestData)
return requestMessage;
}
- private static void SetContent(HttpRequestMessage message, RequestData requestData)
+ private static void SetContent(HttpRequestMessage message, RequestData requestData, PostData postData)
{
if (requestData.TransferEncodingChunked)
- message.Content = new RequestDataContent(requestData);
+ message.Content = new RequestDataContent(requestData, postData);
else
{
var stream = requestData.MemoryStreamFactory.Create();
if (requestData.HttpCompression)
{
using var zipStream = new GZipStream(stream, CompressionMode.Compress, true);
- requestData.PostData.Write(zipStream, requestData.ConnectionSettings);
+ postData.Write(zipStream, requestData.ConnectionSettings, requestData.DisableDirectStreaming);
}
else
- requestData.PostData.Write(stream, requestData.ConnectionSettings);
+ postData.Write(stream, requestData.ConnectionSettings, requestData.DisableDirectStreaming);
// the written bytes are uncompressed, so can only be used when http compression isn't used
- if (requestData.PostData.DisableDirectStreaming.GetValueOrDefault(false) && !requestData.HttpCompression)
+ if (requestData.DisableDirectStreaming && !requestData.HttpCompression)
{
- message.Content = new ByteArrayContent(requestData.PostData.WrittenBytes);
+ message.Content = new ByteArrayContent(postData.WrittenBytes);
stream.Dispose();
}
else
@@ -448,7 +449,7 @@ private static void SetContent(HttpRequestMessage message, RequestData requestDa
}
}
- private static async Task SetContentAsync(HttpRequestMessage message, RequestData requestData, CancellationToken cancellationToken)
+ private static async Task SetContentAsync(HttpRequestMessage message, RequestData requestData, PostData postData, CancellationToken cancellationToken)
{
if (requestData.TransferEncodingChunked)
message.Content = new RequestDataContent(requestData, cancellationToken);
@@ -458,15 +459,15 @@ private static async Task SetContentAsync(HttpRequestMessage message, RequestDat
if (requestData.HttpCompression)
{
using var zipStream = new GZipStream(stream, CompressionMode.Compress, true);
- await requestData.PostData.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
+ await postData.WriteAsync(zipStream, requestData.ConnectionSettings, requestData.DisableDirectStreaming, cancellationToken).ConfigureAwait(false);
}
else
- await requestData.PostData.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
+ await postData.WriteAsync(stream, requestData.ConnectionSettings, requestData.DisableDirectStreaming, cancellationToken).ConfigureAwait(false);
// the written bytes are uncompressed, so can only be used when http compression isn't used
- if (requestData.PostData.DisableDirectStreaming.GetValueOrDefault(false) && !requestData.HttpCompression)
+ if (requestData.DisableDirectStreaming && !requestData.HttpCompression)
{
- message.Content = new ByteArrayContent(requestData.PostData.WrittenBytes);
+ message.Content = new ByteArrayContent(postData.WrittenBytes);
#if DOTNETCORE_2_1_OR_HIGHER
await stream.DisposeAsync().ConfigureAwait(false);
#else
diff --git a/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs b/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs
index 6ced0e1..d64c0cf 100644
--- a/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs
+++ b/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs
@@ -50,16 +50,16 @@ public HttpWebRequestInvoker() { }
void IDisposable.Dispose() {}
/// >
- public TResponse Request(RequestData requestData)
+ public TResponse Request(Endpoint endpoint, RequestData requestData, PostData? postData)
where TResponse : TransportResponse, new() =>
- RequestCoreAsync(false, requestData).EnsureCompleted();
+ RequestCoreAsync(false, endpoint, requestData, postData).EnsureCompleted();
/// >
- public Task RequestAsync(RequestData requestData, CancellationToken cancellationToken = default)
+ public Task RequestAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new() =>
- RequestCoreAsync(true, requestData, cancellationToken).AsTask();
+ RequestCoreAsync(true, endpoint, requestData, postData, cancellationToken).AsTask();
- private async ValueTask RequestCoreAsync(bool isAsync, RequestData requestData, CancellationToken cancellationToken = default)
+ private async ValueTask RequestCoreAsync(bool isAsync, Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new()
{
Action unregisterWaitHandle = null;
@@ -72,14 +72,13 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req
ReadOnlyDictionary tcpStats = null;
ReadOnlyDictionary threadPoolStats = null;
Dictionary> responseHeaders = null;
- requestData.IsAsync = true;
var beforeTicks = Stopwatch.GetTimestamp();
try
{
- var data = requestData.PostData;
- var request = CreateHttpWebRequest(requestData);
+ var data = postData;
+ var request = CreateHttpWebRequest(endpoint, requestData, postData, isAsync);
using (cancellationToken.Register(() => request.Abort()))
{
if (data is not null)
@@ -95,10 +94,10 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req
if (requestData.HttpCompression)
{
using var zipStream = new GZipStream(stream, CompressionMode.Compress);
- await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
+ await data.WriteAsync(zipStream, requestData.ConnectionSettings, requestData.DisableDirectStreaming, cancellationToken).ConfigureAwait(false);
}
else
- await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
+ await data.WriteAsync(stream, requestData.ConnectionSettings, requestData.DisableDirectStreaming, cancellationToken).ConfigureAwait(false);
}
unregisterWaitHandle?.Invoke();
}
@@ -109,10 +108,10 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req
if (requestData.HttpCompression)
{
using var zipStream = new GZipStream(stream, CompressionMode.Compress);
- data.Write(zipStream, requestData.ConnectionSettings);
+ data.Write(zipStream, requestData.ConnectionSettings, requestData.DisableDirectStreaming);
}
else
- data.Write(stream, requestData.ConnectionSettings);
+ data.Write(stream, requestData.ConnectionSettings, requestData.DisableDirectStreaming);
}
}
@@ -121,8 +120,6 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req
if (prepareRequestMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportPrepareRequestMs, prepareRequestMs);
- requestData.MadeItToResponse = true;
-
//http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
//Either the stream or the response object needs to be closed but not both although it won't
//throw any errors if both are closed atleast one of them has to be Closed.
@@ -169,13 +166,13 @@ private async ValueTask RequestCoreAsync(bool isAsync, Req
{
TResponse response;
- if (isAsync)
- response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync
- (requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
- .ConfigureAwait(false);
- else
- response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse
- (requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);
+ if (isAsync)
+ response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync
+ (endpoint, requestData, postData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
+ .ConfigureAwait(false);
+ else
+ response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse
+ (endpoint, requestData, postData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);
// Unless indicated otherwise by the TransportResponse, we've now handled the response stream, so we can dispose of the HttpResponseMessage
// to release the connection. In cases, where the derived response works directly on the stream, it can be left open and additional IDisposable
@@ -232,7 +229,7 @@ private static Dictionary> ParseHeaders(RequestData
responseHeaders.Add(key, responseMessage.Headers.GetValues(key));
}
}
- else if (requestData.ResponseHeadersToParse.Count > 0)
+ else if (requestData.ResponseHeadersToParse is { Count: > 0 })
{
foreach (var headerToParse in requestData.ResponseHeadersToParse)
{
@@ -250,11 +247,14 @@ private static Dictionary> ParseHeaders(RequestData
///
/// Allows subclasses to modify the instance that is going to be used for the API call
///
- /// An instance of describing where and how to call out to
- protected virtual HttpWebRequest CreateHttpWebRequest(RequestData requestData)
+ /// An instance of describing where to call out to
+ /// An instance of describing how to call out to
+ /// Optional data to send over the wire
+ ///
+ protected virtual HttpWebRequest CreateHttpWebRequest(Endpoint endpoint, RequestData requestData, PostData? postData, bool isAsync)
{
- var request = CreateWebRequest(requestData);
- SetAuthenticationIfNeeded(requestData, request);
+ var request = CreateWebRequest(endpoint, requestData, postData, isAsync);
+ SetAuthenticationIfNeeded(endpoint, requestData, request);
SetProxyIfNeeded(request, requestData);
SetServerCertificateValidationCallBackIfNeeded(request, requestData);
SetClientCertificates(request, requestData);
@@ -322,9 +322,9 @@ protected virtual void SetServerCertificateValidationCallBackIfNeeded(HttpWebReq
#endif
}
- private static HttpWebRequest CreateWebRequest(RequestData requestData)
+ private static HttpWebRequest CreateWebRequest(Endpoint endpoint, RequestData requestData, PostData? postData, bool isAsync)
{
- var request = (HttpWebRequest)WebRequest.Create(requestData.Uri);
+ var request = (HttpWebRequest)WebRequest.Create(endpoint.Uri);
request.Accept = requestData.Accept;
request.ContentType = requestData.ContentType;
@@ -358,7 +358,7 @@ private static HttpWebRequest CreateWebRequest(RequestData requestData)
{
foreach (var producer in requestData.MetaHeaderProvider.Producers)
{
- var value = producer.ProduceHeaderValue(requestData);
+ var value = producer.ProduceHeaderValue(requestData, isAsync);
if (!string.IsNullOrEmpty(value))
request.Headers.Add(producer.HeaderName, value);
@@ -372,9 +372,9 @@ private static HttpWebRequest CreateWebRequest(RequestData requestData)
//WebRequest won't send Content-Length: 0 for empty bodies
//which goes against RFC's and might break i.e IIS when used as a proxy.
//see: https://github.com/elastic/elasticsearch-net/issues/562
- var m = requestData.Method.GetStringValue();
+ var m = endpoint.Method.GetStringValue();
request.Method = m;
- if (m != "HEAD" && m != "GET" && requestData.PostData == null)
+ if (m != "HEAD" && m != "GET" && postData == null)
request.ContentLength = 0;
return request;
@@ -411,7 +411,7 @@ protected virtual void SetProxyIfNeeded(HttpWebRequest request, RequestData requ
}
/// Hook for subclasses to set authentication on
- protected virtual void SetAuthenticationIfNeeded(RequestData requestData, HttpWebRequest request)
+ protected virtual void SetAuthenticationIfNeeded(Endpoint endpoint, RequestData requestData, HttpWebRequest request)
{
//If user manually specifies an Authorization Header give it preference
if (requestData.Headers.HasKeys() && requestData.Headers.AllKeys.Contains("Authorization"))
@@ -420,10 +420,10 @@ protected virtual void SetAuthenticationIfNeeded(RequestData requestData, HttpWe
request.Headers["Authorization"] = header;
return;
}
- SetBasicAuthenticationIfNeeded(request, requestData);
+ SetBasicAuthenticationIfNeeded(endpoint, requestData, request);
}
- private static void SetBasicAuthenticationIfNeeded(HttpWebRequest request, RequestData requestData)
+ private static void SetBasicAuthenticationIfNeeded(Endpoint endpoint, RequestData requestData, HttpWebRequest request)
{
// Basic auth credentials take the following precedence (highest -> lowest):
// 1 - Specified on the request (highest precedence)
@@ -438,9 +438,9 @@ private static void SetBasicAuthenticationIfNeeded(HttpWebRequest request, Reque
string parameters = null;
string scheme = null;
- if (!requestData.Uri.UserInfo.IsNullOrEmpty())
+ if (!endpoint.Uri.UserInfo.IsNullOrEmpty())
{
- parameters = BasicAuthentication.GetBase64String(Uri.UnescapeDataString(requestData.Uri.UserInfo));
+ parameters = BasicAuthentication.GetBase64String(Uri.UnescapeDataString(endpoint.Uri.UserInfo));
scheme = BasicAuthentication.BasicAuthenticationScheme;
}
else if (requestData.AuthenticationHeader != null && requestData.AuthenticationHeader.TryGetAuthorizationParameters(out var v))
diff --git a/src/Elastic.Transport/Components/TransportClient/IRequestInvoker.cs b/src/Elastic.Transport/Components/TransportClient/IRequestInvoker.cs
index 2dcf160..aeced28 100644
--- a/src/Elastic.Transport/Components/TransportClient/IRequestInvoker.cs
+++ b/src/Elastic.Transport/Components/TransportClient/IRequestInvoker.cs
@@ -19,7 +19,9 @@ public interface IRequestInvoker : IDisposable
///
/// Perform a request to the endpoint described by using its associated configuration.
///
- /// An object describing where and how to perform the IO call
+ /// An object describing where to perform the IO call
+ /// An object describing how to perform the IO call
+ /// Optional data to post
///
///
/// An implementation of ensuring enough information is available
@@ -31,13 +33,15 @@ public interface IRequestInvoker : IDisposable
/// for and to determine what to
/// do with the response
///
- public Task RequestAsync(RequestData requestData, CancellationToken cancellationToken)
+ public Task RequestAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken)
where TResponse : TransportResponse, new();
///
/// Perform a request to the endpoint described by using its associated configuration.
///
- /// An object describing where and how to perform the IO call
+ /// An object describing where to perform the IO call
+ /// An object describing how to perform the IO call
+ /// Optional data to post
///
/// An implementation of ensuring enough information is available
/// for and to determine what to
@@ -48,7 +52,7 @@ public Task RequestAsync(RequestData requestData, Cancella
/// for and to determine what to
/// do with the response
///
- public TResponse Request(RequestData requestData)
+ public TResponse Request(Endpoint endpoint, RequestData requestData, PostData? postData)
where TResponse : TransportResponse, new();
}
diff --git a/src/Elastic.Transport/Components/TransportClient/InMemoryRequestInvoker.cs b/src/Elastic.Transport/Components/TransportClient/InMemoryRequestInvoker.cs
index aead20a..3154537 100644
--- a/src/Elastic.Transport/Components/TransportClient/InMemoryRequestInvoker.cs
+++ b/src/Elastic.Transport/Components/TransportClient/InMemoryRequestInvoker.cs
@@ -43,29 +43,31 @@ public InMemoryRequestInvoker(byte[] responseBody, int statusCode = 200, Excepti
void IDisposable.Dispose() { }
/// >
- public TResponse Request(RequestData requestData)
+ public TResponse Request(Endpoint endpoint, RequestData requestData, PostData? postData)
where TResponse : TransportResponse, new() =>
- BuildResponse(requestData);
+ BuildResponse(endpoint, requestData, postData);
/// >
- public Task RequestAsync(RequestData requestData, CancellationToken cancellationToken)
+ public Task RequestAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken)
where TResponse : TransportResponse, new() =>
- BuildResponseAsync(requestData, cancellationToken);
+ BuildResponseAsync(endpoint, requestData, postData, cancellationToken);
///
/// Allow subclasses to provide their own implementations for while reusing the more complex logic
/// to create a response
///
- /// An instance of describing where and how to call out to
+ /// An instance of describing where to call out to
+ /// An instance of describing how to call out to
+ /// Optional data to post
/// The bytes intended to be used as return
/// The status code that the responses should return
///
- public TResponse BuildResponse(RequestData requestData, byte[] responseBody = null, int? statusCode = null,
- string contentType = null)
+ public TResponse BuildResponse(Endpoint endpoint, RequestData requestData, PostData? postData, byte[]? responseBody = null, int? statusCode = null,
+ string? contentType = null)
where TResponse : TransportResponse, new()
{
var body = responseBody ?? _responseBody;
- var data = requestData.PostData;
+ var data = postData;
if (data is not null)
{
@@ -73,29 +75,28 @@ public TResponse BuildResponse(RequestData requestData, byte[] respon
if (requestData.HttpCompression)
{
using var zipStream = new GZipStream(stream, CompressionMode.Compress);
- data.Write(zipStream, requestData.ConnectionSettings);
+ data.Write(zipStream, requestData.ConnectionSettings, requestData.DisableDirectStreaming);
}
else
{
- data.Write(stream, requestData.ConnectionSettings);
+ data.Write(stream, requestData.ConnectionSettings, requestData.DisableDirectStreaming);
}
}
- requestData.MadeItToResponse = true;
var sc = statusCode ?? _statusCode;
Stream responseStream = body != null ? requestData.MemoryStreamFactory.Create(body) : requestData.MemoryStreamFactory.Create(EmptyBody);
return requestData.ConnectionSettings.ProductRegistration.ResponseBuilder
- .ToResponse(requestData, _exception, sc, _headers, responseStream, contentType ?? _contentType ?? RequestData.DefaultMimeType, body?.Length ?? 0, null, null);
+ .ToResponse(endpoint, requestData, postData, _exception, sc, _headers, responseStream, contentType ?? _contentType ?? RequestData.DefaultMimeType, body?.Length ?? 0, null, null);
}
/// >
- public async Task BuildResponseAsync(RequestData requestData, CancellationToken cancellationToken,
- byte[] responseBody = null, int? statusCode = null, string contentType = null)
+ public async Task BuildResponseAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken,
+ byte[]? responseBody = null, int? statusCode = null, string? contentType = null)
where TResponse : TransportResponse, new()
{
var body = responseBody ?? _responseBody;
- var data = requestData.PostData;
+ var data = postData;
if (data is not null)
{
@@ -104,21 +105,19 @@ public async Task BuildResponseAsync(RequestData requestDa
if (requestData.HttpCompression)
{
using var zipStream = new GZipStream(stream, CompressionMode.Compress);
- await data.WriteAsync(zipStream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
+ await data.WriteAsync(zipStream, requestData.ConnectionSettings, requestData.DisableDirectStreaming, cancellationToken).ConfigureAwait(false);
}
else
{
- await data.WriteAsync(stream, requestData.ConnectionSettings, cancellationToken).ConfigureAwait(false);
+ await data.WriteAsync(stream, requestData.ConnectionSettings, requestData.DisableDirectStreaming, cancellationToken).ConfigureAwait(false);
}
}
- requestData.MadeItToResponse = true;
-
var sc = statusCode ?? _statusCode;
Stream responseStream = body != null ? requestData.MemoryStreamFactory.Create(body) : requestData.MemoryStreamFactory.Create(EmptyBody);
return await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder
- .ToResponseAsync(requestData, _exception, sc, _headers, responseStream, contentType ?? _contentType, body?.Length ?? 0, null, null, cancellationToken)
+ .ToResponseAsync(endpoint, requestData, postData, _exception, sc, _headers, responseStream, contentType ?? _contentType, body?.Length ?? 0, null, null, cancellationToken)
.ConfigureAwait(false);
}
}
diff --git a/src/Elastic.Transport/Configuration/HeadersList.cs b/src/Elastic.Transport/Configuration/HeadersList.cs
index 3f56bb3..a08a889 100644
--- a/src/Elastic.Transport/Configuration/HeadersList.cs
+++ b/src/Elastic.Transport/Configuration/HeadersList.cs
@@ -14,7 +14,7 @@ namespace Elastic.Transport;
///
public readonly struct HeadersList : IEnumerable
{
- private readonly List _headers;
+ private readonly List _headers = [];
///
/// Create a new from an existing enumerable of header names.
@@ -23,63 +23,51 @@ namespace Elastic.Transport;
/// The header names to initialise the with.
public HeadersList(IEnumerable headers)
{
- _headers = new List();
-
foreach (var header in headers)
{
if (!_headers.Contains(header, StringComparer.OrdinalIgnoreCase))
- {
_headers.Add(header);
- }
}
}
- ///
- ///
- ///
- ///
- ///
+ /// Represents a unique, case-insensitive, immutable collection of header names.
public HeadersList(IEnumerable headers, string additionalHeader)
{
- _headers = new List();
-
foreach (var header in headers)
{
if (!_headers.Contains(header, StringComparer.OrdinalIgnoreCase))
- {
_headers.Add(header);
- }
}
- if (!_headers.Contains(additionalHeader, StringComparer.OrdinalIgnoreCase))
- {
- _headers.Add(additionalHeader);
- }
+ if (!_headers.Contains(additionalHeader, StringComparer.OrdinalIgnoreCase)) _headers.Add(additionalHeader);
+ }
+
+ /// Represents a unique, case-insensitive, immutable collection of header names.
+ public HeadersList(IEnumerable headers, IEnumerable otherHeaders)
+ : this(new HeadersList(headers), new HeadersList(otherHeaders))
+ {
}
///
- ///
+ /// Initializes a new instance of by combining two existing instances.
+ /// Duplicate names, including those which only differ by case, will be ignored.
///
- ///
- ///
- public HeadersList(IEnumerable headers, IEnumerable otherHeaders)
+ /// The first set of header names to initialize the with.
+ /// The second set of header names to initialize the with.
+ public HeadersList(HeadersList? headers, HeadersList? otherHeaders)
{
- _headers = new List();
+ AddToHeaders(headers);
+ AddToHeaders(otherHeaders);
+ }
- foreach (var header in headers)
- {
- if (!_headers.Contains(header, StringComparer.OrdinalIgnoreCase))
- {
- _headers.Add(header);
- }
- }
+ private void AddToHeaders(HeadersList? headers)
+ {
+ if (headers is null) return;
- foreach (var header in otherHeaders)
+ foreach (var header in headers)
{
if (!_headers.Contains(header, StringComparer.OrdinalIgnoreCase))
- {
_headers.Add(header);
- }
}
}
@@ -92,8 +80,10 @@ public HeadersList(IEnumerable headers, IEnumerable otherHeaders
///
/// Gets the number of elements contained in the .
///
- public int Count => _headers is null ? 0 : _headers.Count;
+ public int Count => _headers.Count;
+ // ReSharper disable once ConstantConditionalAccessQualifier
+ // ReSharper disable once ConstantNullCoalescingCondition
///
public IEnumerator GetEnumerator() => _headers?.GetEnumerator() ?? (IEnumerator)new EmptyEnumerator();
diff --git a/src/Elastic.Transport/Configuration/ITransportConfiguration.cs b/src/Elastic.Transport/Configuration/ITransportConfiguration.cs
index d610098..f8abe90 100644
--- a/src/Elastic.Transport/Configuration/ITransportConfiguration.cs
+++ b/src/Elastic.Transport/Configuration/ITransportConfiguration.cs
@@ -17,20 +17,11 @@ namespace Elastic.Transport;
/// All the transport configuration that you as the user can use to steer the behavior of the and all the components such
/// as and .
///
-public interface ITransportConfiguration : IDisposable
+public interface ITransportConfiguration : IRequestConfiguration, IDisposable
{
- ///
- AuthorizationHeader Authentication { get; }
-
- /// Provides a semaphoreslim to transport implementations that need to limit access to a resource
+ /// Provides a to transport implementations that need to limit access to a resource
SemaphoreSlim BootstrapLock { get; }
- ///
- /// Use the following certificates to authenticate all HTTP requests. You can also set them on individual
- /// request using
- ///
- X509CertificateCollection ClientCertificates { get; }
-
/// The connection abstraction behind which all actual IO happens
IRequestInvoker Connection { get; }
@@ -68,38 +59,6 @@ public interface ITransportConfiguration : IDisposable
///
bool DisableAutomaticProxyDetection { get; }
- ///
- /// When set to true will disable (de)serializing directly to the request and response stream and return a byte[]
- /// copy of the raw request and response. Defaults to false
- ///
- bool DisableDirectStreaming { get; }
-
- ///
- /// When set to true will disable capturing an audit trail for requests.
- ///
- bool DisableAuditTrail { get; }
-
- ///
- /// This signals that we do not want to send initial pings to unknown/previously dead nodes
- /// and just send the call straightaway
- ///
- bool DisablePings { get; }
-
- ///
- /// Enable gzip compressed requests and responses
- ///
- bool EnableHttpCompression { get; }
-
- ///
- /// Try to send these headers for every request
- ///
- NameValueCollection? Headers { get; }
-
- ///
- /// Whether HTTP pipelining is enabled. The default is true
- ///
- bool HttpPipeliningEnabled { get; }
-
///
/// KeepAliveInterval - specifies the interval, in milliseconds, between
/// when successive keep-alive packets are sent if no acknowledgement is
@@ -118,20 +77,6 @@ public interface ITransportConfiguration : IDisposable
///
TimeSpan? MaxDeadTimeout { get; }
- ///
- /// When a retryable exception occurs or status code is returned this controls the maximum
- /// amount of times we should retry the call to Elasticsearch
- ///
- int? MaxRetries { get; }
-
- ///
- /// Limits the total runtime including retries separately from
- ///
- /// When not specified defaults to which itself defaults to 60 seconds
- ///
- ///
- TimeSpan? MaxRetryTimeout { get; }
-
/// Provides a memory stream factory
MemoryStreamFactory MemoryStreamFactory { get; }
@@ -156,16 +101,6 @@ public interface ITransportConfiguration : IDisposable
///
Action? OnRequestDataCreated { get; }
- ///
- /// When enabled, all headers from the HTTP response will be included in the .
- ///
- bool? ParseAllHeaders { get; }
-
- ///
- /// The timeout in milliseconds to use for ping requests, which are issued to determine whether a node is alive
- ///
- TimeSpan? PingTimeout { get; }
-
///
/// When set will force all connections through this proxy
///
@@ -189,17 +124,6 @@ public interface ITransportConfiguration : IDisposable
/// The serializer to use to serialize requests and deserialize responses
Serializer RequestResponseSerializer { get; }
- ///
- /// The timeout in milliseconds for each request to Elasticsearch
- ///
- TimeSpan RequestTimeout { get; }
-
- ///
- /// A containing the names of all HTTP response headers to attempt to parse and
- /// included on the .
- ///
- HeadersList ResponseHeadersToParse { get; }
-
///
/// Register a ServerCertificateValidationCallback per request
///
@@ -233,13 +157,6 @@ public interface ITransportConfiguration : IDisposable
///
bool SniffsOnStartup { get; }
- ///
- /// Instead of following a c/go like error checking on response.IsValid do throw an exception (except when is false)
- /// on the client when a call resulted in an exception on either the client or the Elasticsearch server.
- /// Reasons for such exceptions could be search parser errors, index missing exceptions, etc...
- ///
- bool ThrowExceptions { get; }
-
///
/// Access to instance that is aware of this instance
///
@@ -264,26 +181,11 @@ public interface ITransportConfiguration : IDisposable
///
Func StatusCodeToResponseSuccess { get; }
- ///
- /// Whether the request should be sent with chunked Transfer-Encoding.
- ///
- bool TransferEncodingChunked { get; }
-
///
/// DnsRefreshTimeout for the connections. Defaults to 5 minutes.
///
TimeSpan DnsRefreshTimeout { get; }
- ///
- /// Enable statistics about TCP connections to be collected when making a request
- ///
- bool EnableTcpStats { get; }
-
- ///
- /// Enable statistics about thread pools to be collected when making a request
- ///
- bool EnableThreadPoolStats { get; }
-
///
/// Provide hints to serializer and products to produce pretty, non minified json.
/// Note: this is not a guarantee you will always get prettified json
@@ -293,7 +195,7 @@ public interface ITransportConfiguration : IDisposable
///
/// Produces the client meta header for a request.
///
- MetaHeaderProvider MetaHeaderProvider { get; }
+ MetaHeaderProvider? MetaHeaderProvider { get; }
///
/// Disables the meta header which is included on all requests by default. This header contains lightweight information
diff --git a/src/Elastic.Transport/Configuration/RequestConfiguration.cs b/src/Elastic.Transport/Configuration/RequestConfiguration.cs
index 183d950..c0b9ba6 100644
--- a/src/Elastic.Transport/Configuration/RequestConfiguration.cs
+++ b/src/Elastic.Transport/Configuration/RequestConfiguration.cs
@@ -18,27 +18,25 @@ public interface IRequestConfiguration
///
/// Force a different Accept header on the request
///
- string Accept { get; set; }
+ string? Accept { get; set; }
///
/// Treat the following statuses (on top of the 200 range) NOT as error.
///
- IReadOnlyCollection AllowedStatusCodes { get; set; }
+ IReadOnlyCollection? AllowedStatusCodes { get; set; }
- ///
- /// Provide an authentication header override for this request
- ///
- AuthorizationHeader AuthenticationHeader { get; set; }
+ /// Provide an authentication header override for this request
+ AuthorizationHeader? Authentication { get; set; }
///
/// Use the following client certificates to authenticate this single request
///
- X509CertificateCollection ClientCertificates { get; set; }
+ X509CertificateCollection? ClientCertificates { get; set; }
///
/// Force a different Content-Type header on the request
///
- string ContentType { get; set; }
+ string? ContentType { get; set; }
///
/// Whether to buffer the request and response bytes for the call
@@ -54,7 +52,7 @@ public interface IRequestConfiguration
/// Under no circumstance do a ping before the actual call. If a node was previously dead a small ping with
/// low connect timeout will be tried first in normal circumstances
///
- bool? DisablePing { get; set; }
+ bool? DisablePings { get; set; }
///
/// Forces no sniffing to occur on the request no matter what configuration is in place
@@ -65,25 +63,39 @@ public interface IRequestConfiguration
///
/// Whether or not this request should be pipelined. http://en.wikipedia.org/wiki/HTTP_pipelining defaults to true
///
- bool? EnableHttpPipelining { get; set; }
+ bool? HttpPipeliningEnabled { get; set; }
+
+ ///
+ /// Enable gzip compressed requests and responses
+ ///
+ bool? EnableHttpCompression { get; set; }
///
/// This will force the operation on the specified node, this will bypass any configured connection pool and will no retry.
///
- Uri ForceNode { get; set; }
+ Uri? ForceNode { get; set; }
///
- /// This will override whatever is set on the connection configuration or whatever default the connectionpool has.
+ /// When a retryable exception occurs or status code is returned this controls the maximum
+ /// amount of times we should retry the call to Elasticsearch
///
int? MaxRetries { get; set; }
+ ///
+ /// Limits the total runtime including retries separately from
+ ///
+ /// When not specified defaults to which itself defaults to 60 seconds
+ ///
+ ///
+ TimeSpan? MaxRetryTimeout { get; set; }
+
///
/// Associate an Id with this user-initiated task, such that it can be located in the cluster task list.
/// Valid only for Elasticsearch 6.2.0+
///
- string OpaqueId { get; set; }
+ string? OpaqueId { get; set; }
- ///
+ /// Determines whether to parse all HTTP headers in the request.
bool? ParseAllHeaders { get; set; }
///
@@ -96,14 +108,15 @@ public interface IRequestConfiguration
///
TimeSpan? RequestTimeout { get; set; }
- ///
- HeadersList ResponseHeadersToParse { get; set; }
+
+ /// Specifies the headers from the response that should be parsed.
+ HeadersList? ResponseHeadersToParse { get; set; }
///
/// Submit the request on behalf in the context of a different shield user
/// https://www.elastic.co/guide/en/shield/current/submitting-requests-for-other-users.html
///
- string RunAs { get; set; }
+ string? RunAs { get; set; }
///
/// Instead of following a c/go like error checking on response.IsValid do throw an exception (except when is false)
@@ -120,12 +133,16 @@ public interface IRequestConfiguration
///
/// Try to send these headers for this single request
///
- NameValueCollection Headers { get; set; }
+ NameValueCollection? Headers { get; set; }
- ///
+ ///
+ /// Enable statistics about TCP connections to be collected when making a request
+ ///
bool? EnableTcpStats { get; set; }
- ///
+ ///
+ /// Enable statistics about thread pools to be collected when making a request
+ ///
bool? EnableThreadPoolStats { get; set; }
///
@@ -137,102 +154,125 @@ public interface IRequestConfiguration
///
public class RequestConfiguration : IRequestConfiguration
{
+
+ /// The default request timeout. Defaults to 1 minute
+ public static readonly TimeSpan DefaultRequestTimeout = TimeSpan.FromMinutes(10);
+
+ /// The default ping timeout. Defaults to 2 seconds
+ public static readonly TimeSpan DefaultPingTimeout = TimeSpan.FromSeconds(2);
+
+ /// The default ping timeout when the connection is over HTTPS. Defaults to 5 seconds
+ public static readonly TimeSpan DefaultPingTimeoutOnSsl = TimeSpan.FromSeconds(5);
+
+
///
- public string Accept { get; set; }
+ public string? Accept { get; set; }
+
///
- public IReadOnlyCollection AllowedStatusCodes { get; set; }
+ public IReadOnlyCollection? AllowedStatusCodes { get; set; }
+
///
- public AuthorizationHeader AuthenticationHeader { get; set; }
+ public AuthorizationHeader? Authentication { get; set; }
+
///
- public X509CertificateCollection ClientCertificates { get; set; }
+ public X509CertificateCollection? ClientCertificates { get; set; }
+
///
public string ContentType { get; set; }
+
///
public bool? DisableDirectStreaming { get; set; }
+
///
public bool? DisableAuditTrail { get; set; }
+
+ ///
+ public bool? DisablePings { get; set; }
+
///
public bool? DisablePing { get; set; }
+
///
public bool? DisableSniff { get; set; }
+
+ ///
+ public bool? HttpPipeliningEnabled { get; set; }
+
///
public bool? EnableHttpPipelining { get; set; } = true;
+
+ ///
+ public bool? EnableHttpCompression { get; set; }
+
///
public Uri ForceNode { get; set; }
+
///
public int? MaxRetries { get; set; }
+
+ ///
+ public TimeSpan? MaxRetryTimeout { get; set; }
+
///
public string OpaqueId { get; set; }
+
///
public TimeSpan? PingTimeout { get; set; }
+
///
public TimeSpan? RequestTimeout { get; set; }
+
///
public string RunAs { get; set; }
+
///
public bool? ThrowExceptions { get; set; }
+
///
public bool? TransferEncodingChunked { get; set; }
+
///
public NameValueCollection Headers { get; set; }
+
///
public bool? EnableTcpStats { get; set; }
+
///
public bool? EnableThreadPoolStats { get; set; }
+
///
- public HeadersList ResponseHeadersToParse { get; set; }
+ public HeadersList? ResponseHeadersToParse { get; set; }
+
///
public bool? ParseAllHeaders { get; set; }
///
public RequestMetaData RequestMetaData { get; set; }
+
}
///
public class RequestConfigurationDescriptor : IRequestConfiguration
{
///
- public RequestConfigurationDescriptor(IRequestConfiguration? config)
+ public RequestConfigurationDescriptor()
{
- Self.RequestTimeout = config?.RequestTimeout;
- Self.PingTimeout = config?.PingTimeout;
- Self.ContentType = config?.ContentType;
- Self.Accept = config?.Accept;
- Self.MaxRetries = config?.MaxRetries;
- Self.ForceNode = config?.ForceNode;
- Self.DisableSniff = config?.DisableSniff;
- Self.DisablePing = config?.DisablePing;
- Self.DisableDirectStreaming = config?.DisableDirectStreaming;
- Self.DisableAuditTrail = config?.DisableAuditTrail;
- Self.AllowedStatusCodes = config?.AllowedStatusCodes;
- Self.AuthenticationHeader = config?.AuthenticationHeader;
- Self.EnableHttpPipelining = config?.EnableHttpPipelining ?? true;
- Self.RunAs = config?.RunAs;
- Self.ClientCertificates = config?.ClientCertificates;
- Self.ThrowExceptions = config?.ThrowExceptions;
- Self.OpaqueId = config?.OpaqueId;
- Self.TransferEncodingChunked = config?.TransferEncodingChunked;
- Self.Headers = config?.Headers;
- Self.EnableTcpStats = config?.EnableTcpStats;
- Self.EnableThreadPoolStats = config?.EnableThreadPoolStats;
- Self.ParseAllHeaders = config?.ParseAllHeaders;
-
- if (config?.ResponseHeadersToParse is not null)
- Self.ResponseHeadersToParse = config.ResponseHeadersToParse;
}
string IRequestConfiguration.Accept { get; set; }
IReadOnlyCollection IRequestConfiguration.AllowedStatusCodes { get; set; }
- AuthorizationHeader IRequestConfiguration.AuthenticationHeader { get; set; }
+ AuthorizationHeader IRequestConfiguration.Authentication { get; set; }
X509CertificateCollection IRequestConfiguration.ClientCertificates { get; set; }
string IRequestConfiguration.ContentType { get; set; }
bool? IRequestConfiguration.DisableDirectStreaming { get; set; }
bool? IRequestConfiguration.DisableAuditTrail { get; set; }
- bool? IRequestConfiguration.DisablePing { get; set; }
+ bool? IRequestConfiguration.DisablePings { get; set; }
bool? IRequestConfiguration.DisableSniff { get; set; }
- bool? IRequestConfiguration.EnableHttpPipelining { get; set; } = true;
+ bool? IRequestConfiguration.HttpPipeliningEnabled { get; set; }
+ bool? IRequestConfiguration.EnableHttpCompression { get; set; }
Uri IRequestConfiguration.ForceNode { get; set; }
int? IRequestConfiguration.MaxRetries { get; set; }
+ TimeSpan? IRequestConfiguration.MaxRetryTimeout { get; set; }
string IRequestConfiguration.OpaqueId { get; set; }
TimeSpan? IRequestConfiguration.PingTimeout { get; set; }
TimeSpan? IRequestConfiguration.RequestTimeout { get; set; }
@@ -243,7 +283,7 @@ public RequestConfigurationDescriptor(IRequestConfiguration? config)
NameValueCollection IRequestConfiguration.Headers { get; set; }
bool? IRequestConfiguration.EnableTcpStats { get; set; }
bool? IRequestConfiguration.EnableThreadPoolStats { get; set; }
- HeadersList IRequestConfiguration.ResponseHeadersToParse { get; set; }
+ HeadersList? IRequestConfiguration.ResponseHeadersToParse { get; set; }
bool? IRequestConfiguration.ParseAllHeaders { get; set; }
RequestMetaData IRequestConfiguration.RequestMetaData { get; set; }
@@ -305,16 +345,16 @@ public RequestConfigurationDescriptor AllowedStatusCodes(params int[] codes)
}
///
- public RequestConfigurationDescriptor DisableSniffing(bool? disable = true)
+ public RequestConfigurationDescriptor DisableSniffing(bool disable = true)
{
Self.DisableSniff = disable;
return this;
}
- ///
- public RequestConfigurationDescriptor DisablePing(bool? disable = true)
+ ///
+ public RequestConfigurationDescriptor DisablePing(bool disable = true)
{
- Self.DisablePing = disable;
+ Self.DisablePings = disable;
return this;
}
@@ -326,14 +366,14 @@ public RequestConfigurationDescriptor ThrowExceptions(bool throwExceptions = tru
}
///
- public RequestConfigurationDescriptor DisableDirectStreaming(bool? disable = true)
+ public RequestConfigurationDescriptor DisableDirectStreaming(bool disable = true)
{
Self.DisableDirectStreaming = disable;
return this;
}
///
- public RequestConfigurationDescriptor DisableAuditTrail(bool? disable = true)
+ public RequestConfigurationDescriptor DisableAuditTrail(bool disable = true)
{
Self.DisableAuditTrail = disable;
return this;
@@ -356,14 +396,14 @@ public RequestConfigurationDescriptor MaxRetries(int retry)
///
public RequestConfigurationDescriptor Authentication(AuthorizationHeader authentication)
{
- Self.AuthenticationHeader = authentication;
+ Self.Authentication = authentication;
return this;
}
- ///
+ ///
public RequestConfigurationDescriptor EnableHttpPipelining(bool enable = true)
{
- Self.EnableHttpPipelining = enable;
+ Self.HttpPipeliningEnabled = enable;
return this;
}
@@ -383,7 +423,7 @@ public RequestConfigurationDescriptor ClientCertificate(string certificatePath)
ClientCertificates(new X509Certificate2Collection { new X509Certificate(certificatePath) });
///
- public RequestConfigurationDescriptor TransferEncodingChunked(bool? transferEncodingChunked = true)
+ public RequestConfigurationDescriptor TransferEncodingChunked(bool transferEncodingChunked = true)
{
Self.TransferEncodingChunked = transferEncodingChunked;
return this;
@@ -397,21 +437,21 @@ public RequestConfigurationDescriptor GlobalHeaders(NameValueCollection headers)
}
///
- public RequestConfigurationDescriptor EnableTcpStats(bool? enableTcpStats = true)
+ public RequestConfigurationDescriptor EnableTcpStats(bool enableTcpStats = true)
{
Self.EnableTcpStats = enableTcpStats;
return this;
}
///
- public RequestConfigurationDescriptor EnableThreadPoolStats(bool? enableThreadPoolStats = true)
+ public RequestConfigurationDescriptor EnableThreadPoolStats(bool enableThreadPoolStats = true)
{
Self.EnableThreadPoolStats = enableThreadPoolStats;
return this;
}
///
- public RequestConfigurationDescriptor ParseAllHeaders(bool? enable = true)
+ public RequestConfigurationDescriptor ParseAllHeaders(bool enable = true)
{
Self.ParseAllHeaders = enable;
return this;
diff --git a/src/Elastic.Transport/Configuration/TransportConfiguration.cs b/src/Elastic.Transport/Configuration/TransportConfiguration.cs
index a0fb696..72abbe2 100644
--- a/src/Elastic.Transport/Configuration/TransportConfiguration.cs
+++ b/src/Elastic.Transport/Configuration/TransportConfiguration.cs
@@ -41,23 +41,6 @@ public class TransportConfiguration : TransportConfigurationBase
public static MemoryStreamFactory DefaultMemoryStreamFactory { get; } = Transport.DefaultMemoryStreamFactory.Default;
- ///
- /// The default ping timeout. Defaults to 2 seconds
- ///
- public static readonly TimeSpan DefaultPingTimeout = TimeSpan.FromSeconds(2);
-
- ///
- /// The default ping timeout when the connection is over HTTPS. Defaults to
- /// 5 seconds
- ///
- public static readonly TimeSpan DefaultPingTimeoutOnSsl = TimeSpan.FromSeconds(5);
-
- ///
- /// The default timeout before the client aborts a request to Elasticsearch.
- /// Defaults to 1 minute
- ///
- public static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(1);
-
///
/// The default timeout before a TCP connection is forcefully recycled so that DNS updates come through
/// Defaults to 5 minutes.
@@ -125,40 +108,25 @@ public abstract class TransportConfigurationBase : ITransportConfiguration
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly UrlFormatter _urlFormatter;
- private AuthorizationHeader _authenticationHeader;
- private X509CertificateCollection _clientCertificates;
private Action _completedRequestHandler = DefaultCompletedRequestHandler;
private int _transportClientLimit;
private TimeSpan? _deadTimeout;
private bool _disableAutomaticProxyDetection;
- private bool _disableDirectStreaming;
- private bool _disableAuditTrail;
- private bool _disablePings;
- private bool _enableHttpCompression;
- private bool _enableHttpPipelining = true;
private TimeSpan? _keepAliveInterval;
private TimeSpan? _keepAliveTime;
private TimeSpan? _maxDeadTimeout;
- private int? _maxRetries;
- private TimeSpan? _maxRetryTimeout;
private Func _nodePredicate;
private Action _onRequestDataCreated = DefaultRequestDataCreated;
- private TimeSpan? _pingTimeout;
private string _proxyAddress;
private string _proxyPassword;
private string _proxyUsername;
- private TimeSpan _requestTimeout;
private TimeSpan _dnsRefreshTimeout;
private Func