Skip to content

Commit 564ed9b

Browse files
committed
1
1 parent 965c9a7 commit 564ed9b

File tree

6 files changed

+133
-2
lines changed

6 files changed

+133
-2
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import com.datastax.oss.driver.internal.core.session.PoolManager;
8686
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
8787
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
88+
import com.datastax.oss.driver.internal.core.session.SessionRegistry;
8889
import com.datastax.oss.driver.internal.core.ssl.JdkSslHandlerFactory;
8990
import com.datastax.oss.driver.internal.core.ssl.SslHandlerFactory;
9091
import com.datastax.oss.driver.internal.core.tracker.MultiplexingRequestTracker;
@@ -226,6 +227,7 @@ public class DefaultDriverContext implements InternalDriverContext {
226227
private final LazyReference<List<LifecycleListener>> lifecycleListenersRef =
227228
new LazyReference<>("lifecycleListeners", this::buildLifecycleListeners, cycleDetector);
228229

230+
private static SessionRegistry sessionRegistry;
229231
private final DriverConfig config;
230232
private final DriverConfigLoader configLoader;
231233
private final ChannelPoolFactory channelPoolFactory = new ChannelPoolFactory();
@@ -335,6 +337,14 @@ public DefaultDriverContext(
335337
.build());
336338
}
337339

340+
public SessionRegistry getSessionRegistry() {
341+
return sessionRegistry;
342+
}
343+
344+
public static void setSessionRegistry(SessionRegistry registry) {
345+
sessionRegistry = registry;
346+
}
347+
338348
/**
339349
* Builds a map of options to send in a Startup message.
340350
*

core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.datastax.oss.driver.api.core.session.Request;
4040
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
4141
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
42+
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
4243
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
4344
import com.datastax.oss.driver.internal.core.context.LifecycleListener;
4445
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
@@ -336,6 +337,10 @@ private class SingleThreaded {
336337
private boolean forceCloseWasCalled;
337338

338339
private SingleThreaded(InternalDriverContext context, Set<EndPoint> contactPoints) {
340+
if (context instanceof DefaultDriverContext) {
341+
SessionRegistry sessionRegistry = ((DefaultDriverContext) context).getSessionRegistry();
342+
if (sessionRegistry != null) sessionRegistry.registerSession(this);
343+
}
339344
this.context = context;
340345
this.nodeStateManager = new NodeStateManager(context);
341346
this.initialContactPoints = contactPoints;
@@ -656,6 +661,10 @@ private void warnIfFailed(CompletionStage<Void> stage) {
656661
}
657662

658663
private void closePolicies() {
664+
if (context instanceof DefaultDriverContext) {
665+
SessionRegistry sessionRegistry = ((DefaultDriverContext) context).getSessionRegistry();
666+
if (sessionRegistry != null) sessionRegistry.closeSession(this);
667+
}
659668
// This is a bit tricky: we might be closing the session because of an initialization error.
660669
// This error might have been triggered by a policy failing to initialize. If we try to access
661670
// the policy here to close it, it will fail again. So make sure we ignore that error and
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.datastax.oss.driver.internal.core.session;
2+
3+
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
4+
5+
public abstract class SessionRegistry {
6+
public SessionRegistry() {
7+
DefaultDriverContext.setSessionRegistry(this);
8+
}
9+
10+
public abstract void registerSession(Object session);
11+
12+
public abstract void closeSession(Object session);
13+
}

osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/CcmPaxExam.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.datastax.oss.driver.internal.osgi.support;
1919

2020
import com.datastax.oss.driver.api.testinfra.requirement.BackendRequirementRule;
21+
import com.datastax.oss.driver.api.testinfra.session.SessionTracker;
2122
import org.junit.AssumptionViolatedException;
2223
import org.junit.runner.Description;
2324
import org.junit.runner.notification.Failure;
@@ -26,7 +27,6 @@
2627
import org.ops4j.pax.exam.junit.PaxExam;
2728

2829
public class CcmPaxExam extends PaxExam {
29-
3030
public CcmPaxExam(Class<?> klass) throws InitializationError {
3131
super(klass);
3232
}
@@ -36,7 +36,12 @@ public void run(RunNotifier notifier) {
3636
Description description = getDescription();
3737

3838
if (BackendRequirementRule.meetsDescriptionRequirements(description)) {
39-
super.run(notifier);
39+
try {
40+
SessionTracker.testStarted(description.getClassName(), description.getMethodName());
41+
super.run(notifier);
42+
} finally {
43+
SessionTracker.testEnded(description.getClassName(), description.getMethodName());
44+
}
4045
} else {
4146
// requirements not met, throw reasoning assumption to skip test
4247
AssumptionViolatedException e =

test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CustomCcmRule.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
*/
1818
package com.datastax.oss.driver.api.testinfra.ccm;
1919

