From 230ab7ac4ac4d53ca49d484aead10d3495ecc2d5 Mon Sep 17 00:00:00 2001 From: SivaTharun Date: Mon, 28 Jun 2021 10:30:12 +0530 Subject: [PATCH] 2046 - implementation the idle attribute support for xpending command - added the support for idle attribute support for xpending commands in reactive stream commands and redis stream commands --- .../DefaultStringRedisConnection.java | 26 ++++++- .../connection/ReactiveStreamCommands.java | 67 +++++++++++++++---- .../redis/connection/RedisStreamCommands.java | 41 ++++++++---- .../connection/StringRedisConnection.java | 37 ++++++++++ .../redis/core/DefaultStreamOperations.java | 26 ++++++- .../data/redis/core/StreamOperations.java | 31 +++++++++ 6 files changed, 199 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java index 6ce7ac173c..2415ca07b9 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java @@ -3930,7 +3930,18 @@ public PendingMessagesSummary xPending(String key, String groupName) { @Override public PendingMessages xPending(String key, String groupName, String consumer, org.springframework.data.domain.Range range, Long count) { - return convertAndReturn(delegate.xPending(serialize(key), groupName, consumer, range, count), Converters.identityConverter()); + return convertAndReturn(delegate.xPending(serialize(key), groupName, consumer, range, count, null), Converters.identityConverter()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.StringRedisConnection#xPending(java.lang.String, java.lang.String, java.lang.String, org.springframework.data.domain.Range, java.lang.Long) + */ + @Override + public PendingMessages xPending(String key, String groupName, String consumer, + org.springframework.data.domain.Range range, Long count, Long idleMilliSeconds) { + return convertAndReturn(delegate.xPending(serialize(key), groupName, consumer, range, count, idleMilliSeconds), + Converters.identityConverter()); } /* @@ -3940,7 +3951,18 @@ public PendingMessages xPending(String key, String groupName, String consumer, @Override public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, Long count) { - return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter()); + return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, null), Converters.identityConverter()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.StringRedisConnection#xPending(java.lang.String, java.lang.String, org.springframework.data.domain.Range, java.lang.Long, java.lang.Long) + */ + @Override + public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, + Long count, Long idleMilliSeconds) { + return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, idleMilliSeconds), + Converters.identityConverter()); } /* diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index dc46f75c1b..07322a450e 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -573,8 +573,8 @@ default Mono xPending(ByteBuffer key, String groupName) Assert.notNull(key, "Key must not be null!"); Assert.notNull(groupName, "GroupName must not be null!"); - return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null))).next() - .map(CommandResponse::getOutput); + return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null))) + .next().map(CommandResponse::getOutput); } /** @@ -614,8 +614,8 @@ default Mono xPending(ByteBuffer key, Consumer consumer) { */ @Nullable default Mono xPending(ByteBuffer key, String groupName, String consumerName) { - return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null))).next() - .map(CommandResponse::getOutput); + return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null, null))) + .next().map(CommandResponse::getOutput); } /** @@ -631,7 +631,26 @@ default Mono xPending(ByteBuffer key, String groupName, String * @since 2.3 */ default Mono xPending(ByteBuffer key, String groupName, Range range, Long count) { - return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count))).next() + return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count, null))).next() + .map(CommandResponse::getOutput); + } + + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a + * {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idleMilliSeconds the pending messages which were idle for certain duration . Must not be {@literal null}. + * @return {@link Mono} emitting pending messages for the given {@literal consumer group}. transaction. + * @see Redis Documentation: xpending + * @since 2.3 + */ + default Mono xPending(ByteBuffer key, String groupName, Range range, Long count, + Long idleMilliSeconds) { + return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count, idleMilliSeconds))).next() .map(CommandResponse::getOutput); } @@ -667,10 +686,31 @@ default Mono xPending(ByteBuffer key, Consumer consumer, Range< */ default Mono xPending(ByteBuffer key, String groupName, String consumerName, Range range, Long count) { - return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count))).next() + return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count, null))).next() .map(CommandResponse::getOutput); } + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@literal consumer} within a {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idleMilliSeconds the pending messages which were idle for certain duration . Must not be {@literal null}. + * @return {@link Mono} emitting pending messages for the given {@literal consumer} in given + * {@literal consumer group}. + * @see Redis Documentation: xpending + * @since 2.3 + */ + default Mono xPending(ByteBuffer key, String groupName, String consumerName, Range range, + Long count, Long idleMilliSeconds) { + return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count, idleMilliSeconds))) + .next().map(CommandResponse::getOutput); + } + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. @@ -694,9 +734,10 @@ class PendingRecordsCommand extends KeyCommand { private final @Nullable String consumerName; private final Range range; private final @Nullable Long count; + private final @Nullable Long idleMilliSeconds; private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range range, - @Nullable Long count) { + @Nullable Long count, @Nullable Long idleMilliSeconds) { super(key); @@ -704,6 +745,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String this.consumerName = consumerName; this.range = range; this.count = count; + this.idleMilliSeconds = idleMilliSeconds; } /** @@ -714,16 +756,17 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String * @return new instance of {@link PendingRecordsCommand}. */ static PendingRecordsCommand pending(ByteBuffer key, String groupName) { - return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null); + return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null); } /** - * Create new {@link PendingRecordsCommand} with given {@link Range} and limit. + * Create new {@link PendingRecordsCommand} with given {@link Range},limit and messages which are idle for certain + * time. * * @return new instance of {@link XPendingOptions}. */ - public PendingRecordsCommand range(Range range, Long count) { - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count); + public PendingRecordsCommand range(Range range, Long count, Long idleMilliSeconds) { + return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idleMilliSeconds); } /** @@ -733,7 +776,7 @@ public PendingRecordsCommand range(Range range, Long count) { * @return new instance of {@link PendingRecordsCommand}. */ public PendingRecordsCommand consumer(String consumerName) { - return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count); + return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idleMilliSeconds); } public String getGroupName() { diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index f46cc27324..663a754e48 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -605,14 +605,15 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. + * @param idleMilliSeconds the pending messages which were idle for certain duration . Must not be {@literal null}. * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / * transaction. * @see Redis Documentation: xpending * @since 2.3 */ @Nullable - default PendingMessages xPending(byte[] key, String groupName, Range range, Long count) { - return xPending(key, groupName, XPendingOptions.range(range, count)); + default PendingMessages xPending(byte[] key, String groupName, Range range, Long count, Long idleMilliSeconds) { + return xPending(key, groupName, XPendingOptions.range(range, count, idleMilliSeconds)); } /** @@ -623,13 +624,14 @@ default PendingMessages xPending(byte[] key, String groupName, Range range, L * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. * @param range the range of messages ids to search within. Must not be {@literal null}. * @param count limit the number of results. Must not be {@literal null}. + * @param idleMilliSeconds the pending messages which were idle for certain duration . Must not be {@literal null}. * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. * @see Redis Documentation: xpending * @since 2.3 */ @Nullable - default PendingMessages xPending(byte[] key, Consumer consumer, Range range, Long count) { - return xPending(key, consumer.getGroup(), consumer.getName(), range, count); + default PendingMessages xPending(byte[] key, Consumer consumer, Range range, Long count, Long idleMilliSeconds) { + return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idleMilliSeconds); } /** @@ -647,8 +649,9 @@ default PendingMessages xPending(byte[] key, Consumer consumer, Range range, * @since 2.3 */ @Nullable - default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range range, Long count) { - return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName)); + default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range range, Long count, + Long idleMilliSeconds) { + return xPending(key, groupName, XPendingOptions.range(range, count, idleMilliSeconds).consumer(consumerName)); } /** @@ -677,12 +680,15 @@ class XPendingOptions { private final @Nullable String consumerName; private final Range range; private final @Nullable Long count; + private final @Nullable Long idleMilliseconds; - private XPendingOptions(@Nullable String consumerName, Range range, @Nullable Long count) { + private XPendingOptions(@Nullable String consumerName, Range range, @Nullable Long count, + @Nullable Long idleMilliseconds) { this.range = range; this.count = count; this.consumerName = consumerName; + this.idleMilliseconds = idleMilliseconds; } /** @@ -691,7 +697,7 @@ private XPendingOptions(@Nullable String consumerName, Range range, @Nullable * @return new instance of {@link XPendingOptions}. */ public static XPendingOptions unbounded() { - return new XPendingOptions(null, Range.unbounded(), null); + return new XPendingOptions(null, Range.unbounded(), null, null); } /** @@ -701,16 +707,17 @@ public static XPendingOptions unbounded() { * @return new instance of {@link XPendingOptions}. */ public static XPendingOptions unbounded(Long count) { - return new XPendingOptions(null, Range.unbounded(), count); + return new XPendingOptions(null, Range.unbounded(), count, null); } /** - * Create new {@link XPendingOptions} with given {@link Range} and limit. + * StreamOperations Create new {@link XPendingOptions} with given {@link Range}, limit and messages which are idle + * for certain time. * * @return new instance of {@link XPendingOptions}. */ - public static XPendingOptions range(Range range, Long count) { - return new XPendingOptions(null, range, count); + public static XPendingOptions range(Range range, Long count, Long idleMilliseconds) { + return new XPendingOptions(null, range, count, idleMilliseconds); } /** @@ -720,7 +727,7 @@ public static XPendingOptions range(Range range, Long count) { * @return new instance of {@link XPendingOptions}. */ public XPendingOptions consumer(String consumerName) { - return new XPendingOptions(consumerName, range, count); + return new XPendingOptions(consumerName, range, count, idleMilliseconds); } /** @@ -759,6 +766,14 @@ public boolean hasConsumer() { public boolean isLimited() { return count != null && count > -1; } + + /** + * @return can be {@literal null}. + */ + @Nullable + public Long getIdleMilliseconds() { + return idleMilliseconds; + } } /** diff --git a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java index 66c9f27936..bac60349ca 100644 --- a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java @@ -2390,6 +2390,25 @@ default Long xDel(String key, String... entryIds) { PendingMessages xPending(String key, String groupName, String consumerName, org.springframework.data.domain.Range range, Long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} within a {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idleMilliSeconds the time limit until the pending messages are idle. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 2.3 + */ + @Nullable + PendingMessages xPending(String key, String groupName, String consumerName, + org.springframework.data.domain.Range range, Long count, Long idleMilliSeconds); + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given * {@link org.springframework.data.domain.Range} within a {@literal consumer group}. @@ -2407,6 +2426,24 @@ PendingMessages xPending(String key, String groupName, String consumerName, PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, Long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given + * {@link org.springframework.data.domain.Range} within a {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @param idleMilliSeconds the time limit until the pending messages are idle. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 2.3 + */ + @Nullable + PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range range, + Long count, Long idleMilliSeconds); + /** * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions * options}. diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java index a71cfb3383..8e6b1ecdd2 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java @@ -225,7 +225,18 @@ public XInfoGroups groups(K key) { public PendingMessages pending(K key, String group, Range range, long count) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xPending(rawKey, group, range, count), true); + return execute(connection -> connection.xPending(rawKey, group, range, count, null), true); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.StreamOperations#pending(java.lang.Object, java.lang.String, org.springframework.data.domain.Range, java.lang.Long, java.lang.Long) + */ + @Override + public PendingMessages pending(K key, String group, Range range, long count, + long idleMilliSeconds) { + byte[] rawKey = rawKey(key); + return execute(connection -> connection.xPending(rawKey, group, range, count, idleMilliSeconds), true); } /* @@ -236,7 +247,18 @@ public PendingMessages pending(K key, String group, Range range, long count) public PendingMessages pending(K key, Consumer consumer, Range range, long count) { byte[] rawKey = rawKey(key); - return execute(connection -> connection.xPending(rawKey, consumer, range, count), true); + return execute(connection -> connection.xPending(rawKey, consumer, range, count, null), true); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.core.StreamOperations#pending(java.lang.Object, org.springframework.data.redis.connection.stream.Consumer, org.springframework.data.domain.Range, java.lang.Long, java.lang.Long) + */ + @Override + public PendingMessages pending(K key, Consumer consumer, Range range, long count, + long idleMilliSeconds) { + byte[] rawKey = rawKey(key); + return execute(connection -> connection.xPending(rawKey, consumer, range, count, idleMilliSeconds), true); } /* diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java index 2a80131674..9f2b695087 100644 --- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java @@ -281,6 +281,22 @@ default PendingMessages pending(K key, Consumer consumer) { */ PendingMessages pending(K key, String group, Range range, long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a + * {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param group the name of the {@literal consumer group}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. + * @param idleMilliSeconds the time limit until the pending messages are idle. Must not be {@literal null}. + * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline / + * transaction. + * @see Redis Documentation: xpending + * @since 2.3 + */ + PendingMessages pending(K key, String group, Range range, long count, long idleMilliSeconds); + /** * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and * {@link Consumer} within a {@literal consumer group}. @@ -295,6 +311,21 @@ default PendingMessages pending(K key, Consumer consumer) { */ PendingMessages pending(K key, Consumer consumer, Range range, long count); + /** + * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and + * {@link Consumer} within a {@literal consumer group}. + * + * @param key the {@literal key} the stream is stored at. Must not be {@literal null}. + * @param consumer the name of the {@link Consumer}. Must not be {@literal null}. + * @param range the range of messages ids to search within. Must not be {@literal null}. + * @param count limit the number of results. + * @param idleMilliSeconds the time limit until the pending messages are idle. Must not be {@literal null}. + * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: xpending + * @since 2.3 + */ + PendingMessages pending(K key, Consumer consumer, Range range, long count, long idleMilliSeconds); + /** * Get the length of a stream. *