diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java index 5c29a9b704b..2d0f5774ecc 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java @@ -228,7 +228,7 @@ private void processSchemaChange(Event event) { SchemaChangeEvent sce = (SchemaChangeEvent) event; context .getMetadataManager() - .refreshSchema(sce.keyspace, false, false) + .refreshSchema(sce.keyspace, false, false, null) .whenComplete( (metadata, error) -> { if (error != null) { @@ -479,7 +479,7 @@ private void onSuccessfulReconnect() { context.getLoadBalancingPolicyWrapper().init(); context .getMetadataManager() - .refreshSchema(null, false, true) + .refreshSchema(null, false, true, null) .whenComplete( (metadata, schemaError) -> { if (schemaError != null) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index 0808bdce63f..7a96a691dda 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -615,7 +615,7 @@ public void onResponse(Frame responseFrame) { SchemaChange schemaChange = (SchemaChange) responseMessage; context .getMetadataManager() - .refreshSchema(schemaChange.keyspace, false, false) + .refreshSchema(schemaChange.keyspace, false, false, channel) .whenComplete( (result, error) -> { boolean schemaInAgreement; diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java index f3dc988cfbc..1f4bb229258 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java @@ -228,11 +228,17 @@ public CompletionStage> refreshNodeList() { @Override public CompletionStage checkSchemaAgreement() { + return checkSchemaAgreement(null); + } + + @Override + public CompletionStage checkSchemaAgreement(DriverChannel channel) { if (closeFuture.isDone()) { return CompletableFuture.completedFuture(true); } - DriverChannel channel = controlConnection.channel(); - return new SchemaAgreementChecker(channel, context, logPrefix).run(); + return new SchemaAgreementChecker( + channel == null ? controlConnection.channel() : channel, context, logPrefix) + .run(); } @NonNull diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java index efb04bde5e1..8ead3aab8bf 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java @@ -23,6 +23,7 @@ import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.channel.DriverChannel; import com.datastax.oss.driver.internal.core.config.ConfigChangeEvent; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.control.ControlConnection; @@ -112,7 +113,7 @@ private void onConfigChanged(@SuppressWarnings("unused") ConfigChangeEvent event || !keyspacesBefore.equals(refreshedKeyspaces) || (!tokenMapEnabledBefore && tokenMapEnabled)) && isSchemaEnabled()) { - refreshSchema(null, false, true) + refreshSchema(null, false, true, null) .whenComplete( (metadata, error) -> { if (error != null) { @@ -226,10 +227,11 @@ public void removeNode(InetSocketAddress broadcastRpcAddress) { * request) * @param flushNow bypass the debouncer and force an immediate refresh (used to avoid a delay at * startup) + * @param channel */ public CompletionStage refreshSchema( - String keyspace, boolean evenIfDisabled, boolean flushNow) { - CompletableFuture future = new CompletableFuture<>(); + String keyspace, boolean evenIfDisabled, boolean flushNow, DriverChannel channel) { + ChannelFuture future = new ChannelFuture<>(channel); RunOrSchedule.on( adminExecutor, () -> singleThreaded.refreshSchema(keyspace, evenIfDisabled, flushNow, future)); @@ -271,7 +273,7 @@ public CompletionStage setSchemaEnabled(Boolean newValue) { boolean wasEnabledBefore = isSchemaEnabled(); schemaEnabledProgrammatically = newValue; if (!wasEnabledBefore && isSchemaEnabled()) { - return refreshSchema(null, false, true).thenApply(RefreshSchemaResult::getMetadata); + return refreshSchema(null, false, true, null).thenApply(RefreshSchemaResult::getMetadata); } else { return CompletableFuture.completedFuture(metadata); } @@ -296,20 +298,38 @@ public CompletionStage forceCloseAsync() { return this.closeAsync(); } + private static class ChannelFuture extends CompletableFuture { + private DriverChannel channel; + + public ChannelFuture(DriverChannel channel) { + this.channel = channel; + } + + public DriverChannel getChannel() { + return channel; + } + + public static void completeFrom(ChannelFuture source, ChannelFuture target) { + if (source.getChannel() == null) { + source.channel = target.getChannel(); + } + CompletableFutures.completeFrom(source, target); + } + } + private class SingleThreaded { private final CompletableFuture closeFuture = new CompletableFuture<>(); private boolean closeWasCalled; private final CompletableFuture firstSchemaRefreshFuture = new CompletableFuture<>(); - private final Debouncer< - CompletableFuture, CompletableFuture> + private final Debouncer, ChannelFuture> schemaRefreshDebouncer; private final SchemaQueriesFactory schemaQueriesFactory; private final SchemaParserFactory schemaParserFactory; // We don't allow concurrent schema refreshes. If one is already running, the next one is queued // (and the ones after that are merged with the queued one). - private CompletableFuture currentSchemaRefresh; - private CompletableFuture queuedSchemaRefresh; + private ChannelFuture currentSchemaRefresh; + private ChannelFuture queuedSchemaRefresh; private boolean didFirstNodeListRefresh; @@ -369,7 +389,7 @@ private void refreshSchema( String keyspace, boolean evenIfDisabled, boolean flushNow, - CompletableFuture future) { + ChannelFuture future) { if (!didFirstNodeListRefresh) { // This happen if the control connection receives a schema event during init. We can't @@ -390,8 +410,7 @@ private void refreshSchema( } // An external component has requested a schema refresh, feed it to the debouncer. - private void acceptSchemaRequest( - CompletableFuture future, boolean flushNow) { + private void acceptSchemaRequest(ChannelFuture future, boolean flushNow) { assert adminExecutor.inEventLoop(); if (closeWasCalled) { future.complete(new RefreshSchemaResult(metadata)); @@ -404,24 +423,24 @@ private void acceptSchemaRequest( } // Multiple requests have arrived within the debouncer window, coalesce them. - private CompletableFuture coalesceSchemaRequests( - List> futures) { + private ChannelFuture coalesceSchemaRequests( + List> futures) { assert adminExecutor.inEventLoop(); assert !futures.isEmpty(); // Keep only one, but ensure that the discarded ones will still be completed when we're done - CompletableFuture result = null; - for (CompletableFuture future : futures) { + ChannelFuture result = null; + for (ChannelFuture future : futures) { if (result == null) { result = future; } else { - CompletableFutures.completeFrom(result, future); + ChannelFuture.completeFrom(result, future); } } return result; } // The debouncer has flushed, start the actual work. - private void startSchemaRequest(CompletableFuture refreshFuture) { + private void startSchemaRequest(ChannelFuture refreshFuture) { assert adminExecutor.inEventLoop(); if (closeWasCalled) { refreshFuture.complete(new RefreshSchemaResult(metadata)); @@ -431,7 +450,8 @@ private void startSchemaRequest(CompletableFuture refreshFu currentSchemaRefresh = refreshFuture; LOG.debug("[{}] Starting schema refresh", logPrefix); initControlConnectionForSchema() - .thenCompose(v -> context.getTopologyMonitor().checkSchemaAgreement()) + .thenCompose( + v -> context.getTopologyMonitor().checkSchemaAgreement(refreshFuture.getChannel())) .whenComplete( (schemaInAgreement, agreementError) -> { if (agreementError != null) { @@ -456,8 +476,7 @@ private void startSchemaRequest(CompletableFuture refreshFu currentSchemaRefresh = null; // If another refresh was enqueued during this one, run it now if (queuedSchemaRefresh != null) { - CompletableFuture tmp = - this.queuedSchemaRefresh; + ChannelFuture tmp = this.queuedSchemaRefresh; this.queuedSchemaRefresh = null; startSchemaRequest(tmp); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java index e7741f11196..0dbf1a750c8 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/TopologyMonitor.java @@ -21,6 +21,7 @@ import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Session; +import com.datastax.oss.driver.internal.core.channel.DriverChannel; import com.datastax.oss.driver.internal.core.context.EventBus; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import java.net.InetSocketAddress; @@ -119,4 +120,8 @@ public interface TopologyMonitor extends AsyncAutoCloseable { * take a while to replicate across nodes. */ CompletionStage checkSchemaAgreement(); + + default CompletionStage checkSchemaAgreement(DriverChannel channel) { + return checkSchemaAgreement(); + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java b/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java index b795c30fce7..58bebd91024 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java @@ -178,7 +178,7 @@ public CompletionStage setSchemaMetadataEnabled(@Nullable Boolean newV @Override public CompletionStage refreshSchemaAsync() { return metadataManager - .refreshSchema(null, true, true) + .refreshSchema(null, true, true, null) .thenApply(RefreshSchemaResult::getMetadata); } @@ -453,7 +453,7 @@ private CompletionStage checkProtocolVersion() { private CompletionStage initialSchemaRefresh() { try { return metadataManager - .refreshSchema(null, false, true) + .refreshSchema(null, false, true, null) .exceptionally( error -> { Loggers.warnWithException( diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java index cb83b523ebe..6ed5fb35f8b 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java @@ -152,6 +152,6 @@ public void should_process_schema_change_events() { callback.onEvent(event); // Then - verify(metadataManager).refreshSchema("ks", false, false); + verify(metadataManager).refreshSchema("ks", false, false, null); } } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java index c52199465a8..080a0714bff 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java @@ -20,6 +20,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; @@ -123,7 +124,8 @@ public void setup() { mockQueryPlan(node1, node2); when(metadataManager.refreshNodes()).thenReturn(CompletableFuture.completedFuture(null)); - when(metadataManager.refreshSchema(anyString(), anyBoolean(), anyBoolean())) + when(metadataManager.refreshSchema( + anyString(), anyBoolean(), anyBoolean(), nullable(DriverChannel.class))) .thenReturn(CompletableFuture.completedFuture(null)); when(context.getMetadataManager()).thenReturn(metadataManager); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java index f9a909400f9..a8986c339ff 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java @@ -21,6 +21,7 @@ import static com.datastax.oss.driver.Assertions.assertThatStage; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -31,6 +32,7 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.channel.DriverChannel; import com.datastax.oss.driver.internal.core.context.EventBus; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.context.NettyOptions; @@ -297,7 +299,7 @@ public void refreshSchema_should_work() { when(schemaQueriesFactory.newInstance()).thenThrow(expectedException); when(topologyMonitor.refreshNodeList()) .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class)))); - when(topologyMonitor.checkSchemaAgreement()) + when(topologyMonitor.checkSchemaAgreement(nullable(DriverChannel.class))) .thenReturn(CompletableFuture.completedFuture(Boolean.TRUE)); when(controlConnection.init(anyBoolean(), anyBoolean(), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(null)); @@ -306,7 +308,7 @@ public void refreshSchema_should_work() { // When CompletionStage result = - metadataManager.refreshSchema("foo", true, true); + metadataManager.refreshSchema("foo", true, true, null); // Then waitForPendingAdminTasks(() -> result.toCompletableFuture().isDone()); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/session/DefaultSessionPoolsTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/session/DefaultSessionPoolsTest.java index 58d1783038d..112ff3bf486 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/session/DefaultSessionPoolsTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/session/DefaultSessionPoolsTest.java @@ -130,7 +130,7 @@ public void setup() { // Init sequence: when(metadataManager.refreshNodes()).thenReturn(CompletableFuture.completedFuture(null)); - when(metadataManager.refreshSchema(null, false, true)) + when(metadataManager.refreshSchema(null, false, true, null)) .thenReturn(CompletableFuture.completedFuture(null)); when(context.getMetadataManager()).thenReturn(metadataManager);