20+
import com.datastax.oss.driver.api.testinfra.session.SessionTracker;
2021
import java.util.concurrent.atomic.AtomicReference;
22+
import org.junit.runner.Description;
23+
import org.junit.runners.model.Statement;
2124
import org.slf4j.Logger;
2225
import org.slf4j.LoggerFactory;
2326

@@ -39,6 +42,24 @@ public class CustomCcmRule extends BaseCcmRule {
3942
super(ccmBridge);
4043
}
4144

45+
@Override
46+
public Statement apply(Statement base, Description description) {
47+
final Statement statement = super.apply(base, description);
48+
return new Statement() {
49+
final Statement original = statement;
50+
51+
@Override
52+
public void evaluate() throws Throwable {
53+
try {
54+
SessionTracker.testStarted(description.getClassName(), description.getMethodName());
55+
original.evaluate();
56+
} finally {
57+
SessionTracker.testEnded(description.getClassName(), description.getMethodName());
58+
}
59+
}
60+
};
61+
}
62+
4263
@Override
4364
protected void before() {
4465
if (CURRENT.get() == null && CURRENT.compareAndSet(null, this)) {
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.datastax.oss.driver.api.testinfra.session;
2+
3+
import com.datastax.oss.driver.internal.core.session.SessionRegistry;
4+
import java.lang.ref.WeakReference;
5+
import java.util.List;
6+
import java.util.Set;
7+
import java.util.concurrent.ConcurrentSkipListSet;
8+
import java.util.concurrent.CopyOnWriteArrayList;
9+
import java.util.stream.Collectors;
10+
11+
public class SessionTracker {
12+
static final TestSessionRegistry sessionRegistry = new TestSessionRegistry();
13+
14+
private static final Set<String> runningTests = new ConcurrentSkipListSet<>();
15+
16+
public static void testStarted(String className, String methodName) {
17+
runningTests.add(String.format("%s.%s", className, methodName));
18+
}
19+
20+
public static void testEnded(String className, String methodName) {
21+
runningTests.remove(String.format("%s.%s", className, methodName));
22+
if (runningTests.isEmpty()) {
23+
List<TestSessionRegistry.SessionRecord> activeSessions =
24+
sessionRegistry.getActiveSessionsAndForget();
25+
if (!activeSessions.isEmpty()) {
26+
throw new IllegalStateException(
27+
String.format(
28+
"There are active sessions, created in following tests: %s",
29+
activeSessions.stream()
30+
.flatMap(s -> s.sourceTests.stream())
31+
.collect(Collectors.toList())));
32+
}
33+
}
34+
}
35+
36+
private static class TestSessionRegistry extends SessionRegistry {
37+
protected TestSessionRegistry() {
38+
super();
39+
}
40+
41+
public static class SessionRecord {
42+
final WeakReference<Object> session;
43+
final Set<String> sourceTests;
44+
45+
SessionRecord(WeakReference<Object> session, Set<String> sourceTests) {
46+
this.session = session;
47+
this.sourceTests = sourceTests;
48+
}
49+
}
50+
51+
private static final List<SessionRecord> sessions = new CopyOnWriteArrayList<>();
52+
53+
@Override
54+
public void registerSession(Object session) {
55+
sessions.add(
56+
new SessionRecord(
57+
new WeakReference<>(session), runningTests.stream().collect(Collectors.toSet())));
58+
}
59+
60+
@Override
61+
public void closeSession(Object session) {
62+
sessions.removeIf(s -> s.session == session);
63+
}
64+
65+
public List<SessionRecord> getActiveSessionsAndForget() {
66+
// Purge known sessions
67+
sessions.removeIf(ref -> ref.session.get() == null);
68+
return sessions.stream()
69+
.filter(ref -> ref.session.get() == null)
70+
.collect(Collectors.toList());
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)