Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add idle attribute for xpending command #2101

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4548,8 +4548,9 @@ public PendingMessagesSummary xPending(String key, String groupName) {
*/
@Override
public PendingMessages xPending(String key, String groupName, String consumer,
org.springframework.data.domain.Range<String> range, Long count) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, consumer, range, count),
org.springframework.data.domain.Range<String> range, Long count, Long idleMilliSeconds) {

return convertAndReturn(delegate.xPending(serialize(key), groupName, consumer, range, count,idleMilliSeconds),
Converters.identityConverter());
}

Expand All @@ -4560,7 +4561,18 @@ public PendingMessages xPending(String key, String groupName, String consumer,
@Override
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
Long count) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter());
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, null), Converters.identityConverter());
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.StringRedisConnection#xPending(java.lang.String, java.lang.String, org.springframework.data.domain.Range, java.lang.Long, java.lang.Long)
*/
@Override
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
Long count, Long idleMilliSeconds) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, idleMilliSeconds),
Converters.identityConverter());
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,8 @@ default Mono<PendingMessagesSummary> xPending(ByteBuffer key, String groupName)
Assert.notNull(key, "Key must not be null!");
Assert.notNull(groupName, "GroupName must not be null!");

return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null))).next()
.map(CommandResponse::getOutput);
return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null)))
.next().map(CommandResponse::getOutput);
}

/**
Expand Down Expand Up @@ -646,8 +646,8 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer) {
*/
@Nullable
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName) {
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null))).next()
.map(CommandResponse::getOutput);
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null, null)))
.next().map(CommandResponse::getOutput);
}

/**
Expand All @@ -663,7 +663,26 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
* @since 2.3
*/
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count) {
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count))).next()
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count, null))).next()
.map(CommandResponse::getOutput);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
* {@literal consumer group}.
*
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
* @param range the range of messages ids to search within. Must not be {@literal null}.
* @param count limit the number of results. Must not be {@literal null}.
* @param idleMilliSeconds the pending messages which were idle for certain duration . Must not be {@literal null}.
* @return {@link Mono} emitting pending messages for the given {@literal consumer group}. transaction.
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
* @since 2.3
*/
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count,
Long idleMilliSeconds) {
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count, idleMilliSeconds))).next()
.map(CommandResponse::getOutput);
}

Expand Down Expand Up @@ -699,10 +718,31 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
*/
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
Long count) {
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count))).next()
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count, null))).next()
.map(CommandResponse::getOutput);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@literal consumer} within a {@literal consumer group}.
*
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
* @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
* @param range the range of messages ids to search within. Must not be {@literal null}.
* @param count limit the number of results. Must not be {@literal null}.
* @param idleMilliSeconds the pending messages which were idle for certain duration . Must not be {@literal null}.
* @return {@link Mono} emitting pending messages for the given {@literal consumer} in given
* {@literal consumer group}.
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
* @since 2.3
*/
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
Long count, Long idleMilliSeconds) {
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count, idleMilliSeconds)))
.next().map(CommandResponse::getOutput);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
* options}.
Expand All @@ -726,16 +766,18 @@ class PendingRecordsCommand extends KeyCommand {
private final @Nullable String consumerName;
private final Range<?> range;
private final @Nullable Long count;
private final @Nullable Long idleMilliSeconds;

private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range<?> range,
@Nullable Long count) {
@Nullable Long count, @Nullable Long idleMilliSeconds) {

super(key);

this.groupName = groupName;
this.consumerName = consumerName;
this.range = range;
this.count = count;
this.idleMilliSeconds = idleMilliSeconds;
}

/**
Expand All @@ -746,16 +788,17 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String
* @return new instance of {@link PendingRecordsCommand}.
*/
static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null);
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null);
}

/**
* Create new {@link PendingRecordsCommand} with given {@link Range} and limit.
* Create new {@link PendingRecordsCommand} with given {@link Range},limit and messages which are idle for certain
* time.
*
* @return new instance of {@link XPendingOptions}.
*/
public PendingRecordsCommand range(Range<String> range, Long count) {
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
public PendingRecordsCommand range(Range<String> range, Long count, Long idleMilliSeconds) {
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idleMilliSeconds);
}

