Skip to content

Commit 4a98253

Browse files
authored
Merge branch 'main' into kv-purge-per-message-ttl
2 parents c1df255 + 12625ef commit 4a98253

File tree

10 files changed

+336
-46
lines changed

10 files changed

+336
-46
lines changed
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Copyright 2025 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at:
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package io.nats.examples.jetstream.simple;
15+
16+
import io.nats.client.*;
17+
import io.nats.client.api.OrderedConsumerConfiguration;
18+
import io.nats.client.api.StorageType;
19+
import io.nats.client.impl.ErrorListenerConsoleImpl;
20+
import io.nats.examples.jetstream.ResilientPublisher;
21+
22+
import java.io.IOException;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
27+
import static io.nats.examples.jetstream.NatsJsUtils.createOrReplaceStream;
28+
29+
/**
30+
* This example will demonstrate simplified ordered consumer with a handler
31+
* To run, start a server and then start the example.
32+
* To test resiliency, kill the server, wait a couple seconds, then restart it.
33+
*/
34+
public class OrderedMessageConsumerExample {
35+
private static final String STREAM = "ordered-stream";
36+
private static final String SUBJECT = "ordered-subject";
37+
private static final String CONSUMER_PREFIX = "prefix";
38+
private static final String MESSAGE_PREFIX = "ordered";
39+
private static final int STOP_COUNT = 1_000_000;
40+
private static final int REPORT_EVERY = 500;
41+
42+
private static final String SERVER = "nats://localhost:4222";
43+
44+
public static void main(String[] args) {
45+
Options options = Options.builder()
46+
.server(SERVER)
47+
.errorListener(new ErrorListenerConsoleImpl())
48+
.build();
49+
try (Connection nc = Nats.connect(options)) {
50+
JetStreamManagement jsm = nc.jetStreamManagement();
51+
createOrReplaceStream(jsm, STREAM, StorageType.File, SUBJECT); // file is important, memory won't survive a restart
52+
53+
System.out.println("Starting publish...");
54+
ResilientPublisher publisher = new ResilientPublisher(nc, jsm, STREAM, SUBJECT).basicDataPrefix(MESSAGE_PREFIX).jitter(10);
55+
Thread pubThread = new Thread(publisher);
56+
pubThread.start();
57+
58+
// get stream context, create consumer and get the consumer context
59+
StreamContext streamContext;
60+
OrderedConsumerContext orderedConsumerContext;
61+
try {
62+
OrderedConsumerConfiguration ocConfig = new OrderedConsumerConfiguration()
63+
.consumerNamePrefix(CONSUMER_PREFIX)
64+
.filterSubjects(SUBJECT);
65+
streamContext = nc.getStreamContext(STREAM);
66+
orderedConsumerContext = streamContext.createOrderedConsumer(ocConfig);
67+
}
68+
catch (JetStreamApiException | IOException e) {
69+
// JetStreamApiException:
70+
// the stream or consumer did not exist
71+
// IOException:
72+
// likely a connection problem
73+
return;
74+
}
75+
76+
CountDownLatch latch = new CountDownLatch(1);
77+
AtomicInteger atomicCount = new AtomicInteger();
78+
AtomicLong nextExpectedSequence = new AtomicLong(0);
79+
long start = System.nanoTime();
80+
MessageHandler handler = msg -> {
81+
if (msg.metaData().streamSequence() != nextExpectedSequence.incrementAndGet()) {
82+
System.out.println("MESSAGE RECEIVED OUT OF ORDER!");
83+
System.exit(-1);
84+
}
85+
msg.ack();
86+
int count = atomicCount.incrementAndGet();
87+
if (count % REPORT_EVERY == 0) {
88+
report("Handler", start, count);
89+
}
90+
if (count == STOP_COUNT) {
91+
latch.countDown();
92+
}
93+
};
94+
95+
// create the consumer then use it
96+
// the expires option affects 2 things
97+
// 1. A pull request expiry
98+
// 2. The heartbeat checking
99+
// The default expiry is 30 seconds which means the idle heartbeat is 15 seconds.
100+
// It takes 3 times the heartbeat to trip the alarm, so the subscription does
101+
// not restart for 45 seconds since the disconnect.
102+
// If your messages come in slowly this is fine, but if your messages are coming in fast
103+
// set the expiresIn much lower. Minimum expires is 1 second (1000ms)
104+
ConsumeOptions consumeOptions = ConsumeOptions.builder()
105+
.expiresIn(ConsumeOptions.MIN_EXPIRES_MILLS)
106+
.build();
107+
try (MessageConsumer consumer = orderedConsumerContext.consume(consumeOptions, handler)) {
108+
latch.await();
109+
110+
// The consumer has at least 1 pull request active. When stop is called,
111+
// no more pull requests will be made, but messages already requested
112+
// will still come across the wire to the client.
113+
System.out.println("Stop the consumer...");
114+
consumer.stop();
115+
116+
// wait until the consumer is finished
117+
while (!consumer.isFinished()) {
118+
//noinspection BusyWait
119+
Thread.sleep(10);
120+
}
121+
}
122+
catch (JetStreamApiException | IOException e) {
123+
// JetStreamApiException:
124+
// 1. the stream or consumer did not exist
125+
// 2. api calls under the covers theoretically this could fail, but practically it won't.
126+
// IOException:
127+
// likely a connection problem
128+
System.err.println("Exception should be handled properly, just exiting here.");
129+
System.exit(-1);
130+
}
131+
catch (Exception e) {
132+
// this is from the FetchConsumer being AutoCloseable, but should never be called
133+
// as work inside the close is already guarded by try/catch
134+
System.err.println("Exception should be handled properly, just exiting here.");
135+
System.exit(-1);
136+
}
137+
138+
report("Final", start, atomicCount.get());
139+
140+
publisher.stop(); // otherwise it will complain when the connection goes away
141+
pubThread.join();
142+
}
143+
catch (IOException | InterruptedException ioe) {
144+
// IOException:
145+
// problem making the connection
146+
// InterruptedException:
147+
// thread interruption in the body of the example
148+
}
149+
}
150+
151+
private static void report(String label, long start, int count) {
152+
long ms = (System.nanoTime() - start) / 1_000_000;
153+
System.out.println(label + ": Received " + count + " messages in " + ms + "ms.");
154+
}
155+
}

