Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IDLE argument to XPENDING command #3116

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

whatasame
Copy link

@whatasame whatasame commented Mar 8, 2025

Description

Hello, I'm happy to be here. This is my first time contributing to the Spring Framework.

This pull request resolves #2046 and implements the actual execution.

Below is what I have planned in my mind and my progress so far.

To do

Property test

  • XPendingOptions (at RedisStreamCommandsUnitTests)
  • PendingRecordsCommand (at ReactiveStreamCommandsUnitTests)

Imperative pending command test

// RedisStreamCommands.java
PendingMessages xPending(byte[] key, String groupName, XPendingOptions options);
  • JedisClusterStreamCommands (at AbstractConnectionIntegrationTests)
  • JedisStreamCommands (at AbstractConnectionIntegrationTests)
  • LettuceStreamCommands (at AbstractConnectionIntegrationTests)

Reactive pending command test

// ReactiveStreamCommands.java
Flux<CommandResponse<PendingRecordsCommand, PendingMessages>> xPending(Publisher<PendingRecordsCommand> commands);
  • LettuceReactiveStreamCommands (at LettuceReactiveStreamCommandsIntegrationTests)

Miscellaneous

  • Match the parameter order to be the same as in the Redis CLI.
  • Squash to just one commit
  • Double check contribution guideline like commit message format, DCO, etc.

Contribution guides

  • You have read the Spring Data contribution guidelines.
  • You use the code formatters provided here and have them applied to your changes. Don’t submit any formatting related changes.
  • You submit test cases (unit or integration tests) that back your changes.
  • You added yourself as author in the headers of the classes you touched. Amend the date range in the Apache license header if needed. For new types, add the license header (copy from another file and set the current year only).

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Mar 8, 2025
Copy link
Author

@whatasame whatasame left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know if you have any suggestions. I really appreciate your guidance and support. 🙇‍♂️ Thank you!

@whatasame whatasame marked this pull request as ready for review March 8, 2025 10:05
@whatasame whatasame marked this pull request as draft March 9, 2025 06:19
whatasame added a commit to whatasame/spring-data-redis that referenced this pull request Mar 9, 2025
Copy link
Author

@whatasame whatasame left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added tests related to the new features. Please check whether I wrote them correctly and appropriately.

Here are some questions I had while contributing:

Static Wildcard Imports

According to the Spring Data Code Formatting Settings, it states that wildcards should be used when importing more than one name. However, there are more than two static imports, as seen in AbstractConnectionIntegrationTests.

I applied the code formatting settings, and my IntelliJ attempted to modify them accordingly, but I kept the original formatting for consistency.

Match the parameter order to be the same as in the Redis CLI.

In my implementation, the IDLE argument is positioned last. However, in the Redis CLI, it appears before the RANGE:

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

Should we match the argument order to be consistent with the Redis CLI, or is it acceptable to keep it as is?

Comment on lines +3138 to +3166
// /**
// * Obtained detailed information about all pending messages for a given {@link Consumer}.
// *
// * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
// * @param consumer the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}.
// * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
// * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
// * @since 3.5
// */
// @Nullable
// default PendingMessages xPending(String key, Consumer consumer) {
// return xPending(key, consumer.getGroup(), consumer.getName());
// }

// /**
// * Obtained detailed information about all pending messages for a given {@literal consumer}.
// *
// * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
// * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
// * @param consumerName the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}.
// * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
// * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
// * @since 3.5
// */
// @Nullable
// default PendingMessages xPending(String key, String groupName, String consumerName) {
// return xPending(key, groupName, XPendingOptions.unbounded().consumer(consumerName));
// }

Copy link
Author

@whatasame whatasame Mar 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature is the same as RedisStreamCommands#xPending(). However, it throws a NPE during execution on Jedis because there is no COUNT argument, but a range is being used.

I am not certain whether the same issue occurs when using RedisStreamCommands#xPending() because I couldn't find any related tests. And we have to why it does not happened on Lettuce.

This issue can be observed in my commented-out test in AbstractConnectionIntegrationTests.

image

