Skip to content

Commit a0440f4

Browse files
committed
wip
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 9722189 commit a0440f4

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.util.concurrent.CancellationException;
2424
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2525
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
26+
import java.util.function.Consumer;
27+
import java.util.function.Function;
2628
import java.util.stream.Stream;
27-
import org.reactivestreams.Subscription;
29+
import org.reactivestreams.Suwipbscription;
2830
import reactor.core.CoreSubscriber;
2931
import reactor.core.Disposable;
3032
import reactor.core.Exceptions;
@@ -97,7 +99,7 @@ public UnboundedProcessor() {
9799
this(() -> {});
98100
}
99101

100-
public UnboundedProcessor(Runnable onFinalizedHook) {
102+
public UnboundedProcessor(Runnable onFinalizedHook, Consumer<ByteBuf> onValueDelivered) {
101103
this.onFinalizedHook = onFinalizedHook;
102104
this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
103105
this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
@@ -121,6 +123,9 @@ public Object scanUnsafe(Attr key) {
121123
return null;
122124
}
123125

126+
public boolean tryEmitNext
127+
128+
@Deprecated
124129
public void onNextPrioritized(ByteBuf t) {
125130
if (this.done || this.cancelled) {
126131
release(t);
@@ -157,6 +162,7 @@ public void onNextPrioritized(ByteBuf t) {
157162
}
158163

159164
@Override
165+
@Deprecated
160166
public void onNext(ByteBuf t) {
161167
if (this.done || this.cancelled) {
162168
release(t);
@@ -193,6 +199,7 @@ public void onNext(ByteBuf t) {
193199
}
194200

195201
@Override
202+
@Deprecated
196203
public void onError(Throwable t) {
197204
if (this.done || this.cancelled) {
198205
Operators.onErrorDropped(t, currentContext());
@@ -235,6 +242,7 @@ public void onError(Throwable t) {
235242
}
236243

237244
@Override
245+
@Deprecated
238246
public void onComplete() {
239247
if (this.done || this.cancelled) {
240248
return;

0 commit comments

Comments
 (0)