Skip to content

Commit ab2a40b

Browse files
authored
Merge pull request #1335 from nats-io/replace-timer
Replace Timer with scheduled tasks
2 parents 3bf51e7 + b1c71b8 commit ab2a40b

File tree

12 files changed

+319
-150
lines changed

12 files changed

+319
-150
lines changed

CHANGELOG.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,18 @@
2020
### General
2121
* Update repository info and cleanup readme. #1318 @scottf
2222

23+
```
24+
┌─────────────────────┬───────────────────┬─────────────────┬──────────────────────────┬──────────────────┐
25+
│ │ count │ time │ msgs/sec │ bytes/sec │
26+
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
27+
│ PubAsync │ 50,000,000 msgs │ 28:02.729 │ 29,713.638 msgs/sec │ 7.08 mb/sec │
28+
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
29+
│ SubFetch │ 49,999,991 msgs │ 36:03.206 │ 23,113.837 msgs/sec │ 5.51 mb/sec │
30+
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
31+
│ SubIterate │ 50,000,000 msgs │ 20:02.740 │ 41,571.745 msgs/sec │ 9.91 mb/sec │
32+
└─────────────────────┴───────────────────┴─────────────────┴──────────────────────────┴──────────────────┘
33+
```
34+
2335
## 2.21.1
2436
### KV
2537
* KV TTL (stream max_age) versus stream duplicate_window #1301 @scottf
@@ -30,6 +42,18 @@
3042
### Misc
3143
* Remove STAN references #1300 @scottf
3244

45+
```
46+
┌─────────────────────┬───────────────────┬─────────────────┬──────────────────────────┬──────────────────┐
47+
│ │ count │ time │ msgs/sec │ bytes/sec │
48+
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
49+
│ PubAsync │ 50,000,000 msgs │ 28:01.156 │ 29,741.440 msgs/sec │ 7.09 mb/sec │
50+
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
51+
│ SubFetch │ 50,000,000 msgs │ 38:01.867 │ 21,911.882 msgs/sec │ 5.22 mb/sec │
52+
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
53+
│ SubIterate │ 50,000,000 msgs │ 20:27.331 │ 40,738.806 msgs/sec │ 9.71 mb/sec │
54+
└─────────────────────┴───────────────────┴─────────────────┴──────────────────────────┴──────────────────┘
55+
```
56+
3357
## 2.21.0
3458
### Core
3559
* Handle Server 2.10.26 returns No Responders instead of timeouts. #1292 @scottf
@@ -44,6 +68,18 @@ Main 2 11 merge safe #1294 is actually a compilation of PRs related to 2.11 feat
4468
* Add Message TTL Stream Configuration #1280
4569
* Per Message TTL Support for 2.11 #1295
4670

71+
```
72+
┌─────────────────────┬───────────────────┬─────────────────┬──────────────────────────┬──────────────────┐
73+
│ │ count │ time │ msgs/sec │ bytes/sec │
74+
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
75+
│ PubAsync │ 50,000,000 msgs │ 29:10.069 │ 28,570.302 msgs/sec │ 6.81 mb/sec │
76+
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
77+
│ SubFetch │ 49,999,993 msgs │ 36:15.122 │ 22,987.213 msgs/sec │ 5.48 mb/sec │
78+
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
79+
│ SubIterate │ 50,000,000 msgs │ 21:02.180 │ 39,614.001 msgs/sec │ 9.44 mb/sec │
80+
└─────────────────────┴───────────────────┴─────────────────┴──────────────────────────┴──────────────────┘
81+
```
82+
4783
## 2.20.6
4884
### Core
4985
* Reader Listener #1265 @scottf

