Skip to content

Flaky tests #2045

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
@Category(IsolatedTests.class)
public class PreparedStatementCachingIT {

private static final Logger LOG = LoggerFactory.getLogger(PreparedStatementCachingIT.class);

private CustomCcmRule ccmRule = CustomCcmRule.builder().build();

private SessionRule<CqlSession> sessionRule =
Expand Down Expand Up @@ -121,9 +123,11 @@ private static RemovalListener<Object, Object> buildCacheRemoveCallback(
@NonNull Optional<DefaultDriverContext> context) {
return (evt) -> {
try {
LOG.info("Cache removal callback triggered, cause: {}", evt.getCause());
CompletableFuture<PreparedStatement> future =
(CompletableFuture<PreparedStatement>) evt.getValue();
ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId();
LOG.info("Firing PreparedStatementRemovalEvent for queryId: {}", queryId);
context.ifPresent(
ctx -> ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId)));
} catch (Exception e) {
Expand Down Expand Up @@ -194,8 +198,8 @@ private void invalidationResultSetTest(
Consumer<CqlSession> setupTestSchema, Set<String> expectedChangedTypes) {
invalidationTestInner(
setupTestSchema,
"select f from test_table_1 where e = ?",
"select h from test_table_2 where g = ?",
"select f from test_table_caching_1 where e = ?",
"select h from test_table_caching_2 where g = ?",
expectedChangedTypes);
}

Expand All @@ -206,8 +210,8 @@ private void invalidationVariableDefsTest(
String condition = isCollection ? "contains ?" : "= ?";
invalidationTestInner(
setupTestSchema,
String.format("select e from test_table_1 where f %s allow filtering", condition),
String.format("select g from test_table_2 where h %s allow filtering", condition),
String.format("select e from test_table_caching_1 where f %s allow filtering", condition),
String.format("select g from test_table_caching_2 where h %s allow filtering", condition),
expectedChangedTypes);
}

Expand All @@ -222,10 +226,15 @@ private void invalidationTestInner(
assertThat(getPreparedCacheSize(session)).isEqualTo(0);
setupTestSchema.accept(session);

session.prepare(preparedStmtQueryType1);
ByteBuffer queryId2 = session.prepare(preparedStmtQueryType2).getId();
PreparedStatement stmt1 = session.prepare(preparedStmtQueryType1);
PreparedStatement stmt2 = session.prepare(preparedStmtQueryType2);
ByteBuffer queryId2 = stmt2.getId();
assertThat(getPreparedCacheSize(session)).isEqualTo(2);

LOG.info("Prepared statements in cache:");
LOG.info(" Statement 1: {} (queryId: {})", preparedStmtQueryType1, stmt1.getId());
LOG.info(" Statement 2: {} (queryId: {})", preparedStmtQueryType2, stmt2.getId());

CountDownLatch preparedStmtCacheRemoveLatch = new CountDownLatch(1);
CountDownLatch typeChangeEventLatch = new CountDownLatch(expectedChangedTypes.size());

Expand All @@ -242,6 +251,8 @@ private void invalidationTestInner(
TypeChangeEvent.class,
(e) -> {
// expect one event per type changed and for every parent type that nests it
LOG.info("Received TypeChangeEvent for type: {} (changeType: {})",
e.oldType.getName(), e.changeType);
if (Boolean.TRUE.equals(
changedTypes.putIfAbsent(e.oldType.getName().toString(), true))) {
// store an error if we see duplicate change event
Expand All @@ -254,25 +265,66 @@ private void invalidationTestInner(
.register(
PreparedStatementRemovalEvent.class,
(e) -> {
LOG.info("Received PreparedStatementRemovalEvent for queryId: {}", e.queryId);
if (!removedQueryIds.compareAndSet(Optional.empty(), Optional.of(e.queryId))) {
// store an error if we see multiple cache invalidation events
// any non-empty error will fail the test so it's OK to do this multiple times
removedQueryEventError.set(
Optional.of("Unable to set reference for PS removal event"));
LOG.warn("Multiple PreparedStatementRemovalEvents received, ignoring subsequent ones");
}
preparedStmtCacheRemoveLatch.countDown();
});

// alter test_type_2 to trigger cache invalidation and above events
session.execute("ALTER TYPE test_type_2 add i blob");
// alter test_type_caching_2 to trigger cache invalidation and above events
LOG.info("Executing ALTER TYPE test_type_caching_2 add i blob");
LOG.info("Expected to invalidate statement 2 (queryId: {}) due to type change", queryId2);
session.execute("ALTER TYPE test_type_caching_2 add i blob");

// Give a small delay to allow the schema change to propagate before checking agreement
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
session.checkSchemaAgreement();

// wait for latches and fail if they don't reach zero before timeout
assertThat(
Uninterruptibles.awaitUninterruptibly(
preparedStmtCacheRemoveLatch, 10, TimeUnit.SECONDS))
// Use longer timeout for cache removal as it depends on complex event chain
boolean cacheRemovalSuccess =
Uninterruptibles.awaitUninterruptibly(
preparedStmtCacheRemoveLatch, 180, TimeUnit.SECONDS);
boolean typeChangeSuccess =
Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 60, TimeUnit.SECONDS);

// Provide detailed diagnostics if either latch fails
if (!cacheRemovalSuccess || !typeChangeSuccess) {
String diagnostics =
String.format(
"Test failure diagnostics:\n"
+ " - Cache removal latch success: %s (count: %d)\n"
+ " - Type change latch success: %s (count: %d)\n"
+ " - Current cache size: %d\n"
+ " - Expected changed types: %s\n"
+ " - Actual changed types detected: %s\n"
+ " - Expected removed query ID: %s\n"
+ " - Actual removed query IDs: %s\n"
+ " - Type change errors: %s\n"
+ " - Removal event errors: %s",
cacheRemovalSuccess,
preparedStmtCacheRemoveLatch.getCount(),
typeChangeSuccess,
typeChangeEventLatch.getCount(),
getPreparedCacheSize(session),
expectedChangedTypes,
changedTypes.keySet(),
queryId2,
removedQueryIds.get(),
typeChangeEventError.get(),
removedQueryEventError.get());
LOG.error("Prepared statement cache invalidation test failed: {}", diagnostics);
}

assertThat(cacheRemovalSuccess)
.withFailMessage("preparedStmtCacheRemoveLatch did not trigger before timeout")
.isTrue();
assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 10, TimeUnit.SECONDS))
assertThat(typeChangeSuccess)
.withFailMessage("typeChangeEventLatch did not trigger before timeout")
.isTrue();

Expand All @@ -295,17 +347,20 @@ private void invalidationTestInner(

Consumer<CqlSession> setupCacheEntryTestBasic =
(session) -> {
session.execute("CREATE TYPE test_type_1 (a text, b int)");
session.execute("CREATE TYPE test_type_2 (c int, d text)");
session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen<test_type_1>)");
session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen<test_type_2>)");
session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
session.execute(
"CREATE TABLE test_table_caching_1 (e int primary key, f frozen<test_type_caching_1>)");
session.execute(
"CREATE TABLE test_table_caching_2 (g int primary key, h frozen<test_type_caching_2>)");
};

@Test
public void should_invalidate_cache_entry_on_basic_udt_change_result_set() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationResultSetTest(setupCacheEntryTestBasic, ImmutableSet.of("test_type_2"));
invalidationResultSetTest(
setupCacheEntryTestBasic, ImmutableSet.of("test_type_caching_2"));
});
}

