Skip to content

Commit ff2d7f2

Browse files
CASSJAVA-92: Local DC provided for nodetool clientstats
patch by Lukasz Antoniak; reviewed by Bret McGuire and Abe Ratnofsky for CASSJAVA-92
1 parent 05e6717 commit ff2d7f2

File tree

11 files changed

+252
-46
lines changed

11 files changed

+252
-46
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LoadBalancingPolicy.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
2525
import edu.umd.cs.findbugs.annotations.NonNull;
2626
import edu.umd.cs.findbugs.annotations.Nullable;
27+
import java.util.Collections;
2728
import java.util.Map;
2829
import java.util.Optional;
2930
import java.util.Queue;
@@ -76,6 +77,12 @@ default Optional<RequestTracker> getRequestTracker() {
7677
*/
7778
void init(@NonNull Map<UUID, Node> nodes, @NonNull DistanceReporter distanceReporter);
7879

80+
/** Returns map containing details that impact C* node connectivity. */
81+
@NonNull
82+
default Map<String, ?> getStartupConfiguration() {
83+
return Collections.emptyMap();
84+
}
85+
7986
/**
8087
* Returns the coordinators to use for a new query.
8188
*

core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,8 @@ public class DefaultDriverContext implements InternalDriverContext {
216216
new LazyReference<>("metricIdGenerator", this::buildMetricIdGenerator, cycleDetector);
217217
private final LazyReference<RequestThrottler> requestThrottlerRef =
218218
new LazyReference<>("requestThrottler", this::buildRequestThrottler, cycleDetector);
219-
private final LazyReference<Map<String, String>> startupOptionsRef =
220-
new LazyReference<>("startupOptions", this::buildStartupOptions, cycleDetector);
219+
private final LazyReference<StartupOptionsBuilder> startupOptionsRef =
220+
new LazyReference<>("startupOptionsFactory", this::buildStartupOptionsFactory, cycleDetector);
221221
private final LazyReference<NodeStateListener> nodeStateListenerRef;
222222
private final LazyReference<SchemaChangeListener> schemaChangeListenerRef;
223223
private final LazyReference<RequestTracker> requestTrackerRef;
@@ -335,16 +335,15 @@ public DefaultDriverContext(
335335
}
336336

337337
/**
338-
* Builds a map of options to send in a Startup message.
338+
* Returns builder of options to send in a Startup message.
339339
*
340340
* @see #getStartupOptions()
341341
*/
342-
protected Map<String, String> buildStartupOptions() {
342+
protected StartupOptionsBuilder buildStartupOptionsFactory() {
343343
return new StartupOptionsBuilder(this)
344344
.withClientId(startupClientId)
345345
.withApplicationName(startupApplicationName)
346-
.withApplicationVersion(startupApplicationVersion)
347-
.build();
346+
.withApplicationVersion(startupApplicationVersion);
348347
}
349348

350349
protected Map<String, LoadBalancingPolicy> buildLoadBalancingPolicies() {
@@ -1013,7 +1012,8 @@ public ProtocolVersion getProtocolVersion() {
10131012
@NonNull
10141013
@Override
10151014
public Map<String, String> getStartupOptions() {
1016-
return startupOptionsRef.get();
1015+
// startup options are calculated dynamically and may vary per connection
1016+
return startupOptionsRef.get().build();
10171017
}
10181018

10191019
protected RequestLogFormatter buildRequestLogFormatter() {

core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,34 @@
1919

2020
import com.datastax.dse.driver.api.core.config.DseDriverOption;
2121
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
22+
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
2223
import com.datastax.oss.driver.api.core.session.Session;
2324
import com.datastax.oss.driver.api.core.uuid.Uuids;
25+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
2426
import com.datastax.oss.protocol.internal.request.Startup;
2527
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
28+
import com.fasterxml.jackson.databind.ObjectMapper;
2629
import edu.umd.cs.findbugs.annotations.Nullable;
2730
import java.util.Map;
31+
import java.util.Optional;
2832
import java.util.UUID;
2933
import net.jcip.annotations.Immutable;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3036

3137
@Immutable
3238
public class StartupOptionsBuilder {
3339

3440
public static final String DRIVER_NAME_KEY = "DRIVER_NAME";
3541
public static final String DRIVER_VERSION_KEY = "DRIVER_VERSION";
42+
public static final String DRIVER_BAGGAGE = "DRIVER_BAGGAGE";
3643
public static final String APPLICATION_NAME_KEY = "APPLICATION_NAME";
3744
public static final String APPLICATION_VERSION_KEY = "APPLICATION_VERSION";
3845
public static final String CLIENT_ID_KEY = "CLIENT_ID";
3946

47+
private static final Logger LOG = LoggerFactory.getLogger(StartupOptionsBuilder.class);
48+
private static final ObjectMapper mapper = new ObjectMapper();
49+
4050
protected final InternalDriverContext context;
4151
private UUID clientId;
4252
private String applicationName;
@@ -119,6 +129,7 @@ public Map<String, String> build() {
119129
if (applicationVersion != null) {
120130
builder.put(APPLICATION_VERSION_KEY, applicationVersion);
121131
}
132+
driverBaggage().ifPresent(s -> builder.put(DRIVER_BAGGAGE, s));
122133

123134
return builder.build();
124135
}
@@ -142,4 +153,21 @@ protected String getDriverName() {
142153
protected String getDriverVersion() {
143154
return Session.OSS_DRIVER_COORDINATES.getVersion().toString();
144155
}
156+
157+
private Optional<String> driverBaggage() {
158+
ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
159+
for (Map.Entry<String, LoadBalancingPolicy> entry :
160+
context.getLoadBalancingPolicies().entrySet()) {
161+
Map<String, ?> config = entry.getValue().getStartupConfiguration();
162+
if (!config.isEmpty()) {
163+
builder.put(entry.getKey(), config);
164+
}
165+
}
166+
try {
167+
return Optional.of(mapper.writeValueAsString(builder.build()));
168+
} catch (Exception e) {
169+
LOG.warn("Failed to construct startup driver baggage", e);
170+
return Optional.empty();
171+
}
172+
}
145173
}

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
4646
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
4747
import com.datastax.oss.driver.shaded.guava.common.base.Predicates;
48+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
4849
import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
4950
import com.datastax.oss.driver.shaded.guava.common.collect.Sets;
5051
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -155,10 +156,38 @@ public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String
155156
* Before initialization, this method always returns null.
156157
*/
157158
@Nullable
158-
protected String getLocalDatacenter() {
159+
public String getLocalDatacenter() {
159160
return localDc;
160161
}
161162

163+
@NonNull
164+
@Override
165+
public Map<String, ?> getStartupConfiguration() {
166+
ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
167+
if (localDc != null) {
168+
builder.put("localDc", localDc);
169+
} else {
170+
// Local data center may not be discovered prior to connection pool initialization.
171+
// In such scenario, return configured local data center name.
172+
// Note that when using DC inferring load balancing policy, startup configuration
173+
// may not show local DC name, because it will be discovered only once control connection
174+
// is established and datacenter of contact points known.
175+
Optional<String> configuredDc =
176+
new OptionalLocalDcHelper(context, profile, logPrefix).configuredLocalDc();
177+
configuredDc.ifPresent(d -> builder.put("localDc", d));
178+
}
179+
if (!preferredRemoteDcs.isEmpty()) {
180+
builder.put("preferredRemoteDcs", preferredRemoteDcs);
181+
}
182+
if (allowDcFailoverForLocalCl) {
183+
builder.put("allowDcFailoverForLocalCl", allowDcFailoverForLocalCl);
184+
}
185+
if (maxNodesPerRemoteDc > 0) {
186+
builder.put("maxNodesPerRemoteDc", maxNodesPerRemoteDc);
187+
}
188+
return ImmutableMap.of(BasicLoadBalancingPolicy.class.getSimpleName(), builder.build());
189+
}
190+
162191
/** @return The nodes currently considered as live. */
163192
protected NodeSet getLiveNodes() {
164193
return liveNodes;

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
3535
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
3636
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
37+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
3738
import com.datastax.oss.driver.shaded.guava.common.collect.MapMaker;
3839
import edu.umd.cs.findbugs.annotations.NonNull;
3940
import edu.umd.cs.findbugs.annotations.Nullable;
@@ -350,4 +351,13 @@ private boolean hasSufficientResponses(long now) {
350351
return this.oldest - threshold >= 0;
351352
}
352353
}
354+
355+
@NonNull
356+
@Override
357+
public Map<String, ?> getStartupConfiguration() {
358+
Map<String, ?> parent = super.getStartupConfiguration();
359+
return ImmutableMap.of(
360+
DefaultLoadBalancingPolicy.class.getSimpleName(),
361+
parent.get(BasicLoadBalancingPolicy.class.getSimpleName()));
362+
}
353363
}

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/helper/OptionalLocalDcHelper.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,14 @@ public OptionalLocalDcHelper(
6565
@Override
6666
@NonNull
6767
public Optional<String> discoverLocalDc(@NonNull Map<UUID, Node> nodes) {
68-
String localDc = context.getLocalDatacenter(profile.getName());
69-
if (localDc != null) {
70-
LOG.debug("[{}] Local DC set programmatically: {}", logPrefix, localDc);
71-
checkLocalDatacenterCompatibility(localDc, context.getMetadataManager().getContactPoints());
72-
return Optional.of(localDc);
73-
} else if (profile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)) {
74-
localDc = profile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER);
75-
LOG.debug("[{}] Local DC set from configuration: {}", logPrefix, localDc);
76-
checkLocalDatacenterCompatibility(localDc, context.getMetadataManager().getContactPoints());
77-
return Optional.of(localDc);
68+
Optional<String> localDc = configuredLocalDc();
69+
if (localDc.isPresent()) {
70+
checkLocalDatacenterCompatibility(
71+
localDc.get(), context.getMetadataManager().getContactPoints());
7872
} else {
7973
LOG.debug("[{}] Local DC not set, DC awareness will be disabled", logPrefix);
80-
return Optional.empty();
8174
}
75+
return localDc;
8276
}
8377

8478
/**
@@ -138,4 +132,19 @@ protected String formatDcs(Iterable<? extends Node> nodes) {
138132
}
139133
return String.join(", ", new TreeSet<>(l));
140134
}
135+
136+
/** @return Local data center set programmatically or from configuration file. */
137+
@NonNull
138+
public Optional<String> configuredLocalDc() {
139+
String localDc = context.getLocalDatacenter(profile.getName());
140+
if (localDc != null) {
141+
LOG.debug("[{}] Local DC set programmatically: {}", logPrefix, localDc);
142+
return Optional.of(localDc);
143+
} else if (profile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)) {
144+
localDc = profile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER);
145+
LOG.debug("[{}] Local DC set from configuration: {}", logPrefix, localDc);
146+
return Optional.of(localDc);
147+
}
148+
return Optional.empty();
149+
}
141150
}

core/src/test/java/com/datastax/oss/driver/internal/core/addresstranslation/FixedHostNameAddressTranslatorTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
2727
import com.datastax.oss.driver.internal.core.context.MockedDriverContextFactory;
2828
import java.net.InetSocketAddress;
29-
import java.util.Optional;
3029
import org.junit.Test;
3130

3231
public class FixedHostNameAddressTranslatorTest {
@@ -36,7 +35,7 @@ public void should_translate_address() {
3635
DriverExecutionProfile defaultProfile = mock(DriverExecutionProfile.class);
3736
when(defaultProfile.getString(ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME)).thenReturn("myaddress");
3837
DefaultDriverContext defaultDriverContext =
39-
MockedDriverContextFactory.defaultDriverContext(Optional.of(defaultProfile));
38+
MockedDriverContextFactory.defaultDriverContext(defaultProfile);
4039

4140
FixedHostNameAddressTranslator translator =
4241
new FixedHostNameAddressTranslator(defaultDriverContext);

core/src/test/java/com/datastax/oss/driver/internal/core/addresstranslation/SubnetAddressTranslatorTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
3131
import java.net.InetSocketAddress;
3232
import java.util.Map;
33-
import java.util.Optional;
3433
import org.junit.Test;
3534

3635
@SuppressWarnings("resource")
@@ -148,6 +147,6 @@ public void should_fail_on_default_address_without_port() {
148147
private static DefaultDriverContext context(Map<String, String> subnetAddresses) {
149148
DriverExecutionProfile profile = mock(DriverExecutionProfile.class);
150149
when(profile.getStringMap(ADDRESS_TRANSLATOR_SUBNET_ADDRESSES)).thenReturn(subnetAddresses);
151-
return MockedDriverContextFactory.defaultDriverContext(Optional.of(profile));
150+
return MockedDriverContextFactory.defaultDriverContext(profile);
152151
}
153152
}

core/src/test/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContextTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ private DefaultDriverContext buildMockedContext(Optional<String> compressionOpti
4242
DriverExecutionProfile defaultProfile = mock(DriverExecutionProfile.class);
4343
when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none"))
4444
.thenReturn(compressionOption.orElse("none"));
45-
return MockedDriverContextFactory.defaultDriverContext(Optional.of(defaultProfile));
45+
return MockedDriverContextFactory.defaultDriverContext(defaultProfile);
4646
}
4747

4848
private void doCreateCompressorTest(Optional<String> configVal, Class<?> expectedClz) {

0 commit comments

Comments
 (0)