Skip to content

Commit 8bc9688

Browse files
authored
Merge pull request #1396 from nats-io/service-flapper
Fix test flappers
2 parents 15c17db + b44195c commit 8bc9688

File tree

4 files changed

+51
-13
lines changed

4 files changed

+51
-13
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ test {
8585
}
8686
retry {
8787
failOnPassedAfterRetry = false
88-
maxFailures = 2
89-
maxRetries = 2
88+
maxFailures = 4
89+
maxRetries = 4
9090
}
9191
maxParallelForks = Runtime.runtime.availableProcessors()
9292
systemProperty 'junit.jupiter.execution.timeout.default', '3m'

src/main/java/io/nats/service/Service.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@
2424
import java.util.Arrays;
2525
import java.util.Collection;
2626
import java.util.List;
27-
import java.util.concurrent.CompletableFuture;
28-
import java.util.concurrent.ConcurrentHashMap;
29-
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.*;
28+
import java.util.concurrent.atomic.AtomicReference;
3029
import java.util.concurrent.locks.ReentrantLock;
3130

3231
import static io.nats.client.support.ApiConstants.*;
@@ -50,19 +49,22 @@ public class Service {
5049
private final ConcurrentHashMap<String, EndpointContext> serviceContexts;
5150
private final List<EndpointContext> discoveryContexts;
5251
private final List<Dispatcher> dInternals;
52+
private final AtomicReference<ZonedDateTime> startTimeRef;
53+
private final CompletableFuture<Boolean> startedFuture;
5354
private final PingResponse pingResponse;
5455
private final InfoResponse infoResponse;
5556

5657
private final ReentrantLock startStopLock;
5758
private CompletableFuture<Boolean> runningIndicator;
58-
private ZonedDateTime started;
5959

6060
Service(ServiceBuilder b) {
6161
String id = new io.nats.client.NUID().next();
6262
conn = b.conn;
6363
drainTimeout = b.drainTimeout;
6464
dInternals = new ArrayList<>();
6565
startStopLock = new ReentrantLock();
66+
startTimeRef = new AtomicReference<>(DateTimeUtils.DEFAULT_TIME);
67+
startedFuture = new CompletableFuture<>();
6668

6769
// build responses first. info needs to be available when adding service endpoints.
6870
pingResponse = new PingResponse(id, b.name, b.version, b.metadata);
@@ -189,7 +191,8 @@ public CompletableFuture<Boolean> startService() {
189191
for (EndpointContext ctx : discoveryContexts) {
190192
ctx.start();
191193
}
192-
started = DateTimeUtils.gmtNow();
194+
startTimeRef.set(DateTimeUtils.gmtNow());
195+
startedFuture.complete(true);
193196
}
194197
return runningIndicator;
195198
}
@@ -302,7 +305,10 @@ public void stop(boolean drain, Throwable t) {
302305
* Reset the statistics for the endpoints
303306
*/
304307
public void reset() {
305-
started = DateTimeUtils.gmtNow();
308+
if (isStarted()) {
309+
// has actually been started if the ref has been set
310+
startTimeRef.set(DateTimeUtils.gmtNow());
311+
}
306312
for (EndpointContext c : discoveryContexts) {
307313
c.reset();
308314
}
@@ -343,6 +349,33 @@ public String getDescription() {
343349
return infoResponse.getDescription();
344350
}
345351

352+
/**
353+
* Get whether the service has full started
354+
* @return true if started
355+
*/
356+
public boolean isStarted() {
357+
return startedFuture.isDone();
358+
}
359+
360+
/**
361+
* Get
362+
* @param timeout the maximum time to wait
363+
* @param unit the time unit of the timeout argument
364+
* @return true if started by the timeout
365+
*/
366+
public boolean isStarted(long timeout, TimeUnit unit) {
367+
try {
368+
return startedFuture.get(timeout, unit);
369+
}
370+
catch (InterruptedException e) {
371+
Thread.currentThread().interrupt();
372+
return false;
373+
}
374+
catch (ExecutionException | TimeoutException e) {
375+
return false;
376+
}
377+
}
378+
346379
/**
347380
* Get the drain timeout setting
348381
* @return the drain timeout setting
@@ -376,7 +409,8 @@ public StatsResponse getStatsResponse() {
376409
for (EndpointContext c : serviceContexts.values()) {
377410
endpointStats.add(c.getEndpointStats());
378411
}
379-
return new StatsResponse(pingResponse, started, endpointStats);
412+
// StatsResponse handles a start time of DateTimeUtils.DEFAULT_TIME
413+
return new StatsResponse(pingResponse, startTimeRef.get(), endpointStats);
380414
}
381415

382416
/**
@@ -396,6 +430,7 @@ public String toString() {
396430
JsonUtils.addField(sb, NAME, infoResponse.getName());
397431
JsonUtils.addField(sb, VERSION, infoResponse.getVersion());
398432
JsonUtils.addField(sb, DESCRIPTION, infoResponse.getDescription());
433+
JsonUtils.addField(sb, STARTED, startTimeRef.get());
399434
return endJson(sb).toString();
400435
}
401436
}

src/test/java/io/nats/client/impl/SimplificationTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1728,7 +1728,7 @@ public void testReconnectOverOrdered() throws Exception {
17281728

17291729
ConsumeOptions consumeOptions = ConsumeOptions.builder()
17301730
.batchSize(100) // small batch size means more round trips
1731-
.expiresIn(2000) // idle heartbeat is half of this, alarm time is 3 * ihb
1731+
.expiresIn(1500) // idle heartbeat is half of this, alarm time is 3 * ihb
17321732
.build();
17331733

17341734
OrderedConsumerConfiguration ocConfig = new OrderedConsumerConfiguration().filterSubjects(subject);
@@ -1749,7 +1749,7 @@ public void testReconnectOverOrdered() throws Exception {
17491749
// reconnect and get some more messages
17501750
try (NatsTestServer ignored = new NatsTestServer(port, false, true)) {
17511751
standardConnectionWait(nc);
1752-
sleep(5000); // long enough to get messages and for the hb alarm to have tripped
1752+
sleep(6000); // long enough to get messages and for the hb alarm to have tripped
17531753
}
17541754
validateConsumerNameForOrdered(orderedConsumerContext, mcon, null);
17551755
assertNotEquals(firstConsumerName, orderedConsumerContext.getConsumerName());
@@ -1759,10 +1759,10 @@ public void testReconnectOverOrdered() throws Exception {
17591759
assertTrue(count2 > count1);
17601760
assertEquals(count2, nextExpectedSequence.get());
17611761

1762-
sleep(4000); // enough delay before reconnect to trip hb alarm again
1762+
sleep(6000); // enough delay before reconnect to trip hb alarm again
17631763
try (NatsTestServer ignored = new NatsTestServer(port, false, true)) {
17641764
standardConnectionWait(nc);
1765-
sleep(4000); // long enough to get messages and for the hb alarm to have tripped
1765+
sleep(6000); // long enough to get messages and for the hb alarm to have tripped
17661766

17671767
try {
17681768
nc.jetStreamManagement().deleteStream(stream); // it was a file stream clean it up

src/test/java/io/nats/service/ServiceTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,9 @@ public void testResponsesFromAllInstances() throws Exception {
600600
service1.startService();
601601
service2.startService();
602602

603+
assertTrue(service1.isStarted(1, TimeUnit.SECONDS));
604+
assertTrue(service2.isStarted(1, TimeUnit.SECONDS));
605+
603606
Discovery discovery = new Discovery(clientNc);
604607

605608
List<PingResponse> prs = discovery.ping();

0 commit comments

Comments
 (0)