I added this method because StringRedisConnection looks similar to RedisStreamCommands.
Is there a specific reason why StringRedisConnection has not included this method so far?
Should I refrain from adding it?

Comment on lines +4106 to +4185
// @Test // GH-2046
// @EnabledOnCommand("XADD")
// void xPendingShouldLoadPendingMessagesForConsumer() {
//
// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
//
// actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer")));
//
// List<Object> results = getResults();
// assertThat(results).hasSize(4);
// PendingMessages pending = (PendingMessages) results.get(3);
//
// assertThat(pending.size()).isOne();
// assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
// assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
// assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
// assertThat(pending.get(0).getIdAsString()).isNotNull();
// }

// @Test // GH-2046
// @EnabledOnCommand("XADD")
// void xPendingShouldLoadEmptyPendingMessagesForNotExistingConsumer() {
//
// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
//
// actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer2")));
//
// List<Object> results = getResults();
// assertThat(results).hasSize(4);
// PendingMessages pending = (PendingMessages) results.get(3);
//
// assertThat(pending.size()).isZero();
// }

// @Test // GH-2046
// @EnabledOnCommand("XADD")
// void xPendingShouldLoadPendingMessagesForGroupNameAndConsumerName() {
//
// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
//
// actual.add(connection.xPending(KEY_1, "my-group", "my-consumer"));
//
// List<Object> results = getResults();
// assertThat(results).hasSize(4);
// PendingMessages pending = (PendingMessages) results.get(3);
//
// assertThat(pending.size()).isOne();
// assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
// assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
// assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
// assertThat(pending.get(0).getIdAsString()).isNotNull();
// }

// @Test // GH-2046
// @EnabledOnCommand("XADD")
// void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerName() {
//
// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
//
// actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2"));
//
// List<Object> results = getResults();
// assertThat(results).hasSize(4);
// PendingMessages pending = (PendingMessages) results.get(3);
//
// assertThat(pending.size()).isZero();
// }

Copy link
Author

@whatasame whatasame Mar 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It throws NPE on Jedis because there is no COUNT argument.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gave those a try seemed to work after adding Range and limit.

Copy link
Author

@whatasame whatasame Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but isn't it intended to allow the command to work independently without Range and limit?

In Redis CLI, XPENDING requires both key and group as mandatory arguments. If the optional consumer argument is provided, both range and count must also be specified.

However, in the current implementation of xPending(groupName, consumerName), only range is set to unbounded, while count is not provided. As a result, this command cannot be used on its own.

I believe the API using the following code:

public static XPendingOptions unbounded() {
	return new XPendingOptions(null, Range.unbounded(), null, null);
}

should be changed as follows, because while range can have a default value of unbounded, count always requires a specific value in order to execute correctly:

// RedisStreamCommands.java
default PendingMessages xPending(byte[] key, String groupName, String consumerName) {
	return xPending(key, groupName, XPendingOptions.unbounded().consumer(consumerName));
}
127.0.0.1:6379> xpending mystream mygroup - + myconsumer
(error) ERR value is not an integer or out of range
// AS-IS
default PendingMessages xPending(String key, String groupName, String consumerName)

// TO-BE
default PendingMessages xPending(String key, String groupName, String consumerName, Long count)

@whatasame whatasame marked this pull request as ready for review March 9, 2025 10:31
Copy link
Member

@christophstrobl christophstrobl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @whatasame for taking the time to create this PR. I left a few comments. Please let me know if you have time/want do the changes yourself? Thank you!

StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));

actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"),
org.springframework.data.domain.Range.unbounded(), 10L, Duration.ZERO));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a duration of ZERO will not have any effect - will it? Adding a little timeout and switching to Duration.ofMillis(1) might be worth a try. I expect we'd need overrides for the pipleine & transaction tests as this won't work there.

Copy link
Author

@whatasame whatasame Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duration.ZERO will be converted to 0 milliseconds, so this xPending call is equivalent to:

XPENDING KEY_1 my-group IDLE 0 - + 10 my-consumer

