34
34
import static io .rsocket .frame .FrameType .REQUEST_N ;
35
35
import static io .rsocket .frame .FrameType .REQUEST_RESPONSE ;
36
36
import static io .rsocket .frame .FrameType .REQUEST_STREAM ;
37
- import static org .hamcrest .MatcherAssert .assertThat ;
38
- import static org .hamcrest .Matchers .anyOf ;
39
- import static org .hamcrest .Matchers .empty ;
40
- import static org .hamcrest .Matchers .is ;
37
+ import static org .assertj .core .api .Assertions .assertThat ;
41
38
42
39
import io .netty .buffer .ByteBuf ;
43
40
import io .netty .buffer .ByteBufAllocator ;
76
73
import java .util .concurrent .atomic .AtomicBoolean ;
77
74
import java .util .concurrent .atomic .AtomicReference ;
78
75
import java .util .stream .Stream ;
79
- import org .assertj .core .api .Assertions ;
80
76
import org .assertj .core .api .Assumptions ;
81
77
import org .junit .jupiter .api .AfterEach ;
82
78
import org .junit .jupiter .api .BeforeEach ;
@@ -126,12 +122,13 @@ public void testHandleKeepAlive() {
126
122
rule .connection .addToReceivedBuffer (
127
123
KeepAliveFrameCodec .encode (rule .alloc (), true , 0 , Unpooled .EMPTY_BUFFER ));
128
124
ByteBuf sent = rule .connection .awaitFrame ();
129
- assertThat ("Unexpected frame sent." , frameType (sent ), is (FrameType .KEEPALIVE ));
125
+ assertThat (frameType (sent ))
126
+ .describedAs ("Unexpected frame sent." )
127
+ .isEqualTo (FrameType .KEEPALIVE );
130
128
/*Keep alive ack must not have respond flag else, it will result in infinite ping-pong of keep alive frames.*/
131
- assertThat (
132
- "Unexpected keep-alive frame respond flag." ,
133
- KeepAliveFrameCodec .respondFlag (sent ),
134
- is (false ));
129
+ assertThat (KeepAliveFrameCodec .respondFlag (sent ))
130
+ .describedAs ("Unexpected keep-alive frame respond flag." )
131
+ .isEqualTo (false );
135
132
}
136
133
137
134
@ Test
@@ -149,10 +146,9 @@ public Mono<Payload> requestResponse(Payload payload) {
149
146
});
150
147
rule .sendRequest (streamId , FrameType .REQUEST_RESPONSE );
151
148
testPublisher .complete ();
152
- assertThat (
153
- "Unexpected frame sent." ,
154
- frameType (rule .connection .awaitFrame ()),
155
- anyOf (is (FrameType .COMPLETE ), is (FrameType .NEXT_COMPLETE )));
149
+ assertThat (frameType (rule .connection .awaitFrame ()))
150
+ .describedAs ("Unexpected frame sent." )
151
+ .isIn (FrameType .COMPLETE , FrameType .NEXT_COMPLETE );
156
152
testPublisher .assertWasNotCancelled ();
157
153
}
158
154
@@ -162,8 +158,9 @@ public void testHandlerEmitsError() {
162
158
final int streamId = 4 ;
163
159
rule .prefetch = 1 ;
164
160
rule .sendRequest (streamId , FrameType .REQUEST_STREAM );
165
- assertThat (
166
- "Unexpected frame sent." , frameType (rule .connection .awaitFrame ()), is (FrameType .ERROR ));
161
+ assertThat (frameType (rule .connection .awaitFrame ()))
162
+ .describedAs ("Unexpected frame sent." )
163
+ .isEqualTo (FrameType .ERROR );
167
164
}
168
165
169
166
@ Test
@@ -182,12 +179,12 @@ public Mono<Payload> requestResponse(Payload payload) {
182
179
});
183
180
rule .sendRequest (streamId , FrameType .REQUEST_RESPONSE );
184
181
185
- assertThat ("Unexpected frame sent." , rule . connection . getSent (), is ( empty ()) );
182
+ assertThat (rule . connection . getSent ()). describedAs ( "Unexpected frame sent." ). isEmpty ( );
186
183
187
184
rule .connection .addToReceivedBuffer (CancelFrameCodec .encode (allocator , streamId ));
188
185
189
- assertThat ("Unexpected frame sent." , rule . connection . getSent (), is ( empty ()) );
190
- assertThat ("Subscription not cancelled." , cancelled . get (), is ( true ) );
186
+ assertThat (rule . connection . getSent ()). describedAs ( "Unexpected frame sent." ). isEmpty ( );
187
+ assertThat (cancelled . get ()). describedAs ( "Subscription not cancelled." ). isTrue ( );
191
188
rule .assertHasNoLeaks ();
192
189
}
193
190
@@ -243,7 +240,7 @@ protected void hookOnSubscribe(Subscription subscription) {
243
240
for (Runnable runnable : runnables ) {
244
241
rule .connection .clearSendReceiveBuffers ();
245
242
runnable .run ();
246
- Assertions . assertThat (rule .connection .getSent ())
243
+ assertThat (rule .connection .getSent ())
247
244
.hasSize (1 )
248
245
.first ()
249
246
.matches (bb -> FrameHeaderCodec .frameType (bb ) == FrameType .ERROR )
@@ -253,7 +250,7 @@ protected void hookOnSubscribe(Subscription subscription) {
253
250
.contains (String .format (INVALID_PAYLOAD_ERROR_MESSAGE , maxFrameLength )))
254
251
.matches (ReferenceCounted ::release );
255
252
256
- assertThat ("Subscription not cancelled." , cancelled . get (), is ( true ) );
253
+ assertThat (cancelled . get ()). describedAs ( "Subscription not cancelled." ). isTrue ( );
257
254
}
258
255
259
256
rule .assertHasNoLeaks ();
@@ -308,9 +305,9 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
308
305
sink .tryEmitEmpty ();
309
306
});
310
307
311
- Assertions . assertThat (assertSubscriber .values ()).allMatch (ReferenceCounted ::release );
308
+ assertThat (assertSubscriber .values ()).allMatch (ReferenceCounted ::release );
312
309
313
- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
310
+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
314
311
315
312
rule .assertHasNoLeaks ();
316
313
testRequestInterceptor .expectOnStart (1 , REQUEST_CHANNEL ).expectOnComplete (1 ).expectNothing ();
@@ -353,7 +350,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
353
350
sink .complete ();
354
351
});
355
352
356
- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
353
+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
357
354
358
355
rule .assertHasNoLeaks ();
359
356
testRequestInterceptor .expectOnStart (1 , REQUEST_CHANNEL ).expectOnCancel (1 ).expectNothing ();
@@ -398,7 +395,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
398
395
sink .complete ();
399
396
});
400
397
401
- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
398
+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
402
399
testRequestInterceptor .expectOnStart (1 , REQUEST_CHANNEL ).expectOnCancel (1 ).expectNothing ();
403
400
rule .assertHasNoLeaks ();
404
401
}
@@ -483,13 +480,13 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
483
480
sink .error (new RuntimeException ());
484
481
});
485
482
486
- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
483
+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
487
484
488
485
assertSubscriber
489
486
.assertTerminated ()
490
487
.assertError (CancellationException .class )
491
488
.assertErrorMessage ("Outbound has terminated with an error" );
492
- Assertions . assertThat (assertSubscriber .values ())
489
+ assertThat (assertSubscriber .values ())
493
490
.allMatch (
494
491
msg -> {
495
492
ReferenceCountUtil .safeRelease (msg );
@@ -531,7 +528,7 @@ public Flux<Payload> requestStream(Payload payload) {
531
528
sink .next (ByteBufPayload .create ("d3" , "m3" ));
532
529
});
533
530
534
- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
531
+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
535
532
536
533
rule .assertHasNoLeaks ();
537
534
@@ -573,15 +570,15 @@ public void subscribe(CoreSubscriber<? super Payload> actual) {
573
570
sources [0 ].complete (ByteBufPayload .create ("d1" , "m1" ));
574
571
});
575
572
576
- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
573
+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
577
574
578
575
rule .assertHasNoLeaks ();
579
576
580
577
testRequestInterceptor
581
578
.expectOnStart (1 , REQUEST_RESPONSE )
582
579
.assertNext (
583
580
e ->
584
- Assertions . assertThat (e .eventType )
581
+ assertThat (e .eventType )
585
582
.isIn (
586
583
TestRequestInterceptor .EventType .ON_COMPLETE ,
587
584
TestRequestInterceptor .EventType .ON_CANCEL ))
@@ -614,7 +611,7 @@ public Flux<Payload> requestStream(Payload payload) {
614
611
sink .next (ByteBufPayload .create ("d3" , "m3" ));
615
612
rule .connection .addToReceivedBuffer (cancelFrame );
616
613
617
- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
614
+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
618
615
619
616
rule .assertHasNoLeaks ();
620
617
}
@@ -660,7 +657,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
660
657
661
658
rule .connection .addToReceivedBuffer (cancelFrame );
662
659
663
- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
660
+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
664
661
665
662
rule .assertHasNoLeaks ();
666
663
}
@@ -730,17 +727,15 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
730
727
}
731
728
732
729
if (responsesCnt > 0 ) {
733
- Assertions .assertThat (
734
- rule .connection .getSent ().stream ().filter (bb -> frameType (bb ) != REQUEST_N ))
730
+ assertThat (rule .connection .getSent ().stream ().filter (bb -> frameType (bb ) != REQUEST_N ))
735
731
.describedAs (
736
732
"Interaction Type :[%s]. Expected to observe %s frames sent" , frameType , responsesCnt )
737
733
.hasSize (responsesCnt )
738
734
.allMatch (bb -> !FrameHeaderCodec .hasMetadata (bb ));
739
735
}
740
736
741
737
if (framesCnt > 1 ) {
742
- Assertions .assertThat (
743
- rule .connection .getSent ().stream ().filter (bb -> frameType (bb ) == REQUEST_N ))
738
+ assertThat (rule .connection .getSent ().stream ().filter (bb -> frameType (bb ) == REQUEST_N ))
744
739
.describedAs (
745
740
"Interaction Type :[%s]. Expected to observe single RequestN(%s) frame" ,
746
741
frameType , framesCnt - 1 )
@@ -749,9 +744,9 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
749
744
.matches (bb -> RequestNFrameCodec .requestN (bb ) == (framesCnt - 1 ));
750
745
}
751
746
752
- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
747
+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
753
748
754
- Assertions . assertThat (assertSubscriber .awaitAndAssertNextValueCount (framesCnt ).values ())
749
+ assertThat (assertSubscriber .awaitAndAssertNextValueCount (framesCnt ).values ())
755
750
.hasSize (framesCnt )
756
751
.allMatch (p -> !p .hasMetadata ())
757
752
.allMatch (ReferenceCounted ::release );
@@ -796,7 +791,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
796
791
797
792
rule .sendRequest (1 , frameType );
798
793
799
- Assertions . assertThat (rule .connection .getSent ())
794
+ assertThat (rule .connection .getSent ())
800
795
.hasSize (1 )
801
796
.first ()
802
797
.matches (
@@ -837,13 +832,13 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
837
832
rule .connection .addToReceivedBuffer (
838
833
ErrorFrameCodec .encode (rule .alloc (), 1 , new RuntimeException ("test" )));
839
834
840
- Assertions . assertThat (rule .connection .getSent ())
835
+ assertThat (rule .connection .getSent ())
841
836
.hasSize (1 )
842
837
.first ()
843
838
.matches (bb -> FrameHeaderCodec .frameType (bb ) == REQUEST_N )
844
839
.matches (ReferenceCounted ::release );
845
840
846
- Assertions . assertThat (rule .socket .isDisposed ()).isFalse ();
841
+ assertThat (rule .socket .isDisposed ()).isFalse ();
847
842
testPublisher .assertWasCancelled ();
848
843
849
844
rule .assertHasNoLeaks ();
0 commit comments