Skip to content

Commit de1ecf5

Browse files
authored
Improve parse chunk consumer side (#423)
- Improve the CRC32 validation: 1- Crc32 is enabled by default 2- Add System.IO.Hashing dependency 3- A function is called when CRC32 fails, the user can decide to ChunkAction.Skip skip the chunk or TryToParse 4- Inside the function, the user can add custom code - Pin CI .NET versions - Remove the tests for .NET 9 and leave only for .NET 8 to increase the speed --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent b5d784f commit de1ecf5

23 files changed

+352
-89
lines changed

.github/workflows/build-test.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ jobs:
88
name: build/test on windows-latest
99
runs-on: windows-latest
1010
steps:
11+
- name: Clone repository
12+
uses: actions/checkout@v4
13+
- name: Setup .NET SDK
14+
uses: actions/setup-dotnet@v4
15+
with:
16+
global-json-file: global.json
1117
- uses: actions/checkout@v4
1218
- uses: actions/cache@v4
1319
with:
@@ -35,6 +41,12 @@ jobs:
3541
name: build/test on ubuntu-latest
3642
runs-on: ubuntu-latest
3743
steps:
44+
- name: Clone repository
45+
uses: actions/checkout@v4
46+
- name: Setup .NET SDK
47+
uses: actions/setup-dotnet@v4
48+
with:
49+
global-json-file: global.json
3850
- uses: actions/checkout@v4
3951
- uses: actions/setup-dotnet@v4
4052
with:

Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
<PackageVersion Include="K4os.Compression.LZ4.Streams" Version="1.2.16" />
2121
<PackageVersion Include="Microsoft.Extensions.Logging" Version="8.0.0" />
2222
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
23+
<PackageVersion Include="Microsoft.Extensions.Configuration.Json" Version="9.0.6" />
2324
</ItemGroup>
2425
<ItemGroup Label="net8.0 specific" Condition="'$(TargetFramework)' == 'net8.0'">
2526
<!-- RabbitMQ.Stream.Client -->

RabbitMQ.Stream.Client/ClientExceptions.cs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,6 @@ public UnknownCommandException(string s)
177177
}
178178
}
179179

180-
public class CrcException : Exception
181-
{
182-
public CrcException(string s)
183-
: base(s)
184-
{
185-
}
186-
}
187-
188180
public class TooManyConnectionsException : Exception
189181
{
190182
public TooManyConnectionsException(string s)

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,9 @@ public ushort InitialCredits
7171
}
7272
}
7373

74-
// enables the check of the crc on the delivery.
75-
// the server will send the crc for each chunk and the client will check it.
76-
// It is not enabled by default because it is could reduce the performance.
77-
public ICrc32 Crc32 { get; set; } = null;
74+
// It is enabled by default. You can disable it by setting it to null.
75+
// It is recommended to keep it enabled. Disable it only for performance reasons.
76+
public ICrc32 Crc32 { get; set; } = new StreamCrc32();
7877
}
7978

8079
public class ConsumerInfo : Info
@@ -90,6 +89,7 @@ public ConsumerInfo(string stream, string reference, string identifier, List<str
9089
public override string ToString()
9190
{
9291
var partitions = Partitions ?? [];
93-
return $"ConsumerInfo(Stream={Stream}, Reference={Reference}, Identifier={Identifier}, Partitions={string.Join(",", partitions)})";
92+
return
93+
$"ConsumerInfo(Stream={Stream}, Reference={Reference}, Identifier={Identifier}, Partitions={string.Join(",", partitions)})";
9494
}
9595
}