build.gradle

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,11 @@ test {
8585
showStandardStreams = true
8686
}
8787
if (isLocal) {
88-
failFast = true
88+
retry {
89+
failOnPassedAfterRetry = false
90+
maxFailures = 2
91+
maxRetries = 2
92+
}
8993
}
9094
else {
9195
retry {

src/main/java/io/nats/client/NatsSystemClock.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,29 @@
1313

1414
package io.nats.client;
1515

16-
public class NatsSystemClock {
16+
public final class NatsSystemClock {
1717
private static NatsSystemClockProvider PROVIDER = new NatsSystemClockProvider() {};
1818

19+
/**
20+
* Set the provider. Null will reset to system default
21+
* @param provider the provider
22+
*/
1923
public static void setProvider(final NatsSystemClockProvider provider) {
2024
PROVIDER = provider == null ? new NatsSystemClockProvider() {} : provider;
2125
}
2226

27+
/**
28+
* Get the current milliseconds from the provider
29+
* @return the milliseconds
30+
*/
2331
public static long currentTimeMillis() {
2432
return PROVIDER.currentTimeMillis();
2533
}
2634

35+
/**
36+
* Get the current nano time from the provider
37+
* @return the nano time
38+
*/
2739
public static long nanoTime() {
2840
return PROVIDER.nanoTime();
2941
}

src/main/java/io/nats/client/Options.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,11 @@ public class Options {
504504
* {@link Builder#executor(ExecutorService) executor}.
505505
*/
506506
public static final String PROP_EXECUTOR_SERVICE_CLASS = "executor.service.class";
507+
/**
508+
* Property used to set class name for the Executor Service (executor) class
509+
* {@link Builder#executor(ExecutorService) executor}.
510+
*/
511+
public static final String PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS = "scheduled.executor.service.class";
507512
/**
508513
* Property used to set class name for the Connect Thread Factory
509514
* {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory}.
@@ -660,14 +665,15 @@ public class Options {
660665
private final ErrorListener errorListener;
661666
private final TimeTraceLogger timeTraceLogger;
662667
private final ConnectionListener connectionListener;
663-
private ReadListener readListener;
668+
private final ReadListener readListener;
664669
private final StatisticsCollector statisticsCollector;
665670
private final String dataPortType;
666671

667672
private final boolean trackAdvancedStats;
668673
private final boolean traceConnection;
669674

670675
private final ExecutorService executor;
676+
private final ScheduledExecutorService scheduledExecutor;
671677
private final ThreadFactory connectThreadFactory;
672678
private final ThreadFactory callbackThreadFactory;
673679
private final ServerPool serverPool;
@@ -808,6 +814,7 @@ public static class Builder {
808814
private StatisticsCollector statisticsCollector = null;
809815
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
810816
private ExecutorService executor;
817+
private ScheduledExecutorService scheduledExecutor;
811818
private ThreadFactory connectThreadFactory;
812819
private ThreadFactory callbackThreadFactory;
813820
private List<java.util.function.Consumer<HttpRequest>> httpRequestInterceptors;
@@ -939,6 +946,7 @@ public Builder properties(Properties props) {
939946
classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o);
940947
classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o);
941948
classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o);
949+
classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.scheduledExecutor = (ScheduledExecutorService) o);
942950
classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o);
943951
classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o);
944952
return this;
@@ -1630,6 +1638,19 @@ public Builder executor(ExecutorService executor) {
16301638
return this;
16311639
}
16321640

1641+
/**
1642+
* Set the {@link ScheduledExecutorService ScheduledExecutorService} used to run scheduled task like
1643+
* heartbeat timers
1644+
* The default is a ScheduledThreadPoolExecutor that does not
1645+
* execute delayed tasks after shutdown and removes tasks on cancel;
1646+
* @param scheduledExecutor The ScheduledExecutorService to use for timer tasks
1647+
* @return the Builder for chaining
1648+
*/
1649+
public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
1650+
this.scheduledExecutor = scheduledExecutor;
1651+
return this;
1652+
}
1653+
16331654
/**
16341655
* Sets custom thread factory for the executor service
16351656
*
@@ -1910,6 +1931,17 @@ else if (useDefaultTls) {
19101931
new DefaultThreadFactory(threadPrefix));
19111932
}
19121933

1934+
if (this.scheduledExecutor == null) {
1935+
String threadPrefix = nullOrEmpty(this.connectionName) ? DEFAULT_THREAD_NAME_PREFIX : this.connectionName;
1936+
// the core pool size of 3 is chosen considering where we know the scheduler is used.
1937+
// 1. Ping timer, 2. cleanup timer, 3. SocketDataPortWithWriteTimeout
1938+
// Pull message managers also use a scheduler, but we don't even know if this will be consuming
1939+
ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(3, new DefaultThreadFactory(threadPrefix));
1940+
stpe.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
1941+
stpe.setRemoveOnCancelPolicy(true);
1942+
this.scheduledExecutor = stpe;
1943+
}
1944+
19131945
if (socketReadTimeoutMillis > 0) {
19141946
long srtMin = pingInterval.toMillis() + MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT;
19151947
if (socketReadTimeoutMillis < srtMin) {
@@ -2014,6 +2046,7 @@ public Builder(Options o) {
20142046
this.dataPortType = o.dataPortType;
20152047
this.trackAdvancedStats = o.trackAdvancedStats;
20162048
this.executor = o.executor;
2049+
this.scheduledExecutor = o.scheduledExecutor;
20172050
this.callbackThreadFactory = o.callbackThreadFactory;
20182051
this.connectThreadFactory = o.connectThreadFactory;
20192052
this.httpRequestInterceptors = o.httpRequestInterceptors;
@@ -2082,6 +2115,7 @@ private Options(Builder b) {
20822115
this.dataPortType = b.dataPortType;
20832116
this.trackAdvancedStats = b.trackAdvancedStats;
20842117
this.executor = b.executor;
2118+
this.scheduledExecutor = b.scheduledExecutor;
20852119
this.callbackThreadFactory = b.callbackThreadFactory;
20862120
this.connectThreadFactory = b.connectThreadFactory;
20872121
this.httpRequestInterceptors = b.httpRequestInterceptors;
@@ -2107,6 +2141,13 @@ public ExecutorService getExecutor() {
21072141
return this.executor;
21082142
}
21092143

2144+
/**
2145+
* @return the ScheduledExecutorService, see {@link Builder#scheduledExecutor(ScheduledExecutorService) scheduledExecutor()} in the builder doc
2146+
*/
2147+
public ScheduledExecutorService getScheduledExecutor() {
2148+
return scheduledExecutor;
2149+
}
2150+
21102151
/**
21112152
* @return the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
21122153
*/

src/main/java/io/nats/client/impl/MessageManager.java

Lines changed: 20 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818
import io.nats.client.PullRequestOptions;
1919
import io.nats.client.SubscribeOptions;
2020
import io.nats.client.support.NatsConstants;
21+
import io.nats.client.support.ScheduledTask;
2122

2223
import java.time.Duration;
23-
import java.util.Timer;
24-
import java.util.TimerTask;
25-
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.TimeUnit;
2625
import java.util.concurrent.atomic.AtomicLong;
2726
import java.util.concurrent.locks.ReentrantLock;
2827

@@ -46,8 +45,8 @@ public enum ManageResult {MESSAGE, STATUS_HANDLED, STATUS_TERMINUS, STATUS_ERROR
4645
protected boolean hb;
4746
protected long idleHeartbeatSetting;
4847
protected long alarmPeriodSettingNanos;
49-
protected MmTimerTask heartbeatTimerTask;
50-
protected Timer heartbeatTimer;
48+
protected ScheduledTask heartbeatTask;
49+
protected final AtomicLong currentAlarmPeriodNanos;
5150

5251
protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) {
5352
stateChangeLock = new ReentrantLock();
@@ -62,6 +61,7 @@ protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncM
6261
idleHeartbeatSetting = 0;
6362
alarmPeriodSettingNanos = 0;
6463
lastMsgReceivedNanoTime = new AtomicLong(NatsSystemClock.nanoTime());
64+
currentAlarmPeriodNanos = new AtomicLong();
6565
}
6666

6767
protected boolean isSyncMode() { return syncMode; }
@@ -134,62 +134,29 @@ protected void updateLastMessageReceived() {
134134
lastMsgReceivedNanoTime.set(NatsSystemClock.nanoTime());
135135
}
136136

137-
class MmTimerTask extends TimerTask {
138-
long alarmPeriodNanos;
139-
final AtomicBoolean alive;
140-
141-
public MmTimerTask(long alarmPeriodNanos) {
142-
this.alarmPeriodNanos = alarmPeriodNanos;
143-
alive = new AtomicBoolean(true);
144-
}
145-
146-
public void reuse() {
147-
alive.getAndSet(true);
148-
}
149-
150-
public void shutdown() {
151-
alive.getAndSet(false);
152-
}
153-
154-
@Override
155-
public void run() {
156-
if (alive.get() && !Thread.interrupted()) {
157-
long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
158-
if (alive.get() && sinceLast > alarmPeriodNanos) {
159-
handleHeartbeatError();
160-
}
161-
}
162-
}
163-
164-
@Override
165-
public String toString() {
166-
long sinceLastMillis = (NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get()) / NatsConstants.NANOS_PER_MILLI;
167-
return "MmTimerTask{" +
168-
", alarmPeriod=" + (alarmPeriodNanos / NatsConstants.NANOS_PER_MILLI) +
169-
"ms, alive=" + alive.get() +
170-
", sinceLast=" + sinceLastMillis + "ms}";
171-
}
172-
}
173-
174137
protected void initOrResetHeartbeatTimer() {
175138
stateChangeLock.lock();
176139
try {
177-
if (heartbeatTimer != null) {
140+
if (heartbeatTask != null) {
178141
// Same settings, just reuse the existing timer
179-
if (heartbeatTimerTask.alarmPeriodNanos == alarmPeriodSettingNanos) {
180-
heartbeatTimerTask.reuse();
142+
if (currentAlarmPeriodNanos.get() == alarmPeriodSettingNanos) {
181143
updateLastMessageReceived();
182144
return;
183145
}
184146

185147
// Replace timer since settings have changed
186148
shutdownHeartbeatTimer();
187149
}
150+
188151
// replacement or new comes here
189-
heartbeatTimer = new Timer();
190-
heartbeatTimerTask = new MmTimerTask(alarmPeriodSettingNanos);
191-
long alarmPeriodSettingMillis = alarmPeriodSettingNanos / NatsConstants.NANOS_PER_MILLI;
192-
heartbeatTimer.schedule(heartbeatTimerTask, alarmPeriodSettingMillis, alarmPeriodSettingMillis);
152+
this.currentAlarmPeriodNanos.set(alarmPeriodSettingNanos);
153+
heartbeatTask = new ScheduledTask(conn.getScheduledExecutor(), alarmPeriodSettingNanos, TimeUnit.NANOSECONDS,
154+
() -> {
155+
long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
156+
if (sinceLast > currentAlarmPeriodNanos.get()) {
157+
handleHeartbeatError();
158+
}
159+
});
193160
updateLastMessageReceived();
194161
}
195162
finally {
@@ -200,12 +167,11 @@ protected void initOrResetHeartbeatTimer() {
200167
protected void shutdownHeartbeatTimer() {
201168
stateChangeLock.lock();
202169
try {
203-
if (heartbeatTimer != null) {
204-
heartbeatTimerTask.shutdown();
205-
heartbeatTimerTask = null;
206-
heartbeatTimer.cancel();
207-
heartbeatTimer = null;
170+
if (heartbeatTask != null) {
171+
heartbeatTask.shutdown();
172+
heartbeatTask = null;
208173
}
174+
currentAlarmPeriodNanos.set(0);
209175
}
210176
finally {
211177
stateChangeLock.unlock();

0 commit comments

Comments
 (0)