Skip to content

Commit

Permalink
Implements a completable convert into a Mono with no values, using in…
Browse files Browse the repository at this point in the history
…ternal converts of Completable, such #toObservable(), so we can reuse the converts 'to' and 'from' publisher from DependencyUtils.java

Enabling RxCompletable only when it is present on classpath
  • Loading branch information
john-evangelist committed Mar 7, 2016
1 parent da92949 commit befa46d
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 149 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ext {
// Libraries
jacksonDatabindVersion = '2.5.1'
jsonPathVersion = '2.0.0'
rxJavaVersion = '1.0.14'
rxJavaVersion = '1.1.1'

// Testing
spockVersion = '1.0-groovy-2.4'
Expand Down
299 changes: 151 additions & 148 deletions src/main/java/reactor/core/converter/DependencyUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,157 +23,160 @@
* Utility class related to the various composition libraries supported.
*
* @author Stephane Maldini
* @author Joao Pedro Evangelista
* @since 2.5
*/
public final class DependencyUtils {

static private final boolean HAS_REACTOR_CODEC;
static private final boolean HAS_REACTOR_NET;
static private final boolean HAS_REACTOR_BUS;

static private final FlowPublisherConverter FLOW_PUBLISHER_CONVERTER;
static private final RxJava1ObservableConverter RX_JAVA_1_OBSERVABLE_CONVERTER;
static private final RxJava1SingleConverter RX_JAVA_1_SINGLE_CONVERTER;

private DependencyUtils() {
}

static {
final int RXJAVA_1_OBSERVABLE = 0b000001;
final int RXJAVA_1_SINGLE = 0b000010;
final int RXJAVA_1_COMPLETABLE = 0b000100;
final int FLOW_PUBLISHER = 0b100000;
final int REACTOR_CODEC = 0b1000000;
final int REACTOR_BUS = 0b10000000;
final int REACTOR_NET = 0b100000000;

int detected = 0;
try {
Flux.class.getClassLoader().loadClass("rx.Observable");
detected = RXJAVA_1_OBSERVABLE;
Class.forName("rx.Single");
detected |= RXJAVA_1_SINGLE;
/*Class.forName("rx.Completable");
hasRxjava1Completable = true;*/
}
catch (ClassNotFoundException cnfe) {
//IGNORE
}
try {
Flux.class.getClassLoader().loadClass("reactor.io.codec.Codec");
detected |= REACTOR_CODEC;
}
catch (ClassNotFoundException cnfe) {
//IGNORE
}
try {
Flux.class.getClassLoader().loadClass("reactor.io.net.ReactiveChannel");
detected |= REACTOR_NET;
}
catch (ClassNotFoundException cnfe) {
//IGNORE
}
try {
Flux.class.getClassLoader().loadClass("reactor.bus.registry.Registry");
detected |= REACTOR_BUS;
}
catch (ClassNotFoundException cnfe) {
//IGNORE
}

if ((detected & RXJAVA_1_OBSERVABLE) == RXJAVA_1_OBSERVABLE) {
RX_JAVA_1_OBSERVABLE_CONVERTER = RxJava1ObservableConverter.INSTANCE;
}
else {
RX_JAVA_1_OBSERVABLE_CONVERTER = null;
}
if ((detected & RXJAVA_1_SINGLE) == RXJAVA_1_SINGLE) {
RX_JAVA_1_SINGLE_CONVERTER = RxJava1SingleConverter.INSTANCE;
}
else {
RX_JAVA_1_SINGLE_CONVERTER = null;
}
if ((detected & RXJAVA_1_COMPLETABLE) == RXJAVA_1_COMPLETABLE) {
//TBD
}
else {
//TBD
}
if ((detected & FLOW_PUBLISHER) == FLOW_PUBLISHER) {
FLOW_PUBLISHER_CONVERTER = FlowPublisherConverter.INSTANCE;
}
else {
FLOW_PUBLISHER_CONVERTER = null;
}
HAS_REACTOR_CODEC = (detected & REACTOR_CODEC) == REACTOR_CODEC;
HAS_REACTOR_BUS = (detected & REACTOR_BUS) == REACTOR_BUS;
HAS_REACTOR_NET = (detected & REACTOR_NET) == REACTOR_NET;

}

public static boolean hasRxJava1() {
return RX_JAVA_1_OBSERVABLE_CONVERTER != null;
}

public static boolean hasRxJava1Single() {
return RX_JAVA_1_SINGLE_CONVERTER != null;
}

public static boolean hasFlowPublisher() {
return FLOW_PUBLISHER_CONVERTER != null;
}

public static boolean hasReactorCodec() {
return HAS_REACTOR_CODEC;
}

public static boolean hasReactorBus() {
return HAS_REACTOR_BUS;
}

public static boolean hasReactorNet() {
return HAS_REACTOR_NET;
}

public static Publisher<?> convertToPublisher(Object source) {
if (source == null) {
throw new IllegalArgumentException("Cannot convert null sources");
}
if (hasRxJava1()) {
if (hasRxJava1Single() && RX_JAVA_1_SINGLE_CONVERTER.test(source)) {
return RX_JAVA_1_SINGLE_CONVERTER.apply(source);
}
else if (RX_JAVA_1_OBSERVABLE_CONVERTER.test(source)) {
return RX_JAVA_1_OBSERVABLE_CONVERTER.apply(source);
}
}

if (hasFlowPublisher() && FLOW_PUBLISHER_CONVERTER.test(source)) {
return FLOW_PUBLISHER_CONVERTER.apply(source);
}
throw new UnsupportedOperationException("Conversion to Publisher from " + source.getClass());
}

@SuppressWarnings("unchecked")
public static <T> T convertFromPublisher(Publisher<?> source, Class<T> to) {
if (source == null || to == null) {
throw new IllegalArgumentException("Cannot convert " + source + " source to " + to + " type");
}
if (hasRxJava1()) {
if (hasRxJava1Single() && RX_JAVA_1_SINGLE_CONVERTER.get()
.isAssignableFrom(to)) {
return (T) RX_JAVA_1_SINGLE_CONVERTER.convertTo(source, to);
}
else if (RX_JAVA_1_OBSERVABLE_CONVERTER.get()
.isAssignableFrom(to)) {
return (T) RX_JAVA_1_OBSERVABLE_CONVERTER.convertTo(source, to);
}
}
if (hasFlowPublisher() && FLOW_PUBLISHER_CONVERTER.get()
.isAssignableFrom(to)) {
return (T) FLOW_PUBLISHER_CONVERTER.convertTo(source, to);
}
throw new UnsupportedOperationException("Cannot convert " + source.getClass() + " source to " + to.getClass() + " type");
}
static private final boolean HAS_REACTOR_CODEC;
static private final boolean HAS_REACTOR_NET;
static private final boolean HAS_REACTOR_BUS;

static private final FlowPublisherConverter FLOW_PUBLISHER_CONVERTER;
static private final RxJava1ObservableConverter RX_JAVA_1_OBSERVABLE_CONVERTER;
static private final RxJava1SingleConverter RX_JAVA_1_SINGLE_CONVERTER;
static private final RxJava1CompletableConverter RX_JAVA_1_COMPLETABLE_CONVERTER;

private DependencyUtils() {
}

static {
final int RXJAVA_1_OBSERVABLE = 0b000001;
final int RXJAVA_1_SINGLE = 0b000010;
final int RXJAVA_1_COMPLETABLE = 0b000100;
final int FLOW_PUBLISHER = 0b100000;
final int REACTOR_CODEC = 0b1000000;
final int REACTOR_BUS = 0b10000000;
final int REACTOR_NET = 0b100000000;

int detected = 0;
try {
Flux.class.getClassLoader().loadClass("rx.Observable");
detected = RXJAVA_1_OBSERVABLE;
Class.forName("rx.Single");
detected |= RXJAVA_1_SINGLE;
Class.forName("rx.Completable");
detected |= RXJAVA_1_COMPLETABLE;
} catch (ClassNotFoundException ignore) {

}

try {
Flux.class.getClassLoader().loadClass("reactor.io.codec.Codec");
detected |= REACTOR_CODEC;
} catch (ClassNotFoundException ignore) {

}

try {
Flux.class.getClassLoader().loadClass("reactor.io.net.ReactiveChannel");
detected |= REACTOR_NET;
} catch (ClassNotFoundException ignore) {

}
try {
Flux.class.getClassLoader().loadClass("reactor.bus.registry.Registry");
detected |= REACTOR_BUS;
} catch (ClassNotFoundException ignore) {

}

if ((detected & RXJAVA_1_OBSERVABLE) == RXJAVA_1_OBSERVABLE) {
RX_JAVA_1_OBSERVABLE_CONVERTER = RxJava1ObservableConverter.INSTANCE;
} else {
RX_JAVA_1_OBSERVABLE_CONVERTER = null;
}
if ((detected & RXJAVA_1_SINGLE) == RXJAVA_1_SINGLE) {
RX_JAVA_1_SINGLE_CONVERTER = RxJava1SingleConverter.INSTANCE;
} else {
RX_JAVA_1_SINGLE_CONVERTER = null;
}
if ((detected & RXJAVA_1_COMPLETABLE) == RXJAVA_1_COMPLETABLE) {
RX_JAVA_1_COMPLETABLE_CONVERTER = RxJava1CompletableConverter.INSTANCE;
} else {
RX_JAVA_1_COMPLETABLE_CONVERTER = null;
}
if ((detected & FLOW_PUBLISHER) == FLOW_PUBLISHER) {
FLOW_PUBLISHER_CONVERTER = FlowPublisherConverter.INSTANCE;
} else {
FLOW_PUBLISHER_CONVERTER = null;
}
HAS_REACTOR_CODEC = (detected & REACTOR_CODEC) == REACTOR_CODEC;
HAS_REACTOR_BUS = (detected & REACTOR_BUS) == REACTOR_BUS;
HAS_REACTOR_NET = (detected & REACTOR_NET) == REACTOR_NET;

}

public static boolean hasRxJava1() {
return RX_JAVA_1_OBSERVABLE_CONVERTER != null;
}

public static boolean hasRxJava1Single() {
return RX_JAVA_1_SINGLE_CONVERTER != null;
}

public static boolean hasFlowPublisher() {
return FLOW_PUBLISHER_CONVERTER != null;
}

public static boolean hasRxJava1Completable() {
return RX_JAVA_1_COMPLETABLE_CONVERTER != null;
}

public static boolean hasReactorCodec() {
return HAS_REACTOR_CODEC;
}

public static boolean hasReactorBus() {
return HAS_REACTOR_BUS;
}

public static boolean hasReactorNet() {
return HAS_REACTOR_NET;
}

public static Publisher<?> convertToPublisher(Object source) {
if (source == null) {
throw new IllegalArgumentException("Cannot convert null sources");
}

if (hasRxJava1()) {
if (hasRxJava1Single() && RX_JAVA_1_SINGLE_CONVERTER.test(source)) {
return RX_JAVA_1_SINGLE_CONVERTER.apply(source);
} else if (RX_JAVA_1_OBSERVABLE_CONVERTER.test(source)) {
return RX_JAVA_1_OBSERVABLE_CONVERTER.apply(source);
} else if (hasRxJava1Completable() && RX_JAVA_1_COMPLETABLE_CONVERTER.test(source)) {
return RX_JAVA_1_COMPLETABLE_CONVERTER.apply(source);
}
}

if (hasFlowPublisher() && FLOW_PUBLISHER_CONVERTER.test(source)) {
return FLOW_PUBLISHER_CONVERTER.apply(source);
}
throw new UnsupportedOperationException("Conversion to Publisher from " + source.getClass());
}

@SuppressWarnings("unchecked")
public static <T> T convertFromPublisher(Publisher<?> source, Class<T> to) {
if (source == null || to == null) {
throw new IllegalArgumentException("Cannot convert " + source + " source to " + to + " type");
}
if (hasRxJava1()) {
if (hasRxJava1Single() && RX_JAVA_1_SINGLE_CONVERTER.get()
.isAssignableFrom(to)) {
return (T) RX_JAVA_1_SINGLE_CONVERTER.convertTo(source, to);
} else if (RX_JAVA_1_OBSERVABLE_CONVERTER.get()
.isAssignableFrom(to)) {
return (T) RX_JAVA_1_OBSERVABLE_CONVERTER.convertTo(source, to);
} else if (RX_JAVA_1_COMPLETABLE_CONVERTER.get().isAssignableFrom(to)) {
return (T) RX_JAVA_1_COMPLETABLE_CONVERTER.convertTo(source, to);
}
}
if (hasFlowPublisher() && FLOW_PUBLISHER_CONVERTER.get()
.isAssignableFrom(to)) {
return (T) FLOW_PUBLISHER_CONVERTER.convertTo(source, to);
}
throw new UnsupportedOperationException("Cannot convert " + source.getClass() + " source to " + to.getClass() + " type");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package reactor.core.converter;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import rx.Completable;
import rx.Single;

/**
* Convert a {@link Publisher Publisher} into a Completable
*
* @author Joao Pedro Evangelista
* @since 2.5
*/
public class RxJava1CompletableConverter extends PublisherConverter<Completable> {

static final RxJava1CompletableConverter INSTANCE = new RxJava1CompletableConverter();

@SuppressWarnings("TypeMayBeWeakened")
static Completable from(Mono<Void> noValue) {
return INSTANCE.fromPublisher(noValue);
}

static Mono<Void> from(Completable completable) {
return INSTANCE.toPublisher(completable);
}


@SuppressWarnings("unchecked")
@Override
protected Mono<Void> toPublisher(Object o) {
if (o instanceof Completable) {
Publisher<?> publisher = DependencyUtils.<Void>convertToPublisher(((Completable) o).<Void>toObservable());
return Mono.from((Publisher<? extends Void>) publisher);
} else {
return null;
}
}

@Override
protected Completable fromPublisher(Publisher<?> source) {
return Completable.fromSingle(DependencyUtils.convertFromPublisher(source, Single.class));
}

@Override
public Class<Completable> get() {
return Completable.class;
}
}
Loading

0 comments on commit befa46d

Please sign in to comment.