16
16
namespace Elastic . Transport ;
17
17
18
18
/// <inheritdoc cref="RequestPipeline" />
19
- public class DefaultRequestPipeline < TConfiguration > : RequestPipeline
20
- where TConfiguration : class , ITransportConfiguration
19
+ public class DefaultRequestPipeline : RequestPipeline
21
20
{
22
21
private readonly IRequestInvoker _requestInvoker ;
23
22
private readonly NodePool _nodePool ;
23
+ private readonly RequestData _requestData ;
24
24
private readonly DateTimeProvider _dateTimeProvider ;
25
25
private readonly MemoryStreamFactory _memoryStreamFactory ;
26
26
private readonly Func < Node , bool > _nodePredicate ;
27
27
private readonly ProductRegistration _productRegistration ;
28
- private readonly TConfiguration _settings ;
29
28
private readonly ResponseBuilder _responseBuilder ;
30
29
31
30
private RequestConfiguration ? _pingAndSniffRequestConfiguration ;
32
- private List < Audit > _auditTrail = null ;
31
+ private List < Audit > ? _auditTrail ;
32
+ private readonly ITransportConfiguration _settings ;
33
33
34
34
/// <inheritdoc cref="RequestPipeline" />
35
- internal DefaultRequestPipeline (
36
- TConfiguration configurationValues ,
37
- DateTimeProvider dateTimeProvider ,
38
- MemoryStreamFactory memoryStreamFactory ,
39
- IRequestConfiguration ? requestConfiguration
40
- )
35
+ internal DefaultRequestPipeline ( RequestData requestData , DateTimeProvider dateTimeProvider )
41
36
{
42
- _settings = configurationValues ;
43
- _nodePool = _settings . NodePool ;
44
- _requestInvoker = _settings . Connection ;
37
+ _requestData = requestData ;
38
+ _settings = requestData . ConnectionSettings ;
39
+
40
+ _nodePool = requestData . ConnectionSettings . NodePool ;
41
+ _requestInvoker = requestData . ConnectionSettings . Connection ;
45
42
_dateTimeProvider = dateTimeProvider ;
46
- _memoryStreamFactory = memoryStreamFactory ;
47
- _productRegistration = configurationValues . ProductRegistration ;
43
+ _memoryStreamFactory = requestData . MemoryStreamFactory ;
44
+ _productRegistration = requestData . ConnectionSettings . ProductRegistration ;
48
45
_responseBuilder = _productRegistration . ResponseBuilder ;
49
- _nodePredicate = _settings . NodePredicate ?? _productRegistration . NodePredicate ;
50
- RequestConfig = requestConfiguration ;
46
+ _nodePredicate = requestData . ConnectionSettings . NodePredicate ?? _productRegistration . NodePredicate ;
47
+
51
48
StartedOn = dateTimeProvider . Now ( ) ;
52
49
}
53
50
54
51
/// <inheritdoc cref="RequestPipeline.AuditTrail" />
55
- public override IEnumerable < Audit > AuditTrail => _auditTrail ?? ( IEnumerable < Audit > ) Array . Empty < Audit > ( ) ;
52
+ public override IEnumerable < Audit > AuditTrail => _auditTrail ;
56
53
57
54
private RequestConfiguration PingAndSniffRequestConfiguration
58
55
{
@@ -66,9 +63,9 @@ private RequestConfiguration PingAndSniffRequestConfiguration
66
63
{
67
64
PingTimeout = PingTimeout ,
68
65
RequestTimeout = PingTimeout ,
69
- Authentication = RequestConfig ? . Authentication ?? _settings . Authentication ,
70
- EnableHttpPipelining = RequestConfig ? . HttpPipeliningEnabled ?? _settings . HttpPipeliningEnabled ,
71
- ForceNode = RequestConfig ? . ForceNode
66
+ Authentication = _requestData . AuthenticationHeader ,
67
+ EnableHttpPipelining = _requestData . HttpPipeliningEnabled ,
68
+ ForceNode = _requestData . ForceNode
72
69
} ;
73
70
74
71
return _pingAndSniffRequestConfiguration ;
@@ -99,10 +96,7 @@ public override bool IsTakingTooLong
99
96
}
100
97
}
101
98
102
- public override int MaxRetries =>
103
- RequestConfig ? . ForceNode != null
104
- ? 0
105
- : Math . Min ( RequestConfig ? . MaxRetries ?? _settings . MaxRetries . GetValueOrDefault ( int . MaxValue ) , _nodePool . MaxRetries ) ;
99
+ public override int MaxRetries => _requestData . MaxRetries ;
106
100
107
101
public bool Refresh { get ; private set ; }
108
102
@@ -140,18 +134,13 @@ public override bool StaleClusterState
140
134
141
135
public override DateTimeOffset StartedOn { get ; }
142
136
143
- private TimeSpan PingTimeout =>
144
- RequestConfig ? . PingTimeout
145
- ?? _settings . PingTimeout
146
- ?? ( _nodePool . UsingSsl ? RequestConfiguration . DefaultPingTimeoutOnSsl : RequestConfiguration . DefaultPingTimeout ) ;
137
+ private TimeSpan PingTimeout => _requestData . PingTimeout ;
147
138
148
- private IRequestConfiguration RequestConfig { get ; }
139
+ private bool RequestDisabledSniff => _requestData . DisableSniff ;
149
140
150
- private bool RequestDisabledSniff => RequestConfig != null && ( RequestConfig . DisableSniff ?? false ) ;
141
+ private TimeSpan RequestTimeout => _requestData . RequestTimeout ;
151
142
152
- private TimeSpan RequestTimeout => RequestConfig ? . RequestTimeout ?? _settings . RequestTimeout ?? RequestConfiguration . DefaultRequestTimeout ;
153
-
154
- public override void AuditCancellationRequested ( ) => Audit ( CancellationRequested ) . Dispose ( ) ;
143
+ public override void AuditCancellationRequested ( ) => Audit ( CancellationRequested ) ? . Dispose ( ) ;
155
144
156
145
public override void BadResponse < TResponse > ( ref TResponse response , ApiCallDetails callDetails , Endpoint endpoint , RequestData data , PostData ? postData , TransportException exception )
157
146
{
@@ -362,9 +351,9 @@ public override bool TryGetSingleNode(out Node node)
362
351
363
352
public override IEnumerable < Node > NextNode ( )
364
353
{
365
- if ( RequestConfig ? . ForceNode != null )
354
+ if ( _requestData . ForceNode != null )
366
355
{
367
- yield return new Node ( RequestConfig . ForceNode ) ;
356
+ yield return new Node ( _requestData . ForceNode ) ;
368
357
369
358
yield break ;
370
359
}
@@ -416,15 +405,12 @@ public async ValueTask PingCoreAsync(bool isAsync, Node node, CancellationToken
416
405
417
406
TransportResponse response ;
418
407
419
- //TODO remove
420
- var requestData = new RequestData ( _settings , null , null , _memoryStreamFactory ) ;
421
-
422
408
try
423
409
{
424
410
if ( isAsync )
425
- response = await _productRegistration . PingAsync ( _requestInvoker , pingEndpoint , requestData , cancellationToken ) . ConfigureAwait ( false ) ;
411
+ response = await _productRegistration . PingAsync ( _requestInvoker , pingEndpoint , _requestData , cancellationToken ) . ConfigureAwait ( false ) ;
426
412
else
427
- response = _productRegistration . Ping ( _requestInvoker , pingEndpoint , requestData ) ;
413
+ response = _productRegistration . Ping ( _requestInvoker , pingEndpoint , _requestData ) ;
428
414
429
415
ThrowBadAuthPipelineExceptionWhenNeeded ( response . ApiCallDetails ) ;
430
416
@@ -462,7 +448,7 @@ public async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellati
462
448
{
463
449
var sniffEndpoint = _productRegistration . CreateSniffEndpoint ( node , PingAndSniffRequestConfiguration , _settings ) ;
464
450
//TODO remove
465
- var requestData = new RequestData ( _settings , null , null , _memoryStreamFactory ) ;
451
+ var requestData = new RequestData ( _settings , null , null ) ;
466
452
467
453
using var audit = Audit ( SniffSuccess , node ) ;
468
454
@@ -554,9 +540,7 @@ public override void ThrowNoNodesAttempted(Endpoint endpoint, List<PipelineExcep
554
540
throw new UnexpectedTransportException ( clientException , seenExceptions ) { Endpoint = endpoint , AuditTrail = AuditTrail } ;
555
541
}
556
542
557
- private bool PingDisabled ( Node node ) =>
558
- ( RequestConfig ? . DisablePings ) . GetValueOrDefault ( false )
559
- || ( _settings . DisablePings ?? false ) || ! _nodePool . SupportsPinging || ! node . IsResurrected ;
543
+ private bool PingDisabled ( Node node ) => _requestData . DisablePings || ! node . IsResurrected ;
560
544
561
545
private Auditable ? Audit ( AuditEvent type , Node node = null ) =>
562
546
! _settings . DisableAuditTrail ?? true ? new ( type , ref _auditTrail , _dateTimeProvider , node ) : null ;
0 commit comments