Skip to content

Commit 8e20807

Browse files
authored
[Bug] Poison pill comparison was failing (#1162)
1 parent ac9ed27 commit 8e20807

File tree

5 files changed

+185
-111
lines changed

5 files changed

+185
-111
lines changed

src/main/java/io/nats/client/impl/MessageQueue.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.nats.client.impl;
1515

16+
import io.nats.client.Message;
17+
1618
import java.time.Duration;
1719
import java.util.ArrayList;
1820
import java.util.concurrent.LinkedBlockingQueue;
@@ -30,6 +32,7 @@ class MessageQueue {
3032
protected static final int STOPPED = 0;
3133
protected static final int RUNNING = 1;
3234
protected static final int DRAINING = 2;
35+
protected static final String POISON = "_poison";
3336

3437
protected final AtomicLong length;
3538
protected final AtomicLong sizeInBytes;
@@ -44,9 +47,10 @@ class MessageQueue {
4447
protected final Duration requestCleanupInterval;
4548

4649
// Poison pill is a graphic, but common term for an item that breaks loops or stop something.
47-
// In this class the poisonPill is used to break out of timed waits on the blocking queue.
50+
// In this class the poison pill is used to break out of timed waits on the blocking queue.
4851
// A simple == is used to check if any message in the queue is this message.
49-
protected final NatsMessage poisonPill;
52+
// /\ /\ /\ /\ which is why it is now a static. It's just a marker anyway.
53+
protected static final NatsMessage POISON_PILL = new NatsMessage(POISON, null, EMPTY_BODY);
5054

5155
MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval) {
5256
this(singleReaderMode, -1, false, requestCleanupInterval, null);
@@ -80,9 +84,6 @@ class MessageQueue {
8084
this.offerLockMillis = requestCleanupInterval.toMillis();
8185
this.offerTimeoutMillis = Math.max(1, requestCleanupInterval.toMillis() * 95 / 100);
8286

83-
// The poisonPill is used to stop poll and accumulate when the queue is stopped
84-
this.poisonPill = new NatsMessage("_poison", null, EMPTY_BODY);
85-
8687
editLock = new ReentrantLock();
8788

8889
this.singleReaderMode = singleReaderMode;
@@ -195,7 +196,7 @@ boolean push(NatsMessage msg, boolean internal) {
195196
*/
196197
void poisonTheQueue() {
197198
try {
198-
this.queue.add(this.poisonPill);
199+
this.queue.add(POISON_PILL);
199200
} catch (IllegalStateException ie) { // queue was full, so we don't really need poison pill
200201
// ok to ignore this
201202
}
@@ -222,11 +223,11 @@ NatsMessage poll(Duration timeout) throws InterruptedException {
222223
}
223224
}
224225

225-
if (msg == poisonPill) {
226-
return null;
227-
}
226+
return msg == null || isPoison(msg) ? null : msg;
227+
}
228228

229-
return msg;
229+
private boolean isPoison(Message msg) {
230+
return msg == POISON_PILL;
230231
}
231232

232233
NatsMessage pop(Duration timeout) throws InterruptedException {
@@ -286,7 +287,7 @@ NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout)
286287

287288
while (cursor != null) {
288289
NatsMessage next = this.queue.peek();
289-
if (next != null && next != this.poisonPill) {
290+
if (next != null && !isPoison(next)) {
290291
long s = next.getSizeInBytes();
291292

292293
if (maxSize<0 || (size + s) < maxSize) { // keep going

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ else if (cur.equals(first)) {
213213

214214
// let server pool resolve hostnames, then loop through resolved
215215
List<NatsUri> resolvedList = resolveHost(cur);
216-
while (!resolvedList.isEmpty()) {
216+
for (NatsUri resolved : resolvedList) {
217217
if (isClosed()) {
218218
keepGoing = false;
219219
break;
@@ -224,7 +224,6 @@ else if (cur.equals(first)) {
224224
updateStatus(Status.CONNECTING);
225225

226226
timeTraceLogger.trace("trying to connect to %s", cur);
227-
NatsUri resolved = resolvedList.remove(0);
228227
tryToConnect(cur, resolved, System.nanoTime());
229228

230229
if (isConnected()) {
@@ -321,8 +320,6 @@ void forceReconnectImpl() throws IOException, InterruptedException {
321320
try {
322321
// calling connect just starts like a new connection versus reconnect
323322
// but we have to manually resubscribe like reconnect once it is connected
324-
// also, lets assume we never want to try the currently connected server
325-
serverPool.connectFailed(currentServer); // we don't want to connect to the same server
326323
reconnectImpl();
327324
writer.setReconnectMode(false);
328325
}
@@ -374,40 +371,36 @@ else if (first.equals(cur)) {
374371
// let server list provider resolve hostnames
375372
// then loop through resolved
376373
List<NatsUri> resolvedList = resolveHost(cur);
377-
while (keepGoing && !resolvedList.isEmpty()) {
374+
for (NatsUri resolved : resolvedList) {
378375
if (isClosed()) {
379376
keepGoing = false;
377+
break;
380378
}
381-
else {
382-
connectError.set(""); // reset on each loop
383-
if (isDisconnectingOrClosed() || this.isClosing()) {
384-
keepGoing = false;
385-
}
386-
else {
387-
updateStatus(Status.RECONNECTING);
379+
connectError.set(""); // reset on each loop
380+
if (isDisconnectingOrClosed() || this.isClosing()) {
381+
keepGoing = false;
382+
break;
383+
}
384+
updateStatus(Status.RECONNECTING);
388385

389-
timeTraceLogger.trace("reconnecting to server %s", cur);
390-
NatsUri resolved = resolvedList.remove(0);
391-
tryToConnect(cur, resolved, System.nanoTime());
386+
timeTraceLogger.trace("reconnecting to server %s", cur);
387+
tryToConnect(cur, resolved, System.nanoTime());
392388

393-
if (isConnected()) {
394-
serverPool.connectSucceeded(cur);
395-
statistics.incrementReconnects();
396-
keepGoing = false;
397-
}
398-
else {
399-
serverPool.connectFailed(cur);
400-
String err = connectError.get();
401-
if (this.isAuthenticationError(err)) {
402-
if (err.equals(this.serverAuthErrors.get(resolved))) {
403-
keepGoing = false; // double auth error
404-
}
405-
else {
406-
serverAuthErrors.put(resolved, err);
407-
}
408-
}
409-
}
389+
if (isConnected()) {
390+
serverPool.connectSucceeded(cur);
391+
statistics.incrementReconnects();
392+
keepGoing = false;
393+
break;
394+
}
395+
396+
serverPool.connectFailed(cur);
397+
String err = connectError.get();
398+
if (this.isAuthenticationError(err)) {
399+
if (err.equals(this.serverAuthErrors.get(resolved))) {
400+
keepGoing = false; // double auth error
401+
break;
410402
}
403+
serverAuthErrors.put(resolved, err);
411404
}
412405
}
413406
}

src/test/java/io/nats/client/ConnectTests.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import io.nats.client.ConnectionListener.Events;
1717
import io.nats.client.NatsServerProtocolMock.ExitAt;
18+
import io.nats.client.api.ServerInfo;
1819
import io.nats.client.impl.ListenerForTesting;
1920
import io.nats.client.impl.SimulateSocketDataPortException;
2021
import org.junit.jupiter.api.Test;
@@ -559,4 +560,47 @@ public void exceptionOccurred(Connection conn, Exception exp) {
559560
}
560561
}
561562
}
562-
}
563+
564+
@Test
565+
public void testRunInJsCluster() throws Exception {
566+
ListenerForTesting[] listeners = new ListenerForTesting[3];
567+
listeners[0] = new ListenerForTesting();
568+
listeners[1] = new ListenerForTesting();
569+
listeners[2] = new ListenerForTesting();
570+
571+
ThreeServerTestOptionsAppender appender = (ix, builder) ->
572+
builder.connectionListener(listeners[ix]).errorListener(listeners[ix]);
573+
574+
runInJsCluster(ConnectTests::validateRunInJsCluster);
575+
576+
listeners[0] = new ListenerForTesting();
577+
listeners[1] = new ListenerForTesting();
578+
listeners[2] = new ListenerForTesting();
579+
580+
runInJsCluster(true, appender, ConnectTests::validateRunInJsCluster);
581+
}
582+
583+
private static void validateRunInJsCluster(Connection nc1, Connection nc2, Connection nc3) throws InterruptedException {
584+
Thread.sleep(200);
585+
ServerInfo si1 = nc1.getServerInfo();
586+
ServerInfo si2 = nc2.getServerInfo();
587+
ServerInfo si3 = nc3.getServerInfo();
588+
assertEquals(si1.getCluster(), si2.getCluster());
589+
assertEquals(si1.getCluster(), si3.getCluster());
590+
String port1 = "" + si1.getPort();
591+
String port2 = "" + si2.getPort();
592+
String port3 = "" + si3.getPort();
593+
String urls1 = String.join(",", si1.getConnectURLs());
594+
String urls2 = String.join(",", si2.getConnectURLs());
595+
String urls3 = String.join(",", si3.getConnectURLs());
596+
assertTrue(urls1.contains(port1));
597+
assertTrue(urls1.contains(port2));
598+
assertTrue(urls1.contains(port3));
599+
assertTrue(urls2.contains(port1));
600+
assertTrue(urls2.contains(port2));
601+
assertTrue(urls2.contains(port3));
602+
assertTrue(urls3.contains(port1));
603+
assertTrue(urls3.contains(port2));
604+
assertTrue(urls3.contains(port3));
605+
}
606+
}

src/test/java/io/nats/client/impl/ReconnectTests.java

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import io.nats.client.*;
1717
import io.nats.client.ConnectionListener.Events;
18+
import io.nats.client.api.ServerInfo;
1819
import nats.io.NatsRunnerUtils;
1920
import org.junit.jupiter.api.Test;
2021

@@ -716,30 +717,46 @@ private static Thread getReconnectOnConnectTestThread(AtomicReference<Connection
716717
@Test
717718
public void testForceReconnect() throws Exception {
718719
ListenerForTesting listener = new ListenerForTesting();
719-
Options.Builder builder = Options.builder()
720-
.connectionListener(listener)
721-
.errorListener(listener);
720+
ThreeServerTestOptionsAppender appender = makeOptionsAppender(listener);
722721

723-
runInJsServer(nc1 -> runInServer(nc2 -> {
724-
int port1 = nc1.getServerInfo().getPort();
725-
int port2 = nc2.getServerInfo().getPort();
722+
runInJsCluster(appender, (nc0, nc1, nc2) -> {
723+
_testForceReconnect(nc0, listener);
724+
});
725+
}
726726

727-
String[] servers = new String[]{
728-
NatsRunnerUtils.getNatsLocalhostUri(port1),
729-
NatsRunnerUtils.getNatsLocalhostUri(port2)
730-
};
731-
//noinspection resource
732-
Connection nc = standardConnection(builder.servers(servers).build());
733-
int connectedPort = nc.getServerInfo().getPort();
734-
nc.forceReconnect();
735-
Thread.sleep(3000);
736-
assertNotEquals(connectedPort, nc.getServerInfo().getPort());
737-
}));
727+
@Test
728+
public void testForceReconnectWithAccount() throws Exception {
729+
ListenerForTesting listener = new ListenerForTesting();
730+
ThreeServerTestOptionsAppender appender = makeOptionsAppender(listener);
731+
runInJsCluster(true, appender, (nc0, nc1, nc2) -> {
732+
_testForceReconnect(nc0, listener);
733+
});
734+
735+
}
738736

737+
private static ThreeServerTestOptionsAppender makeOptionsAppender(ListenerForTesting listener) {
738+
ThreeServerTestOptionsAppender appender = (ix, builder) -> {
739+
if (ix == 0) {
740+
builder.connectionListener(listener).ignoreDiscoveredServers().noRandomize();
741+
}
742+
};
743+
return appender;
744+
}
745+
746+
private static void _testForceReconnect(Connection nc0, ListenerForTesting listener) throws IOException, InterruptedException {
747+
ServerInfo si = nc0.getServerInfo();
748+
String connectedServer = si.getServerId();
749+
750+
nc0.forceReconnect();
751+
standardConnectionWait(nc0);
752+
753+
si = nc0.getServerInfo();
754+
assertNotEquals(connectedServer, si.getServerId());
739755
assertTrue(listener.getConnectionEvents().contains(Events.DISCONNECTED));
740756
assertTrue(listener.getConnectionEvents().contains(Events.RECONNECTED));
741757
}
742758

759+
743760
@Test
744761
public void testSocketDataPortTimeout() throws Exception {
745762
ListenerForTesting listener = new ListenerForTesting();

0 commit comments

Comments
 (0)