diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java index 831a46ece2..6bfddb2a4d 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java @@ -82,6 +82,7 @@ * @author ihaohong * @author Dennis Neufeld * @author Shyngys Sapraliyev + * @author Jeonggyu Choi */ @SuppressWarnings({ "ConstantConditions", "deprecation" }) public class DefaultStringRedisConnection implements StringRedisConnection, DecoratedRedisConnection { @@ -2968,12 +2969,26 @@ public PendingMessages xPending(String key, String groupName, String consumer, Converters.identityConverter()); } + @Override + public PendingMessages xPending(String key, String groupName, String consumerName, + org.springframework.data.domain.Range range, Long count, Duration minIdleTime) { + return convertAndReturn(delegate.xPending(serialize(key), groupName, consumerName, range, count, minIdleTime), + Converters.identityConverter()); + } + @Override public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, Long count) { return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter()); } + @Override + public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, + Long count, Duration minIdleTime) { + return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, minIdleTime), + Converters.identityConverter()); + } + @Override public PendingMessages xPending(String key, String groupName, XPendingOptions options) { return convertAndReturn(delegate.xPending(serialize(key), groupName, options), Converters.identityConverter()); diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index 2860dc691c..520a0b4317 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -60,6 +60,7 @@ * @author Dengliming * @author Mark John Moreno * @author jinkshower + * @author Jeonggyu Choi * @since 2.2 */ public interface ReactiveStreamCommands { @@ -747,6 +748,27 @@ default Mono xPending(ByteBuffer key, String groupName, RangeRedis Documentation: xpending + * @since 3.5 + */ + default Mono xPending(ByteBuffer key, String groupName, Range range, Long count, + Duration minIdleTime) { + return xPending( + Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).minIdleTime(minIdleTime))).next() + .map(CommandResponse::getOutput); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@link Consumer} within a {@literal consumer group}. @@ -763,6 +785,24 @@ default Mono xPending(ByteBuffer key, Consumer consumer, Range< return xPending(key, consumer.getGroup(), consumer.getName(), range, count); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + default Mono xPending(ByteBuffer key, Consumer consumer, Range range, Long count, + Duration minIdleTime) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@literal consumer} within a {@literal consumer group}. @@ -783,6 +823,27 @@ default Mono xPending(ByteBuffer key, String groupName, String .next().map(CommandResponse::getOutput); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null} + * when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + default Mono xPending(ByteBuffer key, String groupName, String consumerName, Range range, + Long count, Duration minIdleTime) { + return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count) + .minIdleTime(minIdleTime))).next().map(CommandResponse::getOutput); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. @@ -798,24 +859,26 @@ default Mono xPending(ByteBuffer key, String groupName, String * Value Object holding parameters for obtaining pending messages. * * @author Christoph Strobl + * @author Jeonggyu Choi * @since 2.3 */ class PendingRecordsCommand extends KeyCommand { private final String groupName; - private final @Nullable String consumerName; - private final Range range; - private final @Nullable Long count; + private final XPendingOptions options; private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range range, - @Nullable Long count) { + @Nullable Long count, @Nullable Duration minIdleTime) { + + this(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime)); + } + + private PendingRecordsCommand(ByteBuffer key, String groupName, XPendingOptions options) { super(key); this.groupName = groupName; - this.consumerName = consumerName; - this.range = range; - this.count = count; + this.options = options; } /** @@ -826,7 +889,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String * @return new instance of {@link PendingRecordsCommand}. */ static PendingRecordsCommand pending(ByteBuffer key, String groupName) { - return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null); + return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null); } /** @@ -841,7 +904,7 @@ public PendingRecordsCommand range(Range range, Long count) { Assert.notNull(range, "Range must not be null"); Assert.isTrue(count > -1, "Count must not be negative"); - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count); + return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.range(range, count)); } /** @@ -851,7 +914,20 @@ public PendingRecordsCommand range(Range range, Long count) { * @return new instance of {@link PendingRecordsCommand}. */ public PendingRecordsCommand consumer(String consumerName) { - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count); + return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.unbounded().consumer(consumerName)); + } + + /** + * Append given minimum idle time. + * + * @param minIdleTime must not be {@literal null}. + * @return new instance of {@link PendingRecordsCommand}. + */ + public PendingRecordsCommand minIdleTime(Duration minIdleTime) { + + Assert.notNull(minIdleTime, "Idle must not be null"); + + return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.unbounded().minIdleTime(minIdleTime)); } public String getGroupName() { @@ -863,14 +939,14 @@ public String getGroupName() { */ @Nullable public String getConsumerName() { - return consumerName; + return options.getConsumerName(); } /** * @return never {@literal null}. */ public Range getRange() { - return range; + return options.getRange(); } /** @@ -878,21 +954,36 @@ public Range getRange() { */ @Nullable public Long getCount() { - return count; + return options.getCount(); + } + + /** + * @return can be {@literal null}. + */ + @Nullable + public Duration getMinIdleTime() { + return options.getMinIdleTime(); } /** * @return {@literal true} if a consumer name is present. */ public boolean hasConsumer() { - return StringUtils.hasText(consumerName); + return StringUtils.hasText(options.getConsumerName()); } /** * @return {@literal true} count is set. */ public boolean isLimited() { - return count != null; + return options.getCount() != null; + } + + /** + * @return {@literal true} if idle is set. + */ + public boolean hasIdle() { + return options.getMinIdleTime() != null; } } diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index 8385d70d34..9d2477dc96 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -41,6 +41,7 @@ * @author Tugdual Grall * @author Dengliming * @author Mark John Moreno + * @author Jeonggyu Choi * @see Redis Documentation - Streams * @since 2.2 */ @@ -706,6 +707,25 @@ default PendingMessages xPending(byte[] key, String groupName, Range range, L return xPending(key, groupName, XPendingOptions.range(range, count)); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a + * {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(byte[] key, String groupName, Range range, Long count, Duration minIdleTime) { + return xPending(key, groupName, XPendingOptions.range(range, count).minIdleTime(minIdleTime)); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@link Consumer} within a {@literal consumer group}. @@ -723,6 +743,24 @@ default PendingMessages xPending(byte[] key, Consumer consumer, Range range, return xPending(key, consumer.getGroup(), consumer.getName(), range, count); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(byte[] key, Consumer consumer, Range range, Long count, Duration minIdleTime) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@literal consumer} within a {@literal consumer group}. @@ -742,6 +780,28 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName)); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null} + * when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range range, Long count, + Duration minIdleTime) { + return xPending(key, groupName, + XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime)); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. @@ -761,6 +821,7 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa * Value Object holding parameters for obtaining pending messages. * * @author Christoph Strobl + * @author Jeonggyu Choi * @since 2.3 */ class XPendingOptions { @@ -768,12 +829,15 @@ class XPendingOptions { private final @Nullable String consumerName; private final Range range; private final @Nullable Long count; + private final @Nullable Duration minIdleTime; - private XPendingOptions(@Nullable String consumerName, Range range, @Nullable Long count) { + private XPendingOptions(@Nullable String consumerName, Range range, @Nullable Long count, + @Nullable Duration minIdleTime) { this.range = range; this.count = count; this.consumerName = consumerName; + this.minIdleTime = minIdleTime; } /** @@ -782,7 +846,7 @@ private XPendingOptions(@Nullable String consumerName, Range range, @Nullable * @return new instance of {@link XPendingOptions}. */ public static XPendingOptions unbounded() { - return new XPendingOptions(null, Range.unbounded(), null); + return new XPendingOptions(null, Range.unbounded(), null, null); } /** @@ -795,7 +859,7 @@ public static XPendingOptions unbounded(Long count) { Assert.isTrue(count > -1, "Count must not be negative"); - return new XPendingOptions(null, Range.unbounded(), count); + return new XPendingOptions(null, Range.unbounded(), count, null); } /** @@ -810,7 +874,7 @@ public static XPendingOptions range(Range range, Long count) { Assert.notNull(range, "Range must not be null"); Assert.isTrue(count > -1, "Count must not be negative"); - return new XPendingOptions(null, range, count); + return new XPendingOptions(null, range, count, null); } /** @@ -820,7 +884,23 @@ public static XPendingOptions range(Range range, Long count) { * @return new instance of {@link XPendingOptions}. */ public XPendingOptions consumer(String consumerName) { - return new XPendingOptions(consumerName, range, count); + + Assert.notNull(consumerName, "Consumer name must not be null"); + + return new XPendingOptions(consumerName, range, count, minIdleTime); + } + + /** + * Append given minimum idle time. + * + * @param minIdleTime must not be {@literal null}. + * @return new instance of {@link XPendingOptions}. + */ + public XPendingOptions minIdleTime(Duration minIdleTime) { + + Assert.notNull(minIdleTime, "Idle must not be null"); + + return new XPendingOptions(consumerName, range, count, minIdleTime); } /** @@ -846,6 +926,26 @@ public String getConsumerName() { return consumerName; } + /** + * @return can be {@literal null}. + */ + @Nullable + public Duration getMinIdleTime() { + return minIdleTime; + } + + /** + * @return can be {@literal null}. + */ + @Nullable + public Long getIdleMillis() { + if (minIdleTime == null) { + return null; + } + + return minIdleTime.toMillis(); + } + /** * @return {@literal true} if a consumer name is present. */ @@ -859,6 +959,13 @@ public boolean hasConsumer() { public boolean isLimited() { return count != null; } + + /** + * @return {@literal true} if idle time is set. + */ + public boolean hasIdle() { + return minIdleTime != null; + } } /** diff --git a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java index 1069e430c8..9bd7121afa 100644 --- a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java @@ -71,6 +71,7 @@ * @author Andrey Shlykov * @author ihaohong * @author Shyngys Sapraliyev + * @author Jeonggyu Choi * @see RedisCallback * @see RedisSerializer * @see StringRedisTemplate @@ -3134,6 +3135,35 @@ default Long xDel(String key, String... entryIds) { @Nullable PendingMessagesSummary xPending(String key, String groupName); + // /** + // * Obtained detailed information about all pending messages for a given {@link Consumer}. + // * + // * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + // * @param consumer the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}. + // * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + // * @see Redis Documentation: xpending + // * @since 3.5 + // */ + // @Nullable + // default PendingMessages xPending(String key, Consumer consumer) { + // return xPending(key, consumer.getGroup(), consumer.getName()); + // } + + // /** + // * Obtained detailed information about all pending messages for a given {@literal consumer}. + // * + // * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + // * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + // * @param consumerName the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}. + // * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + // * @see Redis Documentation: xpending + // * @since 3.5 + // */ + // @Nullable + // default PendingMessages xPending(String key, String groupName, String consumerName) { + // return xPending(key, groupName, XPendingOptions.unbounded().consumer(consumerName)); + // } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given * {@link org.springframework.data.domain.Range} within a {@literal consumer group}. @@ -3152,6 +3182,64 @@ default Long xDel(String key, String... entryIds) { PendingMessages xPending(String key, String groupName, String consumerName, org.springframework.data.domain.Range range, Long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} and {@literal consumer} within a {@literal consumer group} and over a + * given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null} + * when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + PendingMessages xPending(String key, String groupName, String consumerName, + org.springframework.data.domain.Range range, Long count, Duration minIdleTime); + + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} and {@link Consumer} within a {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range range, + Long count) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count); + } + + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} and {@link Consumer} within a {@literal consumer group} and over a + * given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range range, + Long count, Duration minIdleTime) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given * {@link org.springframework.data.domain.Range} within a {@literal consumer group}. @@ -3169,6 +3257,25 @@ PendingMessages xPending(String key, String groupName, String consumerName, PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, Long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} within a {@literal consumer group} and over a given {@link Duration} + * of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + @Nullable + PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, + Long count, Duration minIdleTime); + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java index 9d26a6cd8d..c99ceb3c54 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java @@ -48,6 +48,7 @@ /** * @author Dengliming + * @author Jeonggyu Choi * @since 2.3 */ class JedisClusterStreamCommands implements RedisStreamCommands { @@ -283,6 +284,10 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op pendingParams = pendingParams.consumer(consumerName); } + if (options.hasIdle()) { + pendingParams = pendingParams.idle(options.getIdleMillis()); + } + List response = connection.getCluster().xpending(key, group, pendingParams); return StreamConverters.toPendingMessages(groupName, range, diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java index a68eef451a..2466832c29 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java @@ -55,6 +55,7 @@ * * @author dengliming * @author Mark Paluch + * @author Jeonggyu Choi * @since 2.3 */ class StreamConverters { @@ -306,6 +307,9 @@ public static XPendingParams toXPendingParams(RedisStreamCommands.XPendingOption if (options.hasConsumer()) { xPendingParams.consumer(options.getConsumerName()); } + if (options.hasIdle()) { + xPendingParams.idle(options.getIdleMillis()); + } return xPendingParams; } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java index 1f45c373cd..d98d1fd1c8 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java @@ -18,6 +18,7 @@ import io.lettuce.core.XAddArgs; import io.lettuce.core.XClaimArgs; import io.lettuce.core.XGroupCreateArgs; +import io.lettuce.core.XPendingArgs; import io.lettuce.core.XReadArgs; import io.lettuce.core.XReadArgs.StreamOffset; import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands; @@ -56,6 +57,7 @@ * @author Tugdual Grall * @author Dengliming * @author Mark John Moreno + * @author Jeonggyu Choi * @since 2.2 */ class LettuceReactiveStreamCommands implements ReactiveStreamCommands { @@ -235,9 +237,17 @@ public Flux> xPending( io.lettuce.core.Limit limit = command.isLimited() ? io.lettuce.core.Limit.from(command.getCount()) : io.lettuce.core.Limit.unlimited(); - Flux publisher = command.hasConsumer() ? cmd.xpending(command.getKey(), - io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())), range, limit) - : cmd.xpending(command.getKey(), groupName, range, limit); + XPendingArgs xPendingArgs = XPendingArgs.Builder.xpending(groupName, range, limit); + if (command.hasConsumer()) { + io.lettuce.core.Consumer consumer = io.lettuce.core.Consumer.from(groupName, + ByteUtils.getByteBuffer(command.getConsumerName())); + xPendingArgs.consumer(consumer); + } + if (command.hasIdle()) { + xPendingArgs.idle(command.getMinIdleTime()); + } + + Flux publisher = cmd.xpending(command.getKey(), xPendingArgs); return publisher.collectList().map(it -> { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java index ad5c2281c2..4a9b42eb90 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java @@ -18,6 +18,7 @@ import io.lettuce.core.XAddArgs; import io.lettuce.core.XClaimArgs; import io.lettuce.core.XGroupCreateArgs; +import io.lettuce.core.XPendingArgs; import io.lettuce.core.XReadArgs; import io.lettuce.core.api.async.RedisStreamAsyncCommands; import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; @@ -50,6 +51,7 @@ * @author Dejan Jankov * @author Dengliming * @author Mark John Moreno + * @author Jeonggyu Choi * @since 2.2 */ class LettuceStreamCommands implements RedisStreamCommands { @@ -217,15 +219,17 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op io.lettuce.core.Limit limit = options.isLimited() ? io.lettuce.core.Limit.from(options.getCount()) : io.lettuce.core.Limit.unlimited(); + XPendingArgs xPendingArgs = XPendingArgs.Builder.xpending(group, range, limit); if (options.hasConsumer()) { - - return connection.invoke() - .from(RedisStreamAsyncCommands::xpending, key, - io.lettuce.core.Consumer.from(group, LettuceConverters.toBytes(options.getConsumerName())), range, limit) - .get(it -> StreamConverters.toPendingMessages(groupName, options.getRange(), it)); + io.lettuce.core.Consumer consumer = io.lettuce.core.Consumer.from(group, + LettuceConverters.toBytes(options.getConsumerName())); + xPendingArgs.consumer(consumer); + } + if (options.hasIdle()) { + xPendingArgs.idle(options.getMinIdleTime()); } - return connection.invoke().from(RedisStreamAsyncCommands::xpending, key, group, range, limit) + return connection.invoke().from(RedisStreamAsyncCommands::xpending, key, xPendingArgs) .get(it -> StreamConverters.toPendingMessages(groupName, options.getRange(), it)); } diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java index 3cb27d1dcd..8716a10c82 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.core; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -56,6 +57,7 @@ * @author Marcin Zielinski * @author John Blum * @author jinkshower + * @author Jeonggyu Choi * @since 2.2 */ class DefaultStreamOperations extends AbstractOperations implements StreamOperations { @@ -222,6 +224,13 @@ public PendingMessages pending(K key, String group, Range range, long count) return execute(connection -> connection.xPending(rawKey, group, range, count)); } + @Override + public PendingMessages pending(K key, String group, Range range, long count, Duration minIdleTime) { + + byte[] rawKey = rawKey(key); + return execute(connection -> connection.xPending(rawKey, group, range, count, minIdleTime)); + } + @Override public PendingMessages pending(K key, Consumer consumer, Range range, long count) { @@ -229,6 +238,13 @@ public PendingMessages pending(K key, Consumer consumer, Range range, long co return execute(connection -> connection.xPending(rawKey, consumer, range, count)); } + @Override + public PendingMessages pending(K key, Consumer consumer, Range range, long count, Duration minIdleTime) { + + byte[] rawKey = rawKey(key); + return execute(connection -> connection.xPending(rawKey, consumer, range, count, minIdleTime)); + } + @Override public PendingMessagesSummary pending(K key, String group) { diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java index 59a0647f29..5caacc7d71 100644 --- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java @@ -55,6 +55,7 @@ * @author Marcin Zielinski * @author John Blum * @author jinkshower + * @author Jeonggyu Choi * @since 2.2 */ public interface StreamOperations extends HashMapperProvider { @@ -376,6 +377,22 @@ default PendingMessages pending(K key, Consumer consumer) { */ PendingMessages pending(K key, String group, Range range, long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a + * {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param group the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + PendingMessages pending(K key, String group, Range range, long count, Duration minIdleTime); + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@link Consumer} within a {@literal consumer group}. @@ -390,6 +407,21 @@ default PendingMessages pending(K key, Consumer consumer) { */ PendingMessages pending(K key, Consumer consumer, Range range, long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 3.5 + */ + PendingMessages pending(K key, Consumer consumer, Range range, long count, Duration minIdleTime); + /** * Get the length of a stream. * diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java index 573f105247..a0e0001d65 100644 --- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java @@ -139,6 +139,7 @@ * @author Shyngys Sapraliyev * @author Roman Osadchuk * @author Tihomir Mateev + * @author Jeonggyu Choi */ public abstract class AbstractConnectionIntegrationTests { @@ -4102,16 +4103,97 @@ void xPendingShouldLoadEmptyOverviewCorrectly() { assertThat(info.getPendingMessagesPerConsumer()).isEmpty(); } + // @Test // GH-2046 + // @EnabledOnCommand("XADD") + // void xPendingShouldLoadPendingMessagesForConsumer() { + // + // actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + // actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + // actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + // StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + // + // actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"))); + // + // List results = getResults(); + // assertThat(results).hasSize(4); + // PendingMessages pending = (PendingMessages) results.get(3); + // + // assertThat(pending.size()).isOne(); + // assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + // assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + // assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + // assertThat(pending.get(0).getIdAsString()).isNotNull(); + // } + + // @Test // GH-2046 + // @EnabledOnCommand("XADD") + // void xPendingShouldLoadEmptyPendingMessagesForNotExistingConsumer() { + // + // actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + // actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + // actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + // StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + // + // actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer2"))); + // + // List results = getResults(); + // assertThat(results).hasSize(4); + // PendingMessages pending = (PendingMessages) results.get(3); + // + // assertThat(pending.size()).isZero(); + // } + + // @Test // GH-2046 + // @EnabledOnCommand("XADD") + // void xPendingShouldLoadPendingMessagesForGroupNameAndConsumerName() { + // + // actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + // actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + // actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + // StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + // + // actual.add(connection.xPending(KEY_1, "my-group", "my-consumer")); + // + // List results = getResults(); + // assertThat(results).hasSize(4); + // PendingMessages pending = (PendingMessages) results.get(3); + // + // assertThat(pending.size()).isOne(); + // assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + // assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + // assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + // assertThat(pending.get(0).getIdAsString()).isNotNull(); + // } + + // @Test // GH-2046 + // @EnabledOnCommand("XADD") + // void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerName() { + // + // actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + // actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + // actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + // StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + // + // actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2")); + // + // List results = getResults(); + // assertThat(results).hasSize(4); + // PendingMessages pending = (PendingMessages) results.get(3); + // + // assertThat(pending.size()).isZero(); + // } + @Test // DATAREDIS-1084 @EnabledOnCommand("XADD") - public void xPendingShouldLoadPendingMessages() { + public void xPendingShouldLoadPendingMessagesForConsumerNameWithRange() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); - actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L)); + actual.add( + connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(), 10L)); List results = getResults(); assertThat(results).hasSize(4); @@ -4124,16 +4206,36 @@ public void xPendingShouldLoadPendingMessages() { assertThat(pending.get(0).getIdAsString()).isNotNull(); } - @Test // DATAREDIS-1207 + @Test // DATAREDIS-1084 @EnabledOnCommand("XADD") - public void xPendingShouldWorkWithBoundedRange() { + public void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerNameWithRange() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); - actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.closed("0-0", "+"), 10L)); + actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2", + org.springframework.data.domain.Range.unbounded(), 10L)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadPendingMessagesForIdle() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(), + 10L, Duration.ZERO)); List results = getResults(); assertThat(results).hasSize(4); @@ -4146,17 +4248,37 @@ public void xPendingShouldWorkWithBoundedRange() { assertThat(pending.get(0).getIdAsString()).isNotNull(); } - @Test // DATAREDIS-1084 + @Test // GH-2046 @EnabledOnCommand("XADD") - public void xPendingShouldLoadPendingMessagesForConsumer() { + void xPendingShouldLoadEmptyPendingMessagesForNonOverIdle() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); - actual.add( - connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(), 10L)); + Duration nonOverIdle = Duration.ofSeconds(10); + actual.add(connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(), + 10L, nonOverIdle)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadPendingMessagesForConsumerWithRange() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"), + org.springframework.data.domain.Range.unbounded(), 10L)); List results = getResults(); assertThat(results).hasSize(4); @@ -4169,16 +4291,16 @@ public void xPendingShouldLoadPendingMessagesForConsumer() { assertThat(pending.get(0).getIdAsString()).isNotNull(); } - @Test // DATAREDIS-1084 + @Test // GH-2046 @EnabledOnCommand("XADD") - public void xPendingShouldLoadPendingMessagesForNonExistingConsumer() { + void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerWithRange() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); - actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2", + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer-2"), org.springframework.data.domain.Range.unbounded(), 10L)); List results = getResults(); @@ -4188,9 +4310,96 @@ public void xPendingShouldLoadPendingMessagesForNonExistingConsumer() { assertThat(pending.size()).isZero(); } + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadPendingMessagesForIdleWithConsumer() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"), + org.springframework.data.domain.Range.unbounded(), 10L, Duration.ZERO)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadEmptyPendingMessagesForNonOverIdleWithConsumer() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + Duration nonOverIdle = Duration.ofSeconds(10); + actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"), + org.springframework.data.domain.Range.unbounded(), 10L, nonOverIdle)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + + @Test // DATAREDIS-1207 + @EnabledOnCommand("XADD") + public void xPendingShouldWorkWithBoundedRange() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.closed("0-0", "+"), 10L)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + @Test // DATAREDIS-1084 @EnabledOnCommand("XADD") - void xPendingShouldLoadEmptyPendingMessages() { + public void xPendingShouldLoadPendingMessagesForGroupNameWithRange() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + + @Test // DATAREDIS-1084 + @EnabledOnCommand("XADD") + void xPendingShouldLoadEmptyPendingMessagesForGroupNameWithRange() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); @@ -4204,6 +4413,49 @@ void xPendingShouldLoadEmptyPendingMessages() { assertThat(pending.size()).isZero(); } + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadPendingMessagesForIdleWithGroupName() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + actual.add( + connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L, Duration.ZERO)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isOne(); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + assertThat(pending.get(0).getIdAsString()).isNotNull(); + } + + @Test // GH-2046 + @EnabledOnCommand("XADD") + void xPendingShouldLoadEmptyPendingMessagesForNonOverIdleWithGroupName() { + + actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); + actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); + actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); + + Duration nonOverIdle = Duration.ofSeconds(10); + actual.add( + connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L, nonOverIdle)); + + List results = getResults(); + assertThat(results).hasSize(4); + PendingMessages pending = (PendingMessages) results.get(3); + + assertThat(pending.size()).isZero(); + } + @Test // DATAREDIS-1084 @EnabledOnCommand("XADD") public void xClaim() throws InterruptedException { diff --git a/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java index dedaf0c84d..b5445247ef 100644 --- a/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java @@ -28,6 +28,7 @@ * Unit tests for {@link ReactiveStreamCommands}. * * @author jinkshower + * @author Jeonggyu Choi */ class ReactiveStreamCommandsUnitTests { @@ -53,4 +54,14 @@ void pendingRecordsCommandRangeShouldThrowExceptionWhenCountIsNegative() { assertThatIllegalArgumentException().isThrownBy(() -> command.range(range, -1L)); } + + @Test // GH-2046 + void pendingRecordsCommandIdleShouldThrowExceptionWhenIdleIsNull() { + ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes()); + String groupName = "my-group"; + + PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName); + + assertThatIllegalArgumentException().isThrownBy(() -> command.minIdleTime(null)); + } } diff --git a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java index 74579d57a6..6d332b9aae 100644 --- a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java @@ -26,6 +26,7 @@ * Unit tests for {@link RedisStreamCommands}. * * @author jinkshower + * @author Jeonggyu Choi */ class RedisStreamCommandsUnitTests { @@ -46,4 +47,18 @@ void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() { assertThatIllegalArgumentException().isThrownBy(() -> XPendingOptions.range(range, -1L)); } + + @Test // GH-2046 + void xPendingOptionsIdleShouldThrowExceptionWhenIdleIsNull() { + XPendingOptions xPendingOptions = XPendingOptions.unbounded(); + + assertThatIllegalArgumentException().isThrownBy(() -> xPendingOptions.minIdleTime(null)); + } + + @Test // GH-2046 + void xPendingOptionsConsumerShouldThrowExceptionWhenConsumerNameIsNull() { + XPendingOptions xPendingOptions = XPendingOptions.unbounded(); + + assertThatIllegalArgumentException().isThrownBy(() -> xPendingOptions.consumer(null)); + } } diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java b/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java new file mode 100644 index 0000000000..5d75615fee --- /dev/null +++ b/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java @@ -0,0 +1,30 @@ +package org.springframework.data.redis.connection.jedis; + +import static org.assertj.core.api.Assertions.*; + +import redis.clients.jedis.params.XPendingParams; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +import org.junit.jupiter.api.Test; +import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions; + +/** + * @author Jeonggyu Choi + */ +class StreamConvertersUnitTest { + + @Test // GH-2046 + void shouldConvertIdle() throws NoSuchFieldException, IllegalAccessException { + XPendingOptions options = XPendingOptions.unbounded(5L).minIdleTime(Duration.of(1, ChronoUnit.HOURS)); + + XPendingParams xPendingParams = StreamConverters.toXPendingParams(options); + + Field idle = XPendingParams.class.getDeclaredField("idle"); + idle.setAccessible(true); + Long idleValue = (Long) idle.get(xPendingParams); + assertThat(idleValue).isEqualTo(Duration.of(1, ChronoUnit.HOURS).toMillis()); + } +} diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java index b181ef6a60..e8107855e1 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java @@ -21,6 +21,7 @@ import reactor.test.StepVerifier; import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Collections; import org.assertj.core.data.Offset; @@ -44,6 +45,7 @@ * @author Christoph Strobl * @author Tugdual Grall * @author Dengliming + * @author Jeonggyu Choi */ @EnabledOnCommand("XADD") public class LettuceReactiveStreamCommandsIntegrationTests extends LettuceReactiveCommandsTestSupport { @@ -340,6 +342,156 @@ void xPendingShouldLoadPendingMessagesForNonExistingConsumer() { }).verifyComplete(); } + @ParameterizedRedisTest // GH-2046 + void xPendingShouldLoadPendingMessagesForGroupAndIdle() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration exceededIdle = Duration.of(1, ChronoUnit.MILLIS); + + connection.streamCommands().xPending(KEY_1_BBUFFER, "my-group", Range.open("-", "+"), 10L, exceededIdle) + .delaySubscription(Duration.ofMillis(100)).as(StepVerifier::create).assertNext(it -> { + assertThat(it.size()).isOne(); + assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(it.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(it.get(0).getTotalDeliveryCount()).isOne(); + assertThat(it.get(0).getIdAsString()).isNotNull(); + }).verifyComplete(); + } + + @ParameterizedRedisTest // GH-2046 + void xPendingShouldLoadEmptyPendingMessagesForGroupAndIdleWhenDurationNotExceeded() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration notExceededIdle = Duration.ofMinutes(10); + + connection.streamCommands().xPending(KEY_1_BBUFFER, "my-group", Range.open("-", "+"), 10L, notExceededIdle) + .delaySubscription(Duration.ofMillis(100)) + + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.isEmpty()).isTrue(); + }).verifyComplete(); + } + + @ParameterizedRedisTest // GH-2046 + void xPendingShouldLoadPendingMessagesForGroupNameAndConsumerNameAndIdle() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration exceededIdle = Duration.ofMillis(1); + + connection.streamCommands() + .xPending(KEY_1_BBUFFER, "my-group", "my-consumer", Range.open("-", "+"), 10L, exceededIdle) + .delaySubscription(Duration.ofMillis(100)) + + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.size()).isOne(); + assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(it.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(it.get(0).getTotalDeliveryCount()).isOne(); + assertThat(it.get(0).getIdAsString()).isNotNull(); + }).verifyComplete(); + } + + @ParameterizedRedisTest // GH-2046 + void xPendingShouldLoadEmptyPendingMessagesForGroupNameAndConsumerNameAndIdleWhenDurationNotExceeded() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration notExceededIdle = Duration.ofMinutes(10); + + connection.streamCommands() + .xPending(KEY_1_BBUFFER, "my-group", "my-consumer", Range.open("-", "+"), 10L, notExceededIdle) + .delaySubscription(Duration.ofMillis(100)) + + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.isEmpty()).isTrue(); + }).verifyComplete(); + } + + @ParameterizedRedisTest // GH-2046 + void xPendingShouldLoadPendingMessageesForConsumerAndIdle() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration exceededIdle = Duration.ofMillis(1); + + connection.streamCommands() + .xPending(KEY_1_BBUFFER, Consumer.from("my-group", "my-consumer"), Range.open("-", "+"), 10L, exceededIdle) + .delaySubscription(Duration.ofMillis(100)) + + .as(StepVerifier::create).assertNext(it -> { + assertThat(it.size()).isOne(); + assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(it.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(it.get(0).getTotalDeliveryCount()).isOne(); + assertThat(it.get(0).getIdAsString()).isNotNull(); + }).verifyComplete(); + } + + @ParameterizedRedisTest // GH-2046 + void xPendingShouldLoadEmptyPendingMessagesForConsumerAndIdleWhenDurationNotExceeded() { + + String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1); + nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group"); + + nativeCommands.xadd(KEY_1, KEY_2, VALUE_2); + + connection.streamCommands() + .xReadGroup(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) + .then().as(StepVerifier::create).verifyComplete(); + + Duration notExceededIdle = Duration.ofMinutes(10); + + connection.streamCommands() + .xPending(KEY_1_BBUFFER, Consumer.from("my-group", "my-consumer"), Range.open("-", "+"), 10L, notExceededIdle) + .delaySubscription(Duration.ofMillis(100)).as(StepVerifier::create).assertNext(it -> { + assertThat(it.isEmpty()).isTrue(); + }).verifyComplete(); + } + @ParameterizedRedisTest // DATAREDIS-1084 void xPendingShouldLoadEmptyPendingMessages() {