Skip to content

Commit 64ebc2a

Browse files
authored
Merge pull request #174 from nats-io/v2.1.0
V2.1.0
2 parents 2aac130 + 9e69e67 commit 64ebc2a

File tree

18 files changed

+1442
-152
lines changed

18 files changed

+1442
-152
lines changed

.travis.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@ language: java
33
sudo: required
44
jdk:
55
- oraclejdk8
6+
- oraclejdk9
7+
- oraclejdk10
8+
- openjdk8
9+
- openjdk9
10+
- openjdk10
611
before_script:
712
- wget "https://github.com/nats-io/gnatsd/releases/download/$gnatsd_version/gnatsd-$gnatsd_version-linux-amd64.zip"
813
-O tmp.zip
@@ -24,7 +29,7 @@ cache:
2429
- "$HOME/.gradle/wrapper/"
2530
after_success:
2631
- "./gradlew test jacocoTestReport coveralls"
27-
- test ${TRAVIS_BRANCH} != 'master' && "./gradlew uploadArchives" # Disable master for now, it fails due to ip address issues
32+
- test ${TRAVIS_BRANCH} != 'master' && test ${TRAVIS_JDK_VERSION} == 'oraclejdk8' && "./gradlew uploadArchives" # Disable master for now, it fails due to ip address issues
2833
#Disable for now, upload archives fails because of IP address changes - "test ${TRAVIS_PULL_REQUEST} != 'true' && test ${TRAVIS_BRANCH} = 'master' && ./gradlew closeAndReleaseRepository"
2934
env:
3035
global:

CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11

22
# Change Log
33

4-
## Version 2.0.0
4+
## Version 2.1.0
5+
6+
* [ADDED] Support for consumer or connection drain. (New API lead to version bump.)
7+
* [FIXED] Fixed an issue with null pointer when ping/pong and reconnect interacted poorly.
8+
9+
## Version 2.0.2
510

