Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IDLE argument to XPENDING command #3116

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -82,6 +82,7 @@
* @author ihaohong
* @author Dennis Neufeld
* @author Shyngys Sapraliyev
* @author Jeonggyu Choi
*/
@SuppressWarnings({ "ConstantConditions", "deprecation" })
public class DefaultStringRedisConnection implements StringRedisConnection, DecoratedRedisConnection {
@@ -2968,12 +2969,26 @@ public PendingMessages xPending(String key, String groupName, String consumer,
Converters.identityConverter());
}

@Override
public PendingMessages xPending(String key, String groupName, String consumerName,
org.springframework.data.domain.Range<String> range, Long count, Duration minIdleTime) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, consumerName, range, count, minIdleTime),
Converters.identityConverter());
}

@Override
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
Long count) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter());
}

@Override
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
Long count, Duration minIdleTime) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, minIdleTime),
Converters.identityConverter());
}

@Override
public PendingMessages xPending(String key, String groupName, XPendingOptions options) {
return convertAndReturn(delegate.xPending(serialize(key), groupName, options), Converters.identityConverter());
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@
* @author Dengliming
* @author Mark John Moreno
* @author jinkshower
* @author Jeonggyu Choi
* @since 2.2
*/
public interface ReactiveStreamCommands {
@@ -747,6 +748,27 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?
.map(CommandResponse::getOutput);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
* {@literal consumer group} and over a given {@link Duration} of idle time.
*
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
* @param range the range of messages ids to search within. Must not be {@literal null}.
* @param count limit the number of results. Must not be {@literal null}.
* @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
* transaction.
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
* @since 3.5
*/
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count,
Duration minIdleTime) {
return xPending(
Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).minIdleTime(minIdleTime))).next()
.map(CommandResponse::getOutput);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@link Consumer} within a {@literal consumer group}.
@@ -763,6 +785,24 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
*
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
* @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
* @param range the range of messages ids to search within. Must not be {@literal null}.
* @param count limit the number of results. Must not be {@literal null}.
* @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
* @since 3.5
*/
default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<?> range, Long count,
Duration minIdleTime) {
return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@literal consumer} within a {@literal consumer group}.
@@ -783,6 +823,27 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
.next().map(CommandResponse::getOutput);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
* {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
*
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
* @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
* @param range the range of messages ids to search within. Must not be {@literal null}.
* @param count limit the number of results. Must not be {@literal null}.
* @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
* @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
* when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
* @since 3.5
*/
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
Long count, Duration minIdleTime) {
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count)
.minIdleTime(minIdleTime))).next().map(CommandResponse::getOutput);
}

/**
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
* options}.
@@ -798,24 +859,26 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
* Value Object holding parameters for obtaining pending messages.
*
* @author Christoph Strobl
* @author Jeonggyu Choi
* @since 2.3
*/
class PendingRecordsCommand extends KeyCommand {

private final String groupName;
private final @Nullable String consumerName;
private final Range<?> range;
private final @Nullable Long count;
private final XPendingOptions options;

private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range<?> range,
@Nullable Long count) {
@Nullable Long count, @Nullable Duration minIdleTime) {

this(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime));
}

private PendingRecordsCommand(ByteBuffer key, String groupName, XPendingOptions options) {

super(key);

this.groupName = groupName;
this.consumerName = consumerName;
this.range = range;
this.count = count;
this.options = options;
}

/**
@@ -826,7 +889,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String
* @return new instance of {@link PendingRecordsCommand}.
*/
static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null);
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null);
}

/**
@@ -841,7 +904,7 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
Assert.notNull(range, "Range must not be null");
Assert.isTrue(count > -1, "Count must not be negative");

return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.range(range, count));
}

/**
@@ -851,7 +914,20 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
* @return new instance of {@link PendingRecordsCommand}.
*/
public PendingRecordsCommand consumer(String consumerName) {
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.unbounded().consumer(consumerName));
}

/**
* Append given minimum idle time.
*
* @param minIdleTime must not be {@literal null}.
* @return new instance of {@link PendingRecordsCommand}.
*/
public PendingRecordsCommand minIdleTime(Duration minIdleTime) {

Assert.notNull(minIdleTime, "Idle must not be null");

return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.unbounded().minIdleTime(minIdleTime));
}

public String getGroupName() {
@@ -863,36 +939,51 @@ public String getGroupName() {
*/
@Nullable
public String getConsumerName() {
return consumerName;
return options.getConsumerName();
}

/**
* @return never {@literal null}.
*/
public Range<?> getRange() {
return range;
return options.getRange();
}

/**
* @return can be {@literal null}.
*/
@Nullable
public Long getCount() {
return count;
return options.getCount();
}

/**
* @return can be {@literal null}.
*/
@Nullable
public Duration getMinIdleTime() {
return options.getMinIdleTime();
}

/**
* @return {@literal true} if a consumer name is present.
*/
public boolean hasConsumer() {
return StringUtils.hasText(consumerName);
return StringUtils.hasText(options.getConsumerName());
}

/**
* @return {@literal true} count is set.
*/
public boolean isLimited() {
return count != null;
return options.getCount() != null;
}

/**
* @return {@literal true} if idle is set.
*/
public boolean hasIdle() {
return options.getMinIdleTime() != null;
}
}

Loading