Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Completable#concatWith(Completable) remove atomic operation #1000

Merged
merged 1 commit into from
Apr 3, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.servicetalk.concurrent.internal.SequentialCancellable;
import io.servicetalk.concurrent.internal.SignalOffloader;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -63,14 +62,11 @@ protected void handleSubscribe(Subscriber subscriber, SignalOffloader offloader,
}

private static final class ConcatWithSubscriber implements Subscriber {
private static final AtomicIntegerFieldUpdater<ConcatWithSubscriber> subscribedToNextUpdater =
AtomicIntegerFieldUpdater.newUpdater(ConcatWithSubscriber.class, "subscribedToNext");
private final Subscriber target;
private final Completable next;
@Nullable
private SequentialCancellable sequentialCancellable;
@SuppressWarnings("unused")
private volatile int subscribedToNext;
private boolean nextSubscribed;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting one.

We are using the same Subscriber instance across different sources. RS spec mentions that all methods of a Subscriber must be invoked serially(Rule 1.3) and a Subscriber should ensure delivery of signals happens-before processing of signals(Rule 2.11) however, there is no mention of a rule where subscribe() should happen-before delivery of signals to the Subscriber.

So, here there is nothing guaranteeing from the spec that their is a memory barrier between call to next.subscribeInternal(this) and onComplete() from the next source. So, it may be that we will see nextSubscribed as false and subscribe again.

Interestingly I had this conversation before but it wasn't clear whether a rule is required to be added to the spec or existing rules cover this case. So 🍿 😄

Copy link
Member Author

@Scottmitch Scottmitch Apr 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall discussing this previously as well. IIUC the "shared subscriber" scenario is equivalent to the following:

class MySubscriber<T> implements Subscriber<T> {
  // not final, so no constructor barriers
  Object externalState;
  int internalState;

  MySubscriber(Object state) {
    externalState = state;
    internalState = 5;
  }
  
  onNext(T) {
    // is externalState, internalState visible?
  }
}

Publisher<T> pub = ...;
pub.subscribe(new MySubscriber<>("outside"));

If there is no happens-before relationship all non-final state in MySubscriber may not be visible when its Subscriber methods are invoked, which doesn't seem like desirable semantics from RS.

Related considerations:

  • reactive-streams examples use non-final state. In this case the state happens to be set to the default value, but if there is no happens before the explicit initialization to false could happen at some later time and be visible after the Subscriber has set the value to true.
  • sequentialCancellable in this class would also have the same issue (e.g. non final state used across subscribes).
  • All our other concat operator implementations operate in the same way as this PR.

Suggested path forward:

  • use a consistent approach internally. This PR makes our approach consistent and is less change the the other direction which involves investigating all other local state (shared across subscribes or initialized in the construction, ...).
  • open another discussion with the RS folks to discuss the above. Revisit if we need to make more broad changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points about non-final state!

I have created this in RS:

reactive-streams/reactive-streams-jvm#486

And I agree we should go ahead with this change to make things consistent.


ConcatWithSubscriber(Subscriber target, Completable next) {
this.target = target;
Expand All @@ -89,7 +85,10 @@ public void onSubscribe(Cancellable cancellable) {

@Override
public void onComplete() {
if (subscribedToNextUpdater.compareAndSet(this, 0, 1)) {
if (nextSubscribed) {
target.onComplete();
} else {
nextSubscribed = true;
// Do not use the same SignalOffloader as used for original as that may cause deadlock.
// Using a regular subscribe helps us to inherit the threading model for this next source. However,
// since we always offload the original Subscriber (in handleSubscribe above) we are assured that this
Expand All @@ -98,8 +97,6 @@ public void onComplete() {
// This is an asynchronous boundary, and so we should recapture the AsyncContext instead of propagating
// it.
next.subscribeInternal(this);
} else {
target.onComplete();
}
}

Expand Down