Skip to content

Commit 728e69b

Browse files
committed
Ordered Consumers automatically are prefixed if a consumer configuration name is supplied.
1 parent fa46d2d commit 728e69b

13 files changed

+178
-115
lines changed

src/main/java/io/nats/client/BaseConsumeOptions.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020

2121
import static io.nats.client.support.ApiConstants.*;
2222
import static io.nats.client.support.JsonUtils.*;
23+
import static io.nats.client.support.JsonValueUtils.*;
2324
import static io.nats.client.support.JsonValueUtils.readBoolean;
2425
import static io.nats.client.support.JsonValueUtils.readInteger;
2526
import static io.nats.client.support.JsonValueUtils.readLong;
26-
import static io.nats.client.support.JsonValueUtils.*;
2727

2828
/**
2929
* Base Consume Options are provided to customize the way the consume and
@@ -208,7 +208,7 @@ public B thresholdPercent(int thresholdPercent) {
208208

209209
/**
210210
* Raise status warning turns on sending status messages to the error listener.
211-
* The default of to not raise status warning
211+
* The default is to not raise status warnings
212212
* @return the builder
213213
*/
214214
public B raiseStatusWarnings() {
@@ -218,7 +218,8 @@ public B raiseStatusWarnings() {
218218

219219
/**
220220
* Turn on or off raise status warning turns. When on, status messages are sent to the error listener.
221-
* The default of to not raise status warning
221+
* The default is to not raise status warnings
222+
* @param raiseStatusWarnings flag indicating whether to raise status messages
222223
* @return the builder
223224
*/
224225
public B raiseStatusWarnings(boolean raiseStatusWarnings) {

src/main/java/io/nats/client/SubscribeOptions.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public abstract class SubscribeOptions {
3434
protected final boolean ordered;
3535
protected final long messageAlarmTime;
3636
protected final ConsumerConfiguration consumerConfig;
37-
protected final long pendingMessageLimit; // Only applicable for non dispatched (sync) push consumers.
38-
protected final long pendingByteLimit; // Only applicable for non dispatched (sync) push consumers.
37+
protected final long pendingMessageLimit; // Only applicable for non-dispatched (sync) push consumers.
38+
protected final long pendingByteLimit; // Only applicable for non-dispatched (sync) push consumers.
3939
protected final String name;
4040

4141
protected SubscribeOptions(Builder<?, ?> builder, boolean isPull,
@@ -210,15 +210,15 @@ public ConsumerConfiguration getConsumerConfiguration() {
210210
}
211211

212212
/**
213-
* Gets the pending message limit. Only applicable for non dispatched (sync) push consumers.
213+
* Gets the pending message limit. Only applicable for non-dispatched (sync) push consumers.
214214
* @return the message limit
215215
*/
216216
public long getPendingMessageLimit() {
217217
return pendingMessageLimit;
218218
}
219219

220220
/**
221-
* Gets the pending byte limit. Only applicable for non dispatched (sync) push consumers.
221+
* Gets the pending byte limit. Only applicable for non-dispatched (sync) push consumers.
222222
* @return the byte limit
223223
*/
224224
public long getPendingByteLimit() {
@@ -256,7 +256,7 @@ protected static abstract class Builder<B, SO> {
256256
protected abstract B getThis();
257257

258258
/**
259-
* Specify the stream to attach to. If not supplied the stream will be looked up by subject.
259+
* Specify the stream to attach to. If not supplied, the stream will look up by the stream by subject.
260260
* Null or empty clears the field.
261261
* @param stream the name of the stream
262262
* @return the builder
@@ -269,8 +269,8 @@ public B stream(String stream) {
269269
/**
270270
* Specify binding to an existing consumer via name.
271271
* The client validates regular (non-fast)
272-
* binds to ensure that provided consumer configuration
273-
* is consistent with the server version and that
272+
* binds to ensure that the provided consumer configuration
273+
* is consistent with the server version and that the
274274
* consumer type (push versus pull) matches the subscription type.
275275
* @return the builder
276276
* @param bind whether to bind or not

src/main/java/io/nats/client/impl/NatsConsumerContext.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript
3737
private final ReentrantLock stateLock;
3838
private final NatsStreamContext streamCtx;
3939
private final boolean ordered;
40-
private final ConsumerConfiguration orderedConsumerConfigTemplate;
41-
private final String orderedConsumerNamePrefix;
40+
private final ConsumerConfiguration initialOrderedConsumerConfig;
4241
private final PullSubscribeOptions unorderedBindPso;
4342

4443
private final AtomicReference<ConsumerInfo> cachedConsumerInfo;
@@ -57,16 +56,15 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript
5756
lastConsumer = new AtomicReference<>();
5857
if (unorderedConsumerInfo != null) {
5958
ordered = false;
60-
orderedConsumerNamePrefix = null;
61-
orderedConsumerConfigTemplate = null;
59+
initialOrderedConsumerConfig = null;
6260
cachedConsumerInfo.set(unorderedConsumerInfo);
6361
consumerName.set(unorderedConsumerInfo.getName());
6462
unorderedBindPso = PullSubscribeOptions.fastBind(sc.streamName, unorderedConsumerInfo.getName());
6563
}
6664
else {
6765
ordered = true;
68-
orderedConsumerNamePrefix = occ.getConsumerNamePrefix();
69-
orderedConsumerConfigTemplate = ConsumerConfiguration.builder()
66+
initialOrderedConsumerConfig = ConsumerConfiguration.builder()
67+
.name(occ.getConsumerNamePrefix())
7068
.filterSubjects(occ.getFilterSubjects())
7169
.deliverPolicy(occ.getDeliverPolicy())
7270
.startSequence(occ.getStartSequence())
@@ -94,30 +92,32 @@ public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Di
9492
if (lastCon != null) {
9593
highestSeq.set(Math.max(highestSeq.get(), lastCon.pmm.lastStreamSeq));
9694
}
97-
consumerName.set(orderedConsumerNamePrefix == null ? null : orderedConsumerNamePrefix + NUID.nextGlobalSequence());
98-
ConsumerConfiguration cc = streamCtx.js.consumerConfigurationForOrdered(
99-
orderedConsumerConfigTemplate, highestSeq.get(), null, consumerName.get(), optionalInactiveThreshold);
95+
ConsumerConfiguration cc = streamCtx.js.consumerConfigurationForOrdered(initialOrderedConsumerConfig, highestSeq.get(), null, optionalInactiveThreshold).build();
10096
pso = new OrderedPullSubscribeOptionsBuilder(streamCtx.streamName, cc).build();
10197
}
10298
else {
10399
pso = unorderedBindPso;
104100
}
105101

102+
NatsJetStreamPullSubscription sub;
106103
if (messageHandler == null) {
107-
return (NatsJetStreamPullSubscription) streamCtx.js.createSubscription(
104+
sub = (NatsJetStreamPullSubscription) streamCtx.js.createSubscription(
108105
null, null, pso, null, null, null, false, optionalPmm);
109106
}
110-
111-
Dispatcher d = userDispatcher;
112-
if (d == null) {
113-
d = defaultDispatcher.get();
107+
else {
108+
Dispatcher d = userDispatcher;
114109
if (d == null) {
115-
d = streamCtx.js.conn.createDispatcher();
116-
defaultDispatcher.set(d);
110+
d = defaultDispatcher.get();
111+
if (d == null) {
112+
d = streamCtx.js.conn.createDispatcher();
113+
defaultDispatcher.set(d);
114+
}
117115
}
116+
sub = (NatsJetStreamPullSubscription) streamCtx.js.createSubscription(
117+
null, null, pso, null, (NatsDispatcher) d, messageHandler, false, optionalPmm);
118118
}
119-
return (NatsJetStreamPullSubscription) streamCtx.js.createSubscription(
120-
null, null, pso, null, (NatsDispatcher) d, messageHandler, false, optionalPmm);
119+
consumerName.set(sub.getConsumerName());
120+
return sub;
121121
}
122122

123123
private void checkState() throws IOException {

src/main/java/io/nats/client/impl/NatsJetStream.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static io.nats.client.PushSubscribeOptions.DEFAULT_PUSH_OPTS;
2626
import static io.nats.client.impl.MessageManager.ManageResult;
2727
import static io.nats.client.support.NatsJetStreamClientError.*;
28+
import static io.nats.client.support.NatsJetStreamUtil.generateConsumerName;
2829
import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction;
2930
import static io.nats.client.support.Validator.*;
3031

@@ -255,6 +256,7 @@ JetStreamSubscription createSubscription(String userSubscribeSubject,
255256
SubscribeOptions so;
256257
String stream;
257258
ConsumerConfiguration userCC;
259+
boolean ordered;
258260
String settledDeliverGroup = null; // push might set this
259261

260262
if (isPullMode) {
@@ -290,7 +292,7 @@ JetStreamSubscription createSubscription(String userSubscribeSubject,
290292
}
291293
}
292294

293-
// 1B. Flow Control / heartbeat not always valid
295+
// 1B. Flow Control / heartbeat is not always valid
294296
if (userCC.getIdleHeartbeat() != null && userCC.getIdleHeartbeat().toMillis() > 0) {
295297
if (isPullMode) {
296298
throw JsSubFcHbNotValidPull.instance();
@@ -300,7 +302,7 @@ JetStreamSubscription createSubscription(String userSubscribeSubject,
300302
}
301303
}
302304

303-
// 2. figure out user provided subjects and prepare the settledFilterSubjects
305+
// 2. figure out user-provided subjects and prepare the settledFilterSubjects
304306
userSubscribeSubject = emptyAsNull(userSubscribeSubject);
305307
List<String> settledFilterSubjects = new ArrayList<>();
306308
if (userCC.getFilterSubjects() == null) { // empty filterSubjects gives null
@@ -312,13 +314,13 @@ JetStreamSubscription createSubscription(String userSubscribeSubject,
312314
else {
313315
// userCC.filterSubjects not empty, validate them
314316
settledFilterSubjects.addAll(userCC.getFilterSubjects());
315-
// If userSubscribeSubject is provided it must be one of the filter subjects.
317+
// If userSubscribeSubject is provided, it must be one of the filter subjects.
316318
if (userSubscribeSubject != null && !settledFilterSubjects.contains(userSubscribeSubject)) {
317319
throw JsSubSubjectDoesNotMatchFilter.instance();
318320
}
319321
}
320322

321-
// 3. Did they tell me what stream? No? look it up.
323+
// 3. Did they tell me what stream? No? Look it up.
322324
final String settledStream;
323325
if (stream == null) {
324326
if (settledFilterSubjects.isEmpty()) {
@@ -338,18 +340,20 @@ JetStreamSubscription createSubscription(String userSubscribeSubject,
338340
if (consumerName == null) {
339341
consumerName = userCC.getName();
340342
}
343+
341344
String inboxDeliver = userCC.getDeliverSubject();
342345

343-
// 4. Does this consumer already exist? FastBind bypasses the lookup;
344-
// the dev better know what they are doing...
345-
if (!so.isFastBind() && consumerName != null) {
346+
// 4. Does this consumer already exist?
347+
// * FastBind bypasses the lookup; the dev better know what they are doing...
348+
// * ordered also bypasses the lookup b/c we know it's always a new name
349+
if (!so.isFastBind() && !so.isOrdered() && consumerName != null) {
346350
ConsumerInfo serverInfo = lookupConsumerInfo(settledStream, consumerName);
347351

348352
if (serverInfo != null) { // the consumer for that durable already exists
349353
serverCC = serverInfo.getConsumerConfiguration();
350354

351355
// check to see if the user sent a different version than the server has
352-
// because modifications are not allowed during create subscription
356+
// because modifications are not allowed during "create" subscription
353357
ConsumerConfigurationComparer userCCC = new ConsumerConfigurationComparer(userCC);
354358
List<String> changes = userCCC.getChanges(serverCC);
355359
if (!changes.isEmpty()) {
@@ -367,15 +371,15 @@ else if (nullOrEmpty(serverCC.getDeliverSubject())) {
367371
}
368372

369373
if (serverCC.getDeliverGroup() == null) {
370-
// lookedUp was null, means existing consumer is not a queue consumer
374+
// lookedUp was null, means the existing consumer is not a queue consumer
371375
if (settledDeliverGroup == null) {
372-
// ok fine, no queue requested and the existing consumer is also not a queue consumer
376+
// ok fine, no queue was requested, and the existing consumer is also not a queue consumer
373377
// we must check if the consumer is in use though
374378
if (serverInfo.isPushBound()) {
375379
throw JsSubConsumerAlreadyBound.instance();
376380
}
377381
}
378-
else { // else they requested a queue but this durable was not configured as queue
382+
else { // else they requested a queue, but this durable was not configured as a queue.
379383
throw JsSubExistingConsumerNotQueue.instance();
380384
}
381385
}
@@ -399,7 +403,7 @@ else if (!listsAreEquivalent(settledFilterSubjects, serverCC.getFilterSubjects()
399403
throw JsSubSubjectDoesNotMatchFilter.instance();
400404
}
401405

402-
inboxDeliver = serverCC.getDeliverSubject(); // use the deliver subject as the inbox. It may be null, that's ok, we'll fix that later
406+
inboxDeliver = serverCC.getDeliverSubject(); // Use the deliverSubject as the inbox. It may be null, that's ok, we'll fix that later
403407
}
404408
else if (so.isBind()) {
405409
throw JsSubConsumerNotFoundRequiredInBind.instance();
@@ -418,7 +422,7 @@ else if (inboxDeliver == null) {
418422
settledInboxDeliver = inboxDeliver;
419423
}
420424

421-
// 6. If consumer does not exist, create and settle on the config. Name will have to wait
425+
// 6. If the consumer does not exist, create and settle on the config. Name will have to wait
422426
// If the consumer exists, I know what the settled info is
423427
final ConsumerConfiguration settledCC;
424428
final String settledConsumerName;
@@ -429,20 +433,32 @@ else if (inboxDeliver == null) {
429433
else {
430434
ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder(userCC);
431435

432-
// Pull mode doesn't maintain a deliver subject. It's actually an error if we send it.
436+
// Pull mode doesn't maintain a deliverSubject. It's actually an error if we send it.
433437
if (!isPullMode) {
434438
ccBuilder.deliverSubject(settledInboxDeliver);
435439
}
436440

437-
// userCC.filterSubjects might have originally been empty
441+
// userCC.filterSubjects might have originally been empty,
438442
// but there might have been a userSubscribeSubject,
439443
// so this makes sure it's resolved either way
440444
ccBuilder.filterSubjects(settledFilterSubjects);
441445

442446
ccBuilder.deliverGroup(settledDeliverGroup);
443447

448+
if (so.isOrdered()) {
449+
// we have to handle the fact that ordered consumers must always have a unique name
450+
// if the user supplied a name, well call generateConsumerName with the original name as a prefix
451+
if (consumerName != null) {
452+
consumerName = generateConsumerName(userCC.getName());
453+
ccBuilder.name(consumerName);
454+
}
455+
settledConsumerName = consumerName;
456+
}
457+
else {
458+
settledConsumerName = null; // the server will give us a name if the user's was null
459+
}
460+
444461
settledCC = ccBuilder.build();
445-
settledConsumerName = null; // the server will give us a name
446462
}
447463

448464
// 7. create the subscription. lambda needs final or effectively final vars
@@ -481,9 +497,13 @@ else if (inboxDeliver == null) {
481497
}
482498

483499
// 8. The consumer might need to be created, do it here
484-
if (settledConsumerName == null) {
500+
if (settledConsumerName == null || so.isOrdered()) {
501+
// the _create method sets the consumer name for us
485502
_createConsumerUnsubscribeOnException(settledStream, settledCC, sub);
486503
}
504+
else {
505+
sub.setConsumerName(settledConsumerName);
506+
}
487507

488508
return sub;
489509
}

src/main/java/io/nats/client/impl/NatsJetStreamImpl.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import io.nats.client.JetStreamApiException;
1717
import io.nats.client.JetStreamOptions;
1818
import io.nats.client.Message;
19-
import io.nats.client.NUID;
2019
import io.nats.client.api.*;
2120
import io.nats.client.support.NatsJetStreamConstants;
2221

@@ -28,6 +27,7 @@
2827
import static io.nats.client.support.NatsConstants.GREATER_THAN;
2928
import static io.nats.client.support.NatsJetStreamClientError.JsConsumerCreate290NotAvailable;
3029
import static io.nats.client.support.NatsJetStreamClientError.JsMultipleFilterSubjects210NotAvailable;
30+
import static io.nats.client.support.NatsJetStreamUtil.generateConsumerName;
3131
import static io.nats.client.support.NatsRequestCompletableFuture.CancelAction;
3232

3333
class NatsJetStreamImpl implements NatsJetStreamConstants {
@@ -112,7 +112,7 @@ ConsumerInfo _createConsumer(String streamName, ConsumerConfiguration config, Co
112112
// if both consumerName and durable are null, generate a name
113113
consumerName = durable == null ? generateConsumerName() : durable;
114114
}
115-
String fs = config.getFilterSubject(); // we've already determined not multiple so this gives us 1 or null
115+
String fs = config.getFilterSubject(); // we've already determined there are not more than 1 filter subjects, so this gives us one or null
116116
if (fs == null || fs.equals(GREATER_THAN)) {
117117
subj = String.format(JSAPI_CONSUMER_CREATE_V290, streamName, consumerName);
118118
}
@@ -185,43 +185,40 @@ List<String> _getStreamNames(String subjectFilter) throws IOException, JetStream
185185
// ----------------------------------------------------------------------------------------------------
186186
// General Utils
187187
// ----------------------------------------------------------------------------------------------------
188-
String generateConsumerName() {
189-
return NUID.nextGlobal();
190-
}
191-
192-
ConsumerConfiguration consumerConfigurationForOrdered(
193-
ConsumerConfiguration originalCc,
188+
ConsumerConfiguration.Builder consumerConfigurationForOrdered(
189+
ConsumerConfiguration initial,
194190
long lastStreamSeq,
195191
String newDeliverSubject,
196-
String consumerName,
197192
Long inactiveThreshold)
198193
{
199-
ConsumerConfiguration.Builder builder = ConsumerConfiguration.builder(originalCc).deliverSubject(newDeliverSubject);
194+
ConsumerConfiguration.Builder builder = ConsumerConfiguration.builder(initial);
195+
196+
// push will always give one, pull will always give null
197+
if (newDeliverSubject != null) {
198+
builder.deliverSubject(newDeliverSubject);
199+
}
200200

201+
// if the last stream seq is > 0, this means this config is for an ordered restart at a sequence
201202
if (lastStreamSeq > 0) {
202203
builder
203204
.deliverPolicy(DeliverPolicy.ByStartSequence)
204205
.startSequence(Math.max(1, lastStreamSeq + 1))
205206
.startTime(null); // clear start time in case it was originally set
206207
}
207208

208-
if (consumerName != null && consumerCreate290Available) {
209-
builder.name(consumerName);
210-
}
211-
212209
if (inactiveThreshold != null) {
213210
builder.inactiveThreshold(inactiveThreshold);
214211
}
215212

216-
return builder.build();
213+
return builder;
217214
}
218215

219216
ConsumerInfo lookupConsumerInfo(String streamName, String consumerName) throws IOException, JetStreamApiException {
220217
try {
221218
return _getConsumerInfo(streamName, consumerName);
222219
}
223220
catch (JetStreamApiException e) {
224-
// the right side of this condition... ( starting here \/ ) is for backward compatibility with server versions that did not provide api error codes
221+
// The right side of this condition (after the ||) is for backward compatibility with server versions that did not provide api error codes
225222
if (e.getApiErrorCode() == JS_CONSUMER_NOT_FOUND_ERR || (e.getErrorCode() == 404 && e.getErrorDescription().contains("consumer"))) {
226223
return null;
227224
}

0 commit comments

Comments
 (0)