17
17
18
18
import reactor .core .publisher .Flux ;
19
19
import reactor .core .publisher .Mono ;
20
+ import reactor .core .publisher .SignalType ;
20
21
21
22
import java .nio .ByteBuffer ;
22
23
import java .nio .charset .StandardCharsets ;
23
24
import java .time .Duration ;
24
25
import java .util .concurrent .CompletableFuture ;
25
26
import java .util .concurrent .TimeUnit ;
26
27
import java .util .concurrent .atomic .AtomicLong ;
28
+ import java .util .function .Consumer ;
27
29
import java .util .function .Function ;
28
30
29
31
import org .springframework .dao .PessimisticLockingFailureException ;
38
40
import org .springframework .lang .Nullable ;
39
41
import org .springframework .util .Assert ;
40
42
import org .springframework .util .ClassUtils ;
41
- import org . springframework . util . ObjectUtils ;
43
+
42
44
43
45
/**
44
46
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
47
49
* <p>
48
50
* {@link DefaultRedisCacheWriter} can be used in
49
51
* {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
50
- * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
51
- * {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for
52
- * operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
53
- * command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
54
- * requests and potential command wait times.
52
+ * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking}
53
+ * aims for maximum performance it may result in overlapping, non-atomic, command execution for operations spanning
54
+ * multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap
55
+ * by setting an explicit lock key and checking against presence of this key which leads to additional requests
56
+ * and potential command wait times.
55
57
*
56
58
* @author Christoph Strobl
57
59
* @author Mark Paluch
61
63
*/
62
64
class DefaultRedisCacheWriter implements RedisCacheWriter {
63
65
64
- private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils
65
- .isPresent ("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory" , null );
66
+ public static final boolean FLUX_PRESENT = ClassUtils .isPresent ("reactor.core.publisher.Flux" , null );
67
+
68
+ private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT =
69
+ ClassUtils .isPresent ("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory" , null );
70
+
71
+ private final AsyncCacheWriter asyncCacheWriter ;
66
72
67
73
private final BatchStrategy batchStrategy ;
68
74
@@ -74,8 +80,6 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
74
80
75
81
private final TtlFunction lockTtl ;
76
82
77
- private final AsyncCacheWriter asyncCacheWriter ;
78
-
79
83
/**
80
84
* @param connectionFactory must not be {@literal null}.
81
85
* @param batchStrategy must not be {@literal null}.
@@ -86,8 +90,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
86
90
87
91
/**
88
92
* @param connectionFactory must not be {@literal null}.
89
- * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
90
- * to disable locking.
93
+ * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
94
+ * Use {@link Duration#ZERO} to disable locking.
91
95
* @param batchStrategy must not be {@literal null}.
92
96
*/
93
97
DefaultRedisCacheWriter (RedisConnectionFactory connectionFactory , Duration sleepTime , BatchStrategy batchStrategy ) {
@@ -96,8 +100,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
96
100
97
101
/**
98
102
* @param connectionFactory must not be {@literal null}.
99
- * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
100
- * to disable locking.
103
+ * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
104
+ * Use {@link Duration#ZERO} to disable locking.
101
105
* @param lockTtl Lock TTL function must not be {@literal null}.
102
106
* @param cacheStatisticsCollector must not be {@literal null}.
103
107
* @param batchStrategy must not be {@literal null}.
@@ -116,12 +120,13 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
116
120
this .lockTtl = lockTtl ;
117
121
this .statistics = cacheStatisticsCollector ;
118
122
this .batchStrategy = batchStrategy ;
123
+ this .asyncCacheWriter = isAsyncCacheSupportEnabled () ? new AsynchronousCacheWriterDelegate ()
124
+ : UnsupportedAsyncCacheWriter .INSTANCE ;
125
+ }
119
126
120
- if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this .connectionFactory instanceof ReactiveRedisConnectionFactory ) {
121
- asyncCacheWriter = new AsynchronousCacheWriterDelegate ();
122
- } else {
123
- asyncCacheWriter = UnsupportedAsyncCacheWriter .INSTANCE ;
124
- }
127
+ private boolean isAsyncCacheSupportEnabled () {
128
+ return REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && FLUX_PRESENT
129
+ && this .connectionFactory instanceof ReactiveRedisConnectionFactory ;
125
130
}
126
131
127
132
@ Override
@@ -168,7 +173,8 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
168
173
169
174
if (cachedValue != null ) {
170
175
statistics .incHits (name );
171
- } else {
176
+ }
177
+ else {
172
178
statistics .incMisses (name );
173
179
}
174
180
@@ -186,8 +192,7 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
186
192
execute (name , connection -> {
187
193
188
194
if (shouldExpireWithin (ttl )) {
189
- connection .stringCommands ().set (key , value , Expiration .from (ttl .toMillis (), TimeUnit .MILLISECONDS ),
190
- SetOption .upsert ());
195
+ connection .stringCommands ().set (key , value , toExpiration (ttl ), SetOption .upsert ());
191
196
} else {
192
197
connection .stringCommands ().set (key , value );
193
198
}
@@ -224,16 +229,11 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat
224
229
225
230
try {
226
231
227
- boolean put ;
228
-
229
- if (shouldExpireWithin (ttl )) {
230
- put = ObjectUtils .nullSafeEquals (
231
- connection .stringCommands ().set (key , value , Expiration .from (ttl ), SetOption .ifAbsent ()), true );
232
- } else {
233
- put = ObjectUtils .nullSafeEquals (connection .stringCommands ().setNX (key , value ), true );
234
- }
232
+ Boolean wasSet = shouldExpireWithin (ttl )
233
+ ? connection .stringCommands ().set (key , value , Expiration .from (ttl ), SetOption .ifAbsent ())
234
+ : connection .stringCommands ().setNX (key , value );
235
235
236
- if (put ) {
236
+ if (Boolean . TRUE . equals ( wasSet ) ) {
237
237
statistics .incPuts (name );
238
238
return null ;
239
239
}
@@ -322,9 +322,11 @@ void lock(String name) {
322
322
private Boolean doLock (String name , Object contextualKey , @ Nullable Object contextualValue ,
323
323
RedisConnection connection ) {
324
324
325
- Expiration expiration = Expiration . from ( this . lockTtl . getTimeToLive ( contextualKey , contextualValue ) );
325
+ byte [] cacheLockKey = createCacheLockKey ( name );
326
326
327
- return connection .stringCommands ().set (createCacheLockKey (name ), new byte [0 ], expiration , SetOption .SET_IF_ABSENT );
327
+ Expiration expiration = toExpiration (contextualKey , contextualValue );
328
+
329
+ return connection .stringCommands ().set (cacheLockKey , new byte [0 ], expiration , SetOption .SET_IF_ABSENT );
328
330
}
329
331
330
332
/**
@@ -378,29 +380,40 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
378
380
Thread .sleep (this .sleepTime .toMillis ());
379
381
}
380
382
} catch (InterruptedException cause ) {
381
-
382
383
// Re-interrupt current Thread to allow other participants to react.
383
384
Thread .currentThread ().interrupt ();
384
-
385
- throw new PessimisticLockingFailureException (String .format ("Interrupted while waiting to unlock cache %s" , name ),
386
- cause );
385
+ String message = "Interrupted while waiting to unlock cache %s" .formatted (name );
386
+ throw new PessimisticLockingFailureException (message , cause );
387
387
} finally {
388
388
this .statistics .incLockTime (name , System .nanoTime () - lockWaitTimeNs );
389
389
}
390
390
}
391
391
392
392
boolean doCheckLock (String name , RedisConnection connection ) {
393
- return ObjectUtils .nullSafeEquals (connection .keyCommands ().exists (createCacheLockKey (name )), true );
393
+ Boolean cacheLockExists = connection .keyCommands ().exists (createCacheLockKey (name ));
394
+ return Boolean .TRUE .equals (cacheLockExists );
394
395
}
395
396
396
397
byte [] createCacheLockKey (String name ) {
397
398
return (name + "~lock" ).getBytes (StandardCharsets .UTF_8 );
398
399
}
399
400
401
+ private ReactiveRedisConnectionFactory getReactiveConnectionFactory () {
402
+ return (ReactiveRedisConnectionFactory ) this .connectionFactory ;
403
+ }
404
+
400
405
private static boolean shouldExpireWithin (@ Nullable Duration ttl ) {
401
406
return ttl != null && !ttl .isZero () && !ttl .isNegative ();
402
407
}
403
408
409
+ private Expiration toExpiration (Duration ttl ) {
410
+ return Expiration .from (ttl .toMillis (), TimeUnit .MILLISECONDS );
411
+ }
412
+
413
+ private Expiration toExpiration (Object key , @ Nullable Object value ) {
414
+ return Expiration .from (this .lockTtl .getTimeToLive (key , value ));
415
+ }
416
+
404
417
/**
405
418
* Interface for asynchronous cache retrieval.
406
419
*
@@ -419,8 +432,8 @@ interface AsyncCacheWriter {
419
432
* @param name the cache name from which to retrieve the cache entry.
420
433
* @param key the cache entry key.
421
434
* @param ttl optional TTL to set for Time-to-Idle eviction.
422
- * @return a future that completes either with a value if the value exists or completing with {@code null} if the
423
- * cache does not contain an entry.
435
+ * @return a future that completes either with a value if the value exists or completing with {@code null}
436
+ * if the cache does not contain an entry.
424
437
*/
425
438
CompletableFuture <byte []> retrieve (String name , byte [] key , @ Nullable Duration ttl );
426
439
@@ -463,8 +476,8 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
463
476
}
464
477
465
478
/**
466
- * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using
467
- * {@link ReactiveRedisConnectionFactory}.
479
+ * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations
480
+ * using {@link ReactiveRedisConnectionFactory}.
468
481
*
469
482
* @since 3.2
470
483
*/
@@ -481,11 +494,13 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
481
494
return doWithConnection (connection -> {
482
495
483
496
ByteBuffer wrappedKey = ByteBuffer .wrap (key );
497
+
484
498
Mono <?> cacheLockCheck = isLockingCacheWriter () ? waitForLock (connection , name ) : Mono .empty ();
499
+
485
500
ReactiveStringCommands stringCommands = connection .stringCommands ();
486
501
487
502
Mono <ByteBuffer > get = shouldExpireWithin (ttl )
488
- ? stringCommands .getEx (wrappedKey , Expiration . from (ttl ))
503
+ ? stringCommands .getEx (wrappedKey , toExpiration (ttl ))
489
504
: stringCommands .get (wrappedKey );
490
505
491
506
return cacheLockCheck .then (get ).map (ByteUtils ::getBytes ).toFuture ();
@@ -498,75 +513,97 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
498
513
return doWithConnection (connection -> {
499
514
500
515
Mono <?> mono = isLockingCacheWriter ()
501
- ? doStoreWithLocking (name , key , value , ttl , connection )
516
+ ? doLockStoreUnlock (name , key , value , ttl , connection )
502
517
: doStore (key , value , ttl , connection );
503
518
504
519
return mono .then ().toFuture ();
505
520
});
506
521
}
507
522
508
- private Mono <Boolean > doStoreWithLocking (String name , byte [] key , byte [] value , @ Nullable Duration ttl ,
509
- ReactiveRedisConnection connection ) {
510
-
511
- return Mono .usingWhen (doLock (name , key , value , connection ), unused -> doStore (key , value , ttl , connection ),
512
- unused -> doUnlock (name , connection ));
513
- }
514
-
515
523
private Mono <Boolean > doStore (byte [] cacheKey , byte [] value , @ Nullable Duration ttl ,
516
524
ReactiveRedisConnection connection ) {
517
525
518
526
ByteBuffer wrappedKey = ByteBuffer .wrap (cacheKey );
519
527
ByteBuffer wrappedValue = ByteBuffer .wrap (value );
520
528
521
- if (shouldExpireWithin (ttl )) {
522
- return connection .stringCommands ().set (wrappedKey , wrappedValue ,
523
- Expiration .from (ttl .toMillis (), TimeUnit .MILLISECONDS ), SetOption .upsert ());
524
- } else {
525
- return connection .stringCommands ().set (wrappedKey , wrappedValue );
526
- }
529
+ ReactiveStringCommands stringCommands = connection .stringCommands ();
530
+
531
+ return shouldExpireWithin (ttl )
532
+ ? stringCommands .set (wrappedKey , wrappedValue , toExpiration (ttl ), SetOption .upsert ())
533
+ : stringCommands .set (wrappedKey , wrappedValue );
527
534
}
528
535
536
+ private Mono <Boolean > doLockStoreUnlock (String name , byte [] key , byte [] value , @ Nullable Duration ttl ,
537
+ ReactiveRedisConnection connection ) {
538
+
539
+ Mono <Object > lock = doLock (name , key , value , connection );
540
+
541
+ Function <Object , Mono <Boolean >> store = unused -> doStore (key , value , ttl , connection );
542
+ Function <Object , Mono <Void >> unlock = unused -> doUnlock (name , connection );
543
+
544
+ return Mono .usingWhen (lock , store , unlock );
545
+ }
529
546
530
547
private Mono <Object > doLock (String name , Object contextualKey , @ Nullable Object contextualValue ,
531
548
ReactiveRedisConnection connection ) {
532
549
533
- ByteBuffer key = ByteBuffer . wrap ( createCacheLockKey ( name ) );
550
+ ByteBuffer key = toCacheLockKey ( name );
534
551
ByteBuffer value = ByteBuffer .wrap (new byte [0 ]);
535
- Expiration expiration = Expiration .from (lockTtl .getTimeToLive (contextualKey , contextualValue ));
552
+
553
+ Expiration expiration = toExpiration (contextualKey , contextualValue );
536
554
537
555
return connection .stringCommands ().set (key , value , expiration , SetOption .SET_IF_ABSENT ) //
538
556
// Ensure we emit an object, otherwise, the Mono.usingWhen operator doesn't run the inner resource function.
539
557
.thenReturn (Boolean .TRUE );
540
558
}
541
559
542
560
private Mono <Void > doUnlock (String name , ReactiveRedisConnection connection ) {
543
- return connection .keyCommands ().del (ByteBuffer . wrap ( createCacheLockKey ( name ) )).then ();
561
+ return connection .keyCommands ().del (toCacheLockKey ( name )).then ();
544
562
}
545
563
546
564
private Mono <Void > waitForLock (ReactiveRedisConnection connection , String cacheName ) {
547
565
548
- AtomicLong lockWaitTimeNs = new AtomicLong ();
549
- byte [] cacheLockKey = createCacheLockKey (cacheName );
566
+ AtomicLong lockWaitNanoTime = new AtomicLong ();
567
+
568
+ Consumer <org .reactivestreams .Subscription > setNanoTimeOnLockWait = subscription ->
569
+ lockWaitNanoTime .set (System .nanoTime ());
550
570
551
- Flux < Long > wait = Flux . interval ( Duration . ZERO , sleepTime );
552
- Mono < Boolean > exists = connection . keyCommands (). exists ( ByteBuffer . wrap ( cacheLockKey )). filter ( it -> ! it );
571
+ Consumer < SignalType > recordStatistics = signalType ->
572
+ statistics . incLockTime ( cacheName , System . nanoTime () - lockWaitNanoTime . get () );
553
573
554
- return wait .doOnSubscribe (subscription -> lockWaitTimeNs .set (System .nanoTime ())) //
555
- .flatMap (it -> exists ) //
556
- .doFinally (signalType -> statistics .incLockTime (cacheName , System .nanoTime () - lockWaitTimeNs .get ())) //
574
+ Function <Long , Mono <Boolean >> doWhileCacheLockExists = lockWaitTime -> connection .keyCommands ()
575
+ .exists (toCacheLockKey (cacheName )).filter (cacheLockKeyExists -> !cacheLockKeyExists );
576
+
577
+ return waitInterval (sleepTime ) //
578
+ .doOnSubscribe (setNanoTimeOnLockWait ) //
579
+ .flatMap (doWhileCacheLockExists ) //
580
+ .doFinally (recordStatistics ) //
557
581
.next () //
558
582
.then ();
559
583
}
560
584
585
+ private Flux <Long > waitInterval (Duration period ) {
586
+ return Flux .interval (Duration .ZERO , period );
587
+ }
588
+
589
+ private ByteBuffer toCacheLockKey (String cacheName ) {
590
+ return ByteBuffer .wrap (createCacheLockKey (cacheName ));
591
+ }
592
+
561
593
private <T > CompletableFuture <T > doWithConnection (
562
594
Function <ReactiveRedisConnection , CompletableFuture <T >> callback ) {
563
595
564
- ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory ) connectionFactory ;
596
+ Mono <ReactiveRedisConnection > reactiveConnection =
597
+ Mono .fromSupplier (getReactiveConnectionFactory ()::getReactiveConnection );
598
+
599
+ Function <ReactiveRedisConnection , Mono <T >> commandExecution = connection ->
600
+ Mono .fromCompletionStage (callback .apply (connection ));
601
+
602
+ Function <ReactiveRedisConnection , Mono <Void >> connectionClose = ReactiveRedisConnection ::closeLater ;
603
+
604
+ Mono <T > result = Mono .usingWhen (reactiveConnection , commandExecution , connectionClose );
565
605
566
- return Mono .usingWhen (Mono .fromSupplier (cf ::getReactiveConnection ), //
567
- it -> Mono .fromCompletionStage (callback .apply (it )), //
568
- ReactiveRedisConnection ::closeLater ) //
569
- .toFuture ();
606
+ return result .toFuture ();
570
607
}
571
608
}
572
609
}
0 commit comments