From d0248b4c06974f23cd8e6a4c06f72e94aa5ce6eb Mon Sep 17 00:00:00 2001 From: joongsoo Date: Sat, 21 Mar 2020 20:52:23 +0900 Subject: [PATCH 1/3] DATAREDIS-1117 - Improve doLock method to atomic. Related tickets: DATAREDIS-1117. --- .../redis/cache/DefaultRedisCacheWriter.java | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index 698f147788..1f2bd0829a 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -45,6 +45,7 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Joongsoo Park * @since 2.0 */ class DefaultRedisCacheWriter implements RedisCacheWriter { @@ -170,13 +171,10 @@ public void clean(String name, byte[] pattern) { execute(name, connection -> { - boolean wasLocked = false; - try { if (isLockingCacheWriter()) { doLock(name, connection); - wasLocked = true; } byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet()) @@ -187,7 +185,7 @@ public void clean(String name, byte[] pattern) { } } finally { - if (wasLocked && isLockingCacheWriter()) { + if (isLockingCacheWriter()) { doUnlock(name, connection); } } @@ -202,7 +200,12 @@ public void clean(String name, byte[] pattern) { * @param name the name of the cache to lock. */ void lock(String name) { - execute(name, connection -> doLock(name, connection)); + + execute(name, connection -> { + + doLock(name, connection); + return null; + }); } /** @@ -214,8 +217,25 @@ void unlock(String name) { executeLockFree(connection -> doUnlock(name, connection)); } - private Boolean doLock(String name, RedisConnection connection) { - return connection.setNX(createCacheLockKey(name), new byte[0]); + private void doLock(String name, RedisConnection connection) { + + if (!isLockingCacheWriter()) { + return; + } + + try { + + while (!connection.setNX(createCacheLockKey(name), new byte[0])) { + Thread.sleep(sleepTime.toMillis()); + } + } catch (InterruptedException ex) { + + // Re-interrupt current thread, to allow other participants to react. + Thread.currentThread().interrupt(); + + throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to acquire lock cache %s", name), + ex); + } } private Long doUnlock(String name, RedisConnection connection) { From 176782e57cca62c99f530122c5f749220ab97eab Mon Sep 17 00:00:00 2001 From: joongsoo Date: Sun, 25 Oct 2020 01:08:26 +0900 Subject: [PATCH 2/3] DATAREDIS-1117 - Improve doLock method to atomic. Related tickets: DATAREDIS-1117. --- .../redis/cache/DefaultRedisCacheWriter.java | 120 +++++++----------- .../cache/DefaultRedisCacheWriterTests.java | 7 +- 2 files changed, 48 insertions(+), 79 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index 953ef44fac..38e41af515 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -150,32 +150,20 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat return execute(name, connection -> { - if (isLockingCacheWriter()) { - doLock(name, connection); - } - - try { - - boolean put; - - if (shouldExpireWithin(ttl)) { - put = connection.set(key, value, Expiration.from(ttl), SetOption.ifAbsent()); - } else { - put = connection.setNX(key, value); - } - - if (put) { - statistics.incPuts(name); - return null; - } + boolean put; - return connection.get(key); - } finally { + if (shouldExpireWithin(ttl)) { + put = connection.set(key, value, Expiration.from(ttl), SetOption.ifAbsent()); + } else { + put = connection.setNX(key, value); + } - if (isLockingCacheWriter()) { - doUnlock(name, connection); - } + if (put) { + statistics.incPuts(name); + return null; } + + return connection.get(key); }); } @@ -205,24 +193,12 @@ public void clean(String name, byte[] pattern) { execute(name, connection -> { - try { - - if (isLockingCacheWriter()) { - doLock(name, connection); - } - - byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet()) - .toArray(new byte[0][]); - - if (keys.length > 0) { - statistics.incDeletesBy(name, keys.length); - connection.del(keys); - } - } finally { + byte[][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet()) + .toArray(new byte[0][]); - if (isLockingCacheWriter()) { - doUnlock(name, connection); - } + if (keys.length > 0) { + statistics.incDeletesBy(name, keys.length); + connection.del(keys); } return "OK"; @@ -263,11 +239,7 @@ public RedisCacheWriter withStatisticsCollector(CacheStatisticsCollector cacheSt */ void lock(String name) { - execute(name, connection -> { - - doLock(name, connection); - return null; - }); + executeLockFree(connection -> doLock(name, connection)); } /** @@ -279,15 +251,26 @@ void unlock(String name) { executeLockFree(connection -> doUnlock(name, connection)); } + /** + * Explicitly try set a write lock on a cache. + * + * @param name the name of the cache to unlock. + * @param connection must not be {@literal null}. + */ + boolean tryLock(String name, RedisConnection connection) { + return connection.setNX(createCacheLockKey(name), new byte[0]); + } + private void doLock(String name, RedisConnection connection) { if (!isLockingCacheWriter()) { return; } + long lockWaitTimeNs = System.nanoTime(); try { - while (!connection.setNX(createCacheLockKey(name), new byte[0])) { + while (!tryLock(name, connection)) { Thread.sleep(sleepTime.toMillis()); } } catch (InterruptedException ex) { @@ -297,6 +280,8 @@ private void doLock(String name, RedisConnection connection) { throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to acquire lock cache %s", name), ex); + } finally { + statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs); } } @@ -304,10 +289,6 @@ private Long doUnlock(String name, RedisConnection connection) { return connection.del(createCacheLockKey(name)); } - boolean doCheckLock(String name, RedisConnection connection) { - return connection.exists(createCacheLockKey(name)); - } - /** * @return {@literal true} if {@link RedisCacheWriter} uses locks. */ @@ -320,8 +301,19 @@ private T execute(String name, Function callback) { RedisConnection connection = connectionFactory.getConnection(); try { - checkAndPotentiallyWaitUntilUnlocked(name, connection); - return callback.apply(connection); + try { + + if (isLockingCacheWriter()) { + doLock(name, connection); + } + + return callback.apply(connection); + } finally { + + if (isLockingCacheWriter()) { + doUnlock(name, connection); + } + } } finally { connection.close(); } @@ -338,30 +330,6 @@ private void executeLockFree(Consumer callback) { } } - private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection connection) { - - if (!isLockingCacheWriter()) { - return; - } - - long lockWaitTimeNs = System.nanoTime(); - try { - - while (doCheckLock(name, connection)) { - Thread.sleep(sleepTime.toMillis()); - } - } catch (InterruptedException ex) { - - // Re-interrupt current thread, to allow other participants to react. - Thread.currentThread().interrupt(); - - throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name), - ex); - } finally { - statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs); - } - } - private static boolean shouldExpireWithin(@Nullable Duration ttl) { return ttl != null && !ttl.isZero() && !ttl.isNegative(); } diff --git a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java index c8b8da8338..1d2a784aff 100644 --- a/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java +++ b/src/test/java/org/springframework/data/redis/cache/DefaultRedisCacheWriterTests.java @@ -45,6 +45,7 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Joongsoo Park */ @RunWith(Parameterized.class) public class DefaultRedisCacheWriterTests { @@ -320,9 +321,9 @@ public void lockingCacheWriterShouldExitWhenInterruptedWaitForLockRelease() thro DefaultRedisCacheWriter writer = new DefaultRedisCacheWriter(connectionFactory, Duration.ofMillis(50)) { @Override - boolean doCheckLock(String name, RedisConnection connection) { + boolean tryLock(String name, RedisConnection connection) { beforeWrite.countDown(); - return super.doCheckLock(name, connection); + return super.tryLock(name, connection); } }; @@ -342,7 +343,7 @@ boolean doCheckLock(String name, RedisConnection connection) { afterWrite.await(); - assertThat(exceptionRef.get()).hasMessageContaining("Interrupted while waiting to unlock") + assertThat(exceptionRef.get()).hasMessageContaining("Interrupted while waiting to acquire lock cache") .hasCauseInstanceOf(InterruptedException.class); } From 4b667493d8d40f917c64ef47b365c36ca0d62e92 Mon Sep 17 00:00:00 2001 From: joongsoo Date: Sun, 25 Oct 2020 01:14:53 +0900 Subject: [PATCH 3/3] DATAREDIS-1117 - Improve doLock method to atomic. Related tickets: DATAREDIS-1117. --- .../data/redis/cache/DefaultRedisCacheWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java index 38e41af515..2f121ab748 100644 --- a/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java +++ b/src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java @@ -254,7 +254,7 @@ void unlock(String name) { /** * Explicitly try set a write lock on a cache. * - * @param name the name of the cache to unlock. + * @param name the name of the cache to lock. * @param connection must not be {@literal null}. */ boolean tryLock(String name, RedisConnection connection) {