Skip to content

Commit

Permalink
Added unit test singleJust() to expose the hanging subscriber. (#154) (
Browse files Browse the repository at this point in the history
…#156)

* Added unit test singleJust() to expose the hanging SingleAsPublisherSubscriber bug. (#154)

* Fixed the hanging SingleAsPublisherSubscriber bug. (#154)
  • Loading branch information
andrey-radchenko authored and akarnokd committed Oct 17, 2016
1 parent 65efd1f commit daf3ecc
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@

package rx.internal.reactivestreams;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import rx.Single;
import rx.SingleSubscriber;

import rx.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Wraps a Single and exposes it as a Publisher.
*
* @param <T> the value type
*/
public final class SingleAsPublisher<T> implements Publisher<T> {

final Single<T> single;

public SingleAsPublisher(Single<T> single) {
this.single = single;
}
Expand All @@ -41,31 +41,31 @@ public SingleAsPublisher(Single<T> single) {
public void subscribe(Subscriber<? super T> s) {
SingleAsPublisherSubscriber<T> parent = new SingleAsPublisherSubscriber<T>(s);
s.onSubscribe(parent);

single.subscribe(parent);
}

static final class SingleAsPublisherSubscriber<T> extends SingleSubscriber<T>
implements Subscription {

final Subscriber<? super T> actual;

final AtomicInteger state;

T value;

volatile boolean cancelled;

static final int NO_REQUEST_NO_VALUE = 0;
static final int NO_REQUEST_HAS_VALUE = 1;
static final int HAS_REQUEST_NO_VALUE = 2;
static final int HAS_REQUEST_HAS_VALUE = 3;

public SingleAsPublisherSubscriber(Subscriber<? super T> actual) {
this.actual = actual;
this.state = new AtomicInteger();
}

@Override
public void onSuccess(T value) {
if (cancelled) {
Expand All @@ -78,7 +78,7 @@ public void onSuccess(T value) {
}
for (;;) {
int s = state.get();

if (s == NO_REQUEST_HAS_VALUE || s == HAS_REQUEST_HAS_VALUE || cancelled) {
break;
} else
Expand All @@ -87,6 +87,7 @@ public void onSuccess(T value) {
if (!cancelled) {
actual.onComplete();
}
return;
} else {
this.value = value;
if (state.compareAndSet(s, NO_REQUEST_HAS_VALUE)) {
Expand All @@ -95,7 +96,7 @@ public void onSuccess(T value) {
}
}
}

@Override
public void onError(Throwable error) {
if (cancelled) {
Expand All @@ -104,31 +105,34 @@ public void onError(Throwable error) {
state.lazySet(HAS_REQUEST_HAS_VALUE);
actual.onError(error);
}

@Override
public void request(long n) {
if (n > 0) {
for (;;) {
int s = state.get();
if (s == HAS_REQUEST_HAS_VALUE || s == HAS_REQUEST_NO_VALUE || cancelled) {
break;
} else
}
if (s == NO_REQUEST_HAS_VALUE) {
if (state.compareAndSet(s, HAS_REQUEST_HAS_VALUE)) {
T v = value;
value = null;

actual.onNext(v);
if (!cancelled) {
actual.onComplete();
}
}
break;
}
if (state.compareAndSet(NO_REQUEST_NO_VALUE, HAS_REQUEST_NO_VALUE)) {
break;
}
}
}
}

@Override
public void cancel() {
if (!cancelled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package rx.reactivestreams.test;

import static rx.RxReactiveStreams.*;

import java.util.NoSuchElementException;

import org.testng.Assert;
import org.testng.annotations.Test;

import rx.*;
import rx.Observable;
import rx.Single;
import rx.observers.TestSubscriber;
import rx.reactivestreams.test.PublisherAsCompletableTest.*;
import rx.reactivestreams.test.PublisherAsCompletableTest.PublisherEmpty;
import rx.reactivestreams.test.PublisherAsCompletableTest.PublisherFail;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;

import static rx.RxReactiveStreams.toPublisher;
import static rx.RxReactiveStreams.toSingle;

public class PublisherAsSingleTest {

@Test(expectedExceptions = { NullPointerException.class })
Expand All @@ -22,18 +26,31 @@ public void nullCheck() {
@Test
public void just() {
TestSubscriber<Object> ts = new TestSubscriber<Object>();

toSingle(toPublisher(Observable.just(1))).subscribe(ts);

ts.assertValue(1);
ts.assertCompleted();
ts.assertNoErrors();
}

@Test
public void singleJust() {
TestSubscriber<Object> ts = new TestSubscriber<Object>();

toSingle(toPublisher(Single.just(1))).subscribeOn(Schedulers.computation()).subscribe(ts);

ts.awaitTerminalEvent(3, TimeUnit.SECONDS);

ts.assertValue(1);
ts.assertCompleted();
ts.assertNoErrors();
}

@Test
public void empty() {
TestSubscriber<Object> ts = new TestSubscriber<Object>();

toSingle(new PublisherEmpty()).subscribe(ts);

ts.assertNoValues();
Expand All @@ -44,7 +61,7 @@ public void empty() {
@Test
public void range() {
TestSubscriber<Object> ts = new TestSubscriber<Object>();

toSingle(toPublisher(Observable.range(1, 2))).subscribe(ts);

ts.assertNoValues();
Expand All @@ -55,31 +72,31 @@ public void range() {
@Test
public void error() {
TestSubscriber<Object> ts = new TestSubscriber<Object>();

toSingle(new PublisherFail()).subscribe(ts);

ts.assertNoValues();
ts.assertNotCompleted();
ts.assertError(RuntimeException.class);
Assert.assertEquals(ts.getOnErrorEvents().get(0).getMessage(), "Forced failure");
}

@Test
public void cancellation() {
PublishSubject<Object> ps = PublishSubject.create();

TestSubscriber<Object> ts = new TestSubscriber<Object>();

toSingle(toPublisher(ps)).subscribe(ts);

Assert.assertTrue(ps.hasObservers());

ts.unsubscribe();

ts.assertNoValues();
ts.assertNotCompleted();
ts.assertNoErrors();

Assert.assertFalse(ps.hasObservers());
}
}

0 comments on commit daf3ecc

Please sign in to comment.