It seems like IDLE 0 has no effect, but it works similarly to Duration.ofMillis(1) considering the gap between the invocation of xReadGroup and xPending

To make this test clearer, it would be better to use a time-based test like the example below.

@Test // GH-2046
@EnabledOnCommand("XADD")
void xPendingShouldLoadPendingMessagesForIdle() throws InterruptedException {

	// Exceed idle time message
	actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
	actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
	actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
			StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));

	Duration delayed = Duration.ofMillis(10);
	Thread.sleep(delayed.toMillis());

	// Non-exceed idle time message
	actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_3, VALUE_3)));
	actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
			StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));

	actual.add(connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(),
			10L, delayed));

	List<Object> results = getResults();
	assertThat(results).hasSize(6);
	PendingMessages pending = (PendingMessages) results.get(5);

	assertThat(pending.size()).isOne();
	assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
	assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
	assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
	assertThat(pending.get(0).getIdAsString()).isNotNull();
}

However, this test may introduce flakiness in CI environments with low performance. If the second pending message is processed after 20 milliseconds, the test could return two pending messages instead of one, causing it to fail. And it needs to override for pipeline and transaction as you said.

For the reasons mentioned above and to pass pipeline and transaction tests, I chose Duration.ZERO.

StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));

actual.add(
connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L, Duration.ZERO));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as with other ZERO duration.

Comment on lines +4106 to +4185
// @Test // GH-2046
// @EnabledOnCommand("XADD")
// void xPendingShouldLoadPendingMessagesForConsumer() {
//
// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
//
// actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer")));
//
// List<Object> results = getResults();
// assertThat(results).hasSize(4);
// PendingMessages pending = (PendingMessages) results.get(3);
//
// assertThat(pending.size()).isOne();
// assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
// assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
// assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
// assertThat(pending.get(0).getIdAsString()).isNotNull();
// }

// @Test // GH-2046
// @EnabledOnCommand("XADD")
// void xPendingShouldLoadEmptyPendingMessagesForNotExistingConsumer() {
//
// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
//
// actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer2")));
//
// List<Object> results = getResults();
// assertThat(results).hasSize(4);
// PendingMessages pending = (PendingMessages) results.get(3);
//
// assertThat(pending.size()).isZero();
// }

// @Test // GH-2046
// @EnabledOnCommand("XADD")
// void xPendingShouldLoadPendingMessagesForGroupNameAndConsumerName() {
//
// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
//
// actual.add(connection.xPending(KEY_1, "my-group", "my-consumer"));
//
// List<Object> results = getResults();
// assertThat(results).hasSize(4);
// PendingMessages pending = (PendingMessages) results.get(3);
//
// assertThat(pending.size()).isOne();
// assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
// assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
// assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
// assertThat(pending.get(0).getIdAsString()).isNotNull();
// }

// @Test // GH-2046
// @EnabledOnCommand("XADD")
// void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerName() {
//
// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
//
// actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2"));
//
// List<Object> results = getResults();
// assertThat(results).hasSize(4);
// PendingMessages pending = (PendingMessages) results.get(3);
//
// assertThat(pending.size()).isZero();
// }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gave those a try seemed to work after adding Range and limit.

@whatasame whatasame marked this pull request as draft March 12, 2025 06:18
Copy link
Author

@whatasame whatasame left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your thoughtful review. I’ve applied some suggestions and added comments for the others.

As English is not my first language, please let me know if anything is unclear or if you have further feedback.

StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));

actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"),
org.springframework.data.domain.Range.unbounded(), 10L, Duration.ZERO));
Copy link
Author

@whatasame whatasame Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duration.ZERO will be converted to 0 milliseconds, so this xPending call is equivalent to:

XPENDING KEY_1 my-group IDLE 0 - + 10 my-consumer

It seems like IDLE 0 has no effect, but it works similarly to Duration.ofMillis(1) considering the gap between the invocation of xReadGroup and xPending

To make this test clearer, it would be better to use a time-based test like the example below.