Expand All @@ -314,25 +369,26 @@ public void should_invalidate_cache_entry_on_basic_udt_change_variable_defs() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationVariableDefsTest(
setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_2"));
setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_caching_2"));
});
}

Consumer<CqlSession> setupCacheEntryTestCollection =
(session) -> {
session.execute("CREATE TYPE test_type_1 (a text, b int)");
session.execute("CREATE TYPE test_type_2 (c int, d text)");
session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
session.execute(
"CREATE TABLE test_table_1 (e int primary key, f list<frozen<test_type_1>>)");
"CREATE TABLE test_table_caching_1 (e int primary key, f list<frozen<test_type_caching_1>>)");
session.execute(
"CREATE TABLE test_table_2 (g int primary key, h list<frozen<test_type_2>>)");
"CREATE TABLE test_table_caching_2 (g int primary key, h list<frozen<test_type_caching_2>>)");
};

@Test
public void should_invalidate_cache_entry_on_collection_udt_change_result_set() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationResultSetTest(setupCacheEntryTestCollection, ImmutableSet.of("test_type_2"));
invalidationResultSetTest(
setupCacheEntryTestCollection, ImmutableSet.of("test_type_caching_2"));
});
}

Expand All @@ -341,25 +397,26 @@ public void should_invalidate_cache_entry_on_collection_udt_change_variable_defs
SchemaChangeSynchronizer.withLock(
() -> {
invalidationVariableDefsTest(
setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_2"));
setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_caching_2"));
});
}

