Skip to content

CASSJAVA-69 Fix Schema agreement check after DDL query is unreliable #2026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 4.x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -228,7 +228,7 @@ private void processSchemaChange(Event event) {
SchemaChangeEvent sce = (SchemaChangeEvent) event;
context
.getMetadataManager()
.refreshSchema(sce.keyspace, false, false)
.refreshSchema(sce.keyspace, false, false, null)
.whenComplete(
(metadata, error) -> {
if (error != null) {
@@ -479,7 +479,7 @@ private void onSuccessfulReconnect() {
context.getLoadBalancingPolicyWrapper().init();
context
.getMetadataManager()
.refreshSchema(null, false, true)
.refreshSchema(null, false, true, null)
.whenComplete(
(metadata, schemaError) -> {
if (schemaError != null) {
Original file line number Diff line number Diff line change
@@ -615,7 +615,7 @@ public void onResponse(Frame responseFrame) {
SchemaChange schemaChange = (SchemaChange) responseMessage;
context
.getMetadataManager()
.refreshSchema(schemaChange.keyspace, false, false)
.refreshSchema(schemaChange.keyspace, false, false, channel)
.whenComplete(
(result, error) -> {
boolean schemaInAgreement;
Original file line number Diff line number Diff line change
@@ -228,11 +228,17 @@ public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {

@Override
public CompletionStage<Boolean> checkSchemaAgreement() {
return checkSchemaAgreement(null);
}

@Override
public CompletionStage<Boolean> checkSchemaAgreement(DriverChannel channel) {
if (closeFuture.isDone()) {
return CompletableFuture.completedFuture(true);
}
DriverChannel channel = controlConnection.channel();
return new SchemaAgreementChecker(channel, context, logPrefix).run();
return new SchemaAgreementChecker(
channel == null ? controlConnection.channel() : channel, context, logPrefix)
.run();
}

@NonNull
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.config.ConfigChangeEvent;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.control.ControlConnection;
@@ -112,7 +113,7 @@ private void onConfigChanged(@SuppressWarnings("unused") ConfigChangeEvent event
|| !keyspacesBefore.equals(refreshedKeyspaces)
|| (!tokenMapEnabledBefore && tokenMapEnabled))
&& isSchemaEnabled()) {
refreshSchema(null, false, true)
refreshSchema(null, false, true, null)
.whenComplete(
(metadata, error) -> {
if (error != null) {
@@ -226,10 +227,11 @@ public void removeNode(InetSocketAddress broadcastRpcAddress) {
* request)
* @param flushNow bypass the debouncer and force an immediate refresh (used to avoid a delay at
* startup)
* @param channel
*/
public CompletionStage<RefreshSchemaResult> refreshSchema(
String keyspace, boolean evenIfDisabled, boolean flushNow) {
CompletableFuture<RefreshSchemaResult> future = new CompletableFuture<>();
String keyspace, boolean evenIfDisabled, boolean flushNow, DriverChannel channel) {
ChannelFuture<RefreshSchemaResult> future = new ChannelFuture<>(channel);
RunOrSchedule.on(
adminExecutor,
() -> singleThreaded.refreshSchema(keyspace, evenIfDisabled, flushNow, future));
@@ -271,7 +273,7 @@ public CompletionStage<Metadata> setSchemaEnabled(Boolean newValue) {
boolean wasEnabledBefore = isSchemaEnabled();
schemaEnabledProgrammatically = newValue;
if (!wasEnabledBefore && isSchemaEnabled()) {
return refreshSchema(null, false, true).thenApply(RefreshSchemaResult::getMetadata);
return refreshSchema(null, false, true, null).thenApply(RefreshSchemaResult::getMetadata);
} else {
return CompletableFuture.completedFuture(metadata);
}
@@ -296,20 +298,38 @@ public CompletionStage<Void> forceCloseAsync() {
return this.closeAsync();
}

private static class ChannelFuture<T> extends CompletableFuture<T> {
private DriverChannel channel;

public ChannelFuture(DriverChannel channel) {
this.channel = channel;
}

public DriverChannel getChannel() {
return channel;
}

public static <T> void completeFrom(ChannelFuture<T> source, ChannelFuture<T> target) {
if (source.getChannel() == null) {
source.channel = target.getChannel();
}
CompletableFutures.completeFrom(source, target);
}
}

private class SingleThreaded {
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private boolean closeWasCalled;
private final CompletableFuture<Void> firstSchemaRefreshFuture = new CompletableFuture<>();
private final Debouncer<
CompletableFuture<RefreshSchemaResult>, CompletableFuture<RefreshSchemaResult>>
private final Debouncer<ChannelFuture<RefreshSchemaResult>, ChannelFuture<RefreshSchemaResult>>
schemaRefreshDebouncer;
private final SchemaQueriesFactory schemaQueriesFactory;
private final SchemaParserFactory schemaParserFactory;

// We don't allow concurrent schema refreshes. If one is already running, the next one is queued
// (and the ones after that are merged with the queued one).
private CompletableFuture<RefreshSchemaResult> currentSchemaRefresh;
private CompletableFuture<RefreshSchemaResult> queuedSchemaRefresh;
private ChannelFuture<RefreshSchemaResult> currentSchemaRefresh;
private ChannelFuture<RefreshSchemaResult> queuedSchemaRefresh;

private boolean didFirstNodeListRefresh;

@@ -369,7 +389,7 @@ private void refreshSchema(
String keyspace,
boolean evenIfDisabled,
boolean flushNow,
CompletableFuture<RefreshSchemaResult> future) {
ChannelFuture<RefreshSchemaResult> future) {

if (!didFirstNodeListRefresh) {
// This happen if the control connection receives a schema event during init. We can't
@@ -390,8 +410,7 @@ private void refreshSchema(
}

// An external component has requested a schema refresh, feed it to the debouncer.
private void acceptSchemaRequest(
CompletableFuture<RefreshSchemaResult> future, boolean flushNow) {
private void acceptSchemaRequest(ChannelFuture<RefreshSchemaResult> future, boolean flushNow) {
assert adminExecutor.inEventLoop();
if (closeWasCalled) {
future.complete(new RefreshSchemaResult(metadata));
@@ -404,24 +423,24 @@ private void acceptSchemaRequest(
}

// Multiple requests have arrived within the debouncer window, coalesce them.
private CompletableFuture<RefreshSchemaResult> coalesceSchemaRequests(
List<CompletableFuture<RefreshSchemaResult>> futures) {
private ChannelFuture<RefreshSchemaResult> coalesceSchemaRequests(
List<ChannelFuture<RefreshSchemaResult>> futures) {
assert adminExecutor.inEventLoop();
assert !futures.isEmpty();
// Keep only one, but ensure that the discarded ones will still be completed when we're done
CompletableFuture<RefreshSchemaResult> result = null;
for (CompletableFuture<RefreshSchemaResult> future : futures) {
ChannelFuture<RefreshSchemaResult> result = null;
for (ChannelFuture<RefreshSchemaResult> future : futures) {
if (result == null) {
result = future;
} else {
CompletableFutures.completeFrom(result, future);
ChannelFuture.completeFrom(result, future);
}
}
return result;
}

// The debouncer has flushed, start the actual work.
private void startSchemaRequest(CompletableFuture<RefreshSchemaResult> refreshFuture) {
private void startSchemaRequest(ChannelFuture<RefreshSchemaResult> refreshFuture) {
assert adminExecutor.inEventLoop();
if (closeWasCalled) {
refreshFuture.complete(new RefreshSchemaResult(metadata));
@@ -431,7 +450,8 @@ private void startSchemaRequest(CompletableFuture<RefreshSchemaResult> refreshFu
currentSchemaRefresh = refreshFuture;
LOG.debug("[{}] Starting schema refresh", logPrefix);
initControlConnectionForSchema()
.thenCompose(v -> context.getTopologyMonitor().checkSchemaAgreement())
.thenCompose(
v -> context.getTopologyMonitor().checkSchemaAgreement(refreshFuture.getChannel()))
.whenComplete(
(schemaInAgreement, agreementError) -> {
if (agreementError != null) {
@@ -456,8 +476,7 @@ private void startSchemaRequest(CompletableFuture<RefreshSchemaResult> refreshFu
currentSchemaRefresh = null;
// If another refresh was enqueued during this one, run it now
if (queuedSchemaRefresh != null) {
CompletableFuture<RefreshSchemaResult> tmp =
this.queuedSchemaRefresh;
ChannelFuture<RefreshSchemaResult> tmp = this.queuedSchemaRefresh;
this.queuedSchemaRefresh = null;
startSchemaRequest(tmp);
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.context.EventBus;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import java.net.InetSocketAddress;
@@ -119,4 +120,8 @@ public interface TopologyMonitor extends AsyncAutoCloseable {
* take a while to replicate across nodes.
*/
CompletionStage<Boolean> checkSchemaAgreement();

default CompletionStage<Boolean> checkSchemaAgreement(DriverChannel channel) {
return checkSchemaAgreement();
}
}
Original file line number Diff line number Diff line change
@@ -178,7 +178,7 @@ public CompletionStage<Metadata> setSchemaMetadataEnabled(@Nullable Boolean newV
@Override
public CompletionStage<Metadata> refreshSchemaAsync() {
return metadataManager
.refreshSchema(null, true, true)
.refreshSchema(null, true, true, null)
.thenApply(RefreshSchemaResult::getMetadata);
}

@@ -453,7 +453,7 @@ private CompletionStage<Void> checkProtocolVersion() {
private CompletionStage<RefreshSchemaResult> initialSchemaRefresh() {
try {
return metadataManager
.refreshSchema(null, false, true)
.refreshSchema(null, false, true, null)
.exceptionally(
error -> {
Loggers.warnWithException(
Original file line number Diff line number Diff line change
@@ -152,6 +152,6 @@ public void should_process_schema_change_events() {
callback.onEvent(event);

// Then
verify(metadataManager).refreshSchema("ks", false, false);
verify(metadataManager).refreshSchema("ks", false, false, null);
}
}
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
@@ -123,7 +124,8 @@ public void setup() {
mockQueryPlan(node1, node2);

when(metadataManager.refreshNodes()).thenReturn(CompletableFuture.completedFuture(null));
when(metadataManager.refreshSchema(anyString(), anyBoolean(), anyBoolean()))
when(metadataManager.refreshSchema(
anyString(), anyBoolean(), anyBoolean(), nullable(DriverChannel.class)))
.thenReturn(CompletableFuture.completedFuture(null));
when(context.getMetadataManager()).thenReturn(metadataManager);

Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import static com.datastax.oss.driver.Assertions.assertThatStage;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
@@ -31,6 +32,7 @@
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.context.EventBus;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.context.NettyOptions;
@@ -297,7 +299,7 @@ public void refreshSchema_should_work() {
when(schemaQueriesFactory.newInstance()).thenThrow(expectedException);
when(topologyMonitor.refreshNodeList())
.thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class))));
when(topologyMonitor.checkSchemaAgreement())
when(topologyMonitor.checkSchemaAgreement(nullable(DriverChannel.class)))
.thenReturn(CompletableFuture.completedFuture(Boolean.TRUE));
when(controlConnection.init(anyBoolean(), anyBoolean(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
@@ -306,7 +308,7 @@ public void refreshSchema_should_work() {

// When
CompletionStage<MetadataManager.RefreshSchemaResult> result =
metadataManager.refreshSchema("foo", true, true);
metadataManager.refreshSchema("foo", true, true, null);

// Then
waitForPendingAdminTasks(() -> result.toCompletableFuture().isDone());
Original file line number Diff line number Diff line change
@@ -130,7 +130,7 @@ public void setup() {

// Init sequence:
when(metadataManager.refreshNodes()).thenReturn(CompletableFuture.completedFuture(null));
when(metadataManager.refreshSchema(null, false, true))
when(metadataManager.refreshSchema(null, false, true, null))
.thenReturn(CompletableFuture.completedFuture(null));
when(context.getMetadataManager()).thenReturn(metadataManager);