Skip to content

Commit dd44302

Browse files
authored
Merge pull request #1314 from nats-io/ordered-consumer-nameable
Ability to supply a prefix for watches. Fixed ordered consumer naming bug.
2 parents a632667 + b023c60 commit dd44302

22 files changed

+499
-273
lines changed

src/main/java/io/nats/client/BaseConsumeOptions.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020

2121
import static io.nats.client.support.ApiConstants.*;
2222
import static io.nats.client.support.JsonUtils.*;
23+
import static io.nats.client.support.JsonValueUtils.*;
2324
import static io.nats.client.support.JsonValueUtils.readBoolean;
2425
import static io.nats.client.support.JsonValueUtils.readInteger;
2526
import static io.nats.client.support.JsonValueUtils.readLong;
26-
import static io.nats.client.support.JsonValueUtils.*;
2727

2828
/**
2929
* Base Consume Options are provided to customize the way the consume and
@@ -208,7 +208,7 @@ public B thresholdPercent(int thresholdPercent) {
208208

209209
/**
210210
* Raise status warning turns on sending status messages to the error listener.
211-
* The default of to not raise status warning
211+
* The default is to not raise status warnings
212212
* @return the builder
213213
*/
214214
public B raiseStatusWarnings() {
@@ -218,7 +218,8 @@ public B raiseStatusWarnings() {
218218

219219
/**
220220
* Turn on or off raise status warning turns. When on, status messages are sent to the error listener.
221-
* The default of to not raise status warning
221+
* The default is to not raise status warnings
222+
* @param raiseStatusWarnings flag indicating whether to raise status messages
222223
* @return the builder
223224
*/
224225
public B raiseStatusWarnings(boolean raiseStatusWarnings) {

src/main/java/io/nats/client/BaseConsumerContext.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public interface BaseConsumerContext {
3131
* Read the next message with max wait set to {@value BaseConsumeOptions#DEFAULT_EXPIRES_IN_MILLIS} ms
3232
* @return the next message or null if the max wait expires
3333
* @throws IOException covers various communication issues with the NATS
34-
* server such as timeout or interruption
35-
* @throws InterruptedException if one is thrown, in order to propagate it up
34+
* server, such as timeout or interruption
35+
* @throws InterruptedException if one is thrown, to propagate it up
3636
* @throws JetStreamStatusCheckedException an exception representing a status that requires attention,
3737
* such as the consumer was deleted on the server in the middle of use.
3838
* @throws JetStreamApiException the request had an error related to the data
@@ -44,8 +44,8 @@ public interface BaseConsumerContext {
4444
* @param maxWait duration of max wait. Cannot be less than {@value BaseConsumeOptions#MIN_EXPIRES_MILLS} milliseconds.
4545
* @return the next message or null if the max wait expires
4646
* @throws IOException covers various communication issues with the NATS
47-
* server such as timeout or interruption
48-
* @throws InterruptedException if one is thrown, in order to propagate it up
47+
* server, such as timeout or interruption
48+
* @throws InterruptedException if one is thrown, to propagate it up
4949
* @throws JetStreamStatusCheckedException an exception representing a status that requires attention,
5050
* such as the consumer was deleted on the server in the middle of use.
5151
* @throws JetStreamApiException the request had an error related to the data
@@ -57,8 +57,8 @@ public interface BaseConsumerContext {
5757
* @param maxWaitMillis the max wait value in milliseconds. Cannot be less than {@value BaseConsumeOptions#MIN_EXPIRES_MILLS} milliseconds.
5858
* @return the next message or null if the max wait expires
5959
* @throws IOException covers various communication issues with the NATS
60-
* server such as timeout or interruption
61-
* @throws InterruptedException if one is thrown, in order to propagate it up
60+
* server, such as timeout or interruption
61+
* @throws InterruptedException if one is thrown, to propagate it up
6262
* @throws JetStreamStatusCheckedException an exception representing a status that requires attention,
6363
* such as the consumer was deleted on the server in the middle of use.
6464
* @throws JetStreamApiException the request had an error related to the data
@@ -67,10 +67,10 @@ public interface BaseConsumerContext {
6767

6868
/**
6969
* Start a one use Fetch Consumer using all defaults other than the number of messages. See {@link FetchConsumer}
70-
* @param maxMessages the maximum number of message to consume
70+
* @param maxMessages the maximum number of messages to consume
7171
* @return the FetchConsumer instance
7272
* @throws IOException covers various communication issues with the NATS
73-
* server such as timeout or interruption
73+
* server, such as timeout or interruption
7474
* @throws JetStreamApiException the request had an error related to the data
7575
*/
7676
FetchConsumer fetchMessages(int maxMessages) throws IOException, JetStreamApiException;
@@ -80,38 +80,38 @@ public interface BaseConsumerContext {
8080
* @param maxBytes the maximum number of bytes to consume
8181
* @return the FetchConsumer instance
8282
* @throws IOException covers various communication issues with the NATS
83-
* server such as timeout or interruption
83+
* server, such as timeout or interruption
8484
* @throws JetStreamApiException the request had an error related to the data
8585
*/
8686
FetchConsumer fetchBytes(int maxBytes) throws IOException, JetStreamApiException;
8787

8888
/**
89-
* Start a one use Fetch Consumer with complete custom consume options. See {@link FetchConsumer}
89+
* Start a one-use Fetch Consumer with complete custom consume options. See {@link FetchConsumer}
9090
* @param fetchConsumeOptions the custom fetch consume options. See {@link FetchConsumeOptions}
9191
* @return the FetchConsumer instance
9292
* @throws IOException covers various communication issues with the NATS
93-
* server such as timeout or interruption
93+
* server, such as timeout or interruption
9494
* @throws JetStreamApiException the request had an error related to the data
9595
*/
9696
FetchConsumer fetch(FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException;
9797

9898
/**
9999
* Start a long-running IterableConsumer with default ConsumeOptions. See {@link IterableConsumer} and {@link ConsumeOptions}
100-
* IterableConsumer require the developer call nextMessage.
100+
* IterableConsumer require the developer calls nextMessage.
101101
* @return the IterableConsumer instance
102102
* @throws IOException covers various communication issues with the NATS
103-
* server such as timeout or interruption
103+
* server, such as timeout or interruption
104104
* @throws JetStreamApiException the request had an error related to the data
105105
*/
106106
IterableConsumer iterate() throws IOException, JetStreamApiException;
107107

108108
/**
109109
* Start a long-running IterableConsumer with custom ConsumeOptions. See {@link IterableConsumer} and {@link ConsumeOptions}
110-
* IterableConsumer requires the developer call nextMessage.
110+
* IterableConsumer requires the developer calls nextMessage.
111111
* @param consumeOptions the custom consume options
112112
* @return the IterableConsumer instance
113113
* @throws IOException covers various communication issues with the NATS
114-
* server such as timeout or interruption
114+
* server, such as timeout or interruption
115115
* @throws JetStreamApiException the request had an error related to the data
116116
*/
117117
IterableConsumer iterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException;
@@ -121,7 +121,7 @@ public interface BaseConsumerContext {
121121
* @param handler the MessageHandler used for receiving messages.
122122
* @return the MessageConsumer instance
123123
* @throws IOException covers various communication issues with the NATS
124-
* server such as timeout or interruption
124+
* server, such as timeout or interruption
125125
* @throws JetStreamApiException the request had an error related to the data
126126
*/
127127
MessageConsumer consume(MessageHandler handler) throws IOException, JetStreamApiException;
@@ -130,10 +130,10 @@ public interface BaseConsumerContext {
130130
* Start a long-running MessageConsumer with default ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions}
131131
*
132132
* @param dispatcher The dispatcher to handle this subscription
133-
* @param handler the MessageHandler used for receiving messages.
133+
* @param handler the MessageHandler used for receiving messages.
134134
* @return the MessageConsumer instance
135-
* @throws IOException covers various communication issues with the NATS
136-
* server such as timeout or interruption
135+
* @throws IOException covers various communication issues with the NATS
136+
* server, such as timeout or interruption
137137
* @throws JetStreamApiException the request had an error related to the data
138138
*/
139139
MessageConsumer consume(Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException;
@@ -142,10 +142,10 @@ public interface BaseConsumerContext {
142142
* Start a long-running MessageConsumer with custom ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions}
143143
*
144144
* @param consumeOptions the custom consume options
145-
* @param handler the MessageHandler used for receiving messages.
145+
* @param handler the MessageHandler used for receiving messages.
146146
* @return the MessageConsumer instance
147-
* @throws IOException covers various communication issues with the NATS
148-
* server such as timeout or interruption
147+
* @throws IOException covers various communication issues with the NATS
148+
* server, such as timeout or interruption
149149
* @throws JetStreamApiException the request had an error related to the data
150150
*/
151151
MessageConsumer consume(ConsumeOptions consumeOptions, MessageHandler handler) throws IOException, JetStreamApiException;
@@ -154,11 +154,11 @@ public interface BaseConsumerContext {
154154
* Start a long-running MessageConsumer with custom ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions}
155155
*
156156
* @param consumeOptions the custom consume options
157-
* @param dispatcher The dispatcher to handle this subscription
158-
* @param handler the MessageHandler used for receiving messages.
157+
* @param dispatcher the dispatcher to handle this subscription
158+
* @param handler the MessageHandler used for receiving messages.
159159
* @return the MessageConsumer instance
160-
* @throws IOException covers various communication issues with the NATS
161-
* server such as timeout or interruption
160+
* @throws IOException covers various communication issues with the NATS
161+
* server, such as timeout or interruption
162162
* @throws JetStreamApiException the request had an error related to the data
163163
*/
164164
MessageConsumer consume(ConsumeOptions consumeOptions, Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException;

src/main/java/io/nats/client/KeyValueManagement.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public interface KeyValueManagement {
2828
* @param config the key value configuration
2929
* @return bucket info
3030
* @throws IOException covers various communication issues with the NATS
31-
* server such as timeout or interruption
31+
* server, such as timeout or interruption
3232
* @throws JetStreamApiException the request had an error related to the data
3333
* @throws IllegalArgumentException the server is not JetStream enabled
3434
*/
@@ -39,7 +39,7 @@ public interface KeyValueManagement {
3939
* @param config the key value configuration
4040
* @return bucket info
4141
* @throws IOException covers various communication issues with the NATS
42-
* server such as timeout or interruption
42+
* server, such as timeout or interruption
4343
* @throws JetStreamApiException the request had an error related to the data
4444
* @throws IllegalArgumentException the server is not JetStream enabled
4545
*/
@@ -49,7 +49,7 @@ public interface KeyValueManagement {
4949
* Get the list of bucket names.
5050
* @return list of bucket names
5151
* @throws IOException covers various communication issues with the NATS
52-
* server such as timeout or interruption
52+
* server, such as timeout or interruption
5353
* @throws JetStreamApiException the request had an error related to the data
5454
*/
5555
List<String> getBucketNames() throws IOException, JetStreamApiException;
@@ -59,7 +59,7 @@ public interface KeyValueManagement {
5959
* @deprecated Use {@link #getStatus(String)} instead.
6060
* @param bucketName the bucket name to use
6161
* @throws IOException covers various communication issues with the NATS
62-
* server such as timeout or interruption
62+
* server, such as timeout or interruption
6363
* @throws JetStreamApiException the request had an error related to the data
6464
* @return the bucket status object
6565
*/
@@ -70,7 +70,7 @@ public interface KeyValueManagement {
7070
* Gets the status for an existing bucket.
7171
* @param bucketName the bucket name to use
7272
* @throws IOException covers various communication issues with the NATS
73-
* server such as timeout or interruption
73+
* server, such as timeout or interruption
7474
* @throws JetStreamApiException the request had an error related to the data
7575
* @return the bucket status object
7676
*/
@@ -80,7 +80,7 @@ public interface KeyValueManagement {
8080
* Get the statuses for all buckets
8181
* @return list of statuses
8282
* @throws IOException covers various communication issues with the NATS
83-
* server such as timeout or interruption
83+
* server, such as timeout or interruption
8484
* @throws JetStreamApiException the request had an error related to the data
8585
*/
8686
List<KeyValueStatus> getStatuses() throws IOException, JetStreamApiException;
@@ -89,7 +89,7 @@ public interface KeyValueManagement {
8989
* Deletes an existing bucket. Will throw a JetStreamApiException if the delete fails.
9090
* @param bucketName the stream name to use.
9191
* @throws IOException covers various communication issues with the NATS
92-
* server such as timeout or interruption
92+
* server, such as timeout or interruption
9393
* @throws JetStreamApiException the request had an error related to the data
9494
*/
9595
void delete(String bucketName) throws IOException, JetStreamApiException;

src/main/java/io/nats/client/MessageConsumer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
public interface MessageConsumer extends AutoCloseable {
2525
/**
2626
* Gets the consumer name associated with the subscription.
27-
* Some simplified consumer types do not support this, so it might be null.
27+
* For simplified consumers, this value can be null unless
28+
* the consumer info was manually read via {@link #getConsumerInfo()}.
2829
* @return the consumer name
2930
*/
3031
String getConsumerName();
@@ -33,21 +34,22 @@ public interface MessageConsumer extends AutoCloseable {
3334
* Gets information about the consumer behind this subscription.
3435
* @return consumer information
3536
* @throws IOException covers various communication issues with the NATS
36-
* server such as timeout or interruption
37+
* server, such as timeout or interruption
3738
* @throws JetStreamApiException the request had an error related to the data
3839
*/
3940
ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException;
4041

4142
/**
4243
* Gets information about the consumer behind this subscription.
43-
* This returns the last read version of Consumer Info, which could technically be out of date.
44+
* This returns the last read version of Consumer Info,
45+
* which could be null or out of date.
4446
* @return consumer information
4547
*/
4648
ConsumerInfo getCachedConsumerInfo();
4749

4850
/**
49-
* Use {@link close()} to unsubscribe. Stop will not unsubcribe or clean up resources.
50-
* The consumer will finish all pull request already in progress, but will not start any new ones.
51+
* Use {@link #close()} to unsubscribe. Stop will not unsubcribe or clean up resources.
52+
* The consumer will finish all pull requests already in progress, but will not start any new ones.
5153
*/
5254
void stop();
5355

src/main/java/io/nats/client/SubscribeOptions.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public abstract class SubscribeOptions {
3434
protected final boolean ordered;
3535
protected final long messageAlarmTime;
3636
protected final ConsumerConfiguration consumerConfig;
37-
protected final long pendingMessageLimit; // Only applicable for non dispatched (sync) push consumers.
38-
protected final long pendingByteLimit; // Only applicable for non dispatched (sync) push consumers.
37+
protected final long pendingMessageLimit; // Only applicable for non-dispatched (sync) push consumers.
38+
protected final long pendingByteLimit; // Only applicable for non-dispatched (sync) push consumers.
3939
protected final String name;
4040

4141
protected SubscribeOptions(Builder<?, ?> builder, boolean isPull,
@@ -210,15 +210,15 @@ public ConsumerConfiguration getConsumerConfiguration() {
210210
}
211211

212212
/**
213-
* Gets the pending message limit. Only applicable for non dispatched (sync) push consumers.
213+
* Gets the pending message limit. Only applicable for non-dispatched (sync) push consumers.
214214
* @return the message limit
215215
*/
216216
public long getPendingMessageLimit() {
217217
return pendingMessageLimit;
218218
}
219219

220220
/**
221-
* Gets the pending byte limit. Only applicable for non dispatched (sync) push consumers.
221+
* Gets the pending byte limit. Only applicable for non-dispatched (sync) push consumers.
222222
* @return the byte limit
223223
*/
224224
public long getPendingByteLimit() {
@@ -256,7 +256,7 @@ protected static abstract class Builder<B, SO> {
256256
protected abstract B getThis();
257257

258258
/**
259-
* Specify the stream to attach to. If not supplied the stream will be looked up by subject.
259+
* Specify the stream to attach to. If not supplied, the stream will look up by the stream by subject.
260260
* Null or empty clears the field.
261261
* @param stream the name of the stream
262262
* @return the builder
@@ -269,8 +269,8 @@ public B stream(String stream) {
269269
/**
270270
* Specify binding to an existing consumer via name.
271271
* The client validates regular (non-fast)
272-
* binds to ensure that provided consumer configuration
273-
* is consistent with the server version and that
272+
* binds to ensure that the provided consumer configuration
273+
* is consistent with the server version and that the
274274
* consumer type (push versus pull) matches the subscription type.
275275
* @return the builder
276276
* @param bind whether to bind or not

0 commit comments

Comments
 (0)