Consumer<CqlSession> setupCacheEntryTestTuple =
(session) -> {
session.execute("CREATE TYPE test_type_1 (a text, b int)");
session.execute("CREATE TYPE test_type_2 (c int, d text)");
session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
session.execute(
"CREATE TABLE test_table_1 (e int primary key, f tuple<int, test_type_1, text>)");
"CREATE TABLE test_table_caching_1 (e int primary key, f tuple<int, test_type_caching_1, text>)");
session.execute(
"CREATE TABLE test_table_2 (g int primary key, h tuple<text, test_type_2, int>)");
"CREATE TABLE test_table_caching_2 (g int primary key, h tuple<text, test_type_caching_2, int>)");
};

@Test
public void should_invalidate_cache_entry_on_tuple_udt_change_result_set() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationResultSetTest(setupCacheEntryTestTuple, ImmutableSet.of("test_type_2"));
invalidationResultSetTest(
setupCacheEntryTestTuple, ImmutableSet.of("test_type_caching_2"));
});
}

Expand All @@ -368,26 +425,29 @@ public void should_invalidate_cache_entry_on_tuple_udt_change_variable_defs() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationVariableDefsTest(
setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_2"));
setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_caching_2"));
});
}

Consumer<CqlSession> setupCacheEntryTestNested =
(session) -> {
session.execute("CREATE TYPE test_type_1 (a text, b int)");
session.execute("CREATE TYPE test_type_2 (c int, d text)");
session.execute("CREATE TYPE test_type_3 (e frozen<test_type_1>, f int)");
session.execute("CREATE TYPE test_type_4 (g int, h frozen<test_type_2>)");
session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen<test_type_3>)");
session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen<test_type_4>)");
session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
session.execute("CREATE TYPE test_type_caching_3 (e frozen<test_type_caching_1>, f int)");
session.execute("CREATE TYPE test_type_caching_4 (g int, h frozen<test_type_caching_2>)");
session.execute(
"CREATE TABLE test_table_caching_1 (e int primary key, f frozen<test_type_caching_3>)");
session.execute(
"CREATE TABLE test_table_caching_2 (g int primary key, h frozen<test_type_caching_4>)");
};

@Test
public void should_invalidate_cache_entry_on_nested_udt_change_result_set() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationResultSetTest(
setupCacheEntryTestNested, ImmutableSet.of("test_type_2", "test_type_4"));
setupCacheEntryTestNested,
ImmutableSet.of("test_type_caching_2", "test_type_caching_4"));
});
}

Expand All @@ -396,7 +456,9 @@ public void should_invalidate_cache_entry_on_nested_udt_change_variable_defs() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationVariableDefsTest(
setupCacheEntryTestNested, false, ImmutableSet.of("test_type_2", "test_type_4"));
setupCacheEntryTestNested,
false,
ImmutableSet.of("test_type_caching_2", "test_type_caching_4"));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
import com.datastax.oss.driver.api.testinfra.ccm.SchemaChangeSynchronizer;
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.datastax.oss.driver.internal.core.cql.EmptyColumnDefinitions;
import com.tngtech.java.junit.dataprovider.DataProvider;
import com.tngtech.java.junit.dataprovider.DataProviderRunner;
Expand All @@ -47,13 +47,11 @@
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;

