Skip to content

Commit 4f13639

Browse files
authored
Add consumer flow control (#424)
* Add consumer flow control * Add rawConsumer.Credits to manually request Credits * Closes #420 --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent de1ecf5 commit 4f13639

File tree

15 files changed

+451
-54
lines changed

15 files changed

+451
-54
lines changed

RabbitMQ.Stream.Client/Consts.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ internal static class Consts
1515
internal static readonly TimeSpan MidWait = TimeSpan.FromSeconds(3);
1616
internal static readonly TimeSpan LongWait = TimeSpan.FromSeconds(10);
1717
internal const ushort ConsumerInitialCredits = 2;
18+
1819
internal const byte Version1 = 1;
1920
internal const byte Version2 = 2;
2021
internal const string SubscriptionPropertyFilterPrefix = "filter.";

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,15 @@ public ushort InitialCredits
7474
// It is enabled by default. You can disable it by setting it to null.
7575
// It is recommended to keep it enabled. Disable it only for performance reasons.
7676
public ICrc32 Crc32 { get; set; } = new StreamCrc32();
77-
}
7877

79-
public class ConsumerInfo : Info
80-
{
81-
public string Reference { get; }
78+
public FlowControl FlowControl { get; set; } = new FlowControl();
79+
}
8280

83-
public ConsumerInfo(string stream, string reference, string identifier, List<string> partitions) : base(stream,
81+
public class ConsumerInfo(string stream, string reference, string identifier, List<string> partitions)
82+
: Info(stream,
8483
identifier, partitions)
85-
{
86-
Reference = reference;
87-
}
84+
{
85+
public string Reference { get; } = reference;
8886

8987
public override string ToString()
9088
{
@@ -93,3 +91,36 @@ public override string ToString()
9391
$"ConsumerInfo(Stream={Stream}, Reference={Reference}, Identifier={Identifier}, Partitions={string.Join(",", partitions)})";
9492
}
9593
}
94+
95+
public enum ConsumerFlowStrategy
96+
{
97+
/// <summary>
98+
/// Request credits before parsing the chunk.
99+
/// Default strategy. The best for performance.
100+
/// </summary>
101+
CreditsBeforeParseChunk,
102+
103+
/// <summary>
104+
/// Request credits after parsing the chunk.
105+
/// It can be useful if the parsing is expensive and you want to avoid requesting credits too early.
106+
/// Useful for slow processing of chunks.
107+
/// </summary>
108+
CreditsAfterParseChunk,
109+
110+
/// <summary>
111+
/// The user manually requests credits with <see cref="RawConsumer.Credits()"/>
112+
/// Everything is done manually, so the user has full control over the flow of the consumer.
113+
/// </summary>
114+
ConsumerCredits
115+
}
116+
117+
/// <summary>
118+
/// FlowControl is used to control the flow of the consumer.
119+
/// See <see cref="ConsumerFlowStrategy"/> for the available strategies.
120+
/// Open for future extensions.
121+
/// </summary>ra
122+
public class FlowControl
123+
{
124+
public ConsumerFlowStrategy Strategy { get; set; } = ConsumerFlowStrategy.CreditsBeforeParseChunk;
125+
126+
}

RabbitMQ.Stream.Client/ICrc32.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@ public interface ICrc32
3636
/// It is possible to add custom logic to handle the failure, such as logging.
3737
/// The code here should be safe
3838
/// </summary>
39-
Func<IConsumer, ChunkAction> FailAction { get; set; }
39+
Func<IConsumer, ChunkAction> FailAction { get; init; }
4040
}
4141
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.get -> System.Func<RabbitMQ.Str
109109
RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
110110
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
111111
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
112+
RabbitMQ.Stream.Client.ConsumerFlowStrategy
113+
RabbitMQ.Stream.Client.ConsumerFlowStrategy.ConsumerCredits = 2 -> RabbitMQ.Stream.Client.ConsumerFlowStrategy
114+
RabbitMQ.Stream.Client.ConsumerFlowStrategy.CreditsAfterParseChunk = 1 -> RabbitMQ.Stream.Client.ConsumerFlowStrategy
115+
RabbitMQ.Stream.Client.ConsumerFlowStrategy.CreditsBeforeParseChunk = 0 -> RabbitMQ.Stream.Client.ConsumerFlowStrategy
112116
RabbitMQ.Stream.Client.ConsumerInfo
113117
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string identifier, System.Collections.Generic.List<string> partitions) -> void
114118
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
@@ -137,6 +141,10 @@ RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.get -> string
137141
RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.set -> void
138142
RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.get -> System.Func<RabbitMQ.Stream.Client.MetaDataUpdate, System.Threading.Tasks.Task>
139143
RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.set -> void
144+
RabbitMQ.Stream.Client.FlowControl
145+
RabbitMQ.Stream.Client.FlowControl.FlowControl() -> void
146+
RabbitMQ.Stream.Client.FlowControl.Strategy.get -> RabbitMQ.Stream.Client.ConsumerFlowStrategy
147+
RabbitMQ.Stream.Client.FlowControl.Strategy.set -> void
140148
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
141149
RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func<System.Threading.Tasks.ValueTask<bool>> sendHeartbeatFunc, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CloseResponse>> close, int heartbeat, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.HeartBeatHandler> logger = null) -> void
142150
RabbitMQ.Stream.Client.IClient.ClientId.get -> string
@@ -173,11 +181,13 @@ RabbitMQ.Stream.Client.ICommandVersions.Command.get -> ushort
173181
RabbitMQ.Stream.Client.ICommandVersions.MaxVersion.get -> ushort
174182
RabbitMQ.Stream.Client.ICommandVersions.MinVersion.get -> ushort
175183
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void
184+
RabbitMQ.Stream.Client.IConsumerConfig.FlowControl.get -> RabbitMQ.Stream.Client.FlowControl
185+
RabbitMQ.Stream.Client.IConsumerConfig.FlowControl.set -> void
176186
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
177187
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
178188
RabbitMQ.Stream.Client.ICrc32
179189
RabbitMQ.Stream.Client.ICrc32.FailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkAction>
180-
RabbitMQ.Stream.Client.ICrc32.FailAction.set -> void
190+
RabbitMQ.Stream.Client.ICrc32.FailAction.init -> void
181191
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
182192
RabbitMQ.Stream.Client.Info
183193
RabbitMQ.Stream.Client.Info.Identifier.get -> string
@@ -228,6 +238,7 @@ RabbitMQ.Stream.Client.PublishFilter.PublishFilter() -> void
228238
RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Collections.Generic.List<(ulong, RabbitMQ.Stream.Client.Message)> messages, System.Func<RabbitMQ.Stream.Client.Message, string> filterValueExtractor, Microsoft.Extensions.Logging.ILogger logger) -> void
229239
RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
230240
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
241+
RabbitMQ.Stream.Client.RawConsumer.Credits() -> System.Threading.Tasks.Task
231242
RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
232243
RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.get -> System.Func<string, System.Threading.Tasks.Task>
233244
RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.set -> void
@@ -257,6 +268,8 @@ RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Clie
257268
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
258269
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.get -> RabbitMQ.Stream.Client.ConsumerFilter
259270
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.set -> void
271+
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.FlowControl.get -> RabbitMQ.Stream.Client.FlowControl
272+
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.FlowControl.set -> void
260273
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.get -> ushort
261274
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.set -> void
262275
RabbitMQ.Stream.Client.Reliable.ConsumerFactory._consumer -> RabbitMQ.Stream.Client.IConsumer
@@ -327,7 +340,7 @@ RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.Ro
327340
RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType
328341
RabbitMQ.Stream.Client.StreamCrc32
329342
RabbitMQ.Stream.Client.StreamCrc32.FailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkAction>
330-
RabbitMQ.Stream.Client.StreamCrc32.FailAction.set -> void
343+
RabbitMQ.Stream.Client.StreamCrc32.FailAction.init -> void
331344
RabbitMQ.Stream.Client.StreamCrc32.Hash(byte[] data) -> byte[]
332345
RabbitMQ.Stream.Client.StreamCrc32.StreamCrc32() -> void
333346
RabbitMQ.Stream.Client.StreamStats

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 70 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ internal void Validate()
105105
case { Values.Count: 0 }:
106106
throw new ArgumentException("Values must be provided when Filter is set");
107107
}
108+
109+
FlowControl ??= new FlowControl();
110+
108111
}
109112

