diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java index 79a59dcc6e..2ee676cb2e 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java @@ -4548,8 +4548,9 @@ public PendingMessagesSummary xPending(String key, String groupName) { */ @Override public PendingMessages xPending(String key, String groupName, String consumer, - org.springframework.data.domain.Range range, Long count) { - return convertAndReturn(delegate.xPending(serialize(key), groupName, consumer, range, count), + org.springframework.data.domain.Range range, Long count, Long idleMilliSeconds) { + + return convertAndReturn(delegate.xPending(serialize(key), groupName, consumer, range, count,idleMilliSeconds), Converters.identityConverter()); } @@ -4560,7 +4561,18 @@ public PendingMessages xPending(String key, String groupName, String consumer, @Override public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, Long count) { - return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter()); + return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, null), Converters.identityConverter()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.StringRedisConnection#xPending(java.lang.String, java.lang.String, org.springframework.data.domain.Range, java.lang.Long, java.lang.Long) + */ + @Override + public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, + Long count, Long idleMilliSeconds) { + return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, idleMilliSeconds), + Converters.identityConverter()); } /* diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index 5dc3e883e7..bb8034dda8 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -605,8 +605,8 @@ default Mono xPending(ByteBuffer key, String groupName) Assert.notNull(key, "Key must not be null!"); Assert.notNull(groupName, "GroupName must not be null!"); - return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null))).next() - .map(CommandResponse::getOutput); + return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null))) + .next().map(CommandResponse::getOutput); } /** @@ -646,8 +646,8 @@ default Mono xPending(ByteBuffer key, Consumer consumer) { */ @Nullable default Mono xPending(ByteBuffer key, String groupName, String consumerName) { - return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null))).next() - .map(CommandResponse::getOutput); + return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null, null))) + .next().map(CommandResponse::getOutput); } /** @@ -663,7 +663,26 @@ default Mono xPending(ByteBuffer key, String groupName, String * @since 2.3 */ default Mono xPending(ByteBuffer key, String groupName, Range range, Long count) { - return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count))).next() + return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count, null))).next() + .map(CommandResponse::getOutput); + } + + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a + * {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idleMilliSeconds the pending messages which were idle for certain duration . Must not be {@literal null}. + * @return {@link Mono} emitting pending messages for the given {@literal consumer group}. transaction. + * @see Redis Documentation: xpending + * @since 2.3 + */ + default Mono xPending(ByteBuffer key, String groupName, Range range, Long count, + Long idleMilliSeconds) { + return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count, idleMilliSeconds))).next() .map(CommandResponse::getOutput); } @@ -699,10 +718,31 @@ default Mono xPending(ByteBuffer key, Consumer consumer, Range< */ default Mono xPending(ByteBuffer key, String groupName, String consumerName, Range range, Long count) { - return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count))).next() + return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count, null))).next() .map(CommandResponse::getOutput); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@literal consumer} within a {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idleMilliSeconds the pending messages which were idle for certain duration . Must not be {@literal null}. + * @return {@link Mono} emitting pending messages for the given {@literal consumer} in given + * {@literal consumer group}. + * @see Redis Documentation: xpending + * @since 2.3 + */ + default Mono xPending(ByteBuffer key, String groupName, String consumerName, Range range, + Long count, Long idleMilliSeconds) { + return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count, idleMilliSeconds))) + .next().map(CommandResponse::getOutput); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. @@ -726,9 +766,10 @@ class PendingRecordsCommand extends KeyCommand { private final @Nullable String consumerName; private final Range range; private final @Nullable Long count; + private final @Nullable Long idleMilliSeconds; private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range range, - @Nullable Long count) { + @Nullable Long count, @Nullable Long idleMilliSeconds) { super(key); @@ -736,6 +777,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String this.consumerName = consumerName; this.range = range; this.count = count; + this.idleMilliSeconds = idleMilliSeconds; } /** @@ -746,16 +788,17 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String * @return new instance of {@link PendingRecordsCommand}. */ static PendingRecordsCommand pending(ByteBuffer key, String groupName) { - return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null); + return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null); } /** - * Create new {@link PendingRecordsCommand} with given {@link Range} and limit. + * Create new {@link PendingRecordsCommand} with given {@link Range},limit and messages which are idle for certain + * time. * * @return new instance of {@link XPendingOptions}. */ - public PendingRecordsCommand range(Range range, Long count) { - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count); + public PendingRecordsCommand range(Range range, Long count, Long idleMilliSeconds) { + return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idleMilliSeconds); } /** @@ -765,7 +808,7 @@ public PendingRecordsCommand range(Range range, Long count) { * @return new instance of {@link PendingRecordsCommand}. */ public PendingRecordsCommand consumer(String consumerName) { - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count); + return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idleMilliSeconds); } public String getGroupName() { diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index 7f50913c65..f7f9c62470 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -640,14 +640,15 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. + * @param idleMilliSeconds the pending messages which were idle for certain duration . Must not be {@literal null}. * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / * transaction. * @see Redis Documentation: xpending * @since 2.3 */ @Nullable - default PendingMessages xPending(byte[] key, String groupName, Range range, Long count) { - return xPending(key, groupName, XPendingOptions.range(range, count)); + default PendingMessages xPending(byte[] key, String groupName, Range range, Long count, Long idleMilliSeconds) { + return xPending(key, groupName, XPendingOptions.range(range, count, idleMilliSeconds)); } /** @@ -658,13 +659,14 @@ default PendingMessages xPending(byte[] key, String groupName, Range range, L * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. + * @param idleMilliSeconds the pending messages which were idle for certain duration . Must not be {@literal null}. * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. * @see Redis Documentation: xpending * @since 2.3 */ @Nullable - default PendingMessages xPending(byte[] key, Consumer consumer, Range range, Long count) { - return xPending(key, consumer.getGroup(), consumer.getName(), range, count); + default PendingMessages xPending(byte[] key, Consumer consumer, Range range, Long count, Long idleMilliSeconds) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idleMilliSeconds); } /** @@ -682,8 +684,9 @@ default PendingMessages xPending(byte[] key, Consumer consumer, Range range, * @since 2.3 */ @Nullable - default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range range, Long count) { - return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName)); + default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range range, Long count, + Long idleMilliSeconds) { + return xPending(key, groupName, XPendingOptions.range(range, count, idleMilliSeconds).consumer(consumerName)); } /** @@ -712,12 +715,15 @@ class XPendingOptions { private final @Nullable String consumerName; private final Range range; private final @Nullable Long count; + private final @Nullable Long idleMilliseconds; - private XPendingOptions(@Nullable String consumerName, Range range, @Nullable Long count) { + private XPendingOptions(@Nullable String consumerName, Range range, @Nullable Long count, + @Nullable Long idleMilliseconds) { this.range = range; this.count = count; this.consumerName = consumerName; + this.idleMilliseconds = idleMilliseconds; } /** @@ -726,7 +732,7 @@ private XPendingOptions(@Nullable String consumerName, Range range, @Nullable * @return new instance of {@link XPendingOptions}. */ public static XPendingOptions unbounded() { - return new XPendingOptions(null, Range.unbounded(), null); + return new XPendingOptions(null, Range.unbounded(), null, null); } /** @@ -736,16 +742,17 @@ public static XPendingOptions unbounded() { * @return new instance of {@link XPendingOptions}. */ public static XPendingOptions unbounded(Long count) { - return new XPendingOptions(null, Range.unbounded(), count); + return new XPendingOptions(null, Range.unbounded(), count, null); } /** - * Create new {@link XPendingOptions} with given {@link Range} and limit. + * StreamOperations Create new {@link XPendingOptions} with given {@link Range}, limit and messages which are idle + * for certain time. * * @return new instance of {@link XPendingOptions}. */ - public static XPendingOptions range(Range range, Long count) { - return new XPendingOptions(null, range, count); + public static XPendingOptions range(Range range, Long count, Long idleMilliseconds) { + return new XPendingOptions(null, range, count, idleMilliseconds); } /** @@ -755,7 +762,7 @@ public static XPendingOptions range(Range range, Long count) { * @return new instance of {@link XPendingOptions}. */ public XPendingOptions consumer(String consumerName) { - return new XPendingOptions(consumerName, range, count); + return new XPendingOptions(consumerName, range, count, idleMilliseconds); } /** @@ -794,6 +801,14 @@ public boolean hasConsumer() { public boolean isLimited() { return count != null && count > -1; } + + /** + * @return can be {@literal null}. + */ + @Nullable + public Long getIdleMilliseconds() { + return idleMilliseconds; + } } /** diff --git a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java index d4640e886a..468b23059d 100644 --- a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java @@ -2825,6 +2825,25 @@ default Long xDel(String key, String... entryIds) { PendingMessages xPending(String key, String groupName, String consumerName, org.springframework.data.domain.Range range, Long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} within a {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idleMilliSeconds the time limit until the pending messages are idle. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 2.3 + */ + @Nullable + PendingMessages xPending(String key, String groupName, String consumerName, + org.springframework.data.domain.Range range, Long count, Long idleMilliSeconds); + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given * {@link org.springframework.data.domain.Range} within a {@literal consumer group}. @@ -2842,6 +2861,24 @@ PendingMessages xPending(String key, String groupName, String consumerName, PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, Long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} within a {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idleMilliSeconds the time limit until the pending messages are idle. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 2.3 + */ + @Nullable + PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, + Long count, Long idleMilliSeconds); + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java index a71cfb3383..8e6b1ecdd2 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java @@ -225,7 +225,18 @@ public XInfoGroups groups(K key) { public PendingMessages pending(K key, String group, Range range, long count) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xPending(rawKey, group, range, count), true); + return execute(connection -> connection.xPending(rawKey, group, range, count, null), true); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.StreamOperations#pending(java.lang.Object, java.lang.String, org.springframework.data.domain.Range, java.lang.Long, java.lang.Long) + */ + @Override + public PendingMessages pending(K key, String group, Range range, long count, + long idleMilliSeconds) { + byte[] rawKey = rawKey(key); + return execute(connection -> connection.xPending(rawKey, group, range, count, idleMilliSeconds), true); } /* @@ -236,7 +247,18 @@ public PendingMessages pending(K key, String group, Range range, long count) public PendingMessages pending(K key, Consumer consumer, Range range, long count) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xPending(rawKey, consumer, range, count), true); + return execute(connection -> connection.xPending(rawKey, consumer, range, count, null), true); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.StreamOperations#pending(java.lang.Object, org.springframework.data.redis.connection.stream.Consumer, org.springframework.data.domain.Range, java.lang.Long, java.lang.Long) + */ + @Override + public PendingMessages pending(K key, Consumer consumer, Range range, long count, + long idleMilliSeconds) { + byte[] rawKey = rawKey(key); + return execute(connection -> connection.xPending(rawKey, consumer, range, count, idleMilliSeconds), true); } /* diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java index 2a80131674..9f2b695087 100644 --- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java @@ -281,6 +281,22 @@ default PendingMessages pending(K key, Consumer consumer) { */ PendingMessages pending(K key, String group, Range range, long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a + * {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param group the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. + * @param idleMilliSeconds the time limit until the pending messages are idle. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 2.3 + */ + PendingMessages pending(K key, String group, Range range, long count, long idleMilliSeconds); + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@link Consumer} within a {@literal consumer group}. @@ -295,6 +311,21 @@ default PendingMessages pending(K key, Consumer consumer) { */ PendingMessages pending(K key, Consumer consumer, Range range, long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@link Consumer} within a {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. + * @param idleMilliSeconds the time limit until the pending messages are idle. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 2.3 + */ + PendingMessages pending(K key, Consumer consumer, Range range, long count, long idleMilliSeconds); + /** * Get the length of a stream. *