From 6b657b5120a351034a2db7b6175ab5d4fe6824c4 Mon Sep 17 00:00:00 2001 From: Jeonggyu Choi <97666463+whatasame@users.noreply.github.com> Date: Sat, 8 Mar 2025 16:47:53 +0900 Subject: [PATCH 1/6] Add IDLE argument to XPENDING command. Original pull request #3116 Closes #2046 Signed-off-by: Jeonggyu Choi <97666463+whatasame@users.noreply.github.com> --- .../DefaultStringRedisConnection.java | 15 + .../connection/ReactiveStreamCommands.java | 98 +++++- .../redis/connection/RedisStreamCommands.java | 113 ++++++- .../connection/StringRedisConnection.java | 107 +++++++ .../jedis/JedisClusterStreamCommands.java | 5 + .../connection/jedis/StreamConverters.java | 4 + .../LettuceReactiveStreamCommands.java | 16 +- .../lettuce/LettuceStreamCommands.java | 16 +- .../redis/core/DefaultStreamOperations.java | 16 + .../data/redis/core/StreamOperations.java | 32 ++ .../AbstractConnectionIntegrationTests.java | 278 +++++++++++++++++- .../ReactiveStreamCommandsUnitTests.java | 11 + .../RedisStreamCommandsUnitTests.java | 8 + .../jedis/StreamConvertersUnitTest.java | 30 ++ ...eactiveStreamCommandsIntegrationTests.java | 144 +++++++++ 15 files changed, 862 insertions(+), 31 deletions(-) create mode 100644 src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java index 831a46ece2..cf687e647a 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java @@ -82,6 +82,7 @@ * @author ihaohong * @author Dennis Neufeld * @author Shyngys Sapraliyev + * @author Jeonggyu Choi */ @SuppressWarnings({ "ConstantConditions", "deprecation" }) public class DefaultStringRedisConnection implements StringRedisConnection, DecoratedRedisConnection { @@ -2968,12 +2969,26 @@ public PendingMessages xPending(String key, String groupName, String consumer, Converters.identityConverter()); } + @Override + public PendingMessages xPending(String key, String groupName, String consumerName, + org.springframework.data.domain.Range range, Long count, Duration idle) { + return convertAndReturn(delegate.xPending(serialize(key), groupName, consumerName, range, count, idle), + Converters.identityConverter()); + } + @Override public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, Long count) { return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter()); } + @Override + public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, + Long count, Duration idle) { + return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, idle), + Converters.identityConverter()); + } + @Override public PendingMessages xPending(String key, String groupName, XPendingOptions options) { return convertAndReturn(delegate.xPending(serialize(key), groupName, options), Converters.identityConverter()); diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index 2860dc691c..e59370e933 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -60,6 +60,7 @@ * @author Dengliming * @author Mark John Moreno * @author jinkshower + * @author Jeonggyu Choi * @since 2.2 */ public interface ReactiveStreamCommands { @@ -747,6 +748,25 @@ default Mono xPending(ByteBuffer key, String groupName, RangeRedis Documentation: xpending + * @since 3.5 + */ + default Mono xPending(ByteBuffer key, String groupName, Range range, Long count, Duration idle) { + return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).idle(idle))).next() + .map(CommandResponse::getOutput); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@link Consumer} within a {@literal consumer group}. @@ -763,6 +783,23 @@ default Mono xPending(ByteBuffer key, Consumer consumer, Range< return xPending(key, consumer.getGroup(), consumer.getName(), range, count); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + default Mono xPending(ByteBuffer key, Consumer consumer, Range range, Long count, Duration idle) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idle); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@literal consumer} within a {@literal consumer group}. @@ -783,6 +820,28 @@ default Mono xPending(ByteBuffer key, String groupName, String .next().map(CommandResponse::getOutput); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null} + * when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + default Mono xPending(ByteBuffer key, String groupName, String consumerName, Range range, + Long count, Duration idle) { + return xPending( + Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count).idle(idle))) + .next().map(CommandResponse::getOutput); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. @@ -798,6 +857,7 @@ default Mono xPending(ByteBuffer key, String groupName, String * Value Object holding parameters for obtaining pending messages. * * @author Christoph Strobl + * @author Jeonggyu Choi * @since 2.3 */ class PendingRecordsCommand extends KeyCommand { @@ -806,9 +866,10 @@ class PendingRecordsCommand extends KeyCommand { private final @Nullable String consumerName; private final Range range; private final @Nullable Long count; + private final @Nullable Duration idle; private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range range, - @Nullable Long count) { + @Nullable Long count, @Nullable Duration idle) { super(key); @@ -816,6 +877,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String this.consumerName = consumerName; this.range = range; this.count = count; + this.idle = idle; } /** @@ -826,7 +888,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String * @return new instance of {@link PendingRecordsCommand}. */ static PendingRecordsCommand pending(ByteBuffer key, String groupName) { - return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null); + return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null); } /** @@ -841,7 +903,7 @@ public PendingRecordsCommand range(Range range, Long count) { Assert.notNull(range, "Range must not be null"); Assert.isTrue(count > -1, "Count must not be negative"); - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count); + return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, null); } /** @@ -851,7 +913,20 @@ public PendingRecordsCommand range(Range range, Long count) { * @return new instance of {@link PendingRecordsCommand}. */ public PendingRecordsCommand consumer(String consumerName) { - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count); + return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idle); + } + + /** + * Append given idle time. + * + * @param idle must not be {@literal null}. + * @return new instance of {@link PendingRecordsCommand}. + */ + public PendingRecordsCommand idle(Duration idle) { + + Assert.notNull(idle, "Idle must not be null"); + + return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idle); } public String getGroupName() { @@ -881,6 +956,14 @@ public Long getCount() { return count; } + /** + * @return can be {@literal null}. + */ + @Nullable + public Duration getIdle() { + return idle; + } + /** * @return {@literal true} if a consumer name is present. */ @@ -894,6 +977,13 @@ public boolean hasConsumer() { public boolean isLimited() { return count != null; } + + /** + * @return {@literal true} if idle is set. + */ + public boolean hasIdle() { + return idle != null; + } } /** diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index 8385d70d34..a0a7750e58 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -41,6 +41,7 @@ * @author Tugdual Grall * @author Dengliming * @author Mark John Moreno + * @author Jeonggyu Choi * @see Redis Documentation - Streams * @since 2.2 */ @@ -706,6 +707,25 @@ default PendingMessages xPending(byte[] key, String groupName, Range range, L return xPending(key, groupName, XPendingOptions.range(range, count)); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a + * {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(byte[] key, String groupName, Range range, Long count, Duration idle) { + return xPending(key, groupName, XPendingOptions.range(range, count).idle(idle)); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@link Consumer} within a {@literal consumer group}. @@ -723,6 +743,24 @@ default PendingMessages xPending(byte[] key, Consumer consumer, Range range, return xPending(key, consumer.getGroup(), consumer.getName(), range, count); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(byte[] key, Consumer consumer, Range range, Long count, Duration idle) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idle); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@literal consumer} within a {@literal consumer group}. @@ -742,6 +780,27 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName)); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null} + * when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range range, Long count, + Duration idle) { + return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).idle(idle)); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. @@ -761,6 +820,7 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa * Value Object holding parameters for obtaining pending messages. * * @author Christoph Strobl + * @author Jeonggyu Choi * @since 2.3 */ class XPendingOptions { @@ -768,12 +828,15 @@ class XPendingOptions { private final @Nullable String consumerName; private final Range range; private final @Nullable Long count; + private final @Nullable Duration idle; - private XPendingOptions(@Nullable String consumerName, Range range, @Nullable Long count) { + private XPendingOptions(@Nullable String consumerName, Range range, @Nullable Long count, + @Nullable Duration idle) { this.range = range; this.count = count; this.consumerName = consumerName; + this.idle = idle; } /** @@ -782,7 +845,7 @@ private XPendingOptions(@Nullable String consumerName, Range range, @Nullable * @return new instance of {@link XPendingOptions}. */ public static XPendingOptions unbounded() { - return new XPendingOptions(null, Range.unbounded(), null); + return new XPendingOptions(null, Range.unbounded(), null, null); } /** @@ -795,7 +858,7 @@ public static XPendingOptions unbounded(Long count) { Assert.isTrue(count > -1, "Count must not be negative"); - return new XPendingOptions(null, Range.unbounded(), count); + return new XPendingOptions(null, Range.unbounded(), count, null); } /** @@ -810,7 +873,7 @@ public static XPendingOptions range(Range range, Long count) { Assert.notNull(range, "Range must not be null"); Assert.isTrue(count > -1, "Count must not be negative"); - return new XPendingOptions(null, range, count); + return new XPendingOptions(null, range, count, null); } /** @@ -820,7 +883,20 @@ public static XPendingOptions range(Range range, Long count) { * @return new instance of {@link XPendingOptions}. */ public XPendingOptions consumer(String consumerName) { - return new XPendingOptions(consumerName, range, count); + return new XPendingOptions(consumerName, range, count, idle); + } + + /** + * Append given idle time. + * + * @param idle must not be {@literal null}. + * @return new instance of {@link} XPendingOptions}. + */ + public XPendingOptions idle(Duration idle) { + + Assert.notNull(idle, "Idle must not be null"); + + return new XPendingOptions(consumerName, range, count, idle); } /** @@ -846,6 +922,26 @@ public String getConsumerName() { return consumerName; } + /** + * @return can be {@literal null}. + */ + @Nullable + public Duration getIdle() { + return idle; + } + + /** + * @return can be {@literal null}. + */ + @Nullable + public Long getIdleMillis() { + if (idle == null) { + return null; + } + + return idle.toMillis(); + } + /** * @return {@literal true} if a consumer name is present. */ @@ -859,6 +955,13 @@ public boolean hasConsumer() { public boolean isLimited() { return count != null; } + + /** + * @return {@literal true} if idle time is set. + */ + public boolean hasIdle() { + return idle != null; + } } /** diff --git a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java index 1069e430c8..44234c9867 100644 --- a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java @@ -71,6 +71,7 @@ * @author Andrey Shlykov * @author ihaohong * @author Shyngys Sapraliyev + * @author Jeonggyu Choi * @see RedisCallback * @see RedisSerializer * @see StringRedisTemplate @@ -3134,6 +3135,35 @@ default Long xDel(String key, String... entryIds) { @Nullable PendingMessagesSummary xPending(String key, String groupName); + // /** + // * Obtained detailed information about all pending messages for a given {@link Consumer}. + // * + // * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + // * @param consumer the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}. + // * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + // * @see Redis Documentation: xpending + // * @since 3.5 + // */ + // @Nullable + // default PendingMessages xPending(String key, Consumer consumer) { + // return xPending(key, consumer.getGroup(), consumer.getName()); + // } + + // /** + // * Obtained detailed information about all pending messages for a given {@literal consumer}. + // * + // * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + // * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + // * @param consumerName the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}. + // * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + // * @see Redis Documentation: xpending + // * @since 3.5 + // */ + // @Nullable + // default PendingMessages xPending(String key, String groupName, String consumerName) { + // return xPending(key, groupName, XPendingOptions.unbounded().consumer(consumerName)); + // } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given * {@link org.springframework.data.domain.Range} within a {@literal consumer group}. @@ -3152,6 +3182,64 @@ default Long xDel(String key, String... entryIds) { PendingMessages xPending(String key, String groupName, String consumerName, org.springframework.data.domain.Range range, Long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} and {@literal consumer} within a {@literal consumer group} and over a + * given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null} + * when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + PendingMessages xPending(String key, String groupName, String consumerName, + org.springframework.data.domain.Range range, Long count, Duration idle); + + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} and {@link Consumer} within a {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range range, + Long count) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count); + } + + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} and {@link Consumer} within a {@literal consumer group} and over a + * given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range range, + Long count, Duration idle) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idle); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given * {@link org.springframework.data.domain.Range} within a {@literal consumer group}. @@ -3169,6 +3257,25 @@ PendingMessages xPending(String key, String groupName, String consumerName, PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, Long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} within a {@literal consumer group} and over a given {@link Duration} + * of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, + Long count, Duration idle); + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java index 9d26a6cd8d..c99ceb3c54 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java @@ -48,6 +48,7 @@ /** * @author Dengliming + * @author Jeonggyu Choi * @since 2.3 */ class JedisClusterStreamCommands implements RedisStreamCommands { @@ -283,6 +284,10 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op pendingParams = pendingParams.consumer(consumerName); } + if (options.hasIdle()) { + pendingParams = pendingParams.idle(options.getIdleMillis()); + } + List response = connection.getCluster().xpending(key, group, pendingParams); return StreamConverters.toPendingMessages(groupName, range, diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java index a68eef451a..2466832c29 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java @@ -55,6 +55,7 @@ * * @author dengliming * @author Mark Paluch + * @author Jeonggyu Choi * @since 2.3 */ class StreamConverters { @@ -306,6 +307,9 @@ public static XPendingParams toXPendingParams(RedisStreamCommands.XPendingOption if (options.hasConsumer()) { xPendingParams.consumer(options.getConsumerName()); } + if (options.hasIdle()) { + xPendingParams.idle(options.getIdleMillis()); + } return xPendingParams; } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java index 1f45c373cd..e80c567009 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java @@ -18,6 +18,7 @@ import io.lettuce.core.XAddArgs; import io.lettuce.core.XClaimArgs; import io.lettuce.core.XGroupCreateArgs; +import io.lettuce.core.XPendingArgs; import io.lettuce.core.XReadArgs; import io.lettuce.core.XReadArgs.StreamOffset; import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands; @@ -56,6 +57,7 @@ * @author Tugdual Grall * @author Dengliming * @author Mark John Moreno + * @author Jeonggyu Choi * @since 2.2 */ class LettuceReactiveStreamCommands implements ReactiveStreamCommands { @@ -235,9 +237,17 @@ public Flux> xPending( io.lettuce.core.Limit limit = command.isLimited() ? io.lettuce.core.Limit.from(command.getCount()) : io.lettuce.core.Limit.unlimited(); - Flux publisher = command.hasConsumer() ? cmd.xpending(command.getKey(), - io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())), range, limit) - : cmd.xpending(command.getKey(), groupName, range, limit); + XPendingArgs xPendingArgs = XPendingArgs.Builder.xpending(groupName, range, limit); + if (command.hasConsumer()) { + io.lettuce.core.Consumer consumer = io.lettuce.core.Consumer.from(groupName, + ByteUtils.getByteBuffer(command.getConsumerName())); + xPendingArgs.consumer(consumer); + } + if (command.hasIdle()) { + xPendingArgs.idle(command.getIdle()); + } + + Flux publisher = cmd.xpending(command.getKey(), xPendingArgs); return publisher.collectList().map(it -> { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java index ad5c2281c2..d8aa054dc9 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java @@ -18,6 +18,7 @@ import io.lettuce.core.XAddArgs; import io.lettuce.core.XClaimArgs; import io.lettuce.core.XGroupCreateArgs; +import io.lettuce.core.XPendingArgs; import io.lettuce.core.XReadArgs; import io.lettuce.core.api.async.RedisStreamAsyncCommands; import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; @@ -50,6 +51,7 @@ * @author Dejan Jankov * @author Dengliming * @author Mark John Moreno + * @author Jeonggyu Choi * @since 2.2 */ class LettuceStreamCommands implements RedisStreamCommands { @@ -217,15 +219,17 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op io.lettuce.core.Limit limit = options.isLimited() ? io.lettuce.core.Limit.from(options.getCount()) : io.lettuce.core.Limit.unlimited(); + XPendingArgs xPendingArgs = XPendingArgs.Builder.xpending(group, range, limit); if (options.hasConsumer()) { - - return connection.invoke() - .from(RedisStreamAsyncCommands::xpending, key, - io.lettuce.core.Consumer.from(group, LettuceConverters.toBytes(options.getConsumerName())), range, limit) - .get(it -> StreamConverters.toPendingMessages(groupName, options.getRange(), it)); + io.lettuce.core.Consumer consumer = io.lettuce.core.Consumer.from(group, + LettuceConverters.toBytes(options.getConsumerName())); + xPendingArgs.consumer(consumer); + } + if (options.hasIdle()) { + xPendingArgs.idle(options.getIdle()); } - return connection.invoke().from(RedisStreamAsyncCommands::xpending, key, group, range, limit) + return connection.invoke().from(RedisStreamAsyncCommands::xpending, key, xPendingArgs) .get(it -> StreamConverters.toPendingMessages(groupName, options.getRange(), it)); } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java index 3cb27d1dcd..17eb414b60 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.core; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -56,6 +57,7 @@ * @author Marcin Zielinski * @author John Blum * @author jinkshower + * @author Jeonggyu Choi * @since 2.2 */ class DefaultStreamOperations extends AbstractOperations implements StreamOperations { @@ -222,6 +224,13 @@ public PendingMessages pending(K key, String group, Range range, long count) return execute(connection -> connection.xPending(rawKey, group, range, count)); } + @Override + public PendingMessages pending(K key, String group, Range range, long count, Duration idle) { + + byte[] rawKey = rawKey(key); + return execute(connection -> connection.xPending(rawKey, group, range, count, idle)); + } + @Override public PendingMessages pending(K key, Consumer consumer, Range range, long count) { @@ -229,6 +238,13 @@ public PendingMessages pending(K key, Consumer consumer, Range range, long co return execute(connection -> connection.xPending(rawKey, consumer, range, count)); } + @Override + public PendingMessages pending(K key, Consumer consumer, Range range, long count, Duration idle) { + + byte[] rawKey = rawKey(key); + return execute(connection -> connection.xPending(rawKey, consumer, range, count, idle)); + } + @Override public PendingMessagesSummary pending(K key, String group) { diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java index 59a0647f29..f05acc99ef 100644 --- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java @@ -55,6 +55,7 @@ * @author Marcin Zielinski * @author John Blum * @author jinkshower + * @author Jeonggyu Choi * @since 2.2 */ public interface StreamOperations extends HashMapperProvider { @@ -376,6 +377,22 @@ default PendingMessages pending(K key, Consumer consumer) { */ PendingMessages pending(K key, String group, Range range, long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a + * {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param group the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + PendingMessages pending(K key, String group, Range range, long count, Duration idle); + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@link Consumer} within a {@literal consumer group}. @@ -390,6 +407,21 @@ default PendingMessages pending(K key, Consumer consumer) { */ PendingMessages pending(K key, Consumer consumer, Range range, long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + PendingMessages pending(K key, Consumer consumer, Range range, long count, Duration idle); + /** * Get the length of a stream. * diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java index 573f105247..a0e0001d65 100644 --- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java @@ -139,6 +139,7 @@ * @author Shyngys Sapraliyev * @author Roman Osadchuk * @author Tihomir Mateev + * @author Jeonggyu Choi */ public abstract class AbstractConnectionIntegrationTests { @@ -4102,16 +4103,97 @@ void xPendingShouldLoadEmptyOverviewCorrectly() { assertThat(info.getPendingMessagesPerConsumer()).isEmpty(); } + // @Test // GH-2046 + // @EnabledOnCommand("XADD") + // void xPendingShouldLoadPendingMessagesForConsumer() { + // + // actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + // actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + // actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + // StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + // + // actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"))); + // + // List results = getResults(); + // assertThat(results).hasSize(4); + // PendingMessages pending = (PendingMessages) results.get(3); + // + // assertThat(pending.size()).isOne(); + // assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + // assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + // assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + // assertThat(pending.get(0).getIdAsString()).isNotNull(); + // } + + // @Test // GH-2046 + // @EnabledOnCommand("XADD") + // void xPendingShouldLoadEmptyPendingMessagesForNotExistingConsumer() { + // + // actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + // actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + // actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + // StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + // + // actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer2"))); + // + // List results = getResults(); + // assertThat(results).hasSize(4); + // PendingMessages pending = (PendingMessages) results.get(3); + // + // assertThat(pending.size()).isZero(); + // } + + // @Test // GH-2046 + // @EnabledOnCommand("XADD") + // void xPendingShouldLoadPendingMessagesForGroupNameAndConsumerName() { + // + // actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + // actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + // actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + // StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + // + // actual.add(connection.xPending(KEY_1, "my-group", "my-consumer")); + // + // List results = getResults(); + // assertThat(results).hasSize(4); + // PendingMessages pending = (PendingMessages) results.get(3); + // + // assertThat(pending.size()).isOne(); + // assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + // assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + // assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + // assertThat(pending.get(0).getIdAsString()).isNotNull(); + // } + + // @Test // GH-2046 + // @EnabledOnCommand("XADD") + // void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerName() { + // + // actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + // actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + // actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + // StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + // + // actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2")); + // + // List results = getResults(); + // assertThat(results).hasSize(4); + // PendingMessages pending = (PendingMessages) results.get(3); + // + // assertThat(pending.size()).isZero(); + // } + @Test // DATAREDIS-1084 @EnabledOnCommand("XADD") - public void xPendingShouldLoadPendingMessages() { + public void xPendingShouldLoadPendingMessagesForConsumerNameWithRange() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); - actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L)); + actual.add( + connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(), 10L)); List results = getResults(); assertThat(results).hasSize(4); @@ -4124,16 +4206,36 @@ public void xPendingShouldLoadPendingMessages() { assertThat(pending.get(0).getIdAsString()).isNotNull(); } - @Test // DATAREDIS-1207 + @Test // DATAREDIS-1084 @EnabledOnCommand("XADD") - public void xPendingShouldWorkWithBoundedRange() { + public void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerNameWithRange() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); - actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.closed("0-0", "+"), 10L)); + actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2", + org.springframework.data.domain.Range.unbounded(), 10L)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadPendingMessagesForIdle() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(), + 10L, Duration.ZERO)); List results = getResults(); assertThat(results).hasSize(4); @@ -4146,17 +4248,37 @@ public void xPendingShouldWorkWithBoundedRange() { assertThat(pending.get(0).getIdAsString()).isNotNull(); } - @Test // DATAREDIS-1084 + @Test // GH-2046 @EnabledOnCommand("XADD") - public void xPendingShouldLoadPendingMessagesForConsumer() { + void xPendingShouldLoadEmptyPendingMessagesForNonOverIdle() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); - actual.add( - connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(), 10L)); + Duration nonOverIdle = Duration.ofSeconds(10); + actual.add(connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(), + 10L, nonOverIdle)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadPendingMessagesForConsumerWithRange() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"), + org.springframework.data.domain.Range.unbounded(), 10L)); List results = getResults(); assertThat(results).hasSize(4); @@ -4169,16 +4291,16 @@ public void xPendingShouldLoadPendingMessagesForConsumer() { assertThat(pending.get(0).getIdAsString()).isNotNull(); } - @Test // DATAREDIS-1084 + @Test // GH-2046 @EnabledOnCommand("XADD") - public void xPendingShouldLoadPendingMessagesForNonExistingConsumer() { + void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerWithRange() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); - actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2", + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer-2"), org.springframework.data.domain.Range.unbounded(), 10L)); List results = getResults(); @@ -4188,9 +4310,96 @@ public void xPendingShouldLoadPendingMessagesForNonExistingConsumer() { assertThat(pending.size()).isZero(); } + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadPendingMessagesForIdleWithConsumer() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"), + org.springframework.data.domain.Range.unbounded(), 10L, Duration.ZERO)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadEmptyPendingMessagesForNonOverIdleWithConsumer() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + Duration nonOverIdle = Duration.ofSeconds(10); + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"), + org.springframework.data.domain.Range.unbounded(), 10L, nonOverIdle)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + + @Test // DATAREDIS-1207 + @EnabledOnCommand("XADD") + public void xPendingShouldWorkWithBoundedRange() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.closed("0-0", "+"), 10L)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + @Test // DATAREDIS-1084 @EnabledOnCommand("XADD") - void xPendingShouldLoadEmptyPendingMessages() { + public void xPendingShouldLoadPendingMessagesForGroupNameWithRange() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + + @Test // DATAREDIS-1084 + @EnabledOnCommand("XADD") + void xPendingShouldLoadEmptyPendingMessagesForGroupNameWithRange() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); @@ -4204,6 +4413,49 @@ void xPendingShouldLoadEmptyPendingMessages() { assertThat(pending.size()).isZero(); } + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadPendingMessagesForIdleWithGroupName() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add( + connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L, Duration.ZERO)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadEmptyPendingMessagesForNonOverIdleWithGroupName() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + Duration nonOverIdle = Duration.ofSeconds(10); + actual.add( + connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L, nonOverIdle)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + @Test // DATAREDIS-1084 @EnabledOnCommand("XADD") public void xClaim() throws InterruptedException { diff --git a/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java index dedaf0c84d..b218115f69 100644 --- a/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java @@ -28,6 +28,7 @@ * Unit tests for {@link ReactiveStreamCommands}. * * @author jinkshower + * @author Jeonggyu Choi */ class ReactiveStreamCommandsUnitTests { @@ -53,4 +54,14 @@ void pendingRecordsCommandRangeShouldThrowExceptionWhenCountIsNegative() { assertThatIllegalArgumentException().isThrownBy(() -> command.range(range, -1L)); } + + @Test // GH-2046 + void pendingRecordsCommandIdleShouldThrowExceptionWhenIdleIsNull() { + ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes()); + String groupName = "my-group"; + + PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName); + + assertThatIllegalArgumentException().isThrownBy(() -> command.idle(null)); + } } diff --git a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java index 74579d57a6..67b141a5cb 100644 --- a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java @@ -26,6 +26,7 @@ * Unit tests for {@link RedisStreamCommands}. * * @author jinkshower + * @author Jeonggyu Choi */ class RedisStreamCommandsUnitTests { @@ -46,4 +47,11 @@ void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() { assertThatIllegalArgumentException().isThrownBy(() -> XPendingOptions.range(range, -1L)); } + + @Test // GH-2046 + void xPendingOptionsIdleShouldThrowExceptionWhenIdleIsNull() { + XPendingOptions xPendingOptions = XPendingOptions.unbounded(); + + assertThatIllegalArgumentException().isThrownBy(() -> xPendingOptions.idle(null)); + } } diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java b/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java new file mode 100644 index 0000000000..a756f31c23 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java @@ -0,0 +1,30 @@ +package org.springframework.data.redis.connection.jedis; + +import static org.assertj.core.api.Assertions.*; + +import redis.clients.jedis.params.XPendingParams; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +import org.junit.jupiter.api.Test; +import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions; + +/** + * @author Jeonggyu Choi + */ +class StreamConvertersUnitTest { + + @Test // GH-2046 + void shouldConvertIdle() throws NoSuchFieldException, IllegalAccessException { + XPendingOptions options = XPendingOptions.unbounded(5L).idle(Duration.of(1, ChronoUnit.HOURS)); + + XPendingParams xPendingParams = StreamConverters.toXPendingParams(options); + + Field idle = XPendingParams.class.getDeclaredField("idle"); + idle.setAccessible(true); + Long idleValue = (Long) idle.get(xPendingParams); + assertThat(idleValue).isEqualTo(Duration.of(1, ChronoUnit.HOURS).toMillis()); + } +} diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java index b181ef6a60..41f94d2177 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java @@ -21,6 +21,7 @@ import reactor.test.StepVerifier; import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Collections; import org.assertj.core.data.Offset; @@ -44,6 +45,7 @@ * @author Christoph Strobl * @author Tugdual Grall * @author Dengliming + * @author Jeonggyu Choi */ @EnabledOnCommand("XADD") public class LettuceReactiveStreamCommandsIntegrationTests extends LettuceReactiveCommandsTestSupport { @@ -340,6 +342,148 @@ void xPendingShouldLoadPendingMessagesForNonExistingConsumer() { }).verifyComplete(); } + @ParameterizedRedisTest // GH-2046 + void xPendingShouldLoadPendingMessagesForGroupAndIdle() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration exceededIdle = Duration.of(1, ChronoUnit.MILLIS); + + connection.streamCommands().xPending(KEY_1_BBUFFER, "my-group", Range.open("-", "+"), 10L, exceededIdle) + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.size()).isOne(); + assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(it.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(it.get(0).getTotalDeliveryCount()).isOne(); + assertThat(it.get(0).getIdAsString()).isNotNull(); + }).verifyComplete(); + } + + @ParameterizedRedisTest // GH-2046 + void xPendingShouldLoadEmptyPendingMessagesForGroupAndIdleWhenDurationNotExceeded() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration notExceededIdle = Duration.of(10, ChronoUnit.MINUTES); + + connection.streamCommands().xPending(KEY_1_BBUFFER, "my-group", Range.open("-", "+"), 10L, notExceededIdle) + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.isEmpty()).isTrue(); + }).verifyComplete(); + } + + @ParameterizedRedisTest // GH-2046 + void xPendingShouldLoadPendingMessagesForGroupNameAndConsumerNameAndIdle() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration exceededIdle = Duration.of(1, ChronoUnit.MILLIS); + + connection.streamCommands() + .xPending(KEY_1_BBUFFER, "my-group", "my-consumer", Range.open("-", "+"), 10L, exceededIdle) + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.size()).isOne(); + assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(it.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(it.get(0).getTotalDeliveryCount()).isOne(); + assertThat(it.get(0).getIdAsString()).isNotNull(); + }).verifyComplete(); + } + + @ParameterizedRedisTest // GH-2046 + void xPendingShouldLoadEmptyPendingMessagesForGroupNameAndConsumerNameAndIdleWhenDurationNotExceeded() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration notExceededIdle = Duration.of(10, ChronoUnit.MINUTES); + + connection.streamCommands() + .xPending(KEY_1_BBUFFER, "my-group", "my-consumer", Range.open("-", "+"), 10L, notExceededIdle) + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.isEmpty()).isTrue(); + }).verifyComplete(); + } + + @ParameterizedRedisTest // GH-2046 + void xPendingShouldLoadPendingMessageesForConsumerAndIdle() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration exceededIdle = Duration.of(1, ChronoUnit.MILLIS); + + connection.streamCommands() + .xPending(KEY_1_BBUFFER, Consumer.from("my-group", "my-consumer"), Range.open("-", "+"), 10L, exceededIdle) + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.size()).isOne(); + assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(it.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(it.get(0).getTotalDeliveryCount()).isOne(); + assertThat(it.get(0).getIdAsString()).isNotNull(); + }).verifyComplete(); + } + + @ParameterizedRedisTest // GH-2046 + void xPendingShouldLoadEmptyPendingMessagesForConsumerAndIdleWhenDurationNotExceeded() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration notExceededIdle = Duration.of(10, ChronoUnit.MINUTES); + + connection.streamCommands() + .xPending(KEY_1_BBUFFER, Consumer.from("my-group", "my-consumer"), Range.open("-", "+"), 10L, notExceededIdle) + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.isEmpty()).isTrue(); + }).verifyComplete(); + } + @ParameterizedRedisTest // DATAREDIS-1084 void xPendingShouldLoadEmptyPendingMessages() { From 1de2bccab0448b955a04a63795650ec16654154c Mon Sep 17 00:00:00 2001 From: Jeonggyu Choi <97666463+whatasame@users.noreply.github.com> Date: Wed, 12 Mar 2025 15:43:36 +0900 Subject: [PATCH 2/6] Add `delaySubscription` to depend on previous Reactive command --- ...eactiveStreamCommandsIntegrationTests.java | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java index 41f94d2177..26d592e680 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java @@ -358,7 +358,7 @@ void xPendingShouldLoadPendingMessagesForGroupAndIdle() { Duration exceededIdle = Duration.of(1, ChronoUnit.MILLIS); connection.streamCommands().xPending(KEY_1_BBUFFER, "my-group", Range.open("-", "+"), 10L, exceededIdle) - .as(StepVerifier::create).assertNext(it -> { + .delaySubscription(Duration.ofMillis(100)).as(StepVerifier::create).assertNext(it -> { assertThat(it.size()).isOne(); assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer"); assertThat(it.get(0).getGroupName()).isEqualTo("my-group"); @@ -380,9 +380,11 @@ void xPendingShouldLoadEmptyPendingMessagesForGroupAndIdleWhenDurationNotExceede StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) .then().as(StepVerifier::create).verifyComplete(); - Duration notExceededIdle = Duration.of(10, ChronoUnit.MINUTES); + Duration notExceededIdle = Duration.ofMinutes(10); connection.streamCommands().xPending(KEY_1_BBUFFER, "my-group", Range.open("-", "+"), 10L, notExceededIdle) + .delaySubscription(Duration.ofMillis(100)) + .as(StepVerifier::create).assertNext(it -> { assertThat(it.isEmpty()).isTrue(); }).verifyComplete(); @@ -401,10 +403,12 @@ void xPendingShouldLoadPendingMessagesForGroupNameAndConsumerNameAndIdle() { StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) .then().as(StepVerifier::create).verifyComplete(); - Duration exceededIdle = Duration.of(1, ChronoUnit.MILLIS); + Duration exceededIdle = Duration.ofMillis(1); connection.streamCommands() .xPending(KEY_1_BBUFFER, "my-group", "my-consumer", Range.open("-", "+"), 10L, exceededIdle) + .delaySubscription(Duration.ofMillis(100)) + .as(StepVerifier::create).assertNext(it -> { assertThat(it.size()).isOne(); assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer"); @@ -427,10 +431,12 @@ void xPendingShouldLoadEmptyPendingMessagesForGroupNameAndConsumerNameAndIdleWhe StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) .then().as(StepVerifier::create).verifyComplete(); - Duration notExceededIdle = Duration.of(10, ChronoUnit.MINUTES); + Duration notExceededIdle = Duration.ofMinutes(10); connection.streamCommands() .xPending(KEY_1_BBUFFER, "my-group", "my-consumer", Range.open("-", "+"), 10L, notExceededIdle) + .delaySubscription(Duration.ofMillis(100)) + .as(StepVerifier::create).assertNext(it -> { assertThat(it.isEmpty()).isTrue(); }).verifyComplete(); @@ -449,10 +455,12 @@ void xPendingShouldLoadPendingMessageesForConsumerAndIdle() { StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) .then().as(StepVerifier::create).verifyComplete(); - Duration exceededIdle = Duration.of(1, ChronoUnit.MILLIS); + Duration exceededIdle = Duration.ofMillis(1); connection.streamCommands() .xPending(KEY_1_BBUFFER, Consumer.from("my-group", "my-consumer"), Range.open("-", "+"), 10L, exceededIdle) + .delaySubscription(Duration.ofMillis(100)) + .as(StepVerifier::create).assertNext(it -> { assertThat(it.size()).isOne(); assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer"); @@ -475,10 +483,11 @@ void xPendingShouldLoadEmptyPendingMessagesForConsumerAndIdleWhenDurationNotExce StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) .then().as(StepVerifier::create).verifyComplete(); - Duration notExceededIdle = Duration.of(10, ChronoUnit.MINUTES); + Duration notExceededIdle = Duration.ofMinutes(10); connection.streamCommands() .xPending(KEY_1_BBUFFER, Consumer.from("my-group", "my-consumer"), Range.open("-", "+"), 10L, notExceededIdle) + .delaySubscription(Duration.ofMillis(100)) .as(StepVerifier::create).assertNext(it -> { assertThat(it.isEmpty()).isTrue(); }).verifyComplete(); From 333eb5b21a12fb56421270be63289b48523f03b1 Mon Sep 17 00:00:00 2001 From: Jeonggyu Choi <97666463+whatasame@users.noreply.github.com> Date: Wed, 12 Mar 2025 17:02:25 +0900 Subject: [PATCH 3/6] Rename `idle` parameter to `minIdleTime` in pending message methods for clarity --- .../DefaultStringRedisConnection.java | 8 ++-- .../connection/ReactiveStreamCommands.java | 42 ++++++++-------- .../redis/connection/RedisStreamCommands.java | 48 +++++++++---------- .../connection/StringRedisConnection.java | 14 +++--- .../LettuceReactiveStreamCommands.java | 2 +- .../lettuce/LettuceStreamCommands.java | 2 +- .../redis/core/DefaultStreamOperations.java | 8 ++-- .../data/redis/core/StreamOperations.java | 8 ++-- .../ReactiveStreamCommandsUnitTests.java | 2 +- .../RedisStreamCommandsUnitTests.java | 2 +- .../jedis/StreamConvertersUnitTest.java | 2 +- 11 files changed, 69 insertions(+), 69 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java index cf687e647a..6bfddb2a4d 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java @@ -2971,8 +2971,8 @@ public PendingMessages xPending(String key, String groupName, String consumer, @Override public PendingMessages xPending(String key, String groupName, String consumerName, - org.springframework.data.domain.Range range, Long count, Duration idle) { - return convertAndReturn(delegate.xPending(serialize(key), groupName, consumerName, range, count, idle), + org.springframework.data.domain.Range range, Long count, Duration minIdleTime) { + return convertAndReturn(delegate.xPending(serialize(key), groupName, consumerName, range, count, minIdleTime), Converters.identityConverter()); } @@ -2984,8 +2984,8 @@ public PendingMessages xPending(String key, String groupName, org.springframewor @Override public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, - Long count, Duration idle) { - return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, idle), + Long count, Duration minIdleTime) { + return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, minIdleTime), Converters.identityConverter()); } diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index e59370e933..6d01ab41f5 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -756,14 +756,14 @@ default Mono xPending(ByteBuffer key, String groupName, RangeRedis Documentation: xpending * @since 3.5 */ - default Mono xPending(ByteBuffer key, String groupName, Range range, Long count, Duration idle) { - return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).idle(idle))).next() + default Mono xPending(ByteBuffer key, String groupName, Range range, Long count, Duration minIdleTime) { + return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).minIdleTime(minIdleTime))).next() .map(CommandResponse::getOutput); } @@ -791,13 +791,13 @@ default Mono xPending(ByteBuffer key, Consumer consumer, Range< * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. - * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. * @see Redis Documentation: xpending * @since 3.5 */ - default Mono xPending(ByteBuffer key, Consumer consumer, Range range, Long count, Duration idle) { - return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idle); + default Mono xPending(ByteBuffer key, Consumer consumer, Range range, Long count, Duration minIdleTime) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime); } /** @@ -829,16 +829,16 @@ default Mono xPending(ByteBuffer key, String groupName, String * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. - * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null} * when used in pipeline / transaction. * @see Redis Documentation: xpending * @since 3.5 */ default Mono xPending(ByteBuffer key, String groupName, String consumerName, Range range, - Long count, Duration idle) { + Long count, Duration minIdleTime) { return xPending( - Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count).idle(idle))) + Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count).minIdleTime(minIdleTime))) .next().map(CommandResponse::getOutput); } @@ -866,10 +866,10 @@ class PendingRecordsCommand extends KeyCommand { private final @Nullable String consumerName; private final Range range; private final @Nullable Long count; - private final @Nullable Duration idle; + private final @Nullable Duration minIdleTime; private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range range, - @Nullable Long count, @Nullable Duration idle) { + @Nullable Long count, @Nullable Duration minIdleTime) { super(key); @@ -877,7 +877,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String this.consumerName = consumerName; this.range = range; this.count = count; - this.idle = idle; + this.minIdleTime = minIdleTime; } /** @@ -913,20 +913,20 @@ public PendingRecordsCommand range(Range range, Long count) { * @return new instance of {@link PendingRecordsCommand}. */ public PendingRecordsCommand consumer(String consumerName) { - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idle); + return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, minIdleTime); } /** - * Append given idle time. + * Append given minimum idle time. * - * @param idle must not be {@literal null}. + * @param minIdleTime must not be {@literal null}. * @return new instance of {@link PendingRecordsCommand}. */ - public PendingRecordsCommand idle(Duration idle) { + public PendingRecordsCommand minIdleTime(Duration minIdleTime) { - Assert.notNull(idle, "Idle must not be null"); + Assert.notNull(minIdleTime, "Idle must not be null"); - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idle); + return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, minIdleTime); } public String getGroupName() { @@ -960,8 +960,8 @@ public Long getCount() { * @return can be {@literal null}. */ @Nullable - public Duration getIdle() { - return idle; + public Duration getMinIdleTime() { + return minIdleTime; } /** @@ -982,7 +982,7 @@ public boolean isLimited() { * @return {@literal true} if idle is set. */ public boolean hasIdle() { - return idle != null; + return minIdleTime != null; } } diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index a0a7750e58..1ab090f32b 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -715,15 +715,15 @@ default PendingMessages xPending(byte[] key, String groupName, Range range, L * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. - * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / * transaction. * @see Redis Documentation: xpending * @since 3.5 */ @Nullable - default PendingMessages xPending(byte[] key, String groupName, Range range, Long count, Duration idle) { - return xPending(key, groupName, XPendingOptions.range(range, count).idle(idle)); + default PendingMessages xPending(byte[] key, String groupName, Range range, Long count, Duration minIdleTime) { + return xPending(key, groupName, XPendingOptions.range(range, count).minIdleTime(minIdleTime)); } /** @@ -751,14 +751,14 @@ default PendingMessages xPending(byte[] key, Consumer consumer, Range range, * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. - * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. * @see Redis Documentation: xpending * @since 3.5 */ @Nullable - default PendingMessages xPending(byte[] key, Consumer consumer, Range range, Long count, Duration idle) { - return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idle); + default PendingMessages xPending(byte[] key, Consumer consumer, Range range, Long count, Duration minIdleTime) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime); } /** @@ -789,7 +789,7 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. - * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null} * when used in pipeline / transaction. * @see Redis Documentation: xpending @@ -797,8 +797,8 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa */ @Nullable default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range range, Long count, - Duration idle) { - return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).idle(idle)); + Duration minIdleTime) { + return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime)); } /** @@ -828,15 +828,15 @@ class XPendingOptions { private final @Nullable String consumerName; private final Range range; private final @Nullable Long count; - private final @Nullable Duration idle; + private final @Nullable Duration minIdleTime; private XPendingOptions(@Nullable String consumerName, Range range, @Nullable Long count, - @Nullable Duration idle) { + @Nullable Duration minIdleTime) { this.range = range; this.count = count; this.consumerName = consumerName; - this.idle = idle; + this.minIdleTime = minIdleTime; } /** @@ -883,20 +883,20 @@ public static XPendingOptions range(Range range, Long count) { * @return new instance of {@link XPendingOptions}. */ public XPendingOptions consumer(String consumerName) { - return new XPendingOptions(consumerName, range, count, idle); + return new XPendingOptions(consumerName, range, count, minIdleTime); } /** - * Append given idle time. + * Append given minimum idle time. * - * @param idle must not be {@literal null}. - * @return new instance of {@link} XPendingOptions}. + * @param minIdleTime must not be {@literal null}. + * @return new instance of {@link XPendingOptions}. */ - public XPendingOptions idle(Duration idle) { + public XPendingOptions minIdleTime(Duration minIdleTime) { - Assert.notNull(idle, "Idle must not be null"); + Assert.notNull(minIdleTime, "Idle must not be null"); - return new XPendingOptions(consumerName, range, count, idle); + return new XPendingOptions(consumerName, range, count, minIdleTime); } /** @@ -926,8 +926,8 @@ public String getConsumerName() { * @return can be {@literal null}. */ @Nullable - public Duration getIdle() { - return idle; + public Duration getMinIdleTime() { + return minIdleTime; } /** @@ -935,11 +935,11 @@ public Duration getIdle() { */ @Nullable public Long getIdleMillis() { - if (idle == null) { + if (minIdleTime == null) { return null; } - return idle.toMillis(); + return minIdleTime.toMillis(); } /** @@ -960,7 +960,7 @@ public boolean isLimited() { * @return {@literal true} if idle time is set. */ public boolean hasIdle() { - return idle != null; + return minIdleTime != null; } } diff --git a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java index 44234c9867..9bd7121afa 100644 --- a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java @@ -3192,7 +3192,7 @@ PendingMessages xPending(String key, String groupName, String consumerName, * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. - * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null} * when used in pipeline / transaction. * @see Redis Documentation: xpending @@ -3200,7 +3200,7 @@ PendingMessages xPending(String key, String groupName, String consumerName, */ @Nullable PendingMessages xPending(String key, String groupName, String consumerName, - org.springframework.data.domain.Range range, Long count, Duration idle); + org.springframework.data.domain.Range range, Long count, Duration minIdleTime); /** * Obtain detailed information about pending {@link PendingMessage messages} for a given @@ -3229,15 +3229,15 @@ default PendingMessages xPending(String key, Consumer consumer, org.springframew * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. - * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. * @see Redis Documentation: xpending * @since 3.5 */ @Nullable default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range range, - Long count, Duration idle) { - return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idle); + Long count, Duration minIdleTime) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime); } /** @@ -3266,7 +3266,7 @@ PendingMessages xPending(String key, String groupName, org.springframework.data. * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. - * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / * transaction. * @see Redis Documentation: xpending @@ -3274,7 +3274,7 @@ PendingMessages xPending(String key, String groupName, org.springframework.data. */ @Nullable PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, - Long count, Duration idle); + Long count, Duration minIdleTime); /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java index e80c567009..d98d1fd1c8 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java @@ -244,7 +244,7 @@ public Flux> xPending( xPendingArgs.consumer(consumer); } if (command.hasIdle()) { - xPendingArgs.idle(command.getIdle()); + xPendingArgs.idle(command.getMinIdleTime()); } Flux publisher = cmd.xpending(command.getKey(), xPendingArgs); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java index d8aa054dc9..4a9b42eb90 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java @@ -226,7 +226,7 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op xPendingArgs.consumer(consumer); } if (options.hasIdle()) { - xPendingArgs.idle(options.getIdle()); + xPendingArgs.idle(options.getMinIdleTime()); } return connection.invoke().from(RedisStreamAsyncCommands::xpending, key, xPendingArgs) diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java index 17eb414b60..8716a10c82 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java @@ -225,10 +225,10 @@ public PendingMessages pending(K key, String group, Range range, long count) } @Override - public PendingMessages pending(K key, String group, Range range, long count, Duration idle) { + public PendingMessages pending(K key, String group, Range range, long count, Duration minIdleTime) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xPending(rawKey, group, range, count, idle)); + return execute(connection -> connection.xPending(rawKey, group, range, count, minIdleTime)); } @Override @@ -239,10 +239,10 @@ public PendingMessages pending(K key, Consumer consumer, Range range, long co } @Override - public PendingMessages pending(K key, Consumer consumer, Range range, long count, Duration idle) { + public PendingMessages pending(K key, Consumer consumer, Range range, long count, Duration minIdleTime) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xPending(rawKey, consumer, range, count, idle)); + return execute(connection -> connection.xPending(rawKey, consumer, range, count, minIdleTime)); } @Override diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java index f05acc99ef..5caacc7d71 100644 --- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java @@ -385,13 +385,13 @@ default PendingMessages pending(K key, Consumer consumer) { * @param group the name of the {@literal consumer group}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. - * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / * transaction. * @see Redis Documentation: xpending * @since 3.5 */ - PendingMessages pending(K key, String group, Range range, long count, Duration idle); + PendingMessages pending(K key, String group, Range range, long count, Duration minIdleTime); /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and @@ -415,12 +415,12 @@ default PendingMessages pending(K key, Consumer consumer) { * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. - * @param idle the minimum idle time to filter pending messages. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. * @see Redis Documentation: xpending * @since 3.5 */ - PendingMessages pending(K key, Consumer consumer, Range range, long count, Duration idle); + PendingMessages pending(K key, Consumer consumer, Range range, long count, Duration minIdleTime); /** * Get the length of a stream. diff --git a/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java index b218115f69..b5445247ef 100644 --- a/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java @@ -62,6 +62,6 @@ void pendingRecordsCommandIdleShouldThrowExceptionWhenIdleIsNull() { PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName); - assertThatIllegalArgumentException().isThrownBy(() -> command.idle(null)); + assertThatIllegalArgumentException().isThrownBy(() -> command.minIdleTime(null)); } } diff --git a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java index 67b141a5cb..dbda89b861 100644 --- a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java @@ -52,6 +52,6 @@ void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() { void xPendingOptionsIdleShouldThrowExceptionWhenIdleIsNull() { XPendingOptions xPendingOptions = XPendingOptions.unbounded(); - assertThatIllegalArgumentException().isThrownBy(() -> xPendingOptions.idle(null)); + assertThatIllegalArgumentException().isThrownBy(() -> xPendingOptions.minIdleTime(null)); } } diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java b/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java index a756f31c23..5d75615fee 100644 --- a/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java +++ b/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java @@ -18,7 +18,7 @@ class StreamConvertersUnitTest { @Test // GH-2046 void shouldConvertIdle() throws NoSuchFieldException, IllegalAccessException { - XPendingOptions options = XPendingOptions.unbounded(5L).idle(Duration.of(1, ChronoUnit.HOURS)); + XPendingOptions options = XPendingOptions.unbounded(5L).minIdleTime(Duration.of(1, ChronoUnit.HOURS)); XPendingParams xPendingParams = StreamConverters.toXPendingParams(options); From 24550d91d402c647da240471c4d9035aa9e73324 Mon Sep 17 00:00:00 2001 From: Jeonggyu Choi <97666463+whatasame@users.noreply.github.com> Date: Wed, 12 Mar 2025 17:24:45 +0900 Subject: [PATCH 4/6] Refactor `PendingRecordsCommand` to use `XPendingOptions` for better encapsulation --- .../connection/ReactiveStreamCommands.java | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index 6d01ab41f5..cd05ce6e16 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -863,21 +863,20 @@ default Mono xPending(ByteBuffer key, String groupName, String class PendingRecordsCommand extends KeyCommand { private final String groupName; - private final @Nullable String consumerName; - private final Range range; - private final @Nullable Long count; - private final @Nullable Duration minIdleTime; + private final XPendingOptions options; private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range range, @Nullable Long count, @Nullable Duration minIdleTime) { + this(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime)); + } + + private PendingRecordsCommand(ByteBuffer key, String groupName, XPendingOptions options) { + super(key); this.groupName = groupName; - this.consumerName = consumerName; - this.range = range; - this.count = count; - this.minIdleTime = minIdleTime; + this.options = options; } /** @@ -903,7 +902,7 @@ public PendingRecordsCommand range(Range range, Long count) { Assert.notNull(range, "Range must not be null"); Assert.isTrue(count > -1, "Count must not be negative"); - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, null); + return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.range(range, count)); } /** @@ -913,7 +912,7 @@ public PendingRecordsCommand range(Range range, Long count) { * @return new instance of {@link PendingRecordsCommand}. */ public PendingRecordsCommand consumer(String consumerName) { - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, minIdleTime); + return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.unbounded().consumer(consumerName)); } /** @@ -926,7 +925,7 @@ public PendingRecordsCommand minIdleTime(Duration minIdleTime) { Assert.notNull(minIdleTime, "Idle must not be null"); - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, minIdleTime); + return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.unbounded().minIdleTime(minIdleTime)); } public String getGroupName() { @@ -938,14 +937,14 @@ public String getGroupName() { */ @Nullable public String getConsumerName() { - return consumerName; + return options.getConsumerName(); } /** * @return never {@literal null}. */ public Range getRange() { - return range; + return options.getRange(); } /** @@ -953,7 +952,7 @@ public Range getRange() { */ @Nullable public Long getCount() { - return count; + return options.getCount(); } /** @@ -961,28 +960,28 @@ public Long getCount() { */ @Nullable public Duration getMinIdleTime() { - return minIdleTime; + return options.getMinIdleTime(); } /** * @return {@literal true} if a consumer name is present. */ public boolean hasConsumer() { - return StringUtils.hasText(consumerName); + return StringUtils.hasText(options.getConsumerName()); } /** * @return {@literal true} count is set. */ public boolean isLimited() { - return count != null; + return options.getCount() != null; } /** * @return {@literal true} if idle is set. */ public boolean hasIdle() { - return minIdleTime != null; + return options.getMinIdleTime() != null; } } From 6c400ba70ba39a268f52c804c657d5d88db7018e Mon Sep 17 00:00:00 2001 From: Jeonggyu Choi <97666463+whatasame@users.noreply.github.com> Date: Wed, 12 Mar 2025 17:26:00 +0900 Subject: [PATCH 5/6] Add null check for consumer name in XPendingOptions.consumer method --- .../data/redis/connection/RedisStreamCommands.java | 3 +++ .../redis/connection/RedisStreamCommandsUnitTests.java | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index 1ab090f32b..079f427140 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -883,6 +883,9 @@ public static XPendingOptions range(Range range, Long count) { * @return new instance of {@link XPendingOptions}. */ public XPendingOptions consumer(String consumerName) { + + Assert.notNull(consumerName, "Consumer name must not be null"); + return new XPendingOptions(consumerName, range, count, minIdleTime); } diff --git a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java index dbda89b861..88dd168e0c 100644 --- a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java @@ -54,4 +54,11 @@ void xPendingOptionsIdleShouldThrowExceptionWhenIdleIsNull() { assertThatIllegalArgumentException().isThrownBy(() -> xPendingOptions.minIdleTime(null)); } + + @Test // GH-2046 + void xPendingOptionsConsumerShouldThrowExceptionWhenConsumerNameIsNull(){ + XPendingOptions xPendingOptions = XPendingOptions.unbounded(); + + assertThatIllegalArgumentException().isThrownBy(() -> xPendingOptions.consumer(null)); + } } From acb4f0ab015633cf9b5aaa83bb835e109a9c3a5b Mon Sep 17 00:00:00 2001 From: Jeonggyu Choi <97666463+whatasame@users.noreply.github.com> Date: Wed, 12 Mar 2025 17:57:57 +0900 Subject: [PATCH 6/6] Formatting --- .../redis/connection/ReactiveStreamCommands.java | 14 ++++++++------ .../data/redis/connection/RedisStreamCommands.java | 3 ++- .../connection/RedisStreamCommandsUnitTests.java | 2 +- ...tuceReactiveStreamCommandsIntegrationTests.java | 3 +-- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index cd05ce6e16..520a0b4317 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -762,8 +762,10 @@ default Mono xPending(ByteBuffer key, String groupName, RangeRedis Documentation: xpending * @since 3.5 */ - default Mono xPending(ByteBuffer key, String groupName, Range range, Long count, Duration minIdleTime) { - return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).minIdleTime(minIdleTime))).next() + default Mono xPending(ByteBuffer key, String groupName, Range range, Long count, + Duration minIdleTime) { + return xPending( + Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).minIdleTime(minIdleTime))).next() .map(CommandResponse::getOutput); } @@ -796,7 +798,8 @@ default Mono xPending(ByteBuffer key, Consumer consumer, Range< * @see Redis Documentation: xpending * @since 3.5 */ - default Mono xPending(ByteBuffer key, Consumer consumer, Range range, Long count, Duration minIdleTime) { + default Mono xPending(ByteBuffer key, Consumer consumer, Range range, Long count, + Duration minIdleTime) { return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime); } @@ -837,9 +840,8 @@ default Mono xPending(ByteBuffer key, String groupName, String */ default Mono xPending(ByteBuffer key, String groupName, String consumerName, Range range, Long count, Duration minIdleTime) { - return xPending( - Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count).minIdleTime(minIdleTime))) - .next().map(CommandResponse::getOutput); + return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count) + .minIdleTime(minIdleTime))).next().map(CommandResponse::getOutput); } /** diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index 079f427140..9d2477dc96 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -798,7 +798,8 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa @Nullable default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range range, Long count, Duration minIdleTime) { - return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime)); + return xPending(key, groupName, + XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime)); } /** diff --git a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java index 88dd168e0c..6d332b9aae 100644 --- a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java @@ -56,7 +56,7 @@ void xPendingOptionsIdleShouldThrowExceptionWhenIdleIsNull() { } @Test // GH-2046 - void xPendingOptionsConsumerShouldThrowExceptionWhenConsumerNameIsNull(){ + void xPendingOptionsConsumerShouldThrowExceptionWhenConsumerNameIsNull() { XPendingOptions xPendingOptions = XPendingOptions.unbounded(); assertThatIllegalArgumentException().isThrownBy(() -> xPendingOptions.consumer(null)); diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java index 26d592e680..e8107855e1 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java @@ -487,8 +487,7 @@ void xPendingShouldLoadEmptyPendingMessagesForConsumerAndIdleWhenDurationNotExce connection.streamCommands() .xPending(KEY_1_BBUFFER, Consumer.from("my-group", "my-consumer"), Range.open("-", "+"), 10L, notExceededIdle) - .delaySubscription(Duration.ofMillis(100)) - .as(StepVerifier::create).assertNext(it -> { + .delaySubscription(Duration.ofMillis(100)).as(StepVerifier::create).assertNext(it -> { assertThat(it.isEmpty()).isTrue(); }).verifyComplete(); }