22// Licensed under the MIT License.
33
44using System ;
5+ using System . Buffers ;
56using System . Collections . Generic ;
67using System . Threading ;
78using System . Threading . Tasks ;
@@ -19,6 +20,25 @@ public class HttpPipeline
1920
2021 private readonly ReadOnlyMemory < HttpPipelinePolicy > _pipeline ;
2122
23+ /// <summary>
24+ /// Indicates whether or not the pipeline was created using its internal constructor.
25+ /// If it was, we know the indices where we can add per-request policies at positions
26+ /// <see cref="HttpPipelinePosition.PerCall"/> and <see cref="HttpPipelinePosition.PerRetry"/>.
27+ /// </summary>
28+ private readonly bool _internallyConstructed ;
29+
30+ /// <summary>
31+ /// The pipeline index where <see cref="HttpPipelinePosition.PerCall"/> policies will be added,
32+ /// if any are specified using <see cref="RequestContext.AddPolicy(HttpPipelinePolicy, HttpPipelinePosition)"/>.
33+ /// </summary>
34+ private readonly int _perCallIndex ;
35+
36+ /// <summary>
37+ /// The pipeline index where <see cref="HttpPipelinePosition.PerRetry"/> policies will be added,
38+ /// if any are specified using <see cref="RequestContext.AddPolicy(HttpPipelinePolicy, HttpPipelinePosition)"/>.
39+ /// </summary>
40+ private readonly int _perRetryIndex ;
41+
2242 /// <summary>
2343 /// Creates a new instance of <see cref="HttpPipeline"/> with the provided transport, policies and response classifier.
2444 /// </summary>
@@ -39,6 +59,13 @@ public HttpPipeline(HttpPipelineTransport transport, HttpPipelinePolicy[]? polic
3959 _pipeline = all ;
4060 }
4161
62+ internal HttpPipeline ( HttpPipelineTransport transport , int perCallIndex , int perRetryIndex , HttpPipelinePolicy [ ] ? policies = null , ResponseClassifier ? responseClassifier = null ) : this ( transport , policies , responseClassifier )
63+ {
64+ _perCallIndex = perCallIndex ;
65+ _perRetryIndex = perRetryIndex ;
66+ _internallyConstructed = true ;
67+ }
68+
4269 /// <summary>
4370 /// Creates a new <see cref="Request"/> instance.
4471 /// </summary>
@@ -55,6 +82,18 @@ public HttpMessage CreateMessage()
5582 return new HttpMessage ( CreateRequest ( ) , ResponseClassifier ) ;
5683 }
5784
85+ /// <summary>
86+ /// Creates a new <see cref="HttpMessage"/> instance.
87+ /// </summary>
88+ /// <param name="context">Context specifying the message options.</param>
89+ /// <returns>The message.</returns>
90+ public HttpMessage CreateMessage ( RequestContext context )
91+ {
92+ var message = CreateMessage ( ) ;
93+ message . AddPolicies ( context ) ;
94+ return message ;
95+ }
96+
5897 /// <summary>
5998 /// The <see cref="ResponseClassifier"/> instance used in this pipeline invocations.
6099 /// </summary>
@@ -70,7 +109,28 @@ public ValueTask SendAsync(HttpMessage message, CancellationToken cancellationTo
70109 {
71110 message . CancellationToken = cancellationToken ;
72111 AddHttpMessageProperties ( message ) ;
73- return _pipeline . Span [ 0 ] . ProcessAsync ( message , _pipeline . Slice ( 1 ) ) ;
112+
113+ if ( message . Policies == null || message . Policies . Count == 0 )
114+ {
115+ return _pipeline . Span [ 0 ] . ProcessAsync ( message , _pipeline . Slice ( 1 ) ) ;
116+ }
117+
118+ return SendAsync ( message ) ;
119+ }
120+
121+ private async ValueTask SendAsync ( HttpMessage message )
122+ {
123+ var length = _pipeline . Length + message . Policies ! . Count ;
124+ var policies = ArrayPool < HttpPipelinePolicy > . Shared . Rent ( length ) ;
125+ try
126+ {
127+ var pipeline = CreateRequestPipeline ( policies , message . Policies ) ;
128+ await pipeline . Span [ 0 ] . ProcessAsync ( message , pipeline . Slice ( 1 ) ) . ConfigureAwait ( false ) ;
129+ }
130+ finally
131+ {
132+ ArrayPool < HttpPipelinePolicy > . Shared . Return ( policies ) ;
133+ }
74134 }
75135
76136 /// <summary>
@@ -82,8 +142,27 @@ public void Send(HttpMessage message, CancellationToken cancellationToken)
82142 {
83143 message . CancellationToken = cancellationToken ;
84144 AddHttpMessageProperties ( message ) ;
85- _pipeline . Span [ 0 ] . Process ( message , _pipeline . Slice ( 1 ) ) ;
145+
146+ if ( message . Policies == null || message . Policies . Count == 0 )
147+ {
148+ _pipeline . Span [ 0 ] . Process ( message , _pipeline . Slice ( 1 ) ) ;
149+ }
150+ else
151+ {
152+ var length = _pipeline . Length + message . Policies . Count ;
153+ var policies = ArrayPool < HttpPipelinePolicy > . Shared . Rent ( length ) ;
154+ try
155+ {
156+ var pipeline = CreateRequestPipeline ( policies , message . Policies ) ;
157+ pipeline . Span [ 0 ] . Process ( message , pipeline . Slice ( 1 ) ) ;
158+ }
159+ finally
160+ {
161+ ArrayPool < HttpPipelinePolicy > . Shared . Return ( policies ) ;
162+ }
163+ }
86164 }
165+
87166 /// <summary>
88167 /// Invokes the pipeline asynchronously with the provided request.
89168 /// </summary>
@@ -144,6 +223,60 @@ public static IDisposable CreateHttpMessagePropertiesScope(IDictionary<string, o
144223 return CurrentHttpMessagePropertiesScope . Value ;
145224 }
146225
226+ private ReadOnlyMemory < HttpPipelinePolicy > CreateRequestPipeline ( HttpPipelinePolicy [ ] policies , List < ( HttpPipelinePosition Position , HttpPipelinePolicy Policy ) > customPolicies )
227+ {
228+ if ( ! _internallyConstructed )
229+ {
230+ throw new InvalidOperationException ( "Cannot send messages with per-request policies if the pipeline wasn't constructed with HttpPipelineBuilder." ) ;
231+ }
232+
233+ // Copy over client policies and splice in custom policies at designated indices
234+ var pipeline = _pipeline . Span ;
235+ int transportIndex = pipeline . Length - 1 ;
236+
237+ pipeline . Slice ( 0 , _perCallIndex ) . CopyTo ( policies ) ;
238+
239+ int index = _perCallIndex ;
240+ int count = AddCustomPolicies ( customPolicies , policies , HttpPipelinePosition . PerCall , index ) ;
241+
242+ index += count ;
243+ count = _perRetryIndex - _perCallIndex ;
244+ pipeline . Slice ( _perCallIndex , count ) . CopyTo ( policies . AsSpan ( index , count ) ) ;
245+
246+ index += count ;
247+ count = AddCustomPolicies ( customPolicies , policies , HttpPipelinePosition . PerRetry , index ) ;
248+
249+ index += count ;
250+ count = transportIndex - _perRetryIndex ;
251+ pipeline . Slice ( _perRetryIndex , count ) . CopyTo ( policies . AsSpan ( index , count ) ) ;
252+
253+ index += count ;
254+ count = AddCustomPolicies ( customPolicies , policies , HttpPipelinePosition . BeforeTransport , index ) ;
255+
256+ index += count ;
257+ policies [ index ] = pipeline [ transportIndex ] ;
258+
259+ return new ReadOnlyMemory < HttpPipelinePolicy > ( policies , 0 , index + 1 ) ;
260+ }
261+
262+ private static int AddCustomPolicies ( List < ( HttpPipelinePosition Position , HttpPipelinePolicy Policy ) > source , HttpPipelinePolicy [ ] target , HttpPipelinePosition position , int start )
263+ {
264+ int count = 0 ;
265+ if ( source != null )
266+ {
267+ foreach ( var policy in source )
268+ {
269+ if ( policy . Position == position )
270+ {
271+ target [ start + count ] = policy . Policy ;
272+ count ++ ;
273+ }
274+ }
275+ }
276+
277+ return count ;
278+ }
279+
147280 private static void AddHttpMessageProperties ( HttpMessage message )
148281 {
149282 if ( CurrentHttpMessagePropertiesScope . Value != null )
0 commit comments