Skip to content

Commit 59c04e2

Browse files
OlegDokukarstoyanchev
authored and
OlegDokuka
committed
wip
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Co-authored-by: Rossen Stoyanchev <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 7c968c3 commit 59c04e2

File tree

7 files changed

+114
-29
lines changed

7 files changed

+114
-29
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -656,8 +656,16 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
656656
requesterLeaseTracker = null;
657657
}
658658

659-
Sinks.Empty<Void> requesterSink = Sinks.empty();
660-
Sinks.Empty<Void> responderSink = Sinks.empty();
659+
final Sinks.Empty<Void> requesterOnGracefulShutdownSink =
660+
Sinks.unsafe().empty();
661+
final Sinks.Empty<Void> responderOnGracefulShutdownSink =
662+
Sinks.unsafe().empty();
663+
final Sinks.Empty<Void> requesterOnAllClosedSink =
664+
Sinks.unsafe().empty();
665+
final Sinks.Empty<Void> responderOnAllClosedSink =
666+
Sinks.unsafe().empty();
667+
final Sinks.Empty<Void> requesterGracefulShutdownStartedSink =
668+
Sinks.unsafe().empty();
661669

662670
RSocket rSocketRequester =
663671
new RSocketRequester(
@@ -672,8 +680,15 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
672680
keepAliveHandler,
673681
interceptors::initRequesterRequestInterceptor,
674682
requesterLeaseTracker,
675-
requesterSink,
676-
requesterSink.asMono().and(responderSink.asMono()));
683+
requesterGracefulShutdownStartedSink,
684+
requesterOnGracefulShutdownSink,
685+
requesterOnAllClosedSink,
686+
Mono.whenDelayError(
687+
responderOnGracefulShutdownSink.asMono(),
688+
requesterOnGracefulShutdownSink.asMono()),
689+
Mono.whenDelayError(
690+
responderOnAllClosedSink.asMono(),
691+
requesterOnAllClosedSink.asMono()));
677692

678693
RSocket wrappedRSocketRequester =
679694
interceptors.initRequester(rSocketRequester);
@@ -721,7 +736,10 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
721736
(RequestInterceptor)
722737
leases.sender)
723738
: interceptors
724-
::initResponderRequestInterceptor);
739+
::initResponderRequestInterceptor,
740+
responderOnGracefulShutdownSink,
741+
responderOnAllClosedSink,
742+
requesterGracefulShutdownStartedSink.asMono());
725743

726744
return wrappedRSocketRequester;
727745
})

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.rsocket.DuplexConnection;
2424
import io.rsocket.Payload;
2525
import io.rsocket.RSocket;
26+
import io.rsocket.exceptions.ConnectionCloseException;
2627
import io.rsocket.exceptions.ConnectionErrorException;
2728
import io.rsocket.exceptions.Exceptions;
2829
import io.rsocket.frame.ErrorFrameCodec;
@@ -66,7 +67,10 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
6667
RSocketRequester.class, Throwable.class, "terminationError");
6768

6869
@Nullable private final RequesterLeaseTracker requesterLeaseTracker;
69-
private Mono<Void> onAllClose;
70+
71+
private final Sinks.Empty<Void> onGracefulShutdownStartedSink;
72+
private final Sinks.Empty<Void> onThisSideClosedSink;
73+
private final Mono<Void> onAllClosed;
7074
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
7175

7276
RSocketRequester(
@@ -81,8 +85,11 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
8185
@Nullable KeepAliveHandler keepAliveHandler,
8286
Function<RSocket, RequestInterceptor> requestInterceptorFunction,
8387
@Nullable RequesterLeaseTracker requesterLeaseTracker,
84-
Sinks.Empty<Void> onClose,
85-
Mono<Void> onAllClose) {
88+
Sinks.Empty<Void> onGracefulShutdownStartedSink,
89+
Sinks.Empty<Void> onGracefulShutdownSink,
90+
Sinks.Empty<Void> onThisSideClosedSink,
91+
Mono<Void> onGracefulShutdownDone,
92+
Mono<Void> onAllClosed) {
8693
super(
8794
mtu,
8895
maxFrameLength,
@@ -91,13 +98,16 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
9198
connection,
9299
streamIdSupplier,
93100
requestInterceptorFunction,
94-
onClose);
101+
onGracefulShutdownSink);
95102

96103
this.requesterLeaseTracker = requesterLeaseTracker;
97-
this.onAllClose = onAllClose;
104+
this.onGracefulShutdownStartedSink = onGracefulShutdownStartedSink;
105+
this.onThisSideClosedSink = onThisSideClosedSink;
106+
this.onAllClosed = onAllClosed;
98107

99108
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
100-
connection.onClose().subscribe(null, this::tryTerminateOnConnectionError, this::tryShutdown);
109+
connection.onClose().subscribe(null, this::tryShutdown, this::tryShutdown);
110+
onGracefulShutdownDone.subscribe(null, null, connection::dispose);
101111

102112
connection.receive().subscribe(this::handleIncomingFrames, e -> {});
103113