/**
Expand All @@ -765,7 +808,7 @@ public PendingRecordsCommand range(Range<String> range, Long count) {
* @return new instance of {@link PendingRecordsCommand}.
*/
public PendingRecordsCommand consumer(String consumerName) {
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count, idleMilliSeconds);
}

public String getGroupName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,14 +640,15 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
* @param range the range of messages ids to search within. Must not be {@literal null}.
* @param count limit the number of results. Must not be {@literal null}.
* @param idleMilliSeconds the pending messages which were idle for certain duration . Must not be {@literal null}.
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
* transaction.
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
* @since 2.3
*/
@Nullable
default PendingMessages xPending(byte[] key, String groupName, Range<?> range, Long count) {
return xPending(key, groupName, XPendingOptions.range(range, count));
default PendingMessages xPending(byte[] key, String groupName, Range<?> range, Long count, Long idleMilliSeconds) {
return xPending(key, groupName, XPendingOptions.range(range, count, idleMilliSeconds));
}

/**
Expand All @@ -658,13 +659,14 @@ default PendingMessages xPending(byte[] key, String groupName, Range<?> range, L
* @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
* @param range the range of messages ids to search within. Must not be {@literal null}.
* @param count limit the number of results. Must not be {@literal null}.
* @param idleMilliSeconds the pending messages which were idle for certain duration . Must not be {@literal null}.
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
* @since 2.3
*/
@Nullable
default PendingMessages xPending(byte[] key, Consumer consumer, Range<?> range, Long count) {
return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
default PendingMessages xPending(byte[] key, Consumer consumer, Range<?> range, Long count, Long idleMilliSeconds) {
return xPending(key, consumer.getGroup(), consumer.getName(), range, count, idleMilliSeconds);
}

/**
Expand All @@ -682,8 +684,9 @@ default PendingMessages xPending(byte[] key, Consumer consumer, Range<?> range,
* @since 2.3
*/
@Nullable
default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range<?> range, Long count) {
return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName));
default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range<?> range, Long count,
Long idleMilliSeconds) {
return xPending(key, groupName, XPendingOptions.range(range, count, idleMilliSeconds).consumer(consumerName));
}

/**
Expand Down Expand Up @@ -712,12 +715,15 @@ class XPendingOptions {
private final @Nullable String consumerName;
private final Range<?> range;
private final @Nullable Long count;
private final @Nullable Long idleMilliseconds;

private XPendingOptions(@Nullable String consumerName, Range<?> range, @Nullable Long count) {
private XPendingOptions(@Nullable String consumerName, Range<?> range, @Nullable Long count,
@Nullable Long idleMilliseconds) {

this.range = range;
this.count = count;
this.consumerName = consumerName;
this.idleMilliseconds = idleMilliseconds;
}

/**
Expand All @@ -726,7 +732,7 @@ private XPendingOptions(@Nullable String consumerName, Range<?> range, @Nullable
* @return new instance of {@link XPendingOptions}.
*/
public static XPendingOptions unbounded() {
return new XPendingOptions(null, Range.unbounded(), null);
return new XPendingOptions(null, Range.unbounded(), null, null);
}

/**
Expand All @@ -736,16 +742,17 @@ public static XPendingOptions unbounded() {
* @return new instance of {@link XPendingOptions}.
*/
public static XPendingOptions unbounded(Long count) {
return new XPendingOptions(null, Range.unbounded(), count);
return new XPendingOptions(null, Range.unbounded(), count, null);
}