@RunWith(DataProviderRunner.class)
@Category(ParallelizableTests.class)
public class DefaultReactiveResultSetIT {

private static CcmRule ccmRule = CcmRule.getInstance();
Expand All @@ -67,19 +65,15 @@ public static void initialize() {
CqlSession session = sessionRule.session();
SchemaChangeSynchronizer.withLock(
() -> {
session.execute("DROP TABLE IF EXISTS test_reactive_read");
session.execute("DROP TABLE IF EXISTS test_reactive_write");
session.execute(createSlowStatement("DROP TABLE IF EXISTS test_reactive_read"));
session.execute(createSlowStatement("DROP TABLE IF EXISTS test_reactive_write"));
session.checkSchemaAgreement();
session.execute(
SimpleStatement.builder(
"CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))")
.setExecutionProfile(sessionRule.slowProfile())
.build());
createSlowStatement(
"CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))"));
session.execute(
SimpleStatement.builder(
"CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))")
.setExecutionProfile(sessionRule.slowProfile())
.build());
createSlowStatement(
"CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))"));
session.checkSchemaAgreement();
});
for (int i = 0; i < 1000; i++) {
Expand All @@ -92,6 +86,12 @@ public static void initialize() {
}
}

static Statement<?> createSlowStatement(String statement) {
return SimpleStatement.builder(statement)
.setExecutionProfile(sessionRule.slowProfile())
.build();
}

@Before
public void truncateTables() throws Exception {
CqlSession session = sessionRule.session();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ public void should_disable_schema_programmatically_when_enabled_in_config() {
sessionRule
.session()
.execute(
SimpleStatement.builder("CREATE TABLE foo(k int primary key)")
SimpleStatement.builder("CREATE TABLE foo_schema_it(k int primary key)")
.setExecutionProfile(slowProfile)
.build());
assertThat(session.getMetadata().getKeyspace(sessionRule.keyspace()).get().getTables())
.doesNotContainKey(CqlIdentifier.fromInternal("foo"));
.doesNotContainKey(CqlIdentifier.fromInternal("foo_schema_it"));

// Reset to config value (true), should refresh and load the new table
session.setSchemaMetadataEnabled(null);
Expand All @@ -167,7 +167,7 @@ public void should_disable_schema_programmatically_when_enabled_in_config() {
() ->
assertThat(
session.getMetadata().getKeyspace(sessionRule.keyspace()).get().getTables())
.containsKey(CqlIdentifier.fromInternal("foo")));
.containsKey(CqlIdentifier.fromInternal("foo_schema_it")));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,13 @@ public void should_evict_down_node_metrics_when_timeout_fires() throws Exception
// trigger node1 UP -> DOWN
eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.DOWN, node1));

Thread.sleep(expireAfter.toMillis());
Thread.sleep(expireAfter.toMillis() + 100);

// then node-level metrics should be evicted from node1, but
// node2 and node3 metrics should not have been evicted
await().untilAsserted(() -> assertNodeMetricsEvicted(session, node1));
await()
.atMost(Duration.ofMinutes(2))
.untilAsserted(() -> assertNodeMetricsEvicted(session, node1));
assertNodeMetricsNotEvicted(session, node2);
assertNodeMetricsNotEvicted(session, node3);

Expand Down Expand Up @@ -226,7 +228,8 @@ public void should_not_evict_down_node_metrics_when_node_is_back_up_before_timeo
eventBus.fire(NodeStateEvent.changed(NodeState.FORCED_DOWN, NodeState.UP, node2));
eventBus.fire(NodeStateEvent.added(node3));

Thread.sleep(expireAfter.toMillis());
// Add a small buffer to ensure the timeout would have fired if it wasn't cancelled
Thread.sleep(expireAfter.toMillis() + 100);

// then no node-level metrics should be evicted
assertNodeMetricsNotEvicted(session, node1);
Expand Down
Loading