@@ -196,6 +206,7 @@ public void dispose() {
196206

197207
@Override
198208
public void disposeGracefully() {
209+
this.onGracefulShutdownStartedSink.tryEmitEmpty();
199210
super.terminate();
200211
}
201212

@@ -206,7 +217,7 @@ public boolean isDisposed() {
206217

207218
@Override
208219
public Mono<Void> onClose() {
209-
return onAllClose;
220+
return onAllClosed;
210221
}
211222

212223
private void handleIncomingFrames(ByteBuf frame) {
@@ -313,8 +324,12 @@ private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) {
313324
String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis())));
314325
}
315326

316-
private void tryTerminateOnConnectionError(Throwable e) {
317-
tryTerminate(() -> e);
327+
private void tryShutdown(Throwable e) {
328+
if (terminationError == null) {
329+
if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
330+
terminate(CLOSED_CHANNEL_EXCEPTION);
331+
}
332+
}
318333
}
319334

320335
private void tryTerminateOnZeroError(ByteBuf errorFrame) {
@@ -324,6 +339,12 @@ private void tryTerminateOnZeroError(ByteBuf errorFrame) {
324339
private void tryTerminate(Supplier<Throwable> errorSupplier) {
325340
if (terminationError == null) {
326341
Throwable e = errorSupplier.get();
342+
343+
if (e instanceof ConnectionCloseException) {
344+
this.onGracefulShutdownStartedSink.tryEmitEmpty();
345+
super.terminate();
346+
return;
347+
}
327348
if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
328349
terminate(e);
329350
}
@@ -368,9 +389,9 @@ private void terminate(Throwable e) {
368389
}
369390

370391
if (e == CLOSED_CHANNEL_EXCEPTION) {
371-
onClose.tryEmitEmpty();
392+
onThisSideClosedSink.tryEmitEmpty();
372393
} else {
373-
onClose.tryEmitError(e);
394+
onThisSideClosedSink.tryEmitError(e);
374395
}
375396
}
376397
}

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.slf4j.LoggerFactory;
4545
import reactor.core.publisher.Flux;
4646
import reactor.core.publisher.Mono;
47+
import reactor.core.publisher.Sinks;
4748
import reactor.util.annotation.Nullable;
4849

4950
/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
@@ -54,6 +55,8 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
5455
private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
5556

5657
private final RSocket requestHandler;
58+
private final Sinks.Empty<Void> onThisSideClosedSink;
59+
private final Mono<Void> onRequesterGracefullShutdownStarted;
5760

5861
@Nullable private final ResponderLeaseTracker leaseHandler;
5962

@@ -70,25 +73,38 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
7073
int mtu,
7174
int maxFrameLength,
7275
int maxInboundPayloadSize,
73-
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction) {
76+
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction,
77+
Sinks.Empty<Void> onGracefulShutdownSink,
78+
Sinks.Empty<Void> onThisSideClosedSink,
79+
Mono<Void> onRequesterGracefulShutdownStarted) {
7480
super(
7581
mtu,
7682
maxFrameLength,
7783
maxInboundPayloadSize,
7884
payloadDecoder,
7985
connection,
8086
null,
81-
requestInterceptorFunction);
87+
requestInterceptorFunction,
88+
onGracefulShutdownSink);
8289

8390
this.requestHandler = requestHandler;
8491

8592
this.leaseHandler = leaseHandler;
86-
87-
connection.receive().subscribe(this::handleFrame, e -> {});
93+
this.onThisSideClosedSink = onThisSideClosedSink;
94+
this.onRequesterGracefullShutdownStarted = onRequesterGracefulShutdownStarted;
8895

8996
connection
9097
.onClose()
9198
.subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
99+
100+
onRequesterGracefulShutdownStarted.subscribe(null, null, this::onGracefulShutdownStarted);
101+
102+
connection.receive().subscribe(this::handleFrame, e -> {});
103+
}
104+
105+
private void onGracefulShutdownStarted() {
106+
super.terminate();
107+
requestHandler.disposeGracefully();
92108
}
93109

94110
private void tryTerminateOnConnectionError(Throwable e) {
@@ -183,6 +199,7 @@ final void doOnDispose() {
183199
}
184200

185201
requestHandler.dispose();
202+
onThisSideClosedSink.tryEmitEmpty();
186203
}
187204

