Skip to content

Commit

Permalink
Completable#concatWith(Completable) remove atomic operation (#1000)
Browse files Browse the repository at this point in the history
Motivation:
Completable#concatWith(Completable) currently uses an atomic operation to transition subscribe() from the first Completable to the second. However this is done in the contex of a Subscriber and the events should be sequenced in a [serial](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.3) fashion.

Modifications:
- Completable#concatWith(Completable) to use a regular variable instead of volatile/atomic operation to switch subscribers

Result:
Less atomic operations in Completable#concatWith(Completable).
  • Loading branch information
Scottmitch authored Apr 3, 2020
1 parent dad967a commit 10ded56
Showing 1 changed file with 5 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.servicetalk.concurrent.internal.SequentialCancellable;
import io.servicetalk.concurrent.internal.SignalOffloader;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -63,14 +62,11 @@ protected void handleSubscribe(Subscriber subscriber, SignalOffloader offloader,
}

private static final class ConcatWithSubscriber implements Subscriber {
private static final AtomicIntegerFieldUpdater<ConcatWithSubscriber> subscribedToNextUpdater =
AtomicIntegerFieldUpdater.newUpdater(ConcatWithSubscriber.class, "subscribedToNext");
private final Subscriber target;
private final Completable next;
@Nullable
private SequentialCancellable sequentialCancellable;
@SuppressWarnings("unused")
private volatile int subscribedToNext;
private boolean nextSubscribed;

ConcatWithSubscriber(Subscriber target, Completable next) {
this.target = target;
Expand All @@ -89,7 +85,10 @@ public void onSubscribe(Cancellable cancellable) {

@Override
public void onComplete() {
if (subscribedToNextUpdater.compareAndSet(this, 0, 1)) {
if (nextSubscribed) {
target.onComplete();
} else {
nextSubscribed = true;
// Do not use the same SignalOffloader as used for original as that may cause deadlock.
// Using a regular subscribe helps us to inherit the threading model for this next source. However,
// since we always offload the original Subscriber (in handleSubscribe above) we are assured that this
Expand All @@ -98,8 +97,6 @@ public void onComplete() {
// This is an asynchronous boundary, and so we should recapture the AsyncContext instead of propagating
// it.
next.subscribeInternal(this);
} else {
target.onComplete();
}
}

Expand Down

0 comments on commit 10ded56

Please sign in to comment.