From 82f330e4b8fd65846082a2f5fd177d74ec736d1e Mon Sep 17 00:00:00 2001 From: Viktor Gamov Date: Sat, 30 Jul 2016 00:29:24 -0400 Subject: [PATCH] initial commit of RxICache fixes #2 --- pom.xml | 5 + .../com/hazelcast/rxjava/RxHazelcast.java | 22 ++-- .../java/com/hazelcast/rxjava/RxICache.java | 30 +++++ .../hazelcast/rxjava/impl/RxICacheImpl.java | 63 ++++++++++ .../rxjava/impl/RxICacheImplTest.java | 115 ++++++++++++++++++ 5 files changed, 226 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/hazelcast/rxjava/RxICache.java create mode 100644 src/main/java/com/hazelcast/rxjava/impl/RxICacheImpl.java create mode 100644 src/test/java/com/hazelcast/rxjava/impl/RxICacheImplTest.java diff --git a/pom.xml b/pom.xml index 177f166..ee88020 100644 --- a/pom.xml +++ b/pom.xml @@ -285,6 +285,11 @@ rxjava-async-util 0.21.0 + + javax.cache + cache-api + 1.0.0 + diff --git a/src/main/java/com/hazelcast/rxjava/RxHazelcast.java b/src/main/java/com/hazelcast/rxjava/RxHazelcast.java index c8f5c4f..664f8d1 100644 --- a/src/main/java/com/hazelcast/rxjava/RxHazelcast.java +++ b/src/main/java/com/hazelcast/rxjava/RxHazelcast.java @@ -16,16 +16,13 @@ package com.hazelcast.rxjava; +import com.hazelcast.cache.ICache; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IAtomicLong; import com.hazelcast.core.IAtomicReference; import com.hazelcast.core.IMap; import com.hazelcast.ringbuffer.Ringbuffer; -import com.hazelcast.rxjava.impl.RxAtomicLongImpl; -import com.hazelcast.rxjava.impl.RxAtomicReferenceImpl; -import com.hazelcast.rxjava.impl.RxHazelcastInstanceImpl; -import com.hazelcast.rxjava.impl.RxIMapImpl; -import com.hazelcast.rxjava.impl.RxRingbufferImpl; +import com.hazelcast.rxjava.impl.*; import java.util.concurrent.Executor; @@ -63,7 +60,7 @@ public static RxIMap from(IMap map) { } /** - * @param map non-rx java object to wrap-around + * @param map non-rx java object to wrap-around * @param executor executor to pass to rx-java for callback execution * @return a rx-java equivalent of the given input object */ @@ -81,7 +78,7 @@ public static RxRingbuffer from(Ringbuffer ringbuffer) { /** * @param ringbuffer non-rx java object to wrap-around - * @param executor executor to pass to rx-java for callback execution + * @param executor executor to pass to rx-java for callback execution * @return a rx-java equivalent of the given input object */ public static RxRingbuffer from(Ringbuffer ringbuffer, Executor executor) { @@ -98,7 +95,7 @@ public static RxAtomicLong from(IAtomicLong atomicLong) { /** * @param atomicLong non-rx java object to wrap-around - * @param executor executor to pass to rx-java for callback execution + * @param executor executor to pass to rx-java for callback execution * @return a rx-java equivalent of the given input object */ public static RxAtomicLong from(IAtomicLong atomicLong, Executor executor) { @@ -115,11 +112,18 @@ public static RxAtomicReference from(IAtomicReference atomicReference) /** * @param atomicReference non-rx java object to wrap-around - * @param executor executor to pass to rx-java for callback execution + * @param executor executor to pass to rx-java for callback execution * @return a rx-java equivalent of the given input object */ public static RxAtomicReference from(IAtomicReference atomicReference, Executor executor) { return RxAtomicReferenceImpl.from(atomicReference, executor); } + public static RxICache from(ICache cache) { + return RxICacheImpl.from(cache); + } + + public static RxICache from(ICache cache, Executor executor) { + return RxICacheImpl.from(cache, executor); + } } diff --git a/src/main/java/com/hazelcast/rxjava/RxICache.java b/src/main/java/com/hazelcast/rxjava/RxICache.java new file mode 100644 index 0000000..bcb6ac2 --- /dev/null +++ b/src/main/java/com/hazelcast/rxjava/RxICache.java @@ -0,0 +1,30 @@ +package com.hazelcast.rxjava; + + + +import com.hazelcast.cache.ICache; +import rx.Observable; + +import javax.cache.expiry.ExpiryPolicy; + +/** + * TODO + * + * @author Viktor Gamov on 7/29/16. + * Twitter: @gamussa + * @since 0.0.1 + */ +public interface RxICache { + + Observable get(K key); + + Observable getAndPut(K key, V newValue); + + Observable getAndPut(K key, V newValue, ExpiryPolicy expiryPolicy); + + Observable put(K key, V value); + + Observable put(K key, V value, ExpiryPolicy expiryPolicy); + + ICache getDelegate(); +} diff --git a/src/main/java/com/hazelcast/rxjava/impl/RxICacheImpl.java b/src/main/java/com/hazelcast/rxjava/impl/RxICacheImpl.java new file mode 100644 index 0000000..4d64bc8 --- /dev/null +++ b/src/main/java/com/hazelcast/rxjava/impl/RxICacheImpl.java @@ -0,0 +1,63 @@ +package com.hazelcast.rxjava.impl; + +import com.hazelcast.cache.ICache; +import com.hazelcast.rxjava.RxICache; +import rx.Observable; + +import javax.cache.expiry.ExpiryPolicy; +import java.util.concurrent.Executor; + +/** + * TODO + * + * @author Viktor Gamov on 7/29/16. + * Twitter: @gamussa + * @since 0.0.1 + */ +public class RxICacheImpl implements RxICache { + + private final ICache cache; + private final Executor executor; + + public RxICacheImpl(ICache cache) { + this.cache = cache; + this.executor = null; + } + + public RxICacheImpl(ICache cache, Executor executor) { + this.cache = cache; + this.executor = executor; + } + + @Override public Observable get(K key) { + return RxIObservable.from(cache.getAsync(key), executor); + } + + @Override public Observable getAndPut(K key, V newValue) { + return RxIObservable.from(cache.getAndPutAsync(key, newValue), executor); + } + + @Override public Observable getAndPut(K key, V newValue, ExpiryPolicy expiryPolicy) { + return RxIObservable.from(cache.getAndPutAsync(key, newValue, expiryPolicy), executor); + } + + @Override public ICache getDelegate() { + return this.cache; + } + + @Override public Observable put(K key, V value) { + return RxIObservable.from(cache.putAsync(key, value), executor); + } + + @Override public Observable put(K key, V value, ExpiryPolicy expiryPolicy) { + return RxIObservable.from(cache.putAsync(key, value, expiryPolicy), executor); + } + + public static RxICache from(ICache cache) { + return new RxICacheImpl(cache); + } + + public static RxICache from(ICache cache, Executor executor) { + return new RxICacheImpl(cache, executor); + } +} diff --git a/src/test/java/com/hazelcast/rxjava/impl/RxICacheImplTest.java b/src/test/java/com/hazelcast/rxjava/impl/RxICacheImplTest.java new file mode 100644 index 0000000..c5d2d1d --- /dev/null +++ b/src/test/java/com/hazelcast/rxjava/impl/RxICacheImplTest.java @@ -0,0 +1,115 @@ +package com.hazelcast.rxjava.impl; + +import com.hazelcast.cache.ICache; +import com.hazelcast.cache.impl.HazelcastServerCachingProvider; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.rxjava.RxHazelcast; +import com.hazelcast.rxjava.RxICache; +import com.hazelcast.test.HazelcastParallelClassRunner; +import com.hazelcast.test.HazelcastTestSupport; +import com.hazelcast.test.annotation.ParallelTest; +import com.hazelcast.test.annotation.QuickTest; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import rx.observers.TestSubscriber; + +import javax.cache.configuration.MutableConfiguration; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.Assert.assertEquals; + +/** + * TODO + * + * @author Viktor Gamov on 7/29/16. + * Twitter: @gamussa + * @since 0.0.1 + */ +@RunWith(HazelcastParallelClassRunner.class) +@Category({QuickTest.class, ParallelTest.class}) +public class RxICacheImplTest extends HazelcastTestSupport { + + private ICache cache; + private RxICache rxCache; + + @Before + public void before() { + + final HazelcastInstance hazelcastInstance = createHazelcastInstance(); + final MutableConfiguration configuration = new MutableConfiguration(); + HazelcastServerCachingProvider.createCachingProvider(hazelcastInstance) + .getCacheManager().createCache("RxJava", configuration); + + cache = hazelcastInstance.getCacheManager().getCache("RxJava"); + cache.put("RxJava", "cool"); + rxCache = RxHazelcast.from(cache); + } + + @Test + public void fromICache() { + // WHEN + RxICache rxICache = RxHazelcast.from(cache); + + // THEN + assertEquals(cache, rxICache.getDelegate()); + } + + @Test + public void get() { + // WHEN + TestSubscriber subscriber = new TestSubscriber(); + rxCache.get("RxJava").subscribe(subscriber); + + // THEN + RxTestUtils.assertSingleResult("cool", subscriber); + } + + @Test + public void put() { + // WHEN + TestSubscriber subscriber = new TestSubscriber(); + rxCache.put("Reactive", "rocks").subscribe(subscriber); + + // THEN + RxTestUtils.assertVoidResult(subscriber); + } + + @Test + public void putWithTtl() { + // WHEN + TestSubscriber subscriber = new TestSubscriber(); + rxCache.put("Reactive", "rocks", new CreatedExpiryPolicy(new Duration(SECONDS, 120))) + .subscribe(subscriber); + + // THEN + RxTestUtils.assertSingleResult(null, subscriber); + } + + @Test + public void getAndPut() { + //WHEN + final TestSubscriber subscriber = new TestSubscriber(); + rxCache.put("Reactive", "rocks").subscribe(); + rxCache.getAndPut("Reactive", "rocks!!!").subscribe(subscriber); + + // THEN + RxTestUtils.assertSingleResult("rocks", subscriber); + } + + @Test + public void getAndPut_wthTtl() { + //WHEN + final TestSubscriber subscriber = new TestSubscriber(); + rxCache.put("Reactive", "rocks").subscribe(); + rxCache.getAndPut("Reactive", "rocks!!!", new CreatedExpiryPolicy(new Duration(SECONDS, 120))).subscribe + (subscriber); + + // THEN + RxTestUtils.assertSingleResult("rocks", subscriber); + } + +}