110113
internal bool IsFiltering => ConsumerFilter is { Values.Count: > 0 };
@@ -177,7 +180,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
177180
ProcessChunks();
178181
}
179182

180-
// if a user specify a custom offset
183+
// if a user specifies a custom offset,
181184
// the _client must filter messages
182185
// and dispatch only the messages starting from the
183186
// user offset.
@@ -196,17 +199,12 @@ protected override string GetStream()
196199
return _config.Stream;
197200
}
198201

199-
public async Task StoreOffset(ulong offset)
200-
{
201-
await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false);
202-
}
203-
204202
////// *********************
205203
// IsPromotedAsActive is needed to understand if the consumer is active or not
206204
// by default is active
207205
// in case of single active consumer can be not active
208206
// it is important to skip the messages in the chunk that
209-
// it is in progress. In this way the promotion will be faster
207+
// it is in progress. In this way, the promotion will be faster
210208
// avoiding to block the consumer handler if the user put some
211209
// long task
212210
private bool IsPromotedAsActive { get; set; }
@@ -219,14 +217,14 @@ public async Task StoreOffset(ulong offset)
219217

220218
/// <summary>
221219
/// MaybeLockDispatch locks the dispatch of the messages
222-
/// it is needed only when the consumer is single active consumer
223-
/// MaybeLockDispatch is an optimization to avoid to lock the dispatch
220+
/// it is needed only when the consumer is single active consumer.
221+
/// MaybeLockDispatch is an optimization to avoid lock the dispatch
224222
/// when the consumer is not single active consumer
225223
/// </summary>
226224
private async Task MaybeLockDispatch()
227225
{
228226
if (_config.IsSingleActiveConsumer)
229-
await PromotionLock.WaitAsync(Token).ConfigureAwait(false);
227+
await PromotionLock.WaitAsync(TimeSpan.FromSeconds(5), Token).ConfigureAwait(false);
230228
}
231229

232230
/// <summary>
@@ -309,9 +307,14 @@ async Task DispatchMessage(Message message, ulong i)
309307
{
310308
if (!Token.IsCancellationRequested)
311309
{
310+
// we need to lock the dispatch only if the consumer is single active consumer
311+
await MaybeLockDispatch().ConfigureAwait(false);
312+
var lockedIsPromotedAsActive = IsPromotedAsActive;
313+
MaybeReleaseLock();
314+
312315
// it is usually active
313316
// it is useful only in single active consumer
314-
if (IsPromotedAsActive)
317+
if (lockedIsPromotedAsActive)
315318
{
316319
if (_status != EntityStatus.Open)
317320
{
@@ -423,15 +426,7 @@ await _config.MessageHandler(this,
423426
for (ulong z = 0; z < subEntryChunk.NumRecordsInBatch; z++)
424427
{
425428
var message = MessageFromSequence(ref unCompressedData, ref compressOffset);
426-
await MaybeLockDispatch().ConfigureAwait(false);
427-
try
428-
{
429-
await DispatchMessage(message, messageOffset++).ConfigureAwait(false);
430-
}
431-
finally
432-
{
433-
MaybeReleaseLock();
434-
}
429+
await DispatchMessage(message, messageOffset++).ConfigureAwait(false);
435430
}
436431

437432
numRecords -= subEntryChunk.NumRecordsInBatch;
@@ -479,13 +474,23 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
479474
{
480475
if (Token.IsCancellationRequested)
481476
break;
482-
await _client.Credit(EntityId, 1).ConfigureAwait(false);
477+
// Request the credit to the server
478+
if (_config.FlowControl.Strategy ==
479+
ConsumerFlowStrategy.CreditsBeforeParseChunk)
480+
{
481+
// Request the credit before processing the chunk
482+
// this is the default behavior
483+
// it is useful to keep the network busy
484+
// and avoid to wait for the next chunk
485+
await _client.Credit(EntityId, 1)
486+
.ConfigureAwait(false);
487+
}
483488
}
484489
catch (InvalidOperationException)
485490
{
486491
// The client has been closed
487492
// Suppose a scenario where the client is closed and the ProcessChunks task is still running
488-
// we remove the the subscriber from the client and we close the client
493+
// we remove the subscriber from the client and we close the client
489494
// The ProcessChunks task will try to send the credit to the server
490495
// The client will throw an InvalidOperationException
491496
// since the connection is closed
@@ -514,6 +519,14 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
514519
case ChunkAction.TryToProcess:
515520
// That's what happens most of the time, and this is the default action
516521
await ParseChunk(chunk).ConfigureAwait(false);
522+
523+
if (_config.FlowControl.Strategy == ConsumerFlowStrategy.CreditsAfterParseChunk)
524+
{
525+
// it avoids flooding the network with credits
526+
await _client.Credit(EntityId, 1)
527+
.ConfigureAwait(false);
528+
}
529+
517530
break;
518531
default:
519532
throw new ArgumentOutOfRangeException();
@@ -598,7 +611,7 @@ private async Task Init()
598611
chunkConsumed++;
599612
// Send the chunk to the _chunksBuffer
600613
// in this way the chunks are processed in a separate thread
601-
// this wont' block the socket thread
614+
// this won't block the socket thread
602615
// introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250
603616
if (Token.IsCancellationRequested)
604617
{
@@ -642,7 +655,7 @@ await _chunksBuffer.Writer.WriteAsync((deliver.Chunk, chunkAction), Token)
642655
{
643656
// The consumer is closing from the user but some chunks are still in the buffer
644657
// simply skip the chunk since the Token.IsCancellationRequested is true
645-
// the catch is needed to avoid to propagate the exception to the socket thread.
658+
// the catch is needed to avoid propagating the exception to the socket thread.
646659
Logger?.LogWarning(
647660
"OperationCanceledException. {EntityInfo} has been closed while consuming messages. " +
648661
"Token.IsCancellationRequested: {IsCancellationRequested}",
@@ -719,7 +732,7 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() =>
719732
// at this point the server has removed the consumer from the list
720733
// and the unsubscribe is not needed anymore (ignoreIfClosed = true)
721734
// we call the Close to re-enter to the standard behavior
722-
// ignoreIfClosed is an optimization to avoid to send the unsubscribe
735+
// ignoreIfClosed is an optimization to avoid sending the unsubscribe
723736
_config.Pool.RemoveConsumerEntityFromStream(_client.ClientId, EntityId, _config.Stream);
724737
await Shutdown(_config, true).ConfigureAwait(false);
725738
_config.MetadataHandler?.Invoke(metaDataUpdate);
@@ -770,7 +783,7 @@ protected override async Task<ResponseCode> DeleteEntityFromTheServer(bool ignor
770783
public override async Task<ResponseCode> Close()
771784
{
772785
// when the consumer is closed we must be sure that the
773-
// the subscription is completed to avoid problems with the connection
786+
// subscription is completed to avoid problems with the connection
774787
// It could happen when the closing is called just after the creation
775788
_completeSubscription.Task.Wait();
776789
return await Shutdown(_config).ConfigureAwait(false);
@@ -790,5 +803,36 @@ public void Dispose()
790803
}
791804

792805
public ConsumerInfo Info { get; }
806+
807+
public async Task StoreOffset(ulong offset)
808+
{
809+
await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false);
810+
}
811+
812+
/// <summary>
813+
/// Request credits from the server.
814+
/// Valid only if the ConsumerFlowStrategy is set to ConsumerFlowStrategy.ConsumerCredits.
815+
/// </summary>
816+
public async Task Credits()
817+
{
818+
await Credits(1).ConfigureAwait(false);
819+
}
820+
821+
private async Task Credits(ushort credits)
822+
{
823+
if (credits < 1)
824+
{
825+
throw new ArgumentException(
826+
$"Credits must be greater than 0");
827+
}
828+
829+
if (_config.FlowControl.Strategy != ConsumerFlowStrategy.ConsumerCredits)
830+
{
831+
throw new InvalidOperationException(
832+
"RequestCredits can be used only with ConsumerFlowStrategy.ManualRequestCredit.");
833+
}
834+
835+
await _client.Credit(EntityId, credits).ConfigureAwait(false);
836+
}
793837
}
794838
}

RabbitMQ.Stream.Client/Reliable/Consumer.cs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -115,21 +115,10 @@ public record ConsumerConfig : ReliableConfig
115115
/// <summary>
116116
/// Enable the check of the crc on the delivery when set to an implementation
117117
/// of <see cref="ICrc32"><code>ICrc32</code></see>.
118-
/// The server will send the crc for each chunk and the client will check it.
119-
/// It is not enabled by default. In some case it is could reduce the performance.
120-
/// ICrc32 is an interface that can be implemented by the user with the desired implementation.
121-
/// The client is tested with the System.IO.Hashing.Crc32 implementation, like:
122-
///<c>
123-
/// private class UserCrc32 : ICrc32
124-
/// {
125-
/// public byte[] Hash(byte[] data)
126-
/// {
127-
/// return System.IO.Hashing.Crc32.Hash(data);
128-
/// }
129-
/// }
130-
/// </c>
131118
/// </summary>
132-
public ICrc32 Crc32 { get; set; } = null;
119+
public ICrc32 Crc32 { get; set; } = new StreamCrc32();
120+
121+
public FlowControl FlowControl { get; set; } = new FlowControl();
133122

134123
public ConsumerConfig(StreamSystem streamSystem, string stream) : base(streamSystem, stream)
135124
{

0 commit comments

Comments
 (0)