17
17
18
18
import reactor .core .publisher .Flux ;
19
19
import reactor .core .publisher .Mono ;
20
+ import reactor .core .publisher .SignalType ;
20
21
21
22
import java .nio .ByteBuffer ;
22
23
import java .nio .charset .StandardCharsets ;
23
24
import java .time .Duration ;
24
25
import java .util .concurrent .CompletableFuture ;
25
26
import java .util .concurrent .TimeUnit ;
26
27
import java .util .concurrent .atomic .AtomicLong ;
28
+ import java .util .function .Consumer ;
27
29
import java .util .function .Function ;
28
30
29
31
import org .springframework .dao .PessimisticLockingFailureException ;
37
39
import org .springframework .data .redis .util .ByteUtils ;
38
40
import org .springframework .lang .Nullable ;
39
41
import org .springframework .util .Assert ;
40
- import org .springframework .util .ClassUtils ;
41
- import org .springframework .util .ObjectUtils ;
42
42
43
43
/**
44
44
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
47
47
* <p>
48
48
* {@link DefaultRedisCacheWriter} can be used in
49
49
* {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
50
- * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
51
- * {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for
52
- * operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
53
- * command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
54
- * requests and potential command wait times.
50
+ * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking}
51
+ * aims for maximum performance it may result in overlapping, non-atomic, command execution for operations spanning
52
+ * multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap
53
+ * by setting an explicit lock key and checking against presence of this key which leads to additional requests
54
+ * and potential command wait times.
55
55
*
56
56
* @author Christoph Strobl
57
57
* @author Mark Paluch
61
61
*/
62
62
class DefaultRedisCacheWriter implements RedisCacheWriter {
63
63
64
- private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils
65
- .isPresent ("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory" , null );
64
+ private final AsyncCacheWriter asyncCacheWriter ;
66
65
67
66
private final BatchStrategy batchStrategy ;
68
67
@@ -74,33 +73,21 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
74
73
75
74
private final TtlFunction lockTtl ;
76
75
77
- private final AsyncCacheWriter asyncCacheWriter ;
78
-
79
- /**
80
- * @param connectionFactory must not be {@literal null}.
81
- * @param batchStrategy must not be {@literal null}.
82
- */
83
76
DefaultRedisCacheWriter (RedisConnectionFactory connectionFactory , BatchStrategy batchStrategy ) {
84
77
this (connectionFactory , Duration .ZERO , batchStrategy );
85
78
}
86
79
87
80
/**
88
- * @param connectionFactory must not be {@literal null}.
89
- * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
90
- * to disable locking.
91
- * @param batchStrategy must not be {@literal null}.
81
+ * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
82
+ * Use {@link Duration#ZERO} to disable locking.
92
83
*/
93
84
DefaultRedisCacheWriter (RedisConnectionFactory connectionFactory , Duration sleepTime , BatchStrategy batchStrategy ) {
94
85
this (connectionFactory , sleepTime , TtlFunction .persistent (), CacheStatisticsCollector .none (), batchStrategy );
95
86
}
96
87
97
88
/**
98
- * @param connectionFactory must not be {@literal null}.
99
- * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
100
- * to disable locking.
101
- * @param lockTtl Lock TTL function must not be {@literal null}.
102
- * @param cacheStatisticsCollector must not be {@literal null}.
103
- * @param batchStrategy must not be {@literal null}.
89
+ * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
90
+ * Use {@link Duration#ZERO} to disable locking.
104
91
*/
105
92
DefaultRedisCacheWriter (RedisConnectionFactory connectionFactory , Duration sleepTime , TtlFunction lockTtl ,
106
93
CacheStatisticsCollector cacheStatisticsCollector , BatchStrategy batchStrategy ) {
@@ -116,12 +103,12 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
116
103
this .lockTtl = lockTtl ;
117
104
this .statistics = cacheStatisticsCollector ;
118
105
this .batchStrategy = batchStrategy ;
106
+ this .asyncCacheWriter = isAsyncCacheSupportEnabled () ? new AsynchronousCacheWriterDelegate ()
107
+ : UnsupportedAsyncCacheWriter .INSTANCE ;
108
+ }
119
109
120
- if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this .connectionFactory instanceof ReactiveRedisConnectionFactory ) {
121
- asyncCacheWriter = new AsynchronousCacheWriterDelegate ();
122
- } else {
123
- asyncCacheWriter = UnsupportedAsyncCacheWriter .INSTANCE ;
124
- }
110
+ private boolean isAsyncCacheSupportEnabled () {
111
+ return this .connectionFactory instanceof ReactiveRedisConnectionFactory ;
125
112
}
126
113
127
114
@ Override
@@ -162,18 +149,19 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
162
149
Assert .notNull (key , "Key must not be null" );
163
150
164
151
return asyncCacheWriter .retrieve (name , key , ttl ) //
165
- .thenApply (cachedValue -> {
152
+ .thenApply (cachedValue -> {
166
153
167
- statistics .incGets (name );
154
+ statistics .incGets (name );
168
155
169
- if (cachedValue != null ) {
170
- statistics .incHits (name );
171
- } else {
172
- statistics .incMisses (name );
173
- }
156
+ if (cachedValue != null ) {
157
+ statistics .incHits (name );
158
+ }
159
+ else {
160
+ statistics .incMisses (name );
161
+ }
174
162
175
- return cachedValue ;
176
- });
163
+ return cachedValue ;
164
+ });
177
165
}
178
166
179
167
@ Override
@@ -186,8 +174,7 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
186
174
execute (name , connection -> {
187
175
188
176
if (shouldExpireWithin (ttl )) {
189
- connection .stringCommands ().set (key , value , Expiration .from (ttl .toMillis (), TimeUnit .MILLISECONDS ),
190
- SetOption .upsert ());
177
+ connection .stringCommands ().set (key , value , toExpiration (ttl ), SetOption .upsert ());
191
178
} else {
192
179
connection .stringCommands ().set (key , value );
193
180
}
@@ -224,16 +211,11 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat
224
211
225
212
try {
226
213
227
- boolean put ;
228
-
229
- if (shouldExpireWithin (ttl )) {
230
- put = ObjectUtils .nullSafeEquals (
231
- connection .stringCommands ().set (key , value , Expiration .from (ttl ), SetOption .ifAbsent ()), true );
232
- } else {
233
- put = ObjectUtils .nullSafeEquals (connection .stringCommands ().setNX (key , value ), true );
234
- }
214
+ Boolean wasSet = shouldExpireWithin (ttl )
215
+ ? connection .stringCommands ().set (key , value , Expiration .from (ttl ), SetOption .ifAbsent ())
216
+ : connection .stringCommands ().setNX (key , value );
235
217
236
- if (put ) {
218
+ if (Boolean . TRUE . equals ( wasSet ) ) {
237
219
statistics .incPuts (name );
238
220
return null ;
239
221
}
@@ -378,19 +360,18 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
378
360
Thread .sleep (this .sleepTime .toMillis ());
379
361
}
380
362
} catch (InterruptedException cause ) {
381
-
382
363
// Re-interrupt current Thread to allow other participants to react.
383
364
Thread .currentThread ().interrupt ();
384
-
385
- throw new PessimisticLockingFailureException (String .format ("Interrupted while waiting to unlock cache %s" , name ),
386
- cause );
365
+ String message = "Interrupted while waiting to unlock cache %s" .formatted (name );
366
+ throw new PessimisticLockingFailureException (message , cause );
387
367
} finally {
388
368
this .statistics .incLockTime (name , System .nanoTime () - lockWaitTimeNs );
389
369
}
390
370
}
391
371
392
372
boolean doCheckLock (String name , RedisConnection connection ) {
393
- return ObjectUtils .nullSafeEquals (connection .keyCommands ().exists (createCacheLockKey (name )), true );
373
+ Boolean cacheLockExists = connection .keyCommands ().exists (createCacheLockKey (name ));
374
+ return Boolean .TRUE .equals (cacheLockExists );
394
375
}
395
376
396
377
byte [] createCacheLockKey (String name ) {
@@ -401,6 +382,14 @@ private static boolean shouldExpireWithin(@Nullable Duration ttl) {
401
382
return ttl != null && !ttl .isZero () && !ttl .isNegative ();
402
383
}
403
384
385
+ private Expiration toExpiration (Duration ttl ) {
386
+ return Expiration .from (ttl .toMillis (), TimeUnit .MILLISECONDS );
387
+ }
388
+
389
+ private Expiration toExpiration (Object key , @ Nullable Object value ) {
390
+ return Expiration .from (this .lockTtl .getTimeToLive (key , value ));
391
+ }
392
+
404
393
/**
405
394
* Interface for asynchronous cache retrieval.
406
395
*
@@ -419,8 +408,8 @@ interface AsyncCacheWriter {
419
408
* @param name the cache name from which to retrieve the cache entry.
420
409
* @param key the cache entry key.
421
410
* @param ttl optional TTL to set for Time-to-Idle eviction.
422
- * @return a future that completes either with a value if the value exists or completing with {@code null} if the
423
- * cache does not contain an entry.
411
+ * @return a future that completes either with a value if the value exists or completing with {@code null}
412
+ * if the cache does not contain an entry.
424
413
*/
425
414
CompletableFuture <byte []> retrieve (String name , byte [] key , @ Nullable Duration ttl );
426
415
@@ -463,8 +452,8 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
463
452
}
464
453
465
454
/**
466
- * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using
467
- * {@link ReactiveRedisConnectionFactory}.
455
+ * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations
456
+ * using {@link ReactiveRedisConnectionFactory}.
468
457
*
469
458
* @since 3.2
470
459
*/
@@ -481,11 +470,13 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
481
470
return doWithConnection (connection -> {
482
471
483
472
ByteBuffer wrappedKey = ByteBuffer .wrap (key );
473
+
484
474
Mono <?> cacheLockCheck = isLockingCacheWriter () ? waitForLock (connection , name ) : Mono .empty ();
475
+
485
476
ReactiveStringCommands stringCommands = connection .stringCommands ();
486
477
487
478
Mono <ByteBuffer > get = shouldExpireWithin (ttl )
488
- ? stringCommands .getEx (wrappedKey , Expiration . from (ttl ))
479
+ ? stringCommands .getEx (wrappedKey , toExpiration (ttl ))
489
480
: stringCommands .get (wrappedKey );
490
481
491
482
return cacheLockCheck .then (get ).map (ByteUtils ::getBytes ).toFuture ();
@@ -498,41 +489,44 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
498
489
return doWithConnection (connection -> {
499
490
500
491
Mono <?> mono = isLockingCacheWriter ()
501
- ? doStoreWithLocking (name , key , value , ttl , connection )
492
+ ? doLockStoreUnlock (name , key , value , ttl , connection )
502
493
: doStore (key , value , ttl , connection );
503
494
504
495
return mono .then ().toFuture ();
505
496
});
506
497
}
507
498
508
- private Mono <Boolean > doStoreWithLocking (String name , byte [] key , byte [] value , @ Nullable Duration ttl ,
509
- ReactiveRedisConnection connection ) {
510
-
511
- return Mono .usingWhen (doLock (name , key , value , connection ), unused -> doStore (key , value , ttl , connection ),
512
- unused -> doUnlock (name , connection ));
513
- }
514
-
515
499
private Mono <Boolean > doStore (byte [] cacheKey , byte [] value , @ Nullable Duration ttl ,
516
500
ReactiveRedisConnection connection ) {
517
501
518
502
ByteBuffer wrappedKey = ByteBuffer .wrap (cacheKey );
519
503
ByteBuffer wrappedValue = ByteBuffer .wrap (value );
520
504
521
- if (shouldExpireWithin (ttl )) {
522
- return connection .stringCommands ().set (wrappedKey , wrappedValue ,
523
- Expiration .from (ttl .toMillis (), TimeUnit .MILLISECONDS ), SetOption .upsert ());
524
- } else {
525
- return connection .stringCommands ().set (wrappedKey , wrappedValue );
526
- }
505
+ ReactiveStringCommands stringCommands = connection .stringCommands ();
506
+
507
+ return shouldExpireWithin (ttl )
508
+ ? stringCommands .set (wrappedKey , wrappedValue , toExpiration (ttl ), SetOption .upsert ())
509
+ : stringCommands .set (wrappedKey , wrappedValue );
527
510
}
528
511
512
+ private Mono <Boolean > doLockStoreUnlock (String name , byte [] key , byte [] value , @ Nullable Duration ttl ,
513
+ ReactiveRedisConnection connection ) {
514
+
515
+ Mono <Object > lock = doLock (name , key , value , connection );
516
+
517
+ Function <Object , Mono <Boolean >> store = unused -> doStore (key , value , ttl , connection );
518
+ Function <Object , Mono <Void >> unlock = unused -> doUnlock (name , connection );
519
+
520
+ return Mono .usingWhen (lock , store , unlock );
521
+ }
529
522
530
523
private Mono <Object > doLock (String name , Object contextualKey , @ Nullable Object contextualValue ,
531
524
ReactiveRedisConnection connection ) {
532
525
533
526
ByteBuffer key = ByteBuffer .wrap (createCacheLockKey (name ));
534
527
ByteBuffer value = ByteBuffer .wrap (new byte [0 ]);
535
- Expiration expiration = Expiration .from (lockTtl .getTimeToLive (contextualKey , contextualValue ));
528
+
529
+ Expiration expiration = toExpiration (contextualKey , contextualValue );
536
530
537
531
return connection .stringCommands ().set (key , value , expiration , SetOption .SET_IF_ABSENT ) //
538
532
// Ensure we emit an object, otherwise, the Mono.usingWhen operator doesn't run the inner resource function.
@@ -545,28 +539,52 @@ private Mono<Void> doUnlock(String name, ReactiveRedisConnection connection) {
545
539
546
540
private Mono <Void > waitForLock (ReactiveRedisConnection connection , String cacheName ) {
547
541
548
- AtomicLong lockWaitTimeNs = new AtomicLong ();
549
- byte [] cacheLockKey = createCacheLockKey (cacheName );
542
+ AtomicLong lockWaitNanoTime = new AtomicLong ();
543
+
544
+ Consumer <org .reactivestreams .Subscription > setNanoTimeOnLockWait = subscription ->
545
+ lockWaitNanoTime .set (System .nanoTime ());
550
546
551
- Flux < Long > wait = Flux . interval ( Duration . ZERO , sleepTime );
552
- Mono < Boolean > exists = connection . keyCommands (). exists ( ByteBuffer . wrap ( cacheLockKey )). filter ( it -> ! it );
547
+ Consumer < SignalType > recordStatistics = signalType ->
548
+ statistics . incLockTime ( cacheName , System . nanoTime () - lockWaitNanoTime . get () );
553
549
554
- return wait .doOnSubscribe (subscription -> lockWaitTimeNs .set (System .nanoTime ())) //
555
- .flatMap (it -> exists ) //
556
- .doFinally (signalType -> statistics .incLockTime (cacheName , System .nanoTime () - lockWaitTimeNs .get ())) //
550
+ Function <Long , Mono <Boolean >> doWhileCacheLockExists = lockWaitTime -> connection .keyCommands ()
551
+ .exists (toCacheLockKey (cacheName )).filter (cacheLockKeyExists -> !cacheLockKeyExists );
552
+
553
+ return waitInterval (sleepTime ) //
554
+ .doOnSubscribe (setNanoTimeOnLockWait ) //
555
+ .flatMap (doWhileCacheLockExists ) //
556
+ .doFinally (recordStatistics ) //
557
557
.next () //
558
558
.then ();
559
559
}
560
560
561
+ private Flux <Long > waitInterval (Duration period ) {
562
+ return Flux .interval (Duration .ZERO , period );
563
+ }
564
+
565
+ private ByteBuffer toCacheLockKey (String cacheName ) {
566
+ return ByteBuffer .wrap (createCacheLockKey (cacheName ));
567
+ }
568
+
569
+ private ReactiveRedisConnectionFactory getReactiveConnectionFactory () {
570
+ return (ReactiveRedisConnectionFactory ) DefaultRedisCacheWriter .this .connectionFactory ;
571
+ }
572
+
573
+ private Mono <ReactiveRedisConnection > getReactiveConnection () {
574
+ return Mono .fromSupplier (getReactiveConnectionFactory ()::getReactiveConnection );
575
+ }
576
+
561
577
private <T > CompletableFuture <T > doWithConnection (
562
578
Function <ReactiveRedisConnection , CompletableFuture <T >> callback ) {
563
579
564
- ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory ) connectionFactory ;
580
+ Function <ReactiveRedisConnection , Mono <T >> commandExecution = connection ->
581
+ Mono .fromCompletionStage (callback .apply (connection ));
582
+
583
+ Function <ReactiveRedisConnection , Mono <Void >> connectionClose = ReactiveRedisConnection ::closeLater ;
584
+
585
+ Mono <T > result = Mono .usingWhen (getReactiveConnection (), commandExecution , connectionClose );
565
586
566
- return Mono .usingWhen (Mono .fromSupplier (cf ::getReactiveConnection ), //
567
- it -> Mono .fromCompletionStage (callback .apply (it )), //
568
- ReactiveRedisConnection ::closeLater ) //
569
- .toFuture ();
587
+ return result .toFuture ();
570
588
}
571
589
}
572
590
}
0 commit comments