Skip to content

Commit c1df255

Browse files
authored
Merge branch 'main' into kv-purge-per-message-ttl
2 parents 0a5906e + 5d1da2c commit c1df255

File tree

12 files changed

+287
-130
lines changed

12 files changed

+287
-130
lines changed

src/main/java/io/nats/client/impl/DispatcherFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
import io.nats.client.MessageHandler;
1717

1818
/**
19-
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
20-
* WARNING: THIS CLASS IS PUBLIC BUT ITS API IS NOT GUARANTEED
21-
* TO BE BACKWARD COMPATIBLE AS IT IS PURELY AN INTERNAL CLASS
22-
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
19+
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! *
20+
* WARNING: THIS CLASS IS PUBLIC BUT ITS API IS NOT GUARANTEED TO *
21+
* BE BACKWARD COMPATIBLE AS IT IS INTENDED AS AN INTERNAL CLASS *
22+
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! *
2323
*/
2424
public class DispatcherFactory {
2525
NatsDispatcher createDispatcher(NatsConnection conn, MessageHandler handler) {

src/main/java/io/nats/client/impl/MessageManager.java

Lines changed: 35 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222

2323
import java.time.Duration;
2424
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicBoolean;
2526
import java.util.concurrent.atomic.AtomicLong;
27+
import java.util.concurrent.atomic.AtomicReference;
2628
import java.util.concurrent.locks.ReentrantLock;
2729

2830
abstract class MessageManager {
@@ -42,11 +44,10 @@ public enum ManageResult {MESSAGE, STATUS_HANDLED, STATUS_TERMINUS, STATUS_ERROR
4244
protected AtomicLong lastMsgReceivedNanoTime;
4345

4446
// heartbeat stuff
45-
protected boolean hb;
46-
protected long idleHeartbeatSetting;
47-
protected long alarmPeriodSettingNanos;
48-
protected ScheduledTask heartbeatTask;
49-
protected final AtomicLong currentAlarmPeriodNanos;
47+
protected final AtomicBoolean hb;
48+
protected final AtomicLong idleHeartbeatSettingMillis;
49+
protected final AtomicLong alarmPeriodSettingNanos;
50+
protected final AtomicReference<ScheduledTask> heartbeatTask;
5051

5152
protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) {
5253
stateChangeLock = new ReentrantLock();
@@ -57,20 +58,20 @@ protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncM
5758
lastStreamSeq = 0;
5859
lastConsumerSeq = 0;
5960

60-
hb = false;
61-
idleHeartbeatSetting = 0;
62-
alarmPeriodSettingNanos = 0;
61+
hb = new AtomicBoolean(false);
62+
idleHeartbeatSettingMillis = new AtomicLong();
63+
alarmPeriodSettingNanos = new AtomicLong();
6364
lastMsgReceivedNanoTime = new AtomicLong(NatsSystemClock.nanoTime());
64-
currentAlarmPeriodNanos = new AtomicLong();
65+
heartbeatTask = new AtomicReference<>();
6566
}
6667

6768
protected boolean isSyncMode() { return syncMode; }
6869
protected long getLastStreamSequence() { return lastStreamSeq; }
6970
protected long getLastConsumerSequence() { return lastConsumerSeq; }
7071
protected long getLastMsgReceivedNanoTime() { return lastMsgReceivedNanoTime.get(); }
71-
protected boolean isHb() { return hb; }
72-
protected long getIdleHeartbeatSetting() { return idleHeartbeatSetting; }
73-
protected long getAlarmPeriodSettingNanos() { return alarmPeriodSettingNanos; }
72+
protected boolean isHb() { return hb.get(); }
73+
protected long getIdleHeartbeatSetting() { return idleHeartbeatSettingMillis.get(); }
74+
protected long getAlarmPeriodSettingNanos() { return alarmPeriodSettingNanos.get(); }
7475

7576
protected void startup(NatsJetStreamSubscription sub) {
7677
this.sub = sub;
@@ -108,21 +109,22 @@ protected void handleHeartbeatError() {
108109
protected void configureIdleHeartbeat(Duration configIdleHeartbeat, long configMessageAlarmTime) {
109110
stateChangeLock.lock();
110111
try {
111-
idleHeartbeatSetting = configIdleHeartbeat == null ? 0 : configIdleHeartbeat.toMillis();
112-
if (idleHeartbeatSetting <= 0) {
113-
alarmPeriodSettingNanos = 0;
114-
hb = false;
112+
long idleSettingMillis = configIdleHeartbeat == null ? 0 : configIdleHeartbeat.toMillis();
113+
idleHeartbeatSettingMillis.set(idleSettingMillis);
114+
if (idleSettingMillis <= 0) {
115+
alarmPeriodSettingNanos.set(0);
116+
hb.set(false);
115117
}
116118
else {
117-
long alarmPeriodSetting;
118-
if (configMessageAlarmTime < idleHeartbeatSetting) {
119-
alarmPeriodSetting = idleHeartbeatSetting * THRESHOLD;
119+
long alarmPeriodSettingMillis;
120+
if (configMessageAlarmTime < idleSettingMillis) {
121+
alarmPeriodSettingMillis = idleSettingMillis * THRESHOLD;
120122
}
121123
else {
122-
alarmPeriodSetting = configMessageAlarmTime;
124+
alarmPeriodSettingMillis = configMessageAlarmTime;
123125
}
124-
alarmPeriodSettingNanos = alarmPeriodSetting * NatsConstants.NANOS_PER_MILLI;
125-
hb = true;
126+
alarmPeriodSettingNanos.set(alarmPeriodSettingMillis * NatsConstants.NANOS_PER_MILLI);
127+
hb.set(true);
126128
}
127129
}
128130
finally {
@@ -137,26 +139,20 @@ protected void updateLastMessageReceived() {
137139
protected void initOrResetHeartbeatTimer() {
138140
stateChangeLock.lock();
139141
try {
140-
if (heartbeatTask != null) {
141-
// Same settings, just reuse the existing timer
142-
if (currentAlarmPeriodNanos.get() == alarmPeriodSettingNanos) {
143-
updateLastMessageReceived();
144-
return;
145-
}
146-
147-
// Replace timer since settings have changed
148-
shutdownHeartbeatTimer();
142+
ScheduledTask hbTask = heartbeatTask.get();
143+
if (hbTask != null) {
144+
hbTask.shutdown();
149145
}
150146

151147
// replacement or new comes here
152-
this.currentAlarmPeriodNanos.set(alarmPeriodSettingNanos);
153-
heartbeatTask = new ScheduledTask(conn.getScheduledExecutor(), alarmPeriodSettingNanos, TimeUnit.NANOSECONDS,
148+
heartbeatTask.set(new ScheduledTask(conn.getScheduledExecutor(), alarmPeriodSettingNanos.get(), TimeUnit.NANOSECONDS,
154149
() -> {
155150
long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
156-
if (sinceLast > currentAlarmPeriodNanos.get()) {
151+
if (sinceLast > alarmPeriodSettingNanos.get()) {
152+
shutdownHeartbeatTimer(); // a new one will get started when needed.
157153
handleHeartbeatError();
158154
}
159-
});
155+
}));
160156
updateLastMessageReceived();
161157
}
162158
finally {
@@ -167,11 +163,11 @@ protected void initOrResetHeartbeatTimer() {
167163
protected void shutdownHeartbeatTimer() {
168164
stateChangeLock.lock();
169165
try {
170-
if (heartbeatTask != null) {
171-
heartbeatTask.shutdown();
172-
heartbeatTask = null;
166+
ScheduledTask hbTask = heartbeatTask.get();
167+
if (hbTask != null) {
168+
hbTask.shutdown();
169+
heartbeatTask.set(null);
173170
}
174-
currentAlarmPeriodNanos.set(0);
175171
}
176172
finally {
177173
stateChangeLock.unlock();

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ void tryToConnect(NatsUri cur, NatsUri resolved, long now) {
601601

602602
if (pingMillis > 0) {
603603
pingTask = new ScheduledTask(scheduledExecutor, pingMillis, () -> {
604-
if (isConnected()) {
604+
if (isConnected() && !isClosing()) {
605605
try {
606606
softPing(); // The timer always uses the standard queue
607607
}

src/main/java/io/nats/client/impl/PullMessageManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ protected void startPullRequest(String pullSubject, PullRequestOptions pro, bool
5252
pendingBytes += pro.getMaxBytes();
5353
trackingBytes = (pendingBytes > 0);
5454
configureIdleHeartbeat(pro.getIdleHeartbeat(), -1);
55-
if (hb) {
55+
if (hb.get()) {
5656
initOrResetHeartbeatTimer();
5757
}
5858
else {

src/main/java/io/nats/client/impl/PushMessageManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ protected PushMessageManager(NatsConnection conn,
5353
}
5454
else {
5555
configureIdleHeartbeat(initialCc.getIdleHeartbeat(), so.getMessageAlarmTime());
56-
fc = hb && initialCc.isFlowControl(); // can't have fc w/o heartbeat
56+
fc = hb.get() && initialCc.isFlowControl(); // can't have fc w/o heartbeat
5757
}
5858
}
5959

@@ -65,14 +65,14 @@ protected PushMessageManager(NatsConnection conn,
6565
protected void startup(NatsJetStreamSubscription sub) {
6666
super.startup(sub);
6767
sub.setBeforeQueueProcessor(this::beforeQueueProcessorImpl);
68-
if (hb) {
68+
if (hb.get()) {
6969
initOrResetHeartbeatTimer();
7070
}
7171
}
7272

7373
@Override
7474
protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
75-
if (hb) {
75+
if (hb.get()) {
7676
updateLastMessageReceived(); // only need to track when heartbeats are expected
7777
Status status = msg.getStatus();
7878
if (status != null) {

src/main/java/io/nats/client/support/ScheduledTask.java

Lines changed: 75 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,50 +17,113 @@
1717
import java.util.concurrent.ScheduledFuture;
1818
import java.util.concurrent.TimeUnit;
1919
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.concurrent.atomic.AtomicLong;
2021
import java.util.concurrent.atomic.AtomicReference;
2122

23+
2224
/**
23-
* This is a utility class for a task being scheduled
25+
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! *
26+
* WARNING: THIS CLASS IS PUBLIC BUT ITS API IS NOT GUARANTEED TO *
27+
* BE BACKWARD COMPATIBLE AS IT IS INTENDED AS AN INTERNAL CLASS *
28+
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! *
2429
*/
2530
public class ScheduledTask implements Runnable {
31+
private static final AtomicLong ID_GENERATOR = new AtomicLong();
32+
33+
private final String id;
2634
private final Runnable runnable;
2735
protected final AtomicReference<ScheduledFuture<?>> scheduledFutureRef;
2836

29-
protected final AtomicBoolean keepGoing;
37+
protected final AtomicBoolean notShutdown;
38+
protected final AtomicBoolean executing;
3039

3140
public ScheduledTask(ScheduledExecutorService ses, long initialAndPeriodMillis, Runnable runnable) {
32-
this(ses, initialAndPeriodMillis, initialAndPeriodMillis, TimeUnit.MILLISECONDS, runnable);
41+
this(null, ses, initialAndPeriodMillis, initialAndPeriodMillis, TimeUnit.MILLISECONDS, runnable);
42+
}
43+
44+
public ScheduledTask(String id, ScheduledExecutorService ses, long initialAndPeriodMillis, Runnable runnable) {
45+
this(id, ses, initialAndPeriodMillis, initialAndPeriodMillis, TimeUnit.MILLISECONDS, runnable);
3346
}
3447

3548
public ScheduledTask(ScheduledExecutorService ses, long initialAndPeriod, TimeUnit unit, Runnable runnable) {
36-
this(ses, initialAndPeriod, initialAndPeriod, unit, runnable);
49+
this(null, ses, initialAndPeriod, initialAndPeriod, unit, runnable);
50+
}
51+
52+
public ScheduledTask(String id, ScheduledExecutorService ses, long initialAndPeriod, TimeUnit unit, Runnable runnable) {
53+
this(id, ses, initialAndPeriod, initialAndPeriod, unit, runnable);
3754
}
3855

39-
public ScheduledTask(ScheduledExecutorService ses, long initialDelay, long period, TimeUnit unit, Runnable runnable) {
56+
public ScheduledTask(ScheduledExecutorService ses, long initialDelay, long initialAndPeriod, TimeUnit unit, Runnable runnable) {
57+
this(null, ses, initialDelay, initialAndPeriod, unit, runnable);
58+
}
59+
60+
public ScheduledTask(String id, ScheduledExecutorService ses, long initialDelay, long initialAndPeriod, TimeUnit unit, Runnable runnable) {
61+
this.id = id == null || id.isEmpty() ? "st-" + ID_GENERATOR.getAndIncrement() : id;
4062
this.runnable = runnable;
41-
keepGoing = new AtomicBoolean(true);
63+
notShutdown = new AtomicBoolean(true);
64+
executing = new AtomicBoolean(false);
4265
scheduledFutureRef = new AtomicReference<>(
43-
ses.scheduleAtFixedRate(this, initialDelay, period, unit));
66+
ses.scheduleAtFixedRate(this, initialDelay, initialAndPeriod, unit));
4467
}
4568

4669
@Override
4770
public void run() {
48-
if (keepGoing.get() && !Thread.interrupted()) {
49-
runnable.run();
71+
try {
72+
if (notShutdown.get()) {
73+
executing.set(true);
74+
runnable.run();
75+
}
76+
}
77+
finally {
78+
executing.set(false);
5079
}
5180
}
5281

82+
public boolean isShutdown() {
83+
return !notShutdown.get();
84+
}
85+
86+
public boolean isExecuting() {
87+
return executing.get();
88+
}
89+
90+
public boolean isDone() {
91+
ScheduledFuture<?> f = scheduledFutureRef.get();
92+
return f == null || f.isDone();
93+
}
94+
95+
public String getId() {
96+
return id;
97+
}
98+
5399
public void shutdown() {
54100
try {
55-
keepGoing.getAndSet(false);
101+
notShutdown.set(false);
56102
ScheduledFuture<?> f = scheduledFutureRef.get();
57-
scheduledFutureRef.set(null); // just releasing resources
58103
if (f != null) {
59-
f.cancel(true);
104+
scheduledFutureRef.set(null); // just releasing resources.
105+
if (!f.isDone()) {
106+
f.cancel(false);
107+
}
60108
}
61109
}
62110
catch (Exception ignore) {
63111
// don't want this to be passed along
64112
}
65113
}
114+
115+
@Override
116+
public String toString() {
117+
StringBuilder sb = new StringBuilder(id);
118+
if (notShutdown.get()) {
119+
sb.append(" [live");
120+
}
121+
else {
122+
sb.append(" [shutdown");
123+
}
124+
sb.append(isDone() ? "/done" : "/not done");
125+
sb.append(executing.get() ? "/executing" : "/not executing");
126+
sb.append("]");
127+
return sb.toString();
128+
}
66129
}

0 commit comments

Comments
 (0)