16
16
17
17
import static com .rabbitmq .stream .impl .Assertions .assertThat ;
18
18
import static com .rabbitmq .stream .impl .LoadBalancerClusterTest .LOAD_BALANCER_ADDRESS ;
19
+ import static com .rabbitmq .stream .impl .TestUtils .BrokerVersion .RABBITMQ_4_1_2 ;
19
20
import static com .rabbitmq .stream .impl .TestUtils .newLoggerLevel ;
20
21
import static com .rabbitmq .stream .impl .TestUtils .sync ;
22
+ import static com .rabbitmq .stream .impl .TestUtils .waitAtMost ;
21
23
import static com .rabbitmq .stream .impl .ThreadUtils .threadFactory ;
22
24
import static com .rabbitmq .stream .impl .Tuples .pair ;
23
25
import static java .util .stream .Collectors .toList ;
24
26
import static java .util .stream .IntStream .range ;
25
27
import static org .assertj .core .api .Assertions .assertThat ;
28
+ import static org .assertj .core .api .InstanceOfAssertFactories .stream ;
26
29
27
30
import ch .qos .logback .classic .Level ;
28
31
import com .google .common .collect .Streams ;
29
32
import com .google .common .util .concurrent .RateLimiter ;
30
33
import com .rabbitmq .stream .*;
34
+ import com .rabbitmq .stream .impl .TestUtils .BrokerVersionAtLeast ;
31
35
import com .rabbitmq .stream .impl .TestUtils .DisabledIfNotCluster ;
32
36
import com .rabbitmq .stream .impl .TestUtils .Sync ;
33
37
import com .rabbitmq .stream .impl .Tuples .Pair ;
41
45
import java .util .LinkedHashMap ;
42
46
import java .util .List ;
43
47
import java .util .Map ;
48
+ import java .util .concurrent .Callable ;
49
+ import java .util .concurrent .ConcurrentHashMap ;
44
50
import java .util .concurrent .Executors ;
45
51
import java .util .concurrent .ScheduledExecutorService ;
46
52
import java .util .concurrent .ThreadFactory ;
47
53
import java .util .concurrent .atomic .AtomicBoolean ;
48
54
import java .util .concurrent .atomic .AtomicInteger ;
55
+ import java .util .concurrent .atomic .AtomicLong ;
49
56
import java .util .concurrent .atomic .AtomicReference ;
57
+ import java .util .stream .IntStream ;
50
58
import org .junit .jupiter .api .*;
51
59
import org .junit .jupiter .params .ParameterizedTest ;
52
60
import org .junit .jupiter .params .provider .CsvSource ;
61
+ import org .junit .jupiter .params .provider .ValueSource ;
53
62
import org .slf4j .Logger ;
54
63
import org .slf4j .LoggerFactory ;
55
64
@@ -201,15 +210,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
201
210
syncs = consumers .stream ().map (c -> c .waitForNewMessages (100 )).collect (toList ());
202
211
syncs .forEach (s -> assertThat (s ).completes ());
203
212
204
- nodes .forEach (
205
- n -> {
206
- LOGGER .info ("Restarting node {}..." , n );
207
- Cli .restartNode (n );
208
- LOGGER .info ("Restarted node {}." , n );
209
- });
210
- LOGGER .info ("Rebalancing..." );
211
- Cli .rebalance ();
212
- LOGGER .info ("Rebalancing over." );
213
+ restartCluster ();
213
214
214
215
Thread .sleep (BACK_OFF_DELAY_POLICY .delay (0 ).toMillis ());
215
216
@@ -291,8 +292,132 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
291
292
}
292
293
}
293
294
295
+ @ ParameterizedTest
296
+ @ ValueSource (booleans = {true , false })
297
+ @ BrokerVersionAtLeast (RABBITMQ_4_1_2 )
298
+ void sacWithClusterRestart (boolean superStream ) throws Exception {
299
+ environment =
300
+ environmentBuilder
301
+ .uris (URIS )
302
+ .netty ()
303
+ .bootstrapCustomizer (
304
+ b -> {
305
+ b .option (
306
+ ChannelOption .CONNECT_TIMEOUT_MILLIS ,
307
+ (int ) BACK_OFF_DELAY_POLICY .delay (0 ).toMillis ());
308
+ })
309
+ .environmentBuilder ()
310
+ .maxConsumersByConnection (1 )
311
+ .build ();
312
+
313
+ int consumerCount = 3 ;
314
+ AtomicLong lastOffset = new AtomicLong (0 );
315
+ String app = "app-name" ;
316
+ String s = TestUtils .streamName (testInfo );
317
+ ProducerState pState = null ;
318
+ List <ConsumerState > consumers = Collections .emptyList ();
319
+ try {
320
+ StreamCreator sCreator = environment .streamCreator ().stream (s );
321
+ if (superStream ) {
322
+ sCreator = sCreator .superStream ().partitions (1 ).creator ();
323
+ }
324
+ sCreator .create ();
325
+
326
+ pState = new ProducerState (s , true , superStream , environment );
327
+ pState .start ();
328
+
329
+ Map <Integer , Boolean > consumerStatus = new ConcurrentHashMap <>();
330
+ consumers =
331
+ IntStream .range (0 , consumerCount )
332
+ .mapToObj (
333
+ i ->
334
+ new ConsumerState (
335
+ s ,
336
+ environment ,
337
+ b -> {
338
+ b .singleActiveConsumer ()
339
+ .name (app )
340
+ .noTrackingStrategy ()
341
+ .consumerUpdateListener (
342
+ ctx -> {
343
+ consumerStatus .put (i , ctx .isActive ());
344
+ return OffsetSpecification .offset (lastOffset .get ());
345
+ });
346
+ if (superStream ) {
347
+ b .superStream (s );
348
+ } else {
349
+ b .stream (s );
350
+ }
351
+ },
352
+ (ctx , m ) -> lastOffset .set (ctx .offset ())))
353
+ .collect (toList ());
354
+
355
+ Sync sync = pState .waitForNewMessages (100 );
356
+ assertThat (sync ).completes ();
357
+ sync = consumers .get (0 ).waitForNewMessages (100 );
358
+ assertThat (sync ).completes ();
359
+
360
+ String streamArg = superStream ? s + "-0" : s ;
361
+
362
+ Callable <Void > checkConsumers =
363
+ () -> {
364
+ waitAtMost (
365
+ () -> {
366
+ List <Cli .SubscriptionInfo > subscriptions = Cli .listGroupConsumers (streamArg , app );
367
+ LOGGER .info ("Group consumers: {}" , subscriptions );
368
+ return subscriptions .size () == consumerCount
369
+ && subscriptions .stream ()
370
+ .filter (sub -> sub .state ().startsWith ("active" ))
371
+ .count ()
372
+ == 1
373
+ && subscriptions .stream ()
374
+ .filter (sub -> sub .state ().startsWith ("waiting" ))
375
+ .count ()
376
+ == 2 ;
377
+ },
378
+ () ->
379
+ "Group consumers not in expected state: "
380
+ + Cli .listGroupConsumers (streamArg , app ));
381
+ return null ;
382
+ };
383
+
384
+ checkConsumers .call ();
385
+
386
+ restartCluster ();
387
+
388
+ Thread .sleep (BACK_OFF_DELAY_POLICY .delay (0 ).toMillis ());
389
+
390
+ sync = pState .waitForNewMessages (100 );
391
+ assertThat (sync ).completes (ASSERTION_TIMEOUT );
392
+ int activeIndex =
393
+ consumerStatus .entrySet ().stream ()
394
+ .filter (Map .Entry ::getValue )
395
+ .map (Map .Entry ::getKey )
396
+ .findFirst ()
397
+ .orElseThrow (() -> new IllegalStateException ("No active consumer found" ));
398
+
399
+ sync = consumers .get (activeIndex ).waitForNewMessages (100 );
400
+ assertThat (sync ).completes (ASSERTION_TIMEOUT );
401
+
402
+ checkConsumers .call ();
403
+
404
+ } finally {
405
+ if (pState != null ) {
406
+ pState .close ();
407
+ }
408
+ consumers .forEach (ConsumerState ::close );
409
+ if (superStream ) {
410
+ environment .deleteSuperStream (s );
411
+ } else {
412
+ environment .deleteStream (s );
413
+ }
414
+ }
415
+ }
416
+
294
417
private static class ProducerState implements AutoCloseable {
295
418
419
+ private static final AtomicLong MSG_ID_SEQ = new AtomicLong (0 );
420
+
296
421
private static final byte [] BODY = "hello" .getBytes (StandardCharsets .UTF_8 );
297
422
298
423
private final String stream ;
@@ -306,9 +431,19 @@ private static class ProducerState implements AutoCloseable {
306
431
final AtomicReference <Instant > lastExceptionInstant = new AtomicReference <>();
307
432
308
433
private ProducerState (String stream , boolean dynamicBatch , Environment environment ) {
434
+ this (stream , dynamicBatch , false , environment );
435
+ }
436
+
437
+ private ProducerState (
438
+ String stream , boolean dynamicBatch , boolean superStream , Environment environment ) {
309
439
this .stream = stream ;
310
- this .producer =
311
- environment .producerBuilder ().stream (stream ).dynamicBatch (dynamicBatch ).build ();
440
+ ProducerBuilder builder = environment .producerBuilder ().dynamicBatch (dynamicBatch );
441
+ if (superStream ) {
442
+ builder .superStream (stream ).routing (m -> m .getProperties ().getMessageIdAsString ());
443
+ } else {
444
+ builder .stream (stream );
445
+ }
446
+ this .producer = builder .build ();
312
447
}
313
448
314
449
void start () {
@@ -327,7 +462,14 @@ void start() {
327
462
try {
328
463
this .limiter .acquire (1 );
329
464
this .producer .send (
330
- producer .messageBuilder ().addData (BODY ).build (), confirmationHandler );
465
+ producer
466
+ .messageBuilder ()
467
+ .properties ()
468
+ .messageId (MSG_ID_SEQ .getAndIncrement ())
469
+ .messageBuilder ()
470
+ .addData (BODY )
471
+ .build (),
472
+ confirmationHandler );
331
473
} catch (Throwable e ) {
332
474
this .lastException .set (e );
333
475
this .lastExceptionInstant .set (Instant .now ());
@@ -380,16 +522,27 @@ private static class ConsumerState implements AutoCloseable {
380
522
final AtomicReference <Runnable > postHandle = new AtomicReference <>(() -> {});
381
523
382
524
private ConsumerState (String stream , Environment environment ) {
525
+ this (stream , environment , b -> b .stream (stream ), (ctx , m ) -> {});
526
+ }
527
+
528
+ private ConsumerState (
529
+ String stream ,
530
+ Environment environment ,
531
+ java .util .function .Consumer <ConsumerBuilder > customizer ,
532
+ MessageHandler delegateHandler ) {
383
533
this .stream = stream ;
384
- this .consumer =
385
- environment .consumerBuilder ().stream (stream )
534
+ ConsumerBuilder builder =
535
+ environment
536
+ .consumerBuilder ()
386
537
.offset (OffsetSpecification .first ())
387
538
.messageHandler (
388
539
(ctx , m ) -> {
540
+ delegateHandler .handle (ctx , m );
389
541
receivedCount .incrementAndGet ();
390
542
postHandle .get ().run ();
391
- })
392
- .build ();
543
+ });
544
+ customizer .accept (builder );
545
+ this .consumer = builder .build ();
393
546
}
394
547
395
548
Sync waitForNewMessages (int messageCount ) {
@@ -414,4 +567,16 @@ public void close() {
414
567
this .consumer .close ();
415
568
}
416
569
}
570
+
571
+ private static void restartCluster () {
572
+ nodes .forEach (
573
+ n -> {
574
+ LOGGER .info ("Restarting node {}..." , n );
575
+ Cli .restartNode (n );
576
+ LOGGER .info ("Restarted node {}." , n );
577
+ });
578
+ LOGGER .info ("Rebalancing..." );
579
+ Cli .rebalance ();
580
+ LOGGER .info ("Rebalancing over." );
581
+ }
417
582
}
0 commit comments