diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java index 617d489fb95..a8261b290fc 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java @@ -72,6 +72,8 @@ @Category(IsolatedTests.class) public class PreparedStatementCachingIT { + private static final Logger LOG = LoggerFactory.getLogger(PreparedStatementCachingIT.class); + private CustomCcmRule ccmRule = CustomCcmRule.builder().build(); private SessionRule sessionRule = @@ -121,11 +123,30 @@ private static RemovalListener buildCacheRemoveCallback( @NonNull Optional context) { return (evt) -> { try { + LOG.error( + "Cache removal callback triggered, cause: {}, key: {}", evt.getCause(), evt.getKey()); CompletableFuture future = (CompletableFuture) evt.getValue(); - ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId(); - context.ifPresent( - ctx -> ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId))); + + // Add more detailed logging about the future state + LOG.error( + "Future state - done: {}, cancelled: {}, completedExceptionally: {}", + future.isDone(), + future.isCancelled(), + future.isCompletedExceptionally()); + + if (future.isDone() && !future.isCompletedExceptionally() && !future.isCancelled()) { + ByteBuffer queryId = Uninterruptibles.getUninterruptibly(future).getId(); + LOG.error("Firing PreparedStatementRemovalEvent for queryId: {}", queryId); + context.ifPresent( + ctx -> { + LOG.error("About to fire PreparedStatementRemovalEvent on event bus"); + ctx.getEventBus().fire(new PreparedStatementRemovalEvent(queryId)); + LOG.error("PreparedStatementRemovalEvent fired successfully"); + }); + } else { + LOG.error("Skipping removal event - future not in valid state for extraction"); + } } catch (Exception e) { LOG.error("Unable to register removal handler", e); } @@ -190,12 +211,74 @@ public static SessionBuilder builder() { return new TestSessionBuilder(); } + private void debugCacheInvalidation(CqlSession session, TypeChangeEvent event) { + try { + DefaultDriverContext ctx = (DefaultDriverContext) session.getContext(); + // Get the processor to check cache state + RequestProcessorRegistry registry = ctx.getRequestProcessorRegistry(); + + LOG.error( + "Debug: TypeChangeEvent received for type: {} (changeType: {})", + event.oldType.getName(), + event.changeType); + LOG.error("Debug: Current cache size: {}", getPreparedCacheSize(session)); + + // Force cache cleanup to trigger any pending removals + if (registry != null) { + LOG.error("Debug: Forcing cache cleanup..."); + // We can't directly access the cache from here, but we can log that we're trying + } + } catch (Exception e) { + LOG.error("Debug: Error during cache invalidation debugging", e); + } + } + + private boolean waitForCacheRemovalWithCleanup( + CountDownLatch latch, CqlSession session, long timeout, TimeUnit unit) { + long timeoutNanos = unit.toNanos(timeout); + long startTime = System.nanoTime(); + long remainingNanos = timeoutNanos; + + while (remainingNanos > 0 && latch.getCount() > 0) { + // Wait for a short period + long waitTime = Math.min(remainingNanos, TimeUnit.SECONDS.toNanos(5)); + boolean success = + Uninterruptibles.awaitUninterruptibly(latch, waitTime, TimeUnit.NANOSECONDS); + + if (success) { + LOG.error("Cache removal latch triggered successfully"); + return true; + } + + // If we haven't succeeded yet, try to force cache cleanup + LOG.error( + "Cache removal latch not triggered yet, forcing cleanup. Current cache size: {}", + getPreparedCacheSize(session)); + + try { + // Force garbage collection to help with weak references + System.gc(); + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + + remainingNanos = timeoutNanos - (System.nanoTime() - startTime); + } + + LOG.error( + "Cache removal latch failed to trigger within timeout. Final cache size: {}", + getPreparedCacheSize(session)); + return false; + } + private void invalidationResultSetTest( Consumer setupTestSchema, Set expectedChangedTypes) { invalidationTestInner( setupTestSchema, - "select f from test_table_1 where e = ?", - "select h from test_table_2 where g = ?", + "select f from test_table_caching_1 where e = ?", + "select h from test_table_caching_2 where g = ?", expectedChangedTypes); } @@ -206,8 +289,8 @@ private void invalidationVariableDefsTest( String condition = isCollection ? "contains ?" : "= ?"; invalidationTestInner( setupTestSchema, - String.format("select e from test_table_1 where f %s allow filtering", condition), - String.format("select g from test_table_2 where h %s allow filtering", condition), + String.format("select e from test_table_caching_1 where f %s allow filtering", condition), + String.format("select g from test_table_caching_2 where h %s allow filtering", condition), expectedChangedTypes); } @@ -219,13 +302,26 @@ private void invalidationTestInner( try (CqlSession session = sessionWithCacheSizeMetric()) { - assertThat(getPreparedCacheSize(session)).isEqualTo(0); + // Ensure we start with a clean cache + long initialCacheSize = getPreparedCacheSize(session); + LOG.error("Starting test with cache size: {}", initialCacheSize); + assertThat(initialCacheSize).isEqualTo(0); + + // Force garbage collection to ensure clean state + System.gc(); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + setupTestSchema.accept(session); - session.prepare(preparedStmtQueryType1); - ByteBuffer queryId2 = session.prepare(preparedStmtQueryType2).getId(); + PreparedStatement stmt1 = session.prepare(preparedStmtQueryType1); + PreparedStatement stmt2 = session.prepare(preparedStmtQueryType2); + ByteBuffer queryId2 = stmt2.getId(); assertThat(getPreparedCacheSize(session)).isEqualTo(2); + LOG.error("Prepared statements in cache:"); + LOG.error(" Statement 1: {} (queryId: {})", preparedStmtQueryType1, stmt1.getId()); + LOG.error(" Statement 2: {} (queryId: {})", preparedStmtQueryType2, stmt2.getId()); + CountDownLatch preparedStmtCacheRemoveLatch = new CountDownLatch(1); CountDownLatch typeChangeEventLatch = new CountDownLatch(expectedChangedTypes.size()); @@ -242,6 +338,14 @@ private void invalidationTestInner( TypeChangeEvent.class, (e) -> { // expect one event per type changed and for every parent type that nests it + LOG.error( + "Received TypeChangeEvent for type: {} (changeType: {})", + e.oldType.getName(), + e.changeType); + + // Add detailed debugging for cache invalidation + debugCacheInvalidation(session, e); + if (Boolean.TRUE.equals( changedTypes.putIfAbsent(e.oldType.getName().toString(), true))) { // store an error if we see duplicate change event @@ -254,25 +358,74 @@ private void invalidationTestInner( .register( PreparedStatementRemovalEvent.class, (e) -> { + LOG.error("Received PreparedStatementRemovalEvent for queryId: {}", e.queryId); if (!removedQueryIds.compareAndSet(Optional.empty(), Optional.of(e.queryId))) { // store an error if we see multiple cache invalidation events // any non-empty error will fail the test so it's OK to do this multiple times removedQueryEventError.set( Optional.of("Unable to set reference for PS removal event")); + LOG.warn( + "Multiple PreparedStatementRemovalEvents received, ignoring subsequent ones"); } preparedStmtCacheRemoveLatch.countDown(); }); - // alter test_type_2 to trigger cache invalidation and above events - session.execute("ALTER TYPE test_type_2 add i blob"); + // alter test_type_caching_2 to trigger cache invalidation and above events + LOG.error("Executing ALTER TYPE test_type_caching_2 add i blob"); + LOG.error("Expected to invalidate statement 2 (queryId: {}) due to type change", queryId2); + session.execute("ALTER TYPE test_type_caching_2 add i blob"); + + // Give a longer delay to allow the schema change to propagate before checking agreement + LOG.error("Waiting for schema change to propagate..."); + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + session.checkSchemaAgreement(); + + // Additional delay to allow event processing to complete + LOG.error("Waiting for event processing to complete..."); + Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); // wait for latches and fail if they don't reach zero before timeout - assertThat( - Uninterruptibles.awaitUninterruptibly( - preparedStmtCacheRemoveLatch, 10, TimeUnit.SECONDS)) + // Use longer timeout for cache removal as it depends on complex event chain + boolean typeChangeSuccess = + Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 60, TimeUnit.SECONDS); + + // For cache removal, use a more robust waiting mechanism with periodic cleanup + boolean cacheRemovalSuccess = + waitForCacheRemovalWithCleanup( + preparedStmtCacheRemoveLatch, session, 180, TimeUnit.SECONDS); + + // Provide detailed diagnostics if either latch fails + if (!cacheRemovalSuccess || !typeChangeSuccess) { + String diagnostics = + String.format( + "Test failure diagnostics:\n" + + " - Cache removal latch success: %s (count: %d)\n" + + " - Type change latch success: %s (count: %d)\n" + + " - Current cache size: %d\n" + + " - Expected changed types: %s\n" + + " - Actual changed types detected: %s\n" + + " - Expected removed query ID: %s\n" + + " - Actual removed query IDs: %s\n" + + " - Type change errors: %s\n" + + " - Removal event errors: %s", + cacheRemovalSuccess, + preparedStmtCacheRemoveLatch.getCount(), + typeChangeSuccess, + typeChangeEventLatch.getCount(), + getPreparedCacheSize(session), + expectedChangedTypes, + changedTypes.keySet(), + queryId2, + removedQueryIds.get(), + typeChangeEventError.get(), + removedQueryEventError.get()); + LOG.error("Prepared statement cache invalidation test failed: {}", diagnostics); + } + + assertThat(cacheRemovalSuccess) .withFailMessage("preparedStmtCacheRemoveLatch did not trigger before timeout") .isTrue(); - assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 10, TimeUnit.SECONDS)) + assertThat(typeChangeSuccess) .withFailMessage("typeChangeEventLatch did not trigger before timeout") .isTrue(); @@ -295,17 +448,20 @@ private void invalidationTestInner( Consumer setupCacheEntryTestBasic = (session) -> { - session.execute("CREATE TYPE test_type_1 (a text, b int)"); - session.execute("CREATE TYPE test_type_2 (c int, d text)"); - session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen)"); - session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen)"); + session.execute("CREATE TYPE test_type_caching_1 (a text, b int)"); + session.execute("CREATE TYPE test_type_caching_2 (c int, d text)"); + session.execute( + "CREATE TABLE test_table_caching_1 (e int primary key, f frozen)"); + session.execute( + "CREATE TABLE test_table_caching_2 (g int primary key, h frozen)"); }; @Test public void should_invalidate_cache_entry_on_basic_udt_change_result_set() { SchemaChangeSynchronizer.withLock( () -> { - invalidationResultSetTest(setupCacheEntryTestBasic, ImmutableSet.of("test_type_2")); + invalidationResultSetTest( + setupCacheEntryTestBasic, ImmutableSet.of("test_type_caching_2")); }); } @@ -314,25 +470,26 @@ public void should_invalidate_cache_entry_on_basic_udt_change_variable_defs() { SchemaChangeSynchronizer.withLock( () -> { invalidationVariableDefsTest( - setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_2")); + setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_caching_2")); }); } Consumer setupCacheEntryTestCollection = (session) -> { - session.execute("CREATE TYPE test_type_1 (a text, b int)"); - session.execute("CREATE TYPE test_type_2 (c int, d text)"); + session.execute("CREATE TYPE test_type_caching_1 (a text, b int)"); + session.execute("CREATE TYPE test_type_caching_2 (c int, d text)"); session.execute( - "CREATE TABLE test_table_1 (e int primary key, f list>)"); + "CREATE TABLE test_table_caching_1 (e int primary key, f list>)"); session.execute( - "CREATE TABLE test_table_2 (g int primary key, h list>)"); + "CREATE TABLE test_table_caching_2 (g int primary key, h list>)"); }; @Test public void should_invalidate_cache_entry_on_collection_udt_change_result_set() { SchemaChangeSynchronizer.withLock( () -> { - invalidationResultSetTest(setupCacheEntryTestCollection, ImmutableSet.of("test_type_2")); + invalidationResultSetTest( + setupCacheEntryTestCollection, ImmutableSet.of("test_type_caching_2")); }); } @@ -341,25 +498,26 @@ public void should_invalidate_cache_entry_on_collection_udt_change_variable_defs SchemaChangeSynchronizer.withLock( () -> { invalidationVariableDefsTest( - setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_2")); + setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_caching_2")); }); } Consumer setupCacheEntryTestTuple = (session) -> { - session.execute("CREATE TYPE test_type_1 (a text, b int)"); - session.execute("CREATE TYPE test_type_2 (c int, d text)"); + session.execute("CREATE TYPE test_type_caching_1 (a text, b int)"); + session.execute("CREATE TYPE test_type_caching_2 (c int, d text)"); session.execute( - "CREATE TABLE test_table_1 (e int primary key, f tuple)"); + "CREATE TABLE test_table_caching_1 (e int primary key, f tuple)"); session.execute( - "CREATE TABLE test_table_2 (g int primary key, h tuple)"); + "CREATE TABLE test_table_caching_2 (g int primary key, h tuple)"); }; @Test public void should_invalidate_cache_entry_on_tuple_udt_change_result_set() { SchemaChangeSynchronizer.withLock( () -> { - invalidationResultSetTest(setupCacheEntryTestTuple, ImmutableSet.of("test_type_2")); + invalidationResultSetTest( + setupCacheEntryTestTuple, ImmutableSet.of("test_type_caching_2")); }); } @@ -368,18 +526,20 @@ public void should_invalidate_cache_entry_on_tuple_udt_change_variable_defs() { SchemaChangeSynchronizer.withLock( () -> { invalidationVariableDefsTest( - setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_2")); + setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_caching_2")); }); } Consumer setupCacheEntryTestNested = (session) -> { - session.execute("CREATE TYPE test_type_1 (a text, b int)"); - session.execute("CREATE TYPE test_type_2 (c int, d text)"); - session.execute("CREATE TYPE test_type_3 (e frozen, f int)"); - session.execute("CREATE TYPE test_type_4 (g int, h frozen)"); - session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen)"); - session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen)"); + session.execute("CREATE TYPE test_type_caching_1 (a text, b int)"); + session.execute("CREATE TYPE test_type_caching_2 (c int, d text)"); + session.execute("CREATE TYPE test_type_caching_3 (e frozen, f int)"); + session.execute("CREATE TYPE test_type_caching_4 (g int, h frozen)"); + session.execute( + "CREATE TABLE test_table_caching_1 (e int primary key, f frozen)"); + session.execute( + "CREATE TABLE test_table_caching_2 (g int primary key, h frozen)"); }; @Test @@ -387,7 +547,8 @@ public void should_invalidate_cache_entry_on_nested_udt_change_result_set() { SchemaChangeSynchronizer.withLock( () -> { invalidationResultSetTest( - setupCacheEntryTestNested, ImmutableSet.of("test_type_2", "test_type_4")); + setupCacheEntryTestNested, + ImmutableSet.of("test_type_caching_2", "test_type_caching_4")); }); } @@ -396,7 +557,9 @@ public void should_invalidate_cache_entry_on_nested_udt_change_variable_defs() { SchemaChangeSynchronizer.withLock( () -> { invalidationVariableDefsTest( - setupCacheEntryTestNested, false, ImmutableSet.of("test_type_2", "test_type_4")); + setupCacheEntryTestNested, + false, + ImmutableSet.of("test_type_caching_2", "test_type_caching_4")); }); } diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java index c00cf064e51..5c32638bf36 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java @@ -31,10 +31,10 @@ import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.testinfra.ccm.CcmRule; import com.datastax.oss.driver.api.testinfra.ccm.SchemaChangeSynchronizer; import com.datastax.oss.driver.api.testinfra.session.SessionRule; -import com.datastax.oss.driver.categories.ParallelizableTests; import com.datastax.oss.driver.internal.core.cql.EmptyColumnDefinitions; import com.tngtech.java.junit.dataprovider.DataProvider; import com.tngtech.java.junit.dataprovider.DataProviderRunner; @@ -47,13 +47,11 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.rules.RuleChain; import org.junit.rules.TestRule; import org.junit.runner.RunWith; @RunWith(DataProviderRunner.class) -@Category(ParallelizableTests.class) public class DefaultReactiveResultSetIT { private static CcmRule ccmRule = CcmRule.getInstance(); @@ -67,19 +65,15 @@ public static void initialize() { CqlSession session = sessionRule.session(); SchemaChangeSynchronizer.withLock( () -> { - session.execute("DROP TABLE IF EXISTS test_reactive_read"); - session.execute("DROP TABLE IF EXISTS test_reactive_write"); + session.execute(createSlowStatement("DROP TABLE IF EXISTS test_reactive_read")); + session.execute(createSlowStatement("DROP TABLE IF EXISTS test_reactive_write")); session.checkSchemaAgreement(); session.execute( - SimpleStatement.builder( - "CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))") - .setExecutionProfile(sessionRule.slowProfile()) - .build()); + createSlowStatement( + "CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))")); session.execute( - SimpleStatement.builder( - "CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))") - .setExecutionProfile(sessionRule.slowProfile()) - .build()); + createSlowStatement( + "CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))")); session.checkSchemaAgreement(); }); for (int i = 0; i < 1000; i++) { @@ -92,6 +86,12 @@ public static void initialize() { } } + static Statement createSlowStatement(String statement) { + return SimpleStatement.builder(statement) + .setExecutionProfile(sessionRule.slowProfile()) + .build(); + } + @Before public void truncateTables() throws Exception { CqlSession session = sessionRule.session(); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java index df5571974c1..728bd3c6225 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java @@ -151,11 +151,11 @@ public void should_disable_schema_programmatically_when_enabled_in_config() { sessionRule .session() .execute( - SimpleStatement.builder("CREATE TABLE foo(k int primary key)") + SimpleStatement.builder("CREATE TABLE foo_schema_it(k int primary key)") .setExecutionProfile(slowProfile) .build()); assertThat(session.getMetadata().getKeyspace(sessionRule.keyspace()).get().getTables()) - .doesNotContainKey(CqlIdentifier.fromInternal("foo")); + .doesNotContainKey(CqlIdentifier.fromInternal("foo_schema_it")); // Reset to config value (true), should refresh and load the new table session.setSchemaMetadataEnabled(null); @@ -167,7 +167,7 @@ public void should_disable_schema_programmatically_when_enabled_in_config() { () -> assertThat( session.getMetadata().getKeyspace(sessionRule.keyspace()).get().getTables()) - .containsKey(CqlIdentifier.fromInternal("foo"))); + .containsKey(CqlIdentifier.fromInternal("foo_schema_it"))); } @Test diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java index e6121217619..716dc1b66a6 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java @@ -174,11 +174,13 @@ public void should_evict_down_node_metrics_when_timeout_fires() throws Exception // trigger node1 UP -> DOWN eventBus.fire(NodeStateEvent.changed(NodeState.UP, NodeState.DOWN, node1)); - Thread.sleep(expireAfter.toMillis()); + Thread.sleep(expireAfter.toMillis() + 100); // then node-level metrics should be evicted from node1, but // node2 and node3 metrics should not have been evicted - await().untilAsserted(() -> assertNodeMetricsEvicted(session, node1)); + await() + .atMost(Duration.ofMinutes(2)) + .untilAsserted(() -> assertNodeMetricsEvicted(session, node1)); assertNodeMetricsNotEvicted(session, node2); assertNodeMetricsNotEvicted(session, node3); @@ -226,7 +228,8 @@ public void should_not_evict_down_node_metrics_when_node_is_back_up_before_timeo eventBus.fire(NodeStateEvent.changed(NodeState.FORCED_DOWN, NodeState.UP, node2)); eventBus.fire(NodeStateEvent.added(node3)); - Thread.sleep(expireAfter.toMillis()); + // Add a small buffer to ensure the timeout would have fired if it wasn't cancelled + Thread.sleep(expireAfter.toMillis() + 100); // then no node-level metrics should be evicted assertNodeMetricsNotEvicted(session, node1); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java index 804a078bbe0..ff6f3a9d2c5 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/internal/core/type/codec/UdtCodecIT.java @@ -22,12 +22,14 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.data.UdtValue; import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.api.core.type.codec.TypeCodec; import com.datastax.oss.driver.api.testinfra.ccm.CcmRule; import com.datastax.oss.driver.api.testinfra.session.SessionRule; import com.datastax.oss.driver.categories.ParallelizableTests; +import java.time.Duration; import java.util.Objects; import org.junit.Rule; import org.junit.Test; @@ -47,23 +49,31 @@ public class UdtCodecIT { @Test public void should_decoding_udt_be_backward_compatible() { CqlSession session = sessionRule.session(); - session.execute("CREATE TYPE test_type_1 (a text, b int)"); - session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen)"); + session.execute( + SimpleStatement.newInstance("CREATE TYPE test_type_udt_1 (a text, b int)") + .setTimeout(Duration.ofSeconds(20))); + session.execute( + SimpleStatement.newInstance( + "CREATE TABLE test_table_udt_1 (e int primary key, f frozen)") + .setTimeout(Duration.ofSeconds(20))); // insert a row using version 1 of the UDT schema - session.execute("INSERT INTO test_table_1(e, f) VALUES(1, {a: 'a', b: 1})"); + session.execute("INSERT INTO test_table_udt_1(e, f) VALUES(1, {a: 'a', b: 1})"); UserDefinedType udt = session .getMetadata() .getKeyspace(sessionRule.keyspace()) - .flatMap(ks -> ks.getUserDefinedType("test_type_1")) + .flatMap(ks -> ks.getUserDefinedType("test_type_udt_1")) .orElseThrow(IllegalStateException::new); TypeCodec oldCodec = session.getContext().getCodecRegistry().codecFor(udt); // update UDT schema - session.execute("ALTER TYPE test_type_1 add i text"); + session.execute( + SimpleStatement.newInstance("ALTER TYPE test_type_udt_1 add i text") + .setTimeout(Duration.ofSeconds(20))); // insert a row using version 2 of the UDT schema - session.execute("INSERT INTO test_table_1(e, f) VALUES(2, {a: 'b', b: 2, i: 'b'})"); + session.execute("INSERT INTO test_table_udt_1(e, f) VALUES(2, {a: 'b', b: 2, i: 'b'})"); Row row = - Objects.requireNonNull(session.execute("SELECT f FROM test_table_1 WHERE e = ?", 2).one()); + Objects.requireNonNull( + session.execute("SELECT f FROM test_table_udt_1 WHERE e = ?", 2).one()); // Try to read new row with old codec. Using row.getUdtValue() would not cause any issues, // because new codec will be automatically registered (using all 3 attributes). // If application leverages generic row.get(String, Codec) method, data reading with old codec diff --git a/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java b/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java index 3e6171ca530..04a8afaa046 100644 --- a/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java +++ b/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java @@ -35,12 +35,14 @@ public class BundleOptions { public static CompositeOption commonBundles() { return () -> options( - mavenBundle("org.apache.cassandra", "java-driver-guava-shaded").versionAsInProject(), - mavenBundle("io.dropwizard.metrics", "metrics-core").versionAsInProject(), - mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), - mavenBundle("org.hdrhistogram", "HdrHistogram").versionAsInProject(), - mavenBundle("com.typesafe", "config").versionAsInProject(), - mavenBundle("com.datastax.oss", "native-protocol").versionAsInProject(), + mavenBundle("org.apache.cassandra", "java-driver-guava-shaded") + .versionAsInProject() + .startLevel(1), + mavenBundle("io.dropwizard.metrics", "metrics-core").versionAsInProject().startLevel(1), + mavenBundle("org.slf4j", "slf4j-api").versionAsInProject().startLevel(1), + mavenBundle("org.hdrhistogram", "HdrHistogram").versionAsInProject().startLevel(1), + mavenBundle("com.typesafe", "config").versionAsInProject().startLevel(1), + mavenBundle("com.datastax.oss", "native-protocol").versionAsInProject().startLevel(1), logbackBundles(), debugOptions()); } @@ -51,7 +53,7 @@ public static CompositeOption applicationBundle() { systemProperty("cassandra.contactpoints").value("127.0.0.1"), systemProperty("cassandra.port").value("9042"), systemProperty("cassandra.keyspace").value("test_osgi"), - bundle("reference:file:target/classes")); + bundle("reference:file:target/classes").startLevel(3)); } public static UrlProvisionOption driverCoreBundle() { @@ -59,15 +61,15 @@ public static UrlProvisionOption driverCoreBundle() { } public static UrlProvisionOption driverCoreShadedBundle() { - return bundle("reference:file:../core-shaded/target/classes"); + return bundle("reference:file:../core-shaded/target/classes").startLevel(1); } public static UrlProvisionOption driverQueryBuilderBundle() { - return bundle("reference:file:../query-builder/target/classes"); + return bundle("reference:file:../query-builder/target/classes").startLevel(2); } public static UrlProvisionOption driverMapperRuntimeBundle() { - return bundle("reference:file:../mapper-runtime/target/classes"); + return bundle("reference:file:../mapper-runtime/target/classes").startLevel(2); } public static UrlProvisionOption driverTestInfraBundle() {