@Test // GH-2046
@EnabledOnCommand("XADD")
void xPendingShouldLoadPendingMessagesForIdle() throws InterruptedException {

	// Exceed idle time message
	actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
	actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
	actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
			StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));

	Duration delayed = Duration.ofMillis(10);
	Thread.sleep(delayed.toMillis());

	// Non-exceed idle time message
	actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_3, VALUE_3)));
	actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
			StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));

	actual.add(connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(),
			10L, delayed));

	List<Object> results = getResults();
	assertThat(results).hasSize(6);
	PendingMessages pending = (PendingMessages) results.get(5);

	assertThat(pending.size()).isOne();
	assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
	assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
	assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
	assertThat(pending.get(0).getIdAsString()).isNotNull();
}

However, this test may introduce flakiness in CI environments with low performance. If the second pending message is processed after 20 milliseconds, the test could return two pending messages instead of one, causing it to fail. And it needs to override for pipeline and transaction as you said.

For the reasons mentioned above and to pass pipeline and transaction tests, I chose Duration.ZERO.

Comment on lines +4106 to +4185
// @Test // GH-2046
// @EnabledOnCommand("XADD")
// void xPendingShouldLoadPendingMessagesForConsumer() {
//
// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
//
// actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer")));
//
// List<Object> results = getResults();
// assertThat(results).hasSize(4);
// PendingMessages pending = (PendingMessages) results.get(3);
//
// assertThat(pending.size()).isOne();
// assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
// assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
// assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
// assertThat(pending.get(0).getIdAsString()).isNotNull();
// }

// @Test // GH-2046
// @EnabledOnCommand("XADD")
// void xPendingShouldLoadEmptyPendingMessagesForNotExistingConsumer() {
//
// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
//
// actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer2")));
//
// List<Object> results = getResults();
// assertThat(results).hasSize(4);
// PendingMessages pending = (PendingMessages) results.get(3);
//
// assertThat(pending.size()).isZero();
// }

// @Test // GH-2046
// @EnabledOnCommand("XADD")
// void xPendingShouldLoadPendingMessagesForGroupNameAndConsumerName() {
//
// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
//
// actual.add(connection.xPending(KEY_1, "my-group", "my-consumer"));
//
// List<Object> results = getResults();
// assertThat(results).hasSize(4);
// PendingMessages pending = (PendingMessages) results.get(3);
//
// assertThat(pending.size()).isOne();
// assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
// assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
// assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
// assertThat(pending.get(0).getIdAsString()).isNotNull();
// }

// @Test // GH-2046
// @EnabledOnCommand("XADD")
// void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerName() {
//
// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
//
// actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2"));
//
// List<Object> results = getResults();
// assertThat(results).hasSize(4);
// PendingMessages pending = (PendingMessages) results.get(3);
//
// assertThat(pending.size()).isZero();
// }

Copy link
Author

@whatasame whatasame Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but isn't it intended to allow the command to work independently without Range and limit?

In Redis CLI, XPENDING requires both key and group as mandatory arguments. If the optional consumer argument is provided, both range and count must also be specified.

However, in the current implementation of xPending(groupName, consumerName), only range is set to unbounded, while count is not provided. As a result, this command cannot be used on its own.

I believe the API using the following code:

public static XPendingOptions unbounded() {
	return new XPendingOptions(null, Range.unbounded(), null, null);
}

should be changed as follows, because while range can have a default value of unbounded, count always requires a specific value in order to execute correctly:

// RedisStreamCommands.java
default PendingMessages xPending(byte[] key, String groupName, String consumerName) {
	return xPending(key, groupName, XPendingOptions.unbounded().consumer(consumerName));
}
127.0.0.1:6379> xpending mystream mygroup - + myconsumer
(error) ERR value is not an integer or out of range
// AS-IS
default PendingMessages xPending(String key, String groupName, String consumerName)

// TO-BE
default PendingMessages xPending(String key, String groupName, String consumerName, Long count)

@whatasame whatasame marked this pull request as ready for review March 12, 2025 08:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage An issue we've not yet triaged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add IDLE argument to XPENDING command
3 participants