188205
private void cleanUpSendingSubscriptions() {

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.function.Consumer;
4747
import java.util.function.Supplier;
4848
import reactor.core.publisher.Mono;
49+
import reactor.core.publisher.Sinks;
4950

5051
/**
5152
* The main class for starting an RSocket server.
@@ -437,6 +438,12 @@ private Mono<Void> acceptSetup(
437438
requesterLeaseTracker = null;
438439
}
439440

441+
final Sinks.Empty<Void> requesterOnGracefulShutdownSink = Sinks.unsafe().empty();
442+
final Sinks.Empty<Void> responderOnGracefulShutdownSink = Sinks.unsafe().empty();
443+
final Sinks.Empty<Void> requesterOnAllClosedSink = Sinks.unsafe().empty();
444+
final Sinks.Empty<Void> responderOnAllClosedSink = Sinks.unsafe().empty();
445+
final Sinks.Empty<Void> requesterGracefulShutdownStartedSink = Sinks.unsafe().empty();
446+
440447
RSocket rSocketRequester =
441448
new RSocketRequester(
442449
multiplexer.asServerConnection(),
@@ -449,7 +456,15 @@ private Mono<Void> acceptSetup(
449456
setupPayload.keepAliveMaxLifetime(),
450457
keepAliveHandler,
451458
interceptors::initRequesterRequestInterceptor,
452-
requesterLeaseTracker);
459+
requesterLeaseTracker,
460+
requesterGracefulShutdownStartedSink,
461+
requesterOnGracefulShutdownSink,
462+
requesterOnAllClosedSink,
463+
Mono.whenDelayError(
464+
responderOnGracefulShutdownSink.asMono(),
465+
requesterOnGracefulShutdownSink.asMono()),
466+
Mono.whenDelayError(
467+
responderOnAllClosedSink.asMono(), requesterOnAllClosedSink.asMono()));
453468

454469
RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester);
455470

@@ -481,7 +496,10 @@ private Mono<Void> acceptSetup(
481496
? rSocket ->
482497
interceptors.initResponderRequestInterceptor(
483498
rSocket, (RequestInterceptor) leases.sender)
484-
: interceptors::initResponderRequestInterceptor);
499+
: interceptors::initResponderRequestInterceptor,
500+
responderOnGracefulShutdownSink,
501+
responderOnAllClosedSink,
502+
requesterGracefulShutdownStartedSink.asMono());
485503
})
486504
.doFinally(signalType -> setupPayload.release())
487505
.then();

rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class RequesterResponderSupport {
2121
private final PayloadDecoder payloadDecoder;
2222
private final ByteBufAllocator allocator;
2323
private final DuplexConnection connection;
24-
private final Sinks.Empty<Void> onClose;
24+
private final Sinks.Empty<Void> onGracefulShutdownSink;
2525
@Nullable private final RequestInterceptor requestInterceptor;
2626

2727
@Nullable final StreamIdSupplier streamIdSupplier;
@@ -38,7 +38,7 @@ public RequesterResponderSupport(
3838
DuplexConnection connection,
3939
@Nullable StreamIdSupplier streamIdSupplier,
4040
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction,
41-
Sinks.Empty<Void> onClose) {
41+
Sinks.Empty<Void> onGracefulShutdownSink) {
4242

4343
this.activeStreams = new IntObjectHashMap<>();
4444
this.mtu = mtu;
@@ -48,7 +48,7 @@ public RequesterResponderSupport(
4848
this.allocator = connection.alloc();
4949
this.streamIdSupplier = streamIdSupplier;
5050
this.connection = connection;
51-
this.onClose = onClose;
51+
this.onGracefulShutdownSink = onGracefulShutdownSink;
5252
this.requestInterceptor = requestInterceptorFunction.apply((RSocket) this);
5353
}
5454

@@ -186,7 +186,7 @@ public boolean remove(int streamId, FrameHandler frameHandler) {
186186
}
187187

188188
if (terminated) {
189-
onClose.tryEmitEmpty();
189+
onGracefulShutdownSink.tryEmitEmpty();
190190
}
191191
return true;
192192
}
@@ -205,7 +205,7 @@ public void terminate() {
205205
}
206206

207207
if (terminated) {
208-
onClose.tryEmitEmpty();
208+
onGracefulShutdownSink.tryEmitEmpty();
209209
}
210210
}
211211
}

rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,12 @@ public static class ClientSocketRule extends AbstractSocketRule<RSocket> {
643643
protected Runnable delayer;
644644
protected Sinks.One<RSocket> producer;
645645

646+
protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
647+
648+
protected Sinks.Empty<Void> otherGracefulShutdownSink;
649+
650+
protected Sinks.Empty<Void> thisGracefulShutdownSink;
651+
646652
@Override
647653
protected void doInit() {
648654
super.doInit();
@@ -671,7 +677,10 @@ protected RSocket newRSocket() {
671677
Integer.MAX_VALUE,
672678
null,
673679
__ -> null,
674-
null);
680+
null,
681+
onGracefulShutdownStartedSink,
682+
thisGracefulShutdownSink,
683+
);
675684
}
676685
}
677686
}

rsocket-core/src/test/java/io/rsocket/core/TestRequesterResponderSupport.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.ThreadLocalRandom;
3535
import org.assertj.core.api.Assertions;
3636
import reactor.core.Exceptions;
37+
import reactor.core.publisher.Sinks;
3738
import reactor.util.annotation.Nullable;
3839

3940
final class TestRequesterResponderSupport extends RequesterResponderSupport implements RSocket {
@@ -58,7 +59,8 @@ final class TestRequesterResponderSupport extends RequesterResponderSupport impl
5859
PayloadDecoder.ZERO_COPY,
5960
connection,
6061
streamIdSupplier,
61-
(__) -> requestInterceptor);
62+
(__) -> requestInterceptor,
63+
Sinks.empty());
6264
this.error = error;
6365
}
6466

0 commit comments

Comments
 (0)