Skip to content

Commit 6b657b5

Browse files
committed
Add IDLE argument to XPENDING command.
Original pull request spring-projects#3116 Closes spring-projects#2046 Signed-off-by: Jeonggyu Choi <[email protected]>
1 parent 06f3591 commit 6b657b5

15 files changed

+862
-31
lines changed

src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java

+15
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
* @author ihaohong
8383
* @author Dennis Neufeld
8484
* @author Shyngys Sapraliyev
85+
* @author Jeonggyu Choi
8586
*/
8687
@SuppressWarnings({ "ConstantConditions", "deprecation" })
8788
public class DefaultStringRedisConnection implements StringRedisConnection, DecoratedRedisConnection {
@@ -2968,12 +2969,26 @@ public PendingMessages xPending(String key, String groupName, String consumer,
29682969
Converters.identityConverter());
29692970
}
29702971

2972+
@Override
2973+
public PendingMessages xPending(String key, String groupName, String consumerName,
2974+
org.springframework.data.domain.Range<String> range, Long count, Duration idle) {
2975+
return convertAndReturn(delegate.xPending(serialize(key), groupName, consumerName, range, count, idle),
2976+
Converters.identityConverter());
2977+
}
2978+
29712979
@Override
29722980
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
29732981
Long count) {
29742982
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter());
29752983
}
29762984

2985+
@Override
2986+
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
2987+
Long count, Duration idle) {
2988+
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, idle),
2989+
Converters.identityConverter());
2990+
}
2991+
29772992
@Override
29782993
public PendingMessages xPending(String key, String groupName, XPendingOptions options) {
29792994
return convertAndReturn(delegate.xPending(serialize(key), groupName, options), Converters.identityConverter());

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

+94-4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* @author Dengliming
6161
* @author Mark John Moreno
6262
* @author jinkshower
63+
* @author Jeonggyu Choi
6364
* @since 2.2
6465
*/
6566
public interface ReactiveStreamCommands {
@@ -747,6 +748,25 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?
747748
.map(CommandResponse::getOutput);
748749
}
749750

751+
/**
752+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
753+
* {@literal consumer group} and over a given {@link Duration} of idle time.
754+
*
755+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
756+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
757+
* @param range the range of messages ids to search within. Must not be {@literal null}.
758+
* @param count limit the number of results. Must not be {@literal null}.
759+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
760+
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
761+
* transaction.
762+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
763+
* @since 3.5
764+
*/
765+
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count, Duration idle) {
766+
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).idle(idle))).next()
767+
.map(CommandResponse::getOutput);
768+
}
769+
750770
/**
751771
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
752772
* {@link Consumer} within a {@literal consumer group}.
@@ -763,6 +783,23 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
763783
return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
764784
}
765785

786+
/**
787+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
788+
* {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
789+
*
790+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
791+
* @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
792+
* @param range the range of messages ids to search within. Must not be {@literal null}.
793+
* @param count limit the number of results. Must not be {@literal null}.
794+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
795+
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
796+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
797+
* @since 3.5
798+
*/
799+
default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<?> range, Long count, Duration idle) {
800+
return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idle);
801+
}
802+
766803
/**
767804
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
768805
* {@literal consumer} within a {@literal consumer group}.
@@ -783,6 +820,28 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
783820
.next().map(CommandResponse::getOutput);
784821
}
785822

823+
/**
824+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
825+
* {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
826+
*
827+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
828+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
829+
* @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
830+
* @param range the range of messages ids to search within. Must not be {@literal null}.
831+
* @param count limit the number of results. Must not be {@literal null}.
832+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
833+
* @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
834+
* when used in pipeline / transaction.
835+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
836+
* @since 3.5
837+
*/
838+
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
839+
Long count, Duration idle) {
840+
return xPending(
841+
Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count).idle(idle)))
842+
.next().map(CommandResponse::getOutput);
843+
}
844+
786845
/**
787846
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
788847
* options}.
@@ -798,6 +857,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
798857
* Value Object holding parameters for obtaining pending messages.
799858
*
800859
* @author Christoph Strobl
860+
* @author Jeonggyu Choi
801861
* @since 2.3
802862
*/
803863
class PendingRecordsCommand extends KeyCommand {
@@ -806,16 +866,18 @@ class PendingRecordsCommand extends KeyCommand {
806866
private final @Nullable String consumerName;
807867
private final Range<?> range;
808868
private final @Nullable Long count;
869+
private final @Nullable Duration idle;
809870

810871
private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range<?> range,
811-
@Nullable Long count) {
872+
@Nullable Long count, @Nullable Duration idle) {
812873

813874
super(key);
814875

815876
this.groupName = groupName;
816877
this.consumerName = consumerName;
817878
this.range = range;
818879
this.count = count;
880+
this.idle = idle;
819881
}
820882

821883
/**
@@ -826,7 +888,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String
826888
* @return new instance of {@link PendingRecordsCommand}.
827889
*/
828890
static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
829-
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null);
891+
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null);
830892
}
831893

832894
/**
@@ -841,7 +903,7 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
841903
Assert.notNull(range, "Range must not be null");
842904
Assert.isTrue(count > -1, "Count must not be negative");
843905

844-
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
906+
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, null);
845907
}
846908

847909
/**
@@ -851,7 +913,20 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
851913
* @return new instance of {@link PendingRecordsCommand}.
852914
*/
853915
public PendingRecordsCommand consumer(String consumerName) {
854-
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
916+
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idle);
917+
}
918+
919+
/**
920+
* Append given idle time.
921+
*
922+
* @param idle must not be {@literal null}.
923+
* @return new instance of {@link PendingRecordsCommand}.
924+
*/
925+
public PendingRecordsCommand idle(Duration idle) {
926+
927+
Assert.notNull(idle, "Idle must not be null");
928+
929+
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idle);
855930
}
856931

857932
public String getGroupName() {
@@ -881,6 +956,14 @@ public Long getCount() {
881956
return count;
882957
}
883958

959+
/**
960+
* @return can be {@literal null}.
961+
*/
962+
@Nullable
963+
public Duration getIdle() {
964+
return idle;
965+
}
966+
884967
/**
885968
* @return {@literal true} if a consumer name is present.
886969
*/
@@ -894,6 +977,13 @@ public boolean hasConsumer() {
894977
public boolean isLimited() {
895978
return count != null;
896979
}
980+
981+
/**
982+
* @return {@literal true} if idle is set.
983+
*/
984+
public boolean hasIdle() {
985+
return idle != null;
986+
}
897987
}
898988

899989
/**

0 commit comments

Comments
 (0)