/**
* Create new {@link XPendingOptions} with given {@link Range} and limit.
* StreamOperations Create new {@link XPendingOptions} with given {@link Range}, limit and messages which are idle
* for certain time.
*
* @return new instance of {@link XPendingOptions}.
*/
public static XPendingOptions range(Range<?> range, Long count) {
return new XPendingOptions(null, range, count);
public static XPendingOptions range(Range<?> range, Long count, Long idleMilliseconds) {
return new XPendingOptions(null, range, count, idleMilliseconds);
}

/**
Expand All @@ -755,7 +762,7 @@ public static XPendingOptions range(Range<?> range, Long count) {
* @return new instance of {@link XPendingOptions}.
*/
public XPendingOptions consumer(String consumerName) {
return new XPendingOptions(consumerName, range, count);
return new XPendingOptions(consumerName, range, count, idleMilliseconds);
}

/**
Expand Down Expand Up @@ -794,6 +801,14 @@ public boolean hasConsumer() {
public boolean isLimited() {
return count != null && count > -1;
}

/**
* @return can be {@literal null}.
*/
@Nullable
public Long getIdleMilliseconds() {
return idleMilliseconds;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2825,6 +2825,25 @@ default Long xDel(String key, String... entryIds) {
PendingMessages xPending(String key, String groupName, String consumerName,
org.springframework.data.domain.Range<String> range, Long count);

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given
* {@link org.springframework.data.domain.Range} within a {@literal consumer group}.
*
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
* @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
* @param range the range of messages ids to search within. Must not be {@literal null}.
* @param count limit the number of results. Must not be {@literal null}.
* @param idleMilliSeconds the time limit until the pending messages are idle. Must not be {@literal null}.
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
* transaction.
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
* @since 2.3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@since should point to version 2.6 in all newly introduced methods.

*/
@Nullable
PendingMessages xPending(String key, String groupName, String consumerName,
org.springframework.data.domain.Range<String> range, Long count, Long idleMilliSeconds);

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given
* {@link org.springframework.data.domain.Range} within a {@literal consumer group}.
Expand All @@ -2842,6 +2861,24 @@ PendingMessages xPending(String key, String groupName, String consumerName,
PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
Long count);

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given
* {@link org.springframework.data.domain.Range} within a {@literal consumer group}.
*
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
* @param range the range of messages ids to search within. Must not be {@literal null}.
* @param count limit the number of results. Must not be {@literal null}.
* @param idleMilliSeconds the time limit until the pending messages are idle. Must not be {@literal null}.
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
* transaction.
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
* @since 2.3
*/
@Nullable
PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
Long count, Long idleMilliSeconds);

/**
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
* options}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,18 @@ public XInfoGroups groups(K key) {
public PendingMessages pending(K key, String group, Range<?> range, long count) {

byte[] rawKey = rawKey(key);
return execute(connection -> connection.xPending(rawKey, group, range, count), true);
return execute(connection -> connection.xPending(rawKey, group, range, count, null), true);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.core.StreamOperations#pending(java.lang.Object, java.lang.String, org.springframework.data.domain.Range, java.lang.Long, java.lang.Long)
*/
@Override
public PendingMessages pending(K key, String group, Range<?> range, long count,
long idleMilliSeconds) {
byte[] rawKey = rawKey(key);
return execute(connection -> connection.xPending(rawKey, group, range, count, idleMilliSeconds), true);
}

/*
Expand All @@ -236,7 +247,18 @@ public PendingMessages pending(K key, String group, Range<?> range, long count)
public PendingMessages pending(K key, Consumer consumer, Range<?> range, long count) {

byte[] rawKey = rawKey(key);
return execute(connection -> connection.xPending(rawKey, consumer, range, count), true);
return execute(connection -> connection.xPending(rawKey, consumer, range, count, null), true);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.core.StreamOperations#pending(java.lang.Object, org.springframework.data.redis.connection.stream.Consumer, org.springframework.data.domain.Range, java.lang.Long, java.lang.Long)
*/
@Override
public PendingMessages pending(K key, Consumer consumer, Range<?> range, long count,
long idleMilliSeconds) {
byte[] rawKey = rawKey(key);
return execute(connection -> connection.xPending(rawKey, consumer, range, count, idleMilliSeconds), true);
}

/*
Expand Down
Loading