Skip to content

Commit 608c9eb

Browse files
author
OlegDokuka
committed
fixes with UnboundedProcessor
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: OlegDokuka <[email protected]>
1 parent a0440f4 commit 608c9eb

File tree

9 files changed

+214
-57
lines changed

9 files changed

+214
-57
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ subprojects {
3333
apply plugin: 'com.github.sherter.google-java-format'
3434
apply plugin: 'com.github.vlsi.gradle-extensions'
3535

36-
ext['reactor-bom.version'] = '2020.0.25-SNAPSHOT'
36+
ext['reactor-bom.version'] = '2020.0.25'
3737
ext['logback.version'] = '1.2.10'
3838
ext['netty-bom.version'] = '4.1.85.Final'
3939
ext['netty-boringssl.version'] = '2.0.54.Final'
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-7.3-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists

rsocket-core/src/jcstress/java/io/rsocket/internal/UnboundedProcessorStressTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,70 @@ public void arbiter(LLL_Result r) {
895895
}
896896
}
897897

898+
@JCStressTest
899+
@Outcome(
900+
id = {
901+
"0, 1, 0, 5",
902+
"1, 1, 0, 5",
903+
"2, 1, 0, 5",
904+
"3, 1, 0, 5",
905+
"4, 1, 0, 5",
906+
"5, 1, 0, 5",
907+
},
908+
expect = Expect.ACCEPTABLE,
909+
desc = "onComplete()")
910+
@State
911+
public static class Smoke33StressTest extends UnboundedProcessorStressTest {
912+
913+
final StressSubscriber<ByteBuf> stressSubscriber =
914+
new StressSubscriber<>(Long.MAX_VALUE, Fuseable.NONE);
915+
final ByteBuf byteBuf1 = UnpooledByteBufAllocator.DEFAULT.buffer().writeByte(1);
916+
final ByteBuf byteBuf2 = UnpooledByteBufAllocator.DEFAULT.buffer().writeByte(2);
917+
final ByteBuf byteBuf3 = UnpooledByteBufAllocator.DEFAULT.buffer().writeByte(3);
918+
final ByteBuf byteBuf4 = UnpooledByteBufAllocator.DEFAULT.buffer().writeByte(4);
919+
final ByteBuf byteBuf5 = UnpooledByteBufAllocator.DEFAULT.buffer().writeByte(5);
920+
921+
{
922+
unboundedProcessor.subscribe(stressSubscriber);
923+
}
924+
925+
@Actor
926+
public void next1() {
927+
unboundedProcessor.tryEmitNormal(byteBuf1);
928+
unboundedProcessor.tryEmitPrioritized(byteBuf2);
929+
}
930+
931+
@Actor
932+
public void next2() {
933+
unboundedProcessor.tryEmitPrioritized(byteBuf3);
934+
unboundedProcessor.tryEmitNormal(byteBuf4);
935+
}
936+
937+
@Actor
938+
public void complete() {
939+
unboundedProcessor.tryEmitFinal(byteBuf5);
940+
}
941+
942+
@Arbiter
943+
public void arbiter(LLLL_Result r) {
944+
r.r1 = stressSubscriber.onNextCalls;
945+
r.r2 =
946+
stressSubscriber.onCompleteCalls
947+
+ stressSubscriber.onErrorCalls * 2
948+
+ stressSubscriber.droppedErrors.size() * 3;
949+
950+
r.r4 = stressSubscriber.values.get(stressSubscriber.values.size() - 1).readByte();
951+
stressSubscriber.values.forEach(ByteBuf::release);
952+
953+
r.r3 =
954+
byteBuf1.refCnt()
955+
+ byteBuf2.refCnt()
956+
+ byteBuf3.refCnt()
957+
+ byteBuf4.refCnt()
958+
+ byteBuf5.refCnt();
959+
}
960+
}
961+
898962
@JCStressTest
899963
@Outcome(
900964
id = {

rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,22 @@
2424
public abstract class BaseDuplexConnection implements DuplexConnection {
2525

2626
protected final Sinks.Empty<Void> onClose = Sinks.empty();
27-
protected final UnboundedProcessor sender = new UnboundedProcessor(onClose::tryEmitEmpty);
27+
protected final UnboundedProcessor sender =
28+
new UnboundedProcessor(
29+
() -> {
30+
onClose.tryEmitEmpty();
31+
dispose();
32+
},
33+
(__) -> {});
2834

2935
public BaseDuplexConnection() {}
3036

3137
@Override
3238
public void sendFrame(int streamId, ByteBuf frame) {
3339
if (streamId == 0) {
34-
sender.onNextPrioritized(frame);
40+
sender.tryEmitPrioritized(frame);
3541
} else {
36-
sender.onNext(frame);
42+
sender.tryEmitNormal(frame);
3743
}
3844
}
3945

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 106 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@
2424
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2525
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2626
import java.util.function.Consumer;
27-
import java.util.function.Function;
2827
import java.util.stream.Stream;
29-
import org.reactivestreams.Suwipbscription;
28+
import org.reactivestreams.Subscription;
3029
import reactor.core.CoreSubscriber;
3130
import reactor.core.Disposable;
3231
import reactor.core.Exceptions;
@@ -93,10 +92,12 @@ public final class UnboundedProcessor extends Flux<ByteBuf>
9392
static final AtomicLongFieldUpdater<UnboundedProcessor> REQUESTED =
9493
AtomicLongFieldUpdater.newUpdater(UnboundedProcessor.class, "requested");
9594

95+
ByteBuf last;
96+
9697
boolean outputFused;
9798

9899
public UnboundedProcessor() {
99-
this(() -> {});
100+
this(() -> {}, __ -> {});
100101
}
101102

102103
public UnboundedProcessor(Runnable onFinalizedHook, Consumer<ByteBuf> onValueDelivered) {
@@ -123,79 +124,122 @@ public Object scanUnsafe(Attr key) {
123124
return null;
124125
}
125126

126-
public boolean tryEmitNext
127-
128-
@Deprecated
129-
public void onNextPrioritized(ByteBuf t) {
127+
public boolean tryEmitPrioritized(ByteBuf t) {
130128
if (this.done || this.cancelled) {
131129
release(t);
132-
return;
130+
return false;
133131
}
134132

135133
if (!this.priorityQueue.offer(t)) {
136134
onError(Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext()));
137135
release(t);
138-
return;
136+
return false;
139137
}
140138

141139
final long previousState = markValueAdded(this);
142140
if (isFinalized(previousState)) {
143141
this.clearSafely();
144-
return;
142+
return false;
145143
}
146144

147145
if (isSubscriberReady(previousState)) {
148146
if (this.outputFused) {
149147
// fast path for fusion
150148
this.actual.onNext(null);
151-
return;
149+
return true;
152150
}
153151

154152
if (isWorkInProgress(previousState)) {
155-
return;
153+
return true;
156154
}
157155

158156
if (hasRequest(previousState)) {
159157
drainRegular(previousState);
160158
}
161159
}
160+
return true;
162161
}
163162

164-
@Override
165-
@Deprecated
166-
public void onNext(ByteBuf t) {
163+
public boolean tryEmitNormal(ByteBuf t) {
167164
if (this.done || this.cancelled) {
168165
release(t);
169-
return;
166+
return false;
170167
}
171168

172169
if (!this.queue.offer(t)) {
173170
onError(Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext()));
174171
release(t);
175-
return;
172+
return false;
176173
}
177174

178175
final long previousState = markValueAdded(this);
179176
if (isFinalized(previousState)) {
180177
this.clearSafely();
181-
return;
178+
return false;
182179
}
183180

184181
if (isSubscriberReady(previousState)) {
185182
if (this.outputFused) {
186183
// fast path for fusion
187184
this.actual.onNext(null);
188-
return;
185+
return true;
189186
}
190187

191188
if (isWorkInProgress(previousState)) {
192-
return;
189+
return true;
193190
}
194191

195192
if (hasRequest(previousState)) {
196193
drainRegular(previousState);
197194
}
198195
}
196+
197+
return true;
198+
}
199+
200+
public boolean tryEmitFinal(ByteBuf t) {
201+
if (this.done || this.cancelled) {
202+
release(t);
203+
return false;
204+
}
205+
206+
this.done = true;
207+
this.last = t;
208+
209+
final long previousState = markValueAddedAndTerminated(this);
210+
if (isFinalized(previousState)) {
211+
this.clearSafely();
212+
return false;
213+
}
214+
215+
if (isSubscriberReady(previousState)) {
216+
if (this.outputFused) {
217+
// fast path for fusion
218+
this.actual.onNext(null);
219+
return true;
220+
}
221+
222+
if (isWorkInProgress(previousState)) {
223+
return true;
224+
}
225+
226+
if (hasRequest(previousState)) {
227+
drainRegular(previousState);
228+
}
229+
}
230+
231+
return true;
232+
}
233+
234+
@Deprecated
235+
public void onNextPrioritized(ByteBuf t) {
236+
tryEmitPrioritized(t);
237+
}
238+
239+
@Override
240+
@Deprecated
241+
public void onNext(ByteBuf t) {
242+
tryEmitNormal(t);
199243
}
200244

201245
@Override
@@ -371,6 +415,11 @@ boolean checkTerminated(boolean done, boolean empty, CoreSubscriber<? super Byte
371415
}
372416

373417
if (done && empty) {
418+
final ByteBuf last = this.last;
419+
if (last != null) {
420+
this.last = null;
421+
a.onNext(last);
422+
}
374423
clearAndFinalize(this);
375424
Throwable e = this.error;
376425
if (e != null) {
@@ -523,6 +572,7 @@ public void cancel() {
523572
}
524573

525574
@Override
575+
@Deprecated
526576
public void dispose() {
527577
this.cancelled = true;
528578

@@ -606,6 +656,12 @@ void clearUnsafely() {
606656
final Queue<ByteBuf> queue = this.queue;
607657
final Queue<ByteBuf> priorityQueue = this.priorityQueue;
608658

659+
final ByteBuf last = this.last;
660+
661+
if (last != null) {
662+
release(last);
663+
}
664+
609665
ByteBuf byteBuf;
610666
while ((byteBuf = queue.poll()) != null) {
611667
release(byteBuf);
@@ -753,6 +809,36 @@ static long markValueAdded(UnboundedProcessor instance) {
753809
}
754810
}
755811

812+
/**
813+
* Sets {@link #FLAG_HAS_VALUE} flag if it was not set before and if flags {@link
814+
* #FLAG_FINALIZED}, {@link #FLAG_CANCELLED}, {@link #FLAG_DISPOSED} are unset. Also, this method
815+
* increments number of work in progress (WIP) if {@link #FLAG_HAS_REQUEST} is set
816+
*
817+
* @return previous state
818+
*/
819+
static long markValueAddedAndTerminated(UnboundedProcessor instance) {
820+
for (; ; ) {
821+
final long state = instance.state;
822+
823+
if (isFinalized(state)) {
824+
return state;
825+
}
826+
827+
long nextState = state;
828+
if (isWorkInProgress(state)) {
829+
nextState = addWork(state);
830+
} else if (isSubscriberReady(state) && !instance.outputFused) {
831+
if (hasRequest(state)) {
832+
nextState = addWork(state);
833+
}
834+
}
835+
836+
if (STATE.compareAndSet(instance, state, nextState | FLAG_HAS_VALUE | FLAG_TERMINATED)) {
837+
return state;
838+
}
839+
}
840+
}
841+
756842
/**
757843
* Sets {@link #FLAG_TERMINATED} flag if it was not set before and if flags {@link
758844
* #FLAG_FINALIZED}, {@link #FLAG_CANCELLED}, {@link #FLAG_DISPOSED} are unset. Also, this method

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,10 @@ public Mono<DuplexConnection> connect() {
7979

8080
Sinks.One<Object> inSink = Sinks.one();
8181
Sinks.One<Object> outSink = Sinks.one();
82-
UnboundedProcessor in = new UnboundedProcessor(() -> inSink.tryEmitValue(inSink));
83-
UnboundedProcessor out = new UnboundedProcessor(() -> outSink.tryEmitValue(outSink));
82+
UnboundedProcessor in =
83+
new UnboundedProcessor(() -> inSink.tryEmitValue(inSink), (__) -> {});
84+
UnboundedProcessor out =
85+
new UnboundedProcessor(() -> outSink.tryEmitValue(outSink), (__) -> {});
8486

8587
Mono<Void> onClose = inSink.asMono().zipWith(outSink.asMono()).then();
8688

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,16 @@ public Flux<ByteBuf> receive() {
8989
@Override
9090
public void sendFrame(int streamId, ByteBuf frame) {
9191
if (streamId == 0) {
92-
out.onNextPrioritized(frame);
92+
out.tryEmitPrioritized(frame);
9393
} else {
94-
out.onNext(frame);
94+
out.tryEmitNormal(frame);
9595
}
9696
}
9797

9898
@Override
9999
public void sendErrorAndClose(RSocketErrorException e) {
100100
final ByteBuf errorFrame = ErrorFrameCodec.encode(allocator, 0, e);
101-
out.onNext(errorFrame);
102-
dispose();
101+
out.tryEmitFinal(errorFrame);
103102
}
104103

105104
@Override

0 commit comments

Comments
 (0)