Skip to content

Commit 6aaab79

Browse files
authored
Augmented force reconnect (#1165)
1 parent 8e20807 commit 6aaab79

File tree

7 files changed

+300
-10
lines changed

7 files changed

+300
-10
lines changed

src/main/java/io/nats/client/Connection.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,11 +547,21 @@ enum Status {
547547
/**
548548
* Forces reconnect behavior. Stops the current connection including the reading and writing,
549549
* copies already queued outgoing messages, and then begins the reconnect logic.
550+
* Does not flush. Does not force close the connection. See {@link #forceReconnect(ForceReconnectOptions)}.
550551
* @throws IOException the forceReconnect fails
551552
* @throws InterruptedException the connection is not connected
552553
*/
553554
void forceReconnect() throws IOException, InterruptedException;
554555

556+
/**
557+
* Forces reconnect behavior. Stops the current connection including the reading and writing,
558+
* copies already queued outgoing messages, and then begins the reconnect logic.
559+
* @param options options for how the forceReconnect works
560+
* @throws IOException the forceReconnect fails
561+
* @throws InterruptedException the connection is not connected
562+
*/
563+
void forceReconnect(ForceReconnectOptions options) throws IOException, InterruptedException;
564+
555565
/**
556566
* Calculates the round trip time between this client and the server.
557567
* @return the RTT as a duration
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2024 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at:
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package io.nats.client;
15+
16+
import java.time.Duration;
17+
18+
/**
19+
* The PublishOptions class specifies the options for publishing with JetStream enabled servers.
20+
* Options are created using a {@link ForceReconnectOptions.Builder Builder}.
21+
*/
22+
public class ForceReconnectOptions {
23+
24+
public static final ForceReconnectOptions FORCE_CLOSE_INSTANCE = ForceReconnectOptions.builder().forceClose().build();
25+
26+
private final boolean forceClose;
27+
private final Duration flushWait;
28+
29+
private ForceReconnectOptions(Builder b) {
30+
this.forceClose = b.forceClose;
31+
this.flushWait = b.flushWait;
32+
}
33+
34+
public boolean isForceClose() {
35+
return forceClose;
36+
}
37+
38+
public boolean isFlush() {
39+
return flushWait != null;
40+
}
41+
42+
public Duration getFlushWait() {
43+
return flushWait;
44+
}
45+
46+
/**
47+
* Creates a builder for the options.
48+
* @return the builder
49+
*/
50+
public static Builder builder() {
51+
return new Builder();
52+
}
53+
54+
/**
55+
* ForceReconnectOptions are created using a Builder.
56+
*/
57+
public static class Builder {
58+
boolean forceClose = false;
59+
Duration flushWait;
60+
61+
/**
62+
* Constructs a new Builder with the default values.
63+
*/
64+
public Builder() {}
65+
66+
public Builder forceClose() {
67+
this.forceClose = true;
68+
return this;
69+
}
70+
71+
/**
72+
* @param flushWait if supplied and at least 1 millisecond, the forceReconnect will try to
73+
* flush before closing for the specified wait time. Flush happens before close
74+
* so not affected by forceClose option
75+
* @return the builder
76+
*/
77+
public Builder flush(Duration flushWait) {
78+
this.flushWait = flushWait == null || flushWait.toMillis() < 1 ? null : flushWait;
79+
return this;
80+
}
81+
82+
/**
83+
* @param flushWaitMillis if supplied and at least 1 millisecond, the forceReconnect will try to
84+
* flush before closing for the specified wait time. Flush happens before close
85+
* so not affected by forceClose option
86+
* @return the builder
87+
*/
88+
public Builder flush(long flushWaitMillis) {
89+
if (flushWaitMillis > 0) {
90+
this.flushWait = Duration.ofMillis(flushWaitMillis);
91+
}
92+
else {
93+
this.flushWait = null;
94+
}
95+
return this;
96+
}
97+
98+
/**
99+
* Builds the ForceReconnectOptions.
100+
* @return ForceReconnectOptions
101+
*/
102+
public ForceReconnectOptions build() {
103+
return new ForceReconnectOptions(this);
104+
}
105+
}
106+
}

src/main/java/io/nats/client/Options.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1567,7 +1567,7 @@ public Builder proxy(Proxy proxy) {
15671567
* @return the Builder for chaining
15681568
*/
15691569
public Builder dataPortType(String dataPortClassName) {
1570-
this.dataPortType = dataPortClassName;
1570+
this.dataPortType = dataPortClassName == null ? DEFAULT_DATA_PORT_TYPE : dataPortClassName;
15711571
return this;
15721572
}
15731573

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,18 +271,32 @@ else if (trace) {
271271

272272
@Override
273273
public void forceReconnect() throws IOException, InterruptedException {
274+
forceReconnect(null);
275+
}
276+
277+
@Override
278+
public void forceReconnect(ForceReconnectOptions options) throws IOException, InterruptedException {
274279
if (!tryingToConnect.get()) {
275280
try {
276281
tryingToConnect.set(true);
277-
forceReconnectImpl();
282+
forceReconnectImpl(options);
278283
}
279284
finally {
280285
tryingToConnect.set(false);
281286
}
282287
}
283288
}
284289

285-
void forceReconnectImpl() throws IOException, InterruptedException {
290+
void forceReconnectImpl(ForceReconnectOptions options) throws InterruptedException {
291+
if (options != null && options.getFlushWait() != null) {
292+
try {
293+
flush(options.getFlushWait());
294+
}
295+
catch (TimeoutException e) {
296+
// ignore, don't care, too bad;
297+
}
298+
}
299+
286300
closeSocketLock.lock();
287301
try {
288302
updateStatus(Status.DISCONNECTED);
@@ -299,7 +313,12 @@ void forceReconnectImpl() throws IOException, InterruptedException {
299313
dataPort = null;
300314
executor.submit(() -> {
301315
try {
302-
closeMe.forceClose();
316+
if (options != null && options.isForceClose()) {
317+
closeMe.forceClose();
318+
}
319+
else {
320+
closeMe.close();
321+
}
303322
}
304323
catch (IOException ignore) {}
305324
});

src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.nats.client.impl;
1515

16+
import io.nats.client.ForceReconnectOptions;
1617
import io.nats.client.Options;
1718
import io.nats.client.support.NatsUri;
1819

@@ -39,10 +40,10 @@ public void run() {
3940
writeWatcherTimer.cancel(); // we don't need to repeat this
4041
connection.executeCallback((c, el) -> el.socketWriteTimeout(c));
4142
try {
42-
connection.forceReconnect();
43+
connection.forceReconnect(ForceReconnectOptions.FORCE_CLOSE_INSTANCE);
4344
}
4445
catch (IOException e) {
45-
// retry maybe? forceReconnect
46+
// retry maybe?
4647
}
4748
catch (InterruptedException e) {
4849
Thread.currentThread().interrupt();
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2024 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at:
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package io.nats.client.impl;
15+
16+
import java.io.IOException;
17+
18+
public class ForceReconnectQueueCheckDataPort extends SocketDataPort {
19+
public static String WRITE_CHECK;
20+
public static long DELAY;
21+
22+
@Override
23+
public void write(byte[] src, int toWrite) throws IOException {
24+
String s = new String(src, 0, Math.min(7, toWrite));
25+
if (s.startsWith(WRITE_CHECK)) {
26+
try {
27+
Thread.sleep(DELAY);
28+
}
29+
catch (InterruptedException e) {
30+
throw new RuntimeException(e);
31+
}
32+
}
33+
super.write(src, toWrite);
34+
}
35+
}

0 commit comments

Comments
 (0)