2
2
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3
3
// See the LICENSE file in the project root for more information
4
4
5
+ #nullable enable
5
6
using System ;
6
7
using System . Collections . Generic ;
7
8
using System . IO ;
@@ -34,7 +35,7 @@ public class VirtualClusterRequestInvoker : IRequestInvoker
34
35
{
35
36
private static readonly object Lock = new ( ) ;
36
37
37
- private static byte [ ] _defaultResponseBytes ;
38
+ private static byte [ ] ? _defaultResponseBytes ;
38
39
39
40
private VirtualCluster _cluster ;
40
41
private readonly TestableDateTimeProvider _dateTimeProvider ;
@@ -45,7 +46,9 @@ public class VirtualClusterRequestInvoker : IRequestInvoker
45
46
46
47
internal VirtualClusterRequestInvoker ( VirtualCluster cluster , TestableDateTimeProvider dateTimeProvider )
47
48
{
48
- UpdateCluster ( cluster ) ;
49
+ _cluster = cluster ;
50
+ _calls = cluster . Nodes . ToDictionary ( n => n . Uri . Port , v => new State ( ) ) ;
51
+ _productRegistration = cluster . ProductRegistration ;
49
52
_dateTimeProvider = dateTimeProvider ;
50
53
_productRegistration = cluster . ProductRegistration ;
51
54
_inMemoryRequestInvoker = new InMemoryRequestInvoker ( ) ;
@@ -100,52 +103,55 @@ private static object DefaultResponse
100
103
101
104
private void UpdateCluster ( VirtualCluster cluster )
102
105
{
103
- if ( cluster == null ) return ;
104
-
105
106
lock ( Lock )
106
107
{
107
108
_cluster = cluster ;
108
109
_calls = cluster . Nodes . ToDictionary ( n => n . Uri . Port , v => new State ( ) ) ;
109
110
_productRegistration = cluster . ProductRegistration ;
110
111
}
112
+
111
113
}
112
114
113
- private bool IsSniffRequest ( RequestData requestData ) => _productRegistration . IsSniffRequest ( requestData ) ;
115
+ private bool IsSniffRequest ( Endpoint endpoint ) => _productRegistration . IsSniffRequest ( endpoint ) ;
114
116
115
- private bool IsPingRequest ( RequestData requestData ) => _productRegistration . IsPingRequest ( requestData ) ;
117
+ private bool IsPingRequest ( Endpoint endpoint ) => _productRegistration . IsPingRequest ( endpoint ) ;
116
118
117
119
/// <inheritdoc cref="IRequestInvoker.RequestAsync{TResponse}"/>>
118
- public Task < TResponse > RequestAsync < TResponse > ( RequestData requestData , CancellationToken cancellationToken )
120
+ public Task < TResponse > RequestAsync < TResponse > ( Endpoint endpoint , RequestData requestData , PostData ? postData , CancellationToken cancellationToken )
119
121
where TResponse : TransportResponse , new ( ) =>
120
- Task . FromResult ( Request < TResponse > ( requestData ) ) ;
122
+ Task . FromResult ( Request < TResponse > ( endpoint , requestData , postData ) ) ;
121
123
122
124
/// <inheritdoc cref="IRequestInvoker.Request{TResponse}"/>>
123
- public TResponse Request < TResponse > ( RequestData requestData )
125
+ public TResponse Request < TResponse > ( Endpoint endpoint , RequestData requestData , PostData ? postData )
124
126
where TResponse : TransportResponse , new ( )
125
127
{
126
- if ( ! _calls . ContainsKey ( requestData . Uri . Port ) )
127
- throw new Exception ( $ "Expected a call to happen on port { requestData . Uri . Port } but received none") ;
128
+ if ( ! _calls . ContainsKey ( endpoint . Uri . Port ) )
129
+ throw new Exception ( $ "Expected a call to happen on port { endpoint . Uri . Port } but received none") ;
128
130
129
131
try
130
132
{
131
- var state = _calls [ requestData . Uri . Port ] ;
132
- if ( IsSniffRequest ( requestData ) )
133
+ var state = _calls [ endpoint . Uri . Port ] ;
134
+ if ( IsSniffRequest ( endpoint ) )
133
135
{
134
136
_ = Interlocked . Increment ( ref state . Sniffed ) ;
135
137
return HandleRules < TResponse , ISniffRule > (
138
+ endpoint ,
136
139
requestData ,
140
+ postData ,
137
141
nameof ( VirtualCluster . Sniff ) ,
138
142
_cluster . SniffingRules ,
139
143
requestData . RequestTimeout ,
140
144
( r ) => UpdateCluster ( r . NewClusterState ) ,
141
145
( r ) => _productRegistration . CreateSniffResponseBytes ( _cluster . Nodes , _cluster . ElasticsearchVersion , _cluster . PublishAddressOverride , _cluster . SniffShouldReturnFqnd )
142
146
) ;
143
147
}
144
- if ( IsPingRequest ( requestData ) )
148
+ if ( IsPingRequest ( endpoint ) )
145
149
{
146
150
_ = Interlocked . Increment ( ref state . Pinged ) ;
147
151
return HandleRules < TResponse , IRule > (
152
+ endpoint ,
148
153
requestData ,
154
+ postData ,
149
155
nameof ( VirtualCluster . Ping ) ,
150
156
_cluster . PingingRules ,
151
157
requestData . PingTimeout ,
@@ -155,7 +161,9 @@ public TResponse Request<TResponse>(RequestData requestData)
155
161
}
156
162
_ = Interlocked . Increment ( ref state . Called ) ;
157
163
return HandleRules < TResponse , IClientCallRule > (
164
+ endpoint ,
158
165
requestData ,
166
+ postData ,
159
167
nameof ( VirtualCluster . ClientCalls ) ,
160
168
_cluster . ClientCallRules ,
161
169
requestData . RequestTimeout ,
@@ -165,22 +173,23 @@ public TResponse Request<TResponse>(RequestData requestData)
165
173
}
166
174
catch ( TheException e )
167
175
{
168
- return requestData . ConnectionSettings . ProductRegistration . ResponseBuilder . ToResponse < TResponse > ( requestData , e , null , null , Stream . Null , null , - 1 , null , null ) ;
176
+ return requestData . ConnectionSettings . ProductRegistration . ResponseBuilder . ToResponse < TResponse > ( endpoint , requestData , postData , e , null , null , Stream . Null , null , - 1 , null , null ) ;
169
177
}
170
178
}
171
179
172
180
private TResponse HandleRules < TResponse , TRule > (
181
+ Endpoint endpoint ,
173
182
RequestData requestData ,
183
+ PostData ? postData ,
174
184
string origin ,
175
185
IList < TRule > rules ,
176
186
TimeSpan timeout ,
177
187
Action < TRule > beforeReturn ,
178
- Func < TRule , byte [ ] > successResponse
188
+ Func < TRule , byte [ ] ? > successResponse
179
189
)
180
190
where TResponse : TransportResponse , new ( )
181
191
where TRule : IRule
182
192
{
183
- requestData . MadeItToResponse = true ;
184
193
if ( rules . Count == 0 )
185
194
throw new Exception ( $ "No { origin } defined for the current VirtualCluster, so we do not know how to respond") ;
186
195
@@ -189,32 +198,31 @@ Func<TRule, byte[]> successResponse
189
198
var always = rule . Times . Match ( t => true , t => false ) ;
190
199
var times = rule . Times . Match ( t => - 1 , t => t ) ;
191
200
192
- if ( rule . OnPort == null || rule . OnPort . Value != requestData . Uri . Port ) continue ;
201
+ if ( rule . OnPort == null || rule . OnPort . Value != endpoint . Uri . Port ) continue ;
193
202
194
203
if ( always )
195
- return Always < TResponse , TRule > ( requestData , timeout , beforeReturn , successResponse , rule ) ;
204
+ return Always < TResponse , TRule > ( endpoint , requestData , postData , timeout , beforeReturn , successResponse , rule ) ;
196
205
197
206
if ( rule . ExecuteCount > times ) continue ;
198
207
199
- return Sometimes < TResponse , TRule > ( requestData , timeout , beforeReturn , successResponse , rule ) ;
208
+ return Sometimes < TResponse , TRule > ( endpoint , requestData , postData , timeout , beforeReturn , successResponse , rule ) ;
200
209
}
201
210
foreach ( var rule in rules . Where ( s => ! s . OnPort . HasValue ) )
202
211
{
203
212
var always = rule . Times . Match ( t => true , t => false ) ;
204
213
var times = rule . Times . Match ( t => - 1 , t => t ) ;
205
214
if ( always )
206
- return Always < TResponse , TRule > ( requestData , timeout , beforeReturn , successResponse , rule ) ;
215
+ return Always < TResponse , TRule > ( endpoint , requestData , postData , timeout , beforeReturn , successResponse , rule ) ;
207
216
208
217
if ( rule . ExecuteCount > times ) continue ;
209
218
210
- return Sometimes < TResponse , TRule > ( requestData , timeout , beforeReturn , successResponse , rule ) ;
219
+ return Sometimes < TResponse , TRule > ( endpoint , requestData , postData , timeout , beforeReturn , successResponse , rule ) ;
211
220
}
212
221
var count = _calls . Select ( kv => kv . Value . Called ) . Sum ( ) ;
213
- throw new Exception ( $@ "No global or port specific { origin } rule ({ requestData . Uri . Port } ) matches any longer after { count } calls in to the cluster") ;
222
+ throw new Exception ( $@ "No global or port specific { origin } rule ({ endpoint . Uri . Port } ) matches any longer after { count } calls in to the cluster") ;
214
223
}
215
224
216
- private TResponse Always < TResponse , TRule > ( RequestData requestData , TimeSpan timeout , Action < TRule > beforeReturn ,
217
- Func < TRule , byte [ ] > successResponse , TRule rule
225
+ private TResponse Always < TResponse , TRule > ( Endpoint endpoint , RequestData requestData , PostData ? postData , TimeSpan timeout , Action < TRule > beforeReturn , Func < TRule , byte [ ] ? > successResponse , TRule rule
218
226
)
219
227
where TResponse : TransportResponse , new ( )
220
228
where TRule : IRule
@@ -231,12 +239,12 @@ private TResponse Always<TResponse, TRule>(RequestData requestData, TimeSpan tim
231
239
}
232
240
233
241
return rule . Succeeds
234
- ? Success < TResponse , TRule > ( requestData , beforeReturn , successResponse , rule )
235
- : Fail < TResponse , TRule > ( requestData , rule ) ;
242
+ ? Success < TResponse , TRule > ( endpoint , requestData , postData , beforeReturn , successResponse , rule )
243
+ : Fail < TResponse , TRule > ( endpoint , requestData , postData , rule ) ;
236
244
}
237
245
238
246
private TResponse Sometimes < TResponse , TRule > (
239
- RequestData requestData , TimeSpan timeout , Action < TRule > beforeReturn , Func < TRule , byte [ ] > successResponse , TRule rule
247
+ Endpoint endpoint , RequestData requestData , PostData ? postData , TimeSpan timeout , Action < TRule > beforeReturn , Func < TRule , byte [ ] ? > successResponse , TRule rule
240
248
)
241
249
where TResponse : TransportResponse , new ( )
242
250
where TRule : IRule
@@ -253,16 +261,16 @@ private TResponse Sometimes<TResponse, TRule>(
253
261
}
254
262
255
263
if ( rule . Succeeds )
256
- return Success < TResponse , TRule > ( requestData , beforeReturn , successResponse , rule ) ;
264
+ return Success < TResponse , TRule > ( endpoint , requestData , postData , beforeReturn , successResponse , rule ) ;
257
265
258
- return Fail < TResponse , TRule > ( requestData , rule ) ;
266
+ return Fail < TResponse , TRule > ( endpoint , requestData , postData , rule ) ;
259
267
}
260
268
261
- private TResponse Fail < TResponse , TRule > ( RequestData requestData , TRule rule , RuleOption < Exception , int > returnOverride = null )
269
+ private TResponse Fail < TResponse , TRule > ( Endpoint endpoint , RequestData requestData , PostData ? postData , TRule rule , RuleOption < Exception , int > ? returnOverride = null )
262
270
where TResponse : TransportResponse , new ( )
263
271
where TRule : IRule
264
272
{
265
- var state = _calls [ requestData . Uri . Port ] ;
273
+ var state = _calls [ endpoint . Uri . Port ] ;
266
274
_ = Interlocked . Increment ( ref state . Failures ) ;
267
275
var ret = returnOverride ?? rule . Return ;
268
276
rule . RecordExecuted ( ) ;
@@ -271,25 +279,25 @@ private TResponse Fail<TResponse, TRule>(RequestData requestData, TRule rule, Ru
271
279
throw new TheException ( ) ;
272
280
273
281
return ret . Match (
274
- ( e ) => throw e ,
275
- ( statusCode ) => _inMemoryRequestInvoker . BuildResponse < TResponse > ( requestData , CallResponse ( rule ) ,
282
+ e => throw e ,
283
+ statusCode => _inMemoryRequestInvoker . BuildResponse < TResponse > ( endpoint , requestData , postData , CallResponse ( rule ) ,
276
284
//make sure we never return a valid status code in Fail responses because of a bad rule.
277
285
statusCode >= 200 && statusCode < 300 ? 502 : statusCode , rule . ReturnContentType )
278
286
) ;
279
287
}
280
288
281
- private TResponse Success < TResponse , TRule > ( RequestData requestData , Action < TRule > beforeReturn , Func < TRule , byte [ ] > successResponse ,
289
+ private TResponse Success < TResponse , TRule > ( Endpoint endpoint , RequestData requestData , PostData ? postData , Action < TRule > beforeReturn , Func < TRule , byte [ ] ? > successResponse ,
282
290
TRule rule
283
291
)
284
292
where TResponse : TransportResponse , new ( )
285
293
where TRule : IRule
286
294
{
287
- var state = _calls [ requestData . Uri . Port ] ;
295
+ var state = _calls [ endpoint . Uri . Port ] ;
288
296
_ = Interlocked . Increment ( ref state . Successes ) ;
289
297
rule . RecordExecuted ( ) ;
290
298
291
299
beforeReturn ? . Invoke ( rule ) ;
292
- return _inMemoryRequestInvoker . BuildResponse < TResponse > ( requestData , successResponse ( rule ) , contentType : rule . ReturnContentType ) ;
300
+ return _inMemoryRequestInvoker . BuildResponse < TResponse > ( endpoint , requestData , postData , successResponse ( rule ) , contentType : rule . ReturnContentType ) ;
293
301
}
294
302
295
303
private static byte [ ] CallResponse < TRule > ( TRule rule )
0 commit comments