611
* [FIXED] In a cluster situation the library wasn't using each server's auth info if it was in the URI.
712

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ A [Java](http://java.com) client for the [NATS messaging system](https://nats.io
1212

1313
## A Note on Versions
1414

15-
This is version 2.0 of the java-nats library. This version is a ground up rewrite of the original library. Part of the goal of this re-write was to address the excessive use of threads, we created a Dispatcher construct to allow applications to control thread creation more intentionally. This version also removes all non-JDK runtime dependencies.
15+
This is version 2.1 of the java-nats library. This version is a ground up rewrite of the original library. Part of the goal of this re-write was to address the excessive use of threads, we created a Dispatcher construct to allow applications to control thread creation more intentionally. This version also removes all non-JDK runtime dependencies.
1616

1717
The API is [simple to use](#listening-for-incoming-messages) and highly [performant](#Benchmarking).
1818

19-
Version 2.0 uses a simplified versioning scheme. Any issues will be fixed in the incremental version number. As a major release, the major version has been updated to 2.0 to allow clients to limit there use of this new API.
19+
Version 2.1 uses a simplified versioning scheme. Any issues will be fixed in the incremental version number. As a major release, the major version has been updated to 2 to allow clients to limit there use of this new API. With the addition of drain() we are updating to 2.1.
2020

2121
Previous versions are still available in the repo.
2222

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ plugins {
1212

1313
// Update version here, repeated check-ins not into master will have snapshot on them
1414
def versionMajor = 2
15-
def versionMinor = 0
16-
def versionPatch = 2
15+
def versionMinor = 1
16+
def versionPatch = 0
1717
def versionModifier = ""
1818
def branch = System.getenv("TRAVIS_BRANCH");
1919

src/examples/java/io/nats/examples/benchmark/NatsBench.java

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.TimeUnit;
3636
import java.util.concurrent.atomic.AtomicBoolean;
3737
import java.util.concurrent.atomic.AtomicLong;
38+
import java.util.concurrent.locks.LockSupport;
3839

3940
/**
4041
* A utility class for measuring NATS performance, similar to the version in go and node.
@@ -184,8 +185,14 @@ public void run() {
184185
}
185186

186187
class PubWorker extends Worker {
187-
PubWorker(Future<Boolean> starter, Phaser finisher, int numMsgs, int size, boolean secure) {
188+
189+
private AtomicLong start;
190+
private long targetPubRate;
191+
192+
PubWorker(Future<Boolean> starter, Phaser finisher, int numMsgs, int size, boolean secure, long targetPubRate) {
188193
super(starter, finisher, numMsgs, size, secure);
194+
this.start = new AtomicLong();
195+
this.targetPubRate = targetPubRate;
189196
}
190197

191198
@Override
@@ -201,22 +208,66 @@ public void run() {
201208

202209
// Wait for the signal
203210
starter.get(60, TimeUnit.SECONDS);
204-
long start = System.nanoTime();
211+
start.set(System.nanoTime());
205212
for (int i = 0; i < numMsgs; i++) {
206213
nc.publish(subject, payload);
207214
sent.incrementAndGet();
208215
}
209216
nc.flush(Duration.ZERO);
210217
long end = System.nanoTime();
211218

212-
bench.addPubSample(new Sample(numMsgs, size, start, end, nc.getStatistics()));
219+
bench.addPubSample(new Sample(numMsgs, size, start.get(), end, nc.getStatistics()));
213220
nc.close();
214221
} catch (Exception e) {
215222
errorQueue.add(e);
216223
} finally {
217224
finisher.arrive();
218225
}
219226
}
227+
228+
229+
void adjustAndSleep(Connection nc) throws InterruptedException {
230+
231+
if (this.targetPubRate <= 0) {
232+
return;
233+
}
234+
235+
long count = sent.incrementAndGet();
236+
237+
if (count % 1000 != 0) { // Only sleep every 1000 message
238+
return;
239+
}
240+
241+
long now = System.nanoTime();
242+
long start = this.start.get();
243+
double rate = (1e9 * (double) count)/((double)(now - start));
244+
double delay = (1.0/((double) this.targetPubRate));
245+
double adjust = delay / 20.0; // 5%
246+
247+
if (adjust == 0) {
248+
adjust = 1e-9; // 1ns min
249+
}
250+
251+
if (rate < this.targetPubRate) {
252+
delay -= adjust;
253+
} else if (rate > this.targetPubRate) {
254+
delay += adjust;
255+
}
256+
257+
if (delay < 0) {
258+
delay = 0;
259+
}
260+
261+
delay = delay * 1000; // we are doing this every 1000 messages
262+
263+
long nanos = (long)(delay * 1e9);
264+
LockSupport.parkNanos(nanos);
265+
266+
// Flush small messages regularly
267+
if (this.size < 64 && count != 0 && count % 100_000 == 0) {
268+
try {nc.flush(Duration.ZERO);}catch(Exception e){}
269+
}
270+
}
220271
}
221272

222273
/**
@@ -290,7 +341,11 @@ public void runTest(String title, int pubCount, int subCount) throws Exception {
290341
perPubMsgs = remaining;
291342
}
292343

293-
new Thread(new PubWorker(starter, finisher, perPubMsgs, this.size, secure), "Pub-"+i).start();
344+
if (subCount == 0) {
345+
new Thread(new PubWorker(starter, finisher, perPubMsgs, this.size, secure, 0), "Pub-"+i).start();
346+
} else {
347+
new Thread(new PubWorker(starter, finisher, perPubMsgs, this.size, secure, 2_000_000), "Pub-"+i).start();
348+
}
294349

295350
remaining -= perPubMsgs;
296351
}

src/main/java/io/nats/client/Connection.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,45 @@ public enum Status {
234234
*/
235235
public void flush(Duration timeout) throws TimeoutException, InterruptedException;
236236

237+
/**
238+
* Drain tells the connection to process in flight messages before closing.
239+
*
240+
* Drain initially drains all of the consumers, stopping incoming messages.
241+
* Next, publishing is halted and a flush call is used to insure all published
242+
* messages have reached the server.
243+
* Finally the connection is closed.
244+
*
245+
* In order to drain subscribers, an unsub protocol message is sent to the server followed by a flush.
246+
* These two steps occur before drain returns. The remaining steps occur in a background thread.
247+
* This method tries to manage the timeout properly, so that if the timeout is 1 second, and the flush
248+
* takes 100ms, the remaining steps have 900ms in the background thread.
249+
*
250+
* The connection will try to let all messages be drained, but when the timeout is reached
251+
* the connection is closed and any outstanding dispatcher threads are interrupted.
252+
*
253+
* A future is used to allow this call to be treated as synchronous or asynchronous as
254+
* needed by the application. The value of the future will be true if all of the subscriptions
255+
* were drained in the timeout, and false otherwise. The future is completed after the connection
256+
* is closed, so any connection handler notifications will happen before the future completes.
257+
*
258+
* @param timeout The time to wait for the drain to succeed, pass 0 to wait
259+
* forever. Drain involves moving messages to and from the server
260+
* so a very short timeout is not recommended. If the timeout is reached before
261+
* the drain completes, the connection is simply closed, which can result in message
262+
* loss.
263+
* @return A future that can be used to check if the drain has completed
264+
* @throws InterruptedException if the thread is interrupted
265+
* @throws TimeoutException if the initial flush times out
266+
*/
267+
public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutException, InterruptedException;
268+
237269
/**
238270
* Close the connection and release all blocking calls like {@link #flush flush}
239271
* and {@link Subscription#nextMessage(Duration) nextMessage}.
240272
*
273+
* If close() is called after {@link #drain(Duration) drain} it will wait up to the connection timeout
274+
* to return, but it will not initiate a close. The drain takes precedence and will initiate the close.
275+
*
241276
* @throws InterruptedException if the thread, or one owned by the connection is interrupted during the close
242277
*/
243278
public void close() throws InterruptedException ;

src/main/java/io/nats/client/Consumer.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313

1414
package io.nats.client;
1515

16+
import java.time.Duration;
17+
import java.util.concurrent.CompletableFuture;
18+
1619
/**
1720
* A Consumer in the NATS library is an object that represents an incoming queue of
1821
* messages. There are two types of consumers {@link Dispatcher} and {@link Subscription}.
@@ -96,4 +99,20 @@ public interface Consumer {
9699
* For a subscription the answer is false after unsubscribe. For a dispatcher, false after stop.
97100
*/
98101
public boolean isActive();
102+
103+
/**
104+
* Drain tells the consumer to process in flight, or cached messages, but stop receiving new ones. The library will
105+
* flush the unsubscribe call(s) insuring that any publish calls made by this client are included. When all messages
106+
* are processed the consumer effectively becomes unsubscribed.
107+
*
108+
* A future is used to allow this call to be treated as synchronous or asynchronous as
109+
* needed by the application.
110+
*
111+
* @param timeout The time to wait for the drain to succeed, pass 0 to wait
112+
* forever. Drain involves moving messages to and from the server
113+
* so a very short timeout is not recommended.
114+
* @return A future that can be used to check if the drain has completed
115+
* @throws InterruptedException if the thread is interrupted
116+
*/
117+
public CompletableFuture<Boolean> drain(Duration timeout) throws InterruptedException;
99118
}

src/main/java/io/nats/client/impl/MessageQueue.java

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,26 @@
1515

1616
import java.time.Duration;
1717
import java.util.concurrent.ConcurrentLinkedQueue;
18-
import java.util.concurrent.atomic.AtomicBoolean;
18+
import java.util.concurrent.atomic.AtomicInteger;
1919
import java.util.concurrent.atomic.AtomicLong;
2020
import java.util.concurrent.locks.LockSupport;
2121
import java.util.function.Predicate;
2222

2323
class MessageQueue {
24+
private final static int STOPPED = 0;
25+
private final static int RUNNING = 1;
26+
private final static int DRAINING = 2;
27+
2428
private final AtomicLong length;
2529
private final AtomicLong sizeInBytes;
26-
private final AtomicBoolean running;
30+
private final AtomicInteger running;
2731
private final boolean singleThreadedReader;
2832
private final ConcurrentLinkedQueue<NatsMessage> queue;
2933
private final ConcurrentLinkedQueue<Thread> waiters;
3034

3135
MessageQueue(boolean singleReaderMode) {
3236
this.queue = new ConcurrentLinkedQueue<>();
33-
this.running = new AtomicBoolean(true);
37+
this.running = new AtomicInteger(RUNNING);
3438
this.sizeInBytes = new AtomicLong(0);
3539
this.length = new AtomicLong(0);
3640

@@ -43,19 +47,32 @@ boolean isSingleReaderMode() {
4347
}
4448

4549
boolean isRunning() {
46-
return this.running.get();
50+
return this.running.get() != STOPPED;
51+
}
52+
53+
boolean isDraining() {
54+
return this.running.get() == DRAINING;
4755
}
4856

4957
void pause() {
50-
this.running.set(false);
58+
this.running.set(STOPPED);
5159
signalAll();
5260
}
5361

5462
void resume() {
55-
this.running.set(true);
63+
this.running.set(RUNNING);
5664
signalAll();
5765
}
5866

67+
void drain() {
68+
this.running.set(DRAINING);
69+
signalAll();
70+
}
71+
72+
boolean isDrained() {
73+
return this.running.get() == DRAINING && this.length() == 0;
74+
}
75+
5976
void signalOne() {
6077
Thread t = waiters.poll();
6178
if (t != null) {
@@ -99,7 +116,12 @@ NatsMessage waitForTimeout(Duration timeout) throws InterruptedException {
99116
// Semi-spin for at most MAX_SPIN_TIME
100117
if (timeoutNanos > MAX_SPIN_TIME) {
101118
int count = 0;
102-
while (this.running.get() && (retVal = this.queue.poll()) == null && count < MAX_SPINS) {
119+
while (this.isRunning() && (retVal = this.queue.poll()) == null && count < MAX_SPINS) {
120+
121+
if (this.isDraining()) {
122+
break;
123+
}
124+
103125
count++;
104126
LockSupport.parkNanos(SPIN_WAIT);
105127
}
@@ -111,7 +133,12 @@ NatsMessage waitForTimeout(Duration timeout) throws InterruptedException {
111133

112134
long now = start;
113135

114-
while (this.running.get() && (retVal = this.queue.poll()) == null) {
136+
while (this.isRunning() && (retVal = this.queue.poll()) == null) {
137+
138+
if (this.isDraining()) {
139+
break;
140+
}
141+
115142
if (timeoutNanos > 0) { // If it is 0, keep it as zero, otherwise reduce based on time
116143
now = System.nanoTime();
117144
timeoutNanos = timeoutNanos - (now - start); //include the semi-spin time
@@ -140,7 +167,7 @@ NatsMessage waitForTimeout(Duration timeout) throws InterruptedException {
140167
}
141168

142169
NatsMessage pop(Duration timeout) throws InterruptedException {
143-
if (!this.running.get()) {
170+
if (!this.isRunning()) {
144171
return null;
145172
}
146173

@@ -176,7 +203,7 @@ NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout)
176203
throw new IllegalStateException("Accumulate is only supported in single reader mode.");
177204
}
178205

179-
if (!this.running.get()) {
206+
if (!this.isRunning()) {
180207
return null;
181208
}
182209

@@ -185,7 +212,7 @@ NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout)
185212
if (msg == null) {
186213
msg = waitForTimeout(timeout);
187214

188-
if (!this.running.get() || (msg == null)) {
215+
if (!this.isRunning() || (msg == null)) {
189216
return null;
190217
}
191218
}
@@ -247,7 +274,7 @@ long sizeInBytes() {
247274
}
248275

249276
void filter(Predicate<NatsMessage> p) {
250-
if (this.running.get()) {
277+
if (this.isRunning()) {
251278
throw new IllegalStateException("Filter is only supported when the queue is paused");
252279
}
253280

0 commit comments

Comments
 (0)