RabbitMQ.Stream.Client/ICrc32.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,24 @@
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using System;
6+
57
namespace RabbitMQ.Stream.Client
68
{
9+
public enum ChunkAction
10+
{
11+
/// <summary>
12+
/// The consumer will try to process the Chunk.
13+
/// </summary>
14+
TryToProcess,
15+
16+
/// <summary>
17+
/// The consumer will skip the Chunk and continue processing the next Chunk.
18+
/// All the messages in the Chunk will be skipped.
19+
/// </summary>
20+
Skip
21+
}
22+
723
/// <summary>
824
/// ICrc32 defines an interface for implementing crc32 hashing.
925
/// Library users who wish to perform crc32 checks on data from RabbitMQ
@@ -13,5 +29,13 @@ namespace RabbitMQ.Stream.Client
1329
public interface ICrc32
1430
{
1531
byte[] Hash(byte[] data);
32+
33+
/// <summary>
34+
/// FailAction is called when the Crc32 check fails.
35+
/// The user can assign a function that returns a <see cref="ChunkAction"/>.
36+
/// It is possible to add custom logic to handle the failure, such as logging.
37+
/// The code here should be safe
38+
/// </summary>
39+
Func<IConsumer, ChunkAction> FailAction { get; set; }
1640
}
1741
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ RabbitMQ.Stream.Client.BindingsSuperStreamSpec.BindingsSuperStreamSpec(string Na
3535
RabbitMQ.Stream.Client.Chunk.Crc.get -> uint
3636
RabbitMQ.Stream.Client.Chunk.Data.get -> System.ReadOnlyMemory<byte>
3737
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
38+
RabbitMQ.Stream.Client.ChunkAction
39+
RabbitMQ.Stream.Client.ChunkAction.Skip = 1 -> RabbitMQ.Stream.Client.ChunkAction
40+
RabbitMQ.Stream.Client.ChunkAction.TryToProcess = 0 -> RabbitMQ.Stream.Client.ChunkAction
3841
RabbitMQ.Stream.Client.Client.ClientId.get -> string
3942
RabbitMQ.Stream.Client.Client.ClientId.init -> void
4043
RabbitMQ.Stream.Client.Client.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>
@@ -109,8 +112,6 @@ RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
109112
RabbitMQ.Stream.Client.ConsumerInfo
110113
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string identifier, System.Collections.Generic.List<string> partitions) -> void
111114
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
112-
RabbitMQ.Stream.Client.CrcException
113-
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
114115
RabbitMQ.Stream.Client.CreateConsumerException.CreateConsumerException(string s, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void
115116
RabbitMQ.Stream.Client.CreateException
116117
RabbitMQ.Stream.Client.CreateException.CreateException(string s) -> void
@@ -175,6 +176,8 @@ RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void
175176
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
176177
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
177178
RabbitMQ.Stream.Client.ICrc32
179+
RabbitMQ.Stream.Client.ICrc32.FailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkAction>
180+
RabbitMQ.Stream.Client.ICrc32.FailAction.set -> void
178181
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
179182
RabbitMQ.Stream.Client.Info
180183
RabbitMQ.Stream.Client.Info.Identifier.get -> string
@@ -322,6 +325,11 @@ RabbitMQ.Stream.Client.RouteQueryResponse.Write(System.Span<byte> span) -> int
322325
RabbitMQ.Stream.Client.RoutingStrategyType
323326
RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.RoutingStrategyType
324327
RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType
328+
RabbitMQ.Stream.Client.StreamCrc32
329+
RabbitMQ.Stream.Client.StreamCrc32.FailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkAction>
330+
RabbitMQ.Stream.Client.StreamCrc32.FailAction.set -> void
331+
RabbitMQ.Stream.Client.StreamCrc32.Hash(byte[] data) -> byte[]
332+
RabbitMQ.Stream.Client.StreamCrc32.StreamCrc32() -> void
325333
RabbitMQ.Stream.Client.StreamStats
326334
RabbitMQ.Stream.Client.StreamStats.CommittedChunkId() -> ulong
327335
RabbitMQ.Stream.Client.StreamStats.FirstOffset() -> ulong

RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<PackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" PrivateAssets="all" />
4040
<PackageReference Include="Microsoft.SourceLink.GitHub" PrivateAssets="all" />
4141
<PackageReference Include="MinVer" PrivateAssets="all" />
42+
<PackageReference Include="System.IO.Hashing" />
4243
<PackageReference Include="System.IO.Pipelines" />
4344
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
4445
</ItemGroup>

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,16 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable
130130
{
131131
private readonly RawConsumerConfig _config;
132132

133-
private readonly Channel<Chunk> _chunksBuffer;
133+
private readonly Channel<(Chunk, ChunkAction)> _chunksBuffer;
134+
134135
private readonly ushort _initialCredits;
135136

136137
// _completeSubscription is used to notify the ProcessChunks task
137138
// that the subscription is completed and so it can start to process the chunks
138139
// this is needed because the socket starts to receive the chunks before the subscription_id is
139140
// assigned.
140-
private readonly TaskCompletionSource _completeSubscription = new();
141+
private readonly TaskCompletionSource _completeSubscription =
142+
new(TaskCreationOptions.RunContinuationsAsynchronously);
141143

142144
protected sealed override string DumpEntityConfiguration()
143145
{
@@ -161,7 +163,8 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
161163
Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration());
162164
Info = new ConsumerInfo(_config.Stream, _config.Reference, _config.Identifier, null);
163165
// _chunksBuffer is a channel that is used to buffer the chunks
164-
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
166+
167+
_chunksBuffer = Channel.CreateBounded<(Chunk, ChunkAction)>(new BoundedChannelOptions(_initialCredits)
165168
{
166169
AllowSynchronousContinuations = false,
167170
SingleReader = true,
@@ -220,7 +223,6 @@ public async Task StoreOffset(ulong offset)
220223
/// MaybeLockDispatch is an optimization to avoid to lock the dispatch
221224
/// when the consumer is not single active consumer
222225
/// </summary>
223-
224226
private async Task MaybeLockDispatch()
225227
{
226228
if (_config.IsSingleActiveConsumer)
@@ -266,16 +268,16 @@ Message MessageFromSequence(ref ReadOnlySequence<byte> unCompressedData, ref int
266268
var slice = unCompressedData.Slice(compressOffset, 4);
267269
compressOffset += WireFormatting.ReadUInt32(ref slice, out var len);
268270
Debug.Assert(len > 0);
269-
slice = unCompressedData.Slice(compressOffset, len);
270-
Debug.Assert(slice.Length >= len);
271+
var sliceMsg = unCompressedData.Slice(compressOffset, len);
272+
Debug.Assert(sliceMsg.Length == len);
271273
compressOffset += (int)len;
272274

273275
// Here we use the Message.From(ref ReadOnlySequence<byte> seq ..) method to parse the message
274276
// instead of the Message From(ref SequenceReader<byte> reader ..) method
275277
// Since the ParseChunk is async and we cannot use the ref SequenceReader<byte> reader
276278
// See https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250 for more details
277279

278-
var message = Message.From(ref slice, len);
280+
var message = Message.From(ref sliceMsg, len);
279281
return message;
280282
}
281283
catch (Exception e)
@@ -461,15 +463,18 @@ private void ProcessChunks()
461463
// need to wait the subscription is completed
462464
// else the _subscriberId could be incorrect
463465
_completeSubscription.Task.Wait();
466+
464467
try
465468
{
466469
while (!Token.IsCancellationRequested &&
467470
await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
468471
{
469-
while (_chunksBuffer.Reader.TryRead(out var chunk))
472+
while (_chunksBuffer.Reader.TryRead(out var chunkWithAction))
470473
{
471474
// We send the credit to the server to allow the server to send more messages
472475
// we request the credit before process the check to keep the network busy
476+
477+
var (chunk, action) = chunkWithAction;
473478
try
474479
{
475480
if (Token.IsCancellationRequested)
@@ -498,8 +503,21 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
498503
// and close the task
499504
if (Token.IsCancellationRequested)
500505
break;
501-
502-
await ParseChunk(chunk).ConfigureAwait(false);
506+
switch (action)
507+
{
508+
case ChunkAction.Skip:
509+
// the chunk will be skipped due of CRC32 fail
510+
Logger?.LogWarning(
511+
"The chunk {ChunkId} will be skipped for {EntityInfo}",
512+
chunk.ChunkId, DumpEntityConfiguration());
513+
continue; // skip the chunk
514+
case ChunkAction.TryToProcess:
515+
// That's what happens most of the time, and this is the default action
516+
await ParseChunk(chunk).ConfigureAwait(false);
517+
break;
518+
default:
519+
throw new ArgumentOutOfRangeException();
520+
}
503521
}
504522
}
505523

@@ -593,6 +611,8 @@ private async Task Init()
593611
return;
594612
}
595613

614+
var chunkAction = ChunkAction.TryToProcess;
615+
596616
if (_config.Crc32 is not null)
597617
{
598618
var crcCalculated = BitConverter.ToUInt32(
@@ -606,13 +626,17 @@ private async Task Init()
606626
DumpEntityConfiguration(),
607627
chunkConsumed);
608628

609-
throw new CrcException(
610-
$"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {DumpEntityConfiguration()}, " +
611-
$"Chunk Consumed {chunkConsumed}");
629+
// if the user has set the FailAction, we call it
630+
// to allow the user to handle the chunk action
631+
// if the FailAction is not set, we skip the chunk
632+
chunkAction = _config.Crc32.FailAction?.Invoke(this) ?? ChunkAction.Skip;
612633
}
613634
}
614635

615-
await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false);
636+
// The chunkAction is passed to the _chunksBuffer because the ProcessChunks task
637+
// asks for the credits in a Task. If we skip the chunk here no more credits will be requested
638+
await _chunksBuffer.Writer.WriteAsync((deliver.Chunk, chunkAction), Token)
639+
.ConfigureAwait(false);
616640
}
617641
catch (OperationCanceledException)
618642
{

RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ private async Task<IConsumer> StandardConsumer(bool boot)
6161
ConnectionClosedHandler = async (closeReason) =>
6262
{
6363
if (IsClosedNormally(closeReason))
64+
{
65+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser,
66+
[_consumerConfig.Stream]);
6467
return;
68+
}
6569

6670
try
6771
{
@@ -153,7 +157,12 @@ private async Task<IConsumer> SuperConsumer(bool boot)
153157
ConnectionClosedHandler = async (closeReason, partitionStream) =>
154158
{
155159
if (IsClosedNormally(closeReason))
160+
{
161+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser,
162+
[partitionStream]);
156163
return;
164+
}
165+
157166
await RandomWait().ConfigureAwait(false);
158167
try
159168
{

RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,12 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
5050
{
5151
await RandomWait().ConfigureAwait(false);
5252
if (IsClosedNormally(closeReason))
53+
{
54+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser,
55+
[partitionStream]);
5356
return;
57+
}
58+
5459
try
5560
{
5661
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
@@ -121,7 +126,11 @@ private async Task<IProducer> StandardProducer()
121126
{
122127
await RandomWait().ConfigureAwait(false);
123128
if (IsClosedNormally())
129+
{
130+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser,
131+
[_producerConfig.Stream]);
124132
return;
133+
}
125134

126135
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
127136
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);

0 commit comments

Comments
 (0)