src/main/java/io/nats/client/impl/MessageManager.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public enum ManageResult {MESSAGE, STATUS_HANDLED, STATUS_TERMINUS, STATUS_ERROR
4747
protected final AtomicBoolean hb;
4848
protected final AtomicLong idleHeartbeatSettingMillis;
4949
protected final AtomicLong alarmPeriodSettingNanos;
50-
protected final AtomicReference<ScheduledTask> heartbeatTask;
50+
protected final AtomicReference<ScheduledTask> heartbeatTaskRef;
5151

5252
protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) {
5353
stateChangeLock = new ReentrantLock();
@@ -62,7 +62,7 @@ protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncM
6262
idleHeartbeatSettingMillis = new AtomicLong();
6363
alarmPeriodSettingNanos = new AtomicLong();
6464
lastMsgReceivedNanoTime = new AtomicLong(NatsSystemClock.nanoTime());
65-
heartbeatTask = new AtomicReference<>();
65+
heartbeatTaskRef = new AtomicReference<>();
6666
}
6767

6868
protected boolean isSyncMode() { return syncMode; }
@@ -139,20 +139,25 @@ protected void updateLastMessageReceived() {
139139
protected void initOrResetHeartbeatTimer() {
140140
stateChangeLock.lock();
141141
try {
142-
ScheduledTask hbTask = heartbeatTask.get();
142+
ScheduledTask hbTask = heartbeatTaskRef.get();
143143
if (hbTask != null) {
144+
// we always want a fresh schedule because it will have the initial delay
144145
hbTask.shutdown();
145146
}
146147

147-
// replacement or new comes here
148-
heartbeatTask.set(new ScheduledTask(conn.getScheduledExecutor(), alarmPeriodSettingNanos.get(), TimeUnit.NANOSECONDS,
149-
() -> {
150-
long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
151-
if (sinceLast > alarmPeriodSettingNanos.get()) {
152-
shutdownHeartbeatTimer(); // a new one will get started when needed.
153-
handleHeartbeatError();
154-
}
155-
}));
148+
// set the ref with a new ScheduledTask
149+
// reminder that ScheduledTask schedules itself, which is why we pass the executor
150+
heartbeatTaskRef.set(
151+
new ScheduledTask(conn.getScheduledExecutor(), alarmPeriodSettingNanos.get(), TimeUnit.NANOSECONDS,
152+
() -> {
153+
long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
154+
if (sinceLast > alarmPeriodSettingNanos.get()) {
155+
handleHeartbeatError();
156+
}
157+
})
158+
);
159+
160+
// since we just scheduled, reset this otherwise it may alarm too soon
156161
updateLastMessageReceived();
157162
}
158163
finally {
@@ -163,10 +168,10 @@ protected void initOrResetHeartbeatTimer() {
163168
protected void shutdownHeartbeatTimer() {
164169
stateChangeLock.lock();
165170
try {
166-
ScheduledTask hbTask = heartbeatTask.get();
171+
ScheduledTask hbTask = heartbeatTaskRef.get();
167172
if (hbTask != null) {
168173
hbTask.shutdown();
169-
heartbeatTask.set(null);
174+
heartbeatTaskRef.set(null);
170175
}
171176
}
172177
finally {

src/main/java/io/nats/client/impl/NatsConsumerContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private void checkState() throws IOException {
129129
}
130130
}
131131
if (lastCon.finished.get() && !lastCon.stopped.get()) {
132-
lastCon.lenientClose(); // finished, might as well make sure the sub is closed.
132+
lastCon.shutdownSub(); // finished, might as well make sure the sub is closed.
133133
}
134134
}
135135
}

src/main/java/io/nats/client/impl/NatsFetchConsumer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void pendingUpdated() {}
6767

6868
@Override
6969
public void heartbeatError() {
70-
finishAndClose();
70+
fullClose();
7171
}
7272

7373
@Override
@@ -85,7 +85,7 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
8585
if (m == null) {
8686
// if there are no messages in the internal cache AND there are no more pending,
8787
// they all have been read and we can go ahead and finish
88-
finishAndClose();
88+
fullClose();
8989
}
9090
return m;
9191
}
@@ -103,15 +103,15 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
103103
Message m = sub._nextUnmanagedNoWait(pullSubject);
104104
if (m == null) {
105105
// no message and no time left, go ahead and finish
106-
finishAndClose();
106+
fullClose();
107107
}
108108
return m;
109109
}
110110

111111
Message m = sub._nextUnmanaged(timeLeftNanos, pullSubject);
112112
if (m == null && isNoWaitNoExpires) {
113113
// no message and no wait, go ahead and finish
114-
finishAndClose();
114+
fullClose();
115115
}
116116
return m;
117117
}

src/main/java/io/nats/client/impl/NatsMessageConsumer.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,40 +52,47 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager
5252
@Override
5353
public void heartbeatError() {
5454
try {
55-
// just close the current sub and make another one.
56-
// this could go on endlessly - unless the user had called stop
5755
if (stopped.get()) {
58-
finishAndClose();
56+
fullClose();
5957
}
6058
else {
61-
lenientClose();
59+
shutdownSub();
6260
doSub();
6361
}
6462
}
6563
catch (JetStreamApiException | IOException e) {
66-
pmm.resetTracking();
67-
pmm.initOrResetHeartbeatTimer();
64+
setupHbAlarmToTrigger();
6865
}
6966
}
7067

7168
void doSub() throws JetStreamApiException, IOException {
7269
MessageHandler mh = userMessageHandler == null ? null : msg -> {
7370
userMessageHandler.onMessage(msg);
7471
if (stopped.get() && pmm.noMorePending()) {
75-
finishAndClose();
72+
finished.set(true);
7673
}
7774
};
78-
super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null));
79-
repull();
80-
stopped.set(false);
81-
finished.set(false);
75+
try {
76+
stopped.set(false);
77+
finished.set(false);
78+
super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null));
79+
repull();
80+
}
81+
catch (JetStreamApiException | IOException e) {
82+
setupHbAlarmToTrigger();
83+
}
84+
}
85+
86+
private void setupHbAlarmToTrigger() {
87+
pmm.resetTracking();
88+
pmm.initOrResetHeartbeatTimer();
8289
}
8390

8491
@Override
8592
public void pendingUpdated() {
8693
if (stopped.get()) {
8794
if (pmm.noMorePending()) {
88-
finishAndClose();
95+
fullClose();
8996
}
9097
}
9198
else if (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))

src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,21 +101,19 @@ public void stop() {
101101

102102
@Override
103103
public void close() throws Exception {
104-
lenientClose();
104+
stopped.set(true);
105+
shutdownSub();
105106
}
106107

107-
protected void finishAndClose() {
108-
if (pmm != null) {
109-
pmm.shutdownHeartbeatTimer();
110-
}
108+
protected void fullClose() {
109+
stopped.set(true);
111110
finished.set(true);
112-
lenientClose();
111+
shutdownSub();
113112
}
114113

115-
protected void lenientClose() {
114+
protected void shutdownSub() {
116115
try {
117-
if (!stopped.get() || sub.isActive()) {
118-
stopped.set(true);
116+
if (sub.isActive()) {
119117
if (sub.getNatsDispatcher() != null) {
120118
sub.getDispatcher().unsubscribe(sub);
121119
}
@@ -127,5 +125,13 @@ protected void lenientClose() {
127125
catch (Throwable ignore) {
128126
// nothing to do
129127
}
128+
if (pmm != null) {
129+
try {
130+
pmm.shutdownHeartbeatTimer();
131+
}
132+
catch (Throwable ignore) {
133+
// nothing to do
134+
}
135+
}
130136
}
131137
}

src/main/java/io/nats/client/impl/PullOrderedMessageManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ protected PullOrderedMessageManager(NatsConnection conn,
4444

4545
@Override
4646
protected void startup(NatsJetStreamSubscription sub) {
47+
expectedExternalConsumerSeq = 1; // consumer always starts with consumer sequence 1
4748
super.startup(sub);
4849
targetSid.set(sub.getSID());
4950
}

0 commit comments

Comments
 (0)