Skip to content

Commit ac70130

Browse files
committed
Use non-blocking puts.
1 parent 46942de commit ac70130

File tree

5 files changed

+108
-72
lines changed

5 files changed

+108
-72
lines changed

src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,14 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
238238
Assert.notNull(key, "Key must not be null");
239239
Assert.notNull(value, "Value must not be null");
240240

241-
execute(name, connection -> {
242-
doPut(connection, name, key, value, ttl);
243-
return "OK";
244-
});
245-
241+
if (supportsAsyncRetrieve()) {
242+
asyncCacheWriter.store(name, key, value, ttl).thenRun(() -> statistics.incPuts(name));
243+
} else {
244+
execute(name, connection -> {
245+
doPut(connection, name, key, value, ttl);
246+
return "OK";
247+
});
248+
}
246249
}
247250

248251
@SuppressWarnings("NullAway")
@@ -620,7 +623,7 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
620623
Mono<ByteBuffer> get = shouldExpireWithin(ttl) ? stringCommands.getEx(wrappedKey, Expiration.from(ttl))
621624
: stringCommands.get(wrappedKey);
622625

623-
return cacheLockCheck.then(get).map(ByteUtils::getBytes).toFuture();
626+
return cacheLockCheck.then(get).map(ByteUtils::getBytes);
624627
});
625628
}
626629

@@ -631,7 +634,7 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
631634

632635
Mono<?> mono = doWithLocking(name, key, value, connection, () -> doStore(key, value, ttl, connection));
633636

634-
return mono.then().toFuture();
637+
return mono.then();
635638
});
636639
}
637640

@@ -654,21 +657,15 @@ private Mono<Boolean> doStore(byte[] cacheKey, byte[] value, @Nullable Duration
654657
public CompletableFuture<Void> remove(String name, byte[] key) {
655658

656659
return doWithConnection(connection -> {
657-
658-
Mono<?> mono = doWithLocking(name, key, null, connection, () -> doRemove(key, connection));
659-
660-
return mono.then().toFuture();
660+
return doWithLocking(name, key, null, connection, () -> doRemove(key, connection)).then();
661661
});
662662
}
663663

664664
@Override
665665
public CompletableFuture<Long> clean(String name, byte[] pattern, BatchStrategy batchStrategy) {
666666

667667
return doWithConnection(connection -> {
668-
669-
Mono<Long> mono = doWithLocking(name, pattern, null, connection, () -> doClean(pattern, connection));
670-
671-
return mono.toFuture();
668+
return doWithLocking(name, pattern, null, connection, () -> doClean(pattern, connection));
672669
});
673670
}
674671

@@ -741,12 +738,12 @@ private Mono<Void> waitForLock(ReactiveRedisConnection connection, String cacheN
741738
}
742739

743740
private <T> CompletableFuture<T> doWithConnection(
744-
Function<ReactiveRedisConnection, CompletableFuture<T>> callback) {
741+
Function<ReactiveRedisConnection, Mono<T>> callback) {
745742

746743
ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory) connectionFactory;
747744

748745
return Mono.usingWhen(Mono.fromSupplier(cf::getReactiveConnection), //
749-
it -> Mono.fromCompletionStage(callback.apply(it)), //
746+
callback::apply, //
750747
ReactiveRedisConnection::closeLater) //
751748
.toFuture();
752749
}

src/main/java/org/springframework/data/redis/cache/RedisCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ private String convertCollectionLikeOrMapKey(Object key, TypeDescriptor source)
493493
throw new IllegalArgumentException("Cannot convert cache key [%s] to String".formatted(key));
494494
}
495495

496-
private byte[] createAndConvertCacheKey(Object key) {
496+
byte[] createAndConvertCacheKey(Object key) {
497497
return serializeCacheKey(createCacheKey(key));
498498
}
499499

src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import java.util.function.Consumer;
3333

3434
import org.awaitility.Awaitility;
35+
import org.awaitility.core.ConditionFactory;
3536
import org.jspecify.annotations.Nullable;
37+
import org.junit.jupiter.api.Assumptions;
3638
import org.junit.jupiter.api.BeforeEach;
3739
import org.junit.jupiter.api.Disabled;
3840
import org.junit.jupiter.api.Test;
@@ -60,6 +62,8 @@ public class DefaultRedisCacheWriterTests {
6062

6163
private static final String CACHE_NAME = "default-redis-cache-writer-tests";
6264

65+
static ConditionFactory await = Awaitility.with().pollDelay(Duration.ZERO).pollInSameThread();
66+
6367
private String key = "key-1";
6468
private String cacheKey = CACHE_NAME + "::" + key;
6569

@@ -88,7 +92,7 @@ void putShouldAddEternalEntry() {
8892
RedisCacheWriter writer = nonLockingRedisCacheWriter(connectionFactory)
8993
.withStatisticsCollector(CacheStatisticsCollector.create());
9094

91-
writer.put(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO);
95+
writer.putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO);
9296

9397
doWithConnection(connection -> {
9498
assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue);
@@ -102,7 +106,7 @@ void putShouldAddEternalEntry() {
102106
@Test // DATAREDIS-481
103107
void putShouldAddExpiringEntry() {
104108

105-
nonLockingRedisCacheWriter(connectionFactory).put(CACHE_NAME, binaryCacheKey, binaryCacheValue,
109+
nonLockingRedisCacheWriter(connectionFactory).putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue,
106110
Duration.ofSeconds(1));
107111

108112
doWithConnection(connection -> {
@@ -119,6 +123,9 @@ void putShouldOverwriteExistingEternalEntry() {
119123
nonLockingRedisCacheWriter(connectionFactory).put(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO);
120124

121125
doWithConnection(connection -> {
126+
127+
await.until(() -> connection.ttl(binaryCacheKey) == -1);
128+
122129
assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue);
123130
assertThat(connection.ttl(binaryCacheKey)).isEqualTo(-1);
124131
});
@@ -134,6 +141,7 @@ void putShouldOverwriteExistingExpiringEntryAndResetTtl() {
134141
Duration.ofSeconds(5));
135142

136143
doWithConnection(connection -> {
144+
await.until(() -> connection.ttl(binaryCacheKey) < 50);
137145
assertThat(connection.get(binaryCacheKey)).isEqualTo(binaryCacheValue);
138146
assertThat(connection.ttl(binaryCacheKey)).isGreaterThan(3).isLessThan(6);
139147
});
@@ -274,7 +282,7 @@ void removeShouldDeleteEntry() throws InterruptedException {
274282

275283
if (writer.supportsAsyncRetrieve()) {
276284
doWithConnection(connection -> {
277-
Awaitility.await().until(() -> !connection.exists(binaryCacheKey));
285+
await.until(() -> !connection.exists(binaryCacheKey));
278286
});
279287
}
280288

@@ -313,7 +321,7 @@ void cleanShouldRemoveAllKeysByPattern() {
313321

314322
if (writer.supportsAsyncRetrieve()) {
315323
doWithConnection(connection -> {
316-
Awaitility.await().until(() -> !connection.exists(binaryCacheKey));
324+
await.until(() -> !connection.exists(binaryCacheKey));
317325
});
318326
}
319327

@@ -351,7 +359,8 @@ void nonLockingCacheWriterShouldIgnoreExistingLock() {
351359

352360
((DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory)).lock(CACHE_NAME);
353361

354-
nonLockingRedisCacheWriter(connectionFactory).put(CACHE_NAME, binaryCacheKey, binaryCacheValue, Duration.ZERO);
362+
nonLockingRedisCacheWriter(connectionFactory).putIfAbsent(CACHE_NAME, binaryCacheKey, binaryCacheValue,
363+
Duration.ZERO);
355364

356365
doWithConnection(connection -> {
357366
assertThat(connection.exists(binaryCacheKey)).isTrue();
@@ -367,6 +376,7 @@ void lockingCacheWriterShouldIgnoreExistingLockOnDifferenceCache() {
367376
Duration.ZERO);
368377

369378
doWithConnection(connection -> {
379+
await.until(() -> connection.exists(binaryCacheKey));
370380
assertThat(connection.exists(binaryCacheKey)).isTrue();
371381
});
372382
}
@@ -393,8 +403,6 @@ void lockingCacheWriterShouldWaitForLockRelease() throws InterruptedException {
393403

394404
beforeWrite.await();
395405

396-
Thread.sleep(200);
397-
398406
doWithConnection(connection -> {
399407
assertThat(connection.exists(binaryCacheKey)).isFalse();
400408
});
@@ -403,6 +411,7 @@ void lockingCacheWriterShouldWaitForLockRelease() throws InterruptedException {
403411
afterWrite.await();
404412

405413
doWithConnection(connection -> {
414+
await.until(() -> connection.exists(binaryCacheKey));
406415
assertThat(connection.exists(binaryCacheKey)).isTrue();
407416
});
408417

@@ -419,6 +428,8 @@ void lockingCacheWriterShouldExitWhenInterruptedWaitForLockRelease() throws Inte
419428
DefaultRedisCacheWriter cw = (DefaultRedisCacheWriter) lockingRedisCacheWriter(connectionFactory);
420429
cw.lock(CACHE_NAME);
421430

431+
Assumptions.assumeFalse(cw.supportsAsyncRetrieve());
432+
422433
CountDownLatch beforeWrite = new CountDownLatch(1);
423434
CountDownLatch afterWrite = new CountDownLatch(1);
424435
AtomicReference<Exception> exceptionRef = new AtomicReference<>();

src/test/java/org/springframework/data/redis/cache/LegacyRedisCacheTests.java

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@
2525
import java.util.ArrayList;
2626
import java.util.Arrays;
2727
import java.util.Collection;
28+
import java.util.List;
2829
import java.util.concurrent.Callable;
2930
import java.util.concurrent.CountDownLatch;
3031
import java.util.concurrent.atomic.AtomicBoolean;
3132

3233
import org.awaitility.Awaitility;
34+
import org.awaitility.core.ConditionFactory;
35+
import org.junit.jupiter.api.Assumptions;
3336
import org.junit.jupiter.api.Disabled;
3437
import org.junit.jupiter.api.Test;
3538
import org.junit.jupiter.params.ParameterizedClass;
@@ -58,9 +61,10 @@
5861
public class LegacyRedisCacheTests {
5962

6063
private static final String CACHE_NAME = "testCache";
61-
6264
private final boolean allowCacheNullValues;
6365

66+
static ConditionFactory await = Awaitility.with().pollDelay(Duration.ZERO).pollInSameThread();
67+
6468
private ObjectFactory<Object> keyFactory;
6569
private ObjectFactory<Object> valueFactory;
6670

@@ -80,9 +84,10 @@ public LegacyRedisCacheTests(RedisTemplate template, ObjectFactory<Object> keyFa
8084

8185
public static Collection<Object[]> testParams() {
8286

87+
System.out.println("tp");
8388
Collection<Object[]> params = AbstractOperationsTestParams.testParams();
8489

85-
Collection<Object[]> target = new ArrayList<>();
90+
List<Object[]> target = new ArrayList<>();
8691
for (Object[] source : params) {
8792

8893
Object[] cacheNullDisabled = Arrays.copyOf(source, source.length + 1);
@@ -127,7 +132,7 @@ void testCachePut() {
127132

128133
assertThat(value).isNotNull();
129134
assertThat(cache.get(key)).isNull();
130-
cache.put(key, value);
135+
putNow(key, value);
131136
ValueWrapper valueWrapper = cache.get(key);
132137
if (valueWrapper != null) {
133138
assertThat(valueWrapper.get()).isEqualTo(value);
@@ -149,8 +154,7 @@ void testCacheClear() {
149154
cache.put(key2, value2);
150155
cache.clear();
151156

152-
Awaitility.await().until(() -> cache.get(key1) == null);
153-
Awaitility.await().until(() -> cache.get(key2) == null);
157+
await.until(() -> cache.get(key1) == null && cache.get(key2) == null);
154158

155159
assertThat(cache.get(key2)).isNull();
156160
assertThat(cache.get(key1)).isNull();
@@ -178,7 +182,7 @@ void testConcurrentRead() throws Exception {
178182
Thread th = new Thread(() -> {
179183
cache.clear();
180184
cache.put(k1, v1);
181-
cache.put(k2, v2);
185+
putNow(k2, v2);
182186
failed.set(v1.equals(cache.get(k1)));
183187

184188
}, "concurrent-cache-access");
@@ -193,7 +197,7 @@ void testConcurrentRead() throws Exception {
193197
final Object value4 = getValue();
194198

195199
cache.put(key3, value3);
196-
cache.put(key4, value4);
200+
putNow(key4, value4);
197201

198202
assertThat(cache.get(key1)).isNull();
199203
assertThat(cache.get(key2)).isNull();
@@ -233,7 +237,7 @@ void testCacheGetShouldReturnCachedInstance() {
233237

234238
Object key = getKey();
235239
Object value = getValue();
236-
cache.put(key, value);
240+
putNow(key, value);
237241

238242
assertThat(value).isEqualTo(cache.get(key, Object.class));
239243
}
@@ -243,7 +247,7 @@ void testCacheGetShouldRetunInstanceOfCorrectType() {
243247

244248
Object key = getKey();
245249
Object value = getValue();
246-
cache.put(key, value);
250+
putNow(key, value);
247251

248252
assertThat(cache.get(key, value.getClass())).isInstanceOf(value.getClass());
249253
}
@@ -253,7 +257,7 @@ void testCacheGetShouldThrowExceptionOnInvalidType() {
253257

254258
Object key = getKey();
255259
Object value = getValue();
256-
cache.put(key, value);
260+
putNow(key, value);
257261

258262
assertThatIllegalStateException().isThrownBy(() -> cache.get(key, Cache.class));
259263
}
@@ -263,7 +267,7 @@ void testCacheGetShouldReturnNullIfNoCachedValueFound() {
263267

264268
Object key = getKey();
265269
Object value = getValue();
266-
cache.put(key, value);
270+
putNow(key, value);
267271

268272
Object invalidKey = "spring-data-redis".getBytes();
269273
assertThat(cache.get(invalidKey, value.getClass())).isNull();
@@ -305,7 +309,7 @@ void cachePutWithNullShouldErrorAndLeaveExistingKeyUntouched() {
305309
Object key = getKey();
306310
Object value = getValue();
307311

308-
cache.put(key, value);
312+
putNow(key, value);
309313

310314
assertThat(cache.get(key).get()).isEqualTo(value);
311315

@@ -330,15 +334,15 @@ void cachePutWithNullShouldAddStuffToRedisWhenCachingNullIsEnabled() {
330334
Object key = getKey();
331335
Object value = getValue();
332336

333-
cache.put(key, null);
337+
putNow(key, null);
334338

335339
assertThat(cache.get(key, String.class)).isNull();
336340
}
337341

338342
@Test // DATAREDIS-553
339343
void testCacheGetSynchronizedNullAllowingNull() {
340344

341-
assumeThat(allowCacheNullValues).as("Only suitable when cache does allow null values.").isTrue();
345+
Assumptions.assumeTrue(allowCacheNullValues, "Only suitable when cache does allow null values.");
342346

343347
Object key = getKey();
344348
Object value = cache.get(key, () -> null);
@@ -374,13 +378,26 @@ void testCacheGetSynchronizedNullWithStoredNull() {
374378
assumeThat(allowCacheNullValues).as("Only suitable when cache does allow null values").isTrue();
375379

376380
Object key = getKey();
377-
cache.put(key, null);
381+
putNow(key, null);
378382

379383
Object cachedValue = cache.get(key, () -> null);
380384

381385
assertThat(cachedValue).isNull();
382386
}
383387

388+
private void putNow(Object key, Object value) {
389+
putNow(cache, key, value);
390+
}
391+
392+
private void putNow(RedisCache cache, Object key, Object value) {
393+
394+
cache.put(key, value);
395+
396+
if (cache.getCacheWriter().supportsAsyncRetrieve()) {
397+
await.until(() -> cache.get(key) != null);
398+
}
399+
}
400+
384401
@SuppressWarnings("unused")
385402
private static class CacheGetWithValueLoaderIsThreadSafe extends MultithreadedTestCase {
386403

0 commit comments

Comments
 (0)