Skip to content

Commit

Permalink
Add TCK validation to Single and Completable converter, update versio…
Browse files Browse the repository at this point in the history
…ns (#157)
  • Loading branch information
akarnokd authored Oct 17, 2016
1 parent daf3ecc commit 17cb52e
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 8 deletions.
21 changes: 18 additions & 3 deletions rxjava-reactive-streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,26 @@ description = "Adapter between RxJava and ReactiveStreams"
apply plugin: 'java'

dependencies {
compile 'io.reactivex:rxjava:1.1.8'
compile 'io.reactivex:rxjava:1.2.1'
compile 'org.reactivestreams:reactive-streams:1.0.0'
testCompile 'org.reactivestreams:reactive-streams-tck:1.0.0'
testCompile group: 'org.testng', name: 'testng', version: '6.9.10'
}

test {
useTestNG()
}
useTestNG()
testLogging {
events=['passed', 'skipped', 'failed']
exceptionFormat="full"

debug.events = ['passed', 'skipped', 'failed']
debug.exceptionFormat="full"

info.events = ['passed', 'skipped', 'failed']
info.exceptionFormat="full"

warn.events = ['passed', 'skipped', 'failed']
warn.exceptionFormat="full"
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package rx.internal.reactivestreams;

import org.reactivestreams.*;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import rx.Completable;
import rx.*;

/**
* Wraps a Completable and exposes it as a Publisher.
Expand All @@ -35,11 +37,14 @@ public CompletableAsPublisher(Completable completable) {

@Override
public void subscribe(Subscriber<? super T> s) {
if (s == null) {
throw new NullPointerException();
}
completable.subscribe(new CompletableAsPublisherSubscriber<T>(s));
}

static final class CompletableAsPublisherSubscriber<T>
implements Completable.CompletableSubscriber, Subscription {
implements CompletableSubscriber, Subscription {

final Subscriber<? super T> actual;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

import org.reactivestreams.*;

import rx.Completable.CompletableSubscriber;
import rx.CompletableSubscriber;

/**
* Wraps an arbitrary Publisher and exposes it as a Completable, ignoring any onNext events.
*/
public final class PublisherAsCompletable implements rx.Completable.CompletableOnSubscribe {
public final class PublisherAsCompletable implements rx.Completable.OnSubscribe {

final Publisher<?> publisher;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.reactivestreams;

import java.io.IOException;

import org.reactivestreams.Publisher;
import org.reactivestreams.tck.*;
import org.testng.annotations.Test;

import rx.*;
import rx.schedulers.Schedulers;

@Test
public class TckCompletableAsyncConversionTest extends PublisherVerification<Long> {

public TckCompletableAsyncConversionTest() {
super(new TestEnvironment(300L));
}

@Override
public Publisher<Long> createPublisher(long elements) {
return RxReactiveStreams.toPublisher(Completable.complete().observeOn(Schedulers.computation()));
}

@Override
public long maxElementsFromPublisher() {
return 0L;
}

@Override
public Publisher<Long> createFailedPublisher() {
return RxReactiveStreams.toPublisher(Completable.error(new IOException()).observeOn(Schedulers.computation()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.reactivestreams;

import java.io.IOException;

import org.reactivestreams.Publisher;
import org.reactivestreams.tck.*;
import org.testng.annotations.Test;

import rx.*;

@Test
public class TckCompletableConversionTest extends PublisherVerification<Long> {

public TckCompletableConversionTest() {
super(new TestEnvironment(300L));
}

@Override
public Publisher<Long> createPublisher(long elements) {
return RxReactiveStreams.toPublisher(Completable.complete());
}

@Override
public long maxElementsFromPublisher() {
return 0L;
}

@Override
public Publisher<Long> createFailedPublisher() {
return RxReactiveStreams.toPublisher(Completable.error(new IOException()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.reactivestreams;

import java.io.IOException;

import org.reactivestreams.Publisher;
import org.reactivestreams.tck.*;
import org.testng.annotations.Test;

import rx.*;
import rx.schedulers.Schedulers;

@Test
public class TckSingleAsyncConversionTest extends PublisherVerification<Long> {

public TckSingleAsyncConversionTest() {
super(new TestEnvironment(300L));
}

@Override
public Publisher<Long> createPublisher(long elements) {
return RxReactiveStreams.toPublisher(Single.just(1L).observeOn(Schedulers.computation()));
}

@Override
public long maxElementsFromPublisher() {
return 1L;
}

@Override
public Publisher<Long> createFailedPublisher() {
return RxReactiveStreams.toPublisher(Single.<Long>error(new IOException()).observeOn(Schedulers.computation()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.reactivestreams;

import java.io.IOException;

import org.reactivestreams.Publisher;
import org.reactivestreams.tck.*;
import org.testng.annotations.Test;

import rx.*;

@Test
public class TckSingleConversionTest extends PublisherVerification<Long> {

public TckSingleConversionTest() {
super(new TestEnvironment(300L));
}

@Override
public Publisher<Long> createPublisher(long elements) {
return RxReactiveStreams.toPublisher(Single.just(1L));
}

@Override
public long maxElementsFromPublisher() {
return 1L;
}

@Override
public Publisher<Long> createFailedPublisher() {
return RxReactiveStreams.toPublisher(Single.<Long>error(new IOException()));
}

}

0 comments on commit 17cb52e

Please sign in to comment.