diff --git a/pom.xml b/pom.xml
index 177f166..ee88020 100644
--- a/pom.xml
+++ b/pom.xml
@@ -285,6 +285,11 @@
rxjava-async-util
0.21.0
+
+ javax.cache
+ cache-api
+ 1.0.0
+
diff --git a/src/main/java/com/hazelcast/rxjava/RxHazelcast.java b/src/main/java/com/hazelcast/rxjava/RxHazelcast.java
index c8f5c4f..664f8d1 100644
--- a/src/main/java/com/hazelcast/rxjava/RxHazelcast.java
+++ b/src/main/java/com/hazelcast/rxjava/RxHazelcast.java
@@ -16,16 +16,13 @@
package com.hazelcast.rxjava;
+import com.hazelcast.cache.ICache;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.IAtomicReference;
import com.hazelcast.core.IMap;
import com.hazelcast.ringbuffer.Ringbuffer;
-import com.hazelcast.rxjava.impl.RxAtomicLongImpl;
-import com.hazelcast.rxjava.impl.RxAtomicReferenceImpl;
-import com.hazelcast.rxjava.impl.RxHazelcastInstanceImpl;
-import com.hazelcast.rxjava.impl.RxIMapImpl;
-import com.hazelcast.rxjava.impl.RxRingbufferImpl;
+import com.hazelcast.rxjava.impl.*;
import java.util.concurrent.Executor;
@@ -63,7 +60,7 @@ public static RxIMap from(IMap map) {
}
/**
- * @param map non-rx java object to wrap-around
+ * @param map non-rx java object to wrap-around
* @param executor executor to pass to rx-java for callback execution
* @return a rx-java equivalent of the given input object
*/
@@ -81,7 +78,7 @@ public static RxRingbuffer from(Ringbuffer ringbuffer) {
/**
* @param ringbuffer non-rx java object to wrap-around
- * @param executor executor to pass to rx-java for callback execution
+ * @param executor executor to pass to rx-java for callback execution
* @return a rx-java equivalent of the given input object
*/
public static RxRingbuffer from(Ringbuffer ringbuffer, Executor executor) {
@@ -98,7 +95,7 @@ public static RxAtomicLong from(IAtomicLong atomicLong) {
/**
* @param atomicLong non-rx java object to wrap-around
- * @param executor executor to pass to rx-java for callback execution
+ * @param executor executor to pass to rx-java for callback execution
* @return a rx-java equivalent of the given input object
*/
public static RxAtomicLong from(IAtomicLong atomicLong, Executor executor) {
@@ -115,11 +112,18 @@ public static RxAtomicReference from(IAtomicReference atomicReference)
/**
* @param atomicReference non-rx java object to wrap-around
- * @param executor executor to pass to rx-java for callback execution
+ * @param executor executor to pass to rx-java for callback execution
* @return a rx-java equivalent of the given input object
*/
public static RxAtomicReference from(IAtomicReference atomicReference, Executor executor) {
return RxAtomicReferenceImpl.from(atomicReference, executor);
}
+ public static RxICache from(ICache cache) {
+ return RxICacheImpl.from(cache);
+ }
+
+ public static RxICache from(ICache cache, Executor executor) {
+ return RxICacheImpl.from(cache, executor);
+ }
}
diff --git a/src/main/java/com/hazelcast/rxjava/RxICache.java b/src/main/java/com/hazelcast/rxjava/RxICache.java
new file mode 100644
index 0000000..bcb6ac2
--- /dev/null
+++ b/src/main/java/com/hazelcast/rxjava/RxICache.java
@@ -0,0 +1,30 @@
+package com.hazelcast.rxjava;
+
+
+
+import com.hazelcast.cache.ICache;
+import rx.Observable;
+
+import javax.cache.expiry.ExpiryPolicy;
+
+/**
+ * TODO
+ *
+ * @author Viktor Gamov on 7/29/16.
+ * Twitter: @gamussa
+ * @since 0.0.1
+ */
+public interface RxICache {
+
+ Observable get(K key);
+
+ Observable getAndPut(K key, V newValue);
+
+ Observable getAndPut(K key, V newValue, ExpiryPolicy expiryPolicy);
+
+ Observable put(K key, V value);
+
+ Observable put(K key, V value, ExpiryPolicy expiryPolicy);
+
+ ICache getDelegate();
+}
diff --git a/src/main/java/com/hazelcast/rxjava/impl/RxICacheImpl.java b/src/main/java/com/hazelcast/rxjava/impl/RxICacheImpl.java
new file mode 100644
index 0000000..4d64bc8
--- /dev/null
+++ b/src/main/java/com/hazelcast/rxjava/impl/RxICacheImpl.java
@@ -0,0 +1,63 @@
+package com.hazelcast.rxjava.impl;
+
+import com.hazelcast.cache.ICache;
+import com.hazelcast.rxjava.RxICache;
+import rx.Observable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import java.util.concurrent.Executor;
+
+/**
+ * TODO
+ *
+ * @author Viktor Gamov on 7/29/16.
+ * Twitter: @gamussa
+ * @since 0.0.1
+ */
+public class RxICacheImpl implements RxICache {
+
+ private final ICache cache;
+ private final Executor executor;
+
+ public RxICacheImpl(ICache cache) {
+ this.cache = cache;
+ this.executor = null;
+ }
+
+ public RxICacheImpl(ICache cache, Executor executor) {
+ this.cache = cache;
+ this.executor = executor;
+ }
+
+ @Override public Observable get(K key) {
+ return RxIObservable.from(cache.getAsync(key), executor);
+ }
+
+ @Override public Observable getAndPut(K key, V newValue) {
+ return RxIObservable.from(cache.getAndPutAsync(key, newValue), executor);
+ }
+
+ @Override public Observable getAndPut(K key, V newValue, ExpiryPolicy expiryPolicy) {
+ return RxIObservable.from(cache.getAndPutAsync(key, newValue, expiryPolicy), executor);
+ }
+
+ @Override public ICache getDelegate() {
+ return this.cache;
+ }
+
+ @Override public Observable put(K key, V value) {
+ return RxIObservable.from(cache.putAsync(key, value), executor);
+ }
+
+ @Override public Observable put(K key, V value, ExpiryPolicy expiryPolicy) {
+ return RxIObservable.from(cache.putAsync(key, value, expiryPolicy), executor);
+ }
+
+ public static RxICache from(ICache cache) {
+ return new RxICacheImpl(cache);
+ }
+
+ public static RxICache from(ICache cache, Executor executor) {
+ return new RxICacheImpl(cache, executor);
+ }
+}
diff --git a/src/test/java/com/hazelcast/rxjava/impl/RxICacheImplTest.java b/src/test/java/com/hazelcast/rxjava/impl/RxICacheImplTest.java
new file mode 100644
index 0000000..c5d2d1d
--- /dev/null
+++ b/src/test/java/com/hazelcast/rxjava/impl/RxICacheImplTest.java
@@ -0,0 +1,115 @@
+package com.hazelcast.rxjava.impl;
+
+import com.hazelcast.cache.ICache;
+import com.hazelcast.cache.impl.HazelcastServerCachingProvider;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.rxjava.RxHazelcast;
+import com.hazelcast.rxjava.RxICache;
+import com.hazelcast.test.HazelcastParallelClassRunner;
+import com.hazelcast.test.HazelcastTestSupport;
+import com.hazelcast.test.annotation.ParallelTest;
+import com.hazelcast.test.annotation.QuickTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import rx.observers.TestSubscriber;
+
+import javax.cache.configuration.MutableConfiguration;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * TODO
+ *
+ * @author Viktor Gamov on 7/29/16.
+ * Twitter: @gamussa
+ * @since 0.0.1
+ */
+@RunWith(HazelcastParallelClassRunner.class)
+@Category({QuickTest.class, ParallelTest.class})
+public class RxICacheImplTest extends HazelcastTestSupport {
+
+ private ICache cache;
+ private RxICache rxCache;
+
+ @Before
+ public void before() {
+
+ final HazelcastInstance hazelcastInstance = createHazelcastInstance();
+ final MutableConfiguration configuration = new MutableConfiguration();
+ HazelcastServerCachingProvider.createCachingProvider(hazelcastInstance)
+ .getCacheManager().createCache("RxJava", configuration);
+
+ cache = hazelcastInstance.getCacheManager().getCache("RxJava");
+ cache.put("RxJava", "cool");
+ rxCache = RxHazelcast.from(cache);
+ }
+
+ @Test
+ public void fromICache() {
+ // WHEN
+ RxICache rxICache = RxHazelcast.from(cache);
+
+ // THEN
+ assertEquals(cache, rxICache.getDelegate());
+ }
+
+ @Test
+ public void get() {
+ // WHEN
+ TestSubscriber subscriber = new TestSubscriber();
+ rxCache.get("RxJava").subscribe(subscriber);
+
+ // THEN
+ RxTestUtils.assertSingleResult("cool", subscriber);
+ }
+
+ @Test
+ public void put() {
+ // WHEN
+ TestSubscriber subscriber = new TestSubscriber();
+ rxCache.put("Reactive", "rocks").subscribe(subscriber);
+
+ // THEN
+ RxTestUtils.assertVoidResult(subscriber);
+ }
+
+ @Test
+ public void putWithTtl() {
+ // WHEN
+ TestSubscriber subscriber = new TestSubscriber();
+ rxCache.put("Reactive", "rocks", new CreatedExpiryPolicy(new Duration(SECONDS, 120)))
+ .subscribe(subscriber);
+
+ // THEN
+ RxTestUtils.assertSingleResult(null, subscriber);
+ }
+
+ @Test
+ public void getAndPut() {
+ //WHEN
+ final TestSubscriber subscriber = new TestSubscriber();
+ rxCache.put("Reactive", "rocks").subscribe();
+ rxCache.getAndPut("Reactive", "rocks!!!").subscribe(subscriber);
+
+ // THEN
+ RxTestUtils.assertSingleResult("rocks", subscriber);
+ }
+
+ @Test
+ public void getAndPut_wthTtl() {
+ //WHEN
+ final TestSubscriber subscriber = new TestSubscriber();
+ rxCache.put("Reactive", "rocks").subscribe();
+ rxCache.getAndPut("Reactive", "rocks!!!", new CreatedExpiryPolicy(new Duration(SECONDS, 120))).subscribe
+ (subscriber);
+
+ // THEN
+ RxTestUtils.assertSingleResult("rocks", subscriber);
+ }
+
+}