diff --git a/CHANGES.txt b/CHANGES.txt index 7e60e49ca..b5400d6ab 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.3.0 ----- + * CDC: Add end-to-end CDC integration tests (CASSSIDECAR-308) * SchemaStorePublisherFactory should be Injectable in CachingSchemaStore (CASSSIDECAR-408) * Fix StorageClientTest Docker API compatibility and improve CI test reporting (CASSSIDECAR-410) * Incorrect SSL Configuration Keys in CdcPublisher.secretsProvider() (CASSSIDECAR-401) diff --git a/gradle.properties b/gradle.properties index 515c9f7fa..752796286 100644 --- a/gradle.properties +++ b/gradle.properties @@ -46,5 +46,5 @@ swaggerVersion=2.2.21 kryoVersion=4.0.2 # OSHI dependencies oshiVersion=6.9.0 -analyticsVersion=0.2.0 +analyticsVersion=0.3.0 kafkaClientVersion=3.7.0 diff --git a/integration-framework/build.gradle b/integration-framework/build.gradle index 62248946c..29b9c942b 100644 --- a/integration-framework/build.gradle +++ b/integration-framework/build.gradle @@ -65,9 +65,16 @@ dependencies { api("io.vertx:vertx-junit5:${project.vertxVersion}") // The server itself api(project(path: ":server")) + api(testFixtures(project(path: ":server"))) api(project(path: ":server-common")) + + // CDC dependencies + api(group: "org.apache.cassandra", name: "cassandra-analytics-cdc_spark3_2.12", version: "${project.analyticsVersion}") + api(group: "org.apache.cassandra", name: "cassandra-analytics-cdc-sidecar_spark3_2.12", version: "${project.analyticsVersion}") + api "org.apache.kafka:kafka-clients:${project.kafkaClientVersion}" } compileJava.onlyIf { !skipIntegrationTest } compileTestJava.onlyIf { !skipIntegrationTest } javadoc.onlyIf { !skipIntegrationTest } + diff --git a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/MtlsTestHelper.java b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/MtlsTestHelper.java index d3d6e0674..1bef05aa0 100644 --- a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/MtlsTestHelper.java +++ b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/MtlsTestHelper.java @@ -22,6 +22,16 @@ import java.util.Objects; import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; +import org.apache.cassandra.sidecar.config.KeyStoreConfiguration; +import org.apache.cassandra.sidecar.config.SidecarClientConfiguration; +import org.apache.cassandra.sidecar.config.SslConfiguration; +import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.SidecarClientConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl; import org.apache.cassandra.testing.utils.tls.CertificateBuilder; import org.apache.cassandra.testing.utils.tls.CertificateBundle; @@ -30,6 +40,7 @@ */ public class MtlsTestHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(MtlsTestHelper.class); public static final String PASSWORD_STRING = "cassandra"; public static final char[] PASSWORD = PASSWORD_STRING.toCharArray(); /** @@ -142,4 +153,78 @@ public String serverKeyStoreType() { return "PKCS12"; } + + /** + * Creates SSL configuration with the specified keystore and shared truststore. + * + * @param keyStorePath the path to the keystore + * @param keyStorePassword the keystore password + * @param keyStoreType the keystore type + * @return SslConfiguration with the provided keystore and shared truststore + */ + private SslConfiguration createSslConfiguration(String keyStorePath, + String keyStorePassword, + String keyStoreType) + { + KeyStoreConfiguration truststoreConfiguration = + new KeyStoreConfigurationImpl(trustStorePath(), + trustStorePassword(), + trustStoreType(), + SecondBoundConfiguration.parse("60s")); + + KeyStoreConfiguration keyStoreConfiguration = + new KeyStoreConfigurationImpl(keyStorePath, + keyStorePassword, + keyStoreType, + SecondBoundConfiguration.parse("60s")); + + return SslConfigurationImpl.builder() + .enabled(true) + .keystore(keyStoreConfiguration) + .truststore(truststoreConfiguration) + .build(); + } + + /** + * Creates SSL configuration for the Sidecar server with mTLS settings if enabled. + * + * @return SslConfiguration with server keystore/truststore, or null if mTLS is not enabled + */ + public SslConfiguration createServerSslConfiguration() + { + if (!isEnabled()) + { + LOGGER.info("Not enabling mTLS for testing. Set '{}' to 'true' if you would like mTLS enabled.", + CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS); + return null; + } + + LOGGER.info("Enabling test mTLS certificate/keystore for server."); + return createSslConfiguration(serverKeyStorePath(), + serverKeyStorePassword(), + serverKeyStoreType()); + } + + /** + * Creates a SidecarClientConfiguration with mTLS settings if mTLS is enabled. + * + * @return a SidecarClientConfiguration with mTLS settings, or null if mTLS is not enabled + */ + public SidecarClientConfiguration createSidecarClientConfiguration() + { + if (!isEnabled()) + { + LOGGER.info("Not enabling mTLS for testing. Set '{}' to 'true' if you would like mTLS enabled.", + CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS); + return new SidecarClientConfigurationImpl(null); + } + + LOGGER.info("Enabling test mTLS certificate/keystore for client."); + SslConfiguration clientSslConfiguration = + createSslConfiguration(clientKeyStorePath(), + clientKeyStorePassword(), + serverKeyStoreType()); + + return new SidecarClientConfigurationImpl(clientSslConfiguration); + } } diff --git a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java index 2f20ed7e0..7cad85100 100644 --- a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java +++ b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -88,18 +89,15 @@ import org.apache.cassandra.sidecar.common.server.utils.SidecarVersionProvider; import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils; import org.apache.cassandra.sidecar.config.JmxConfiguration; -import org.apache.cassandra.sidecar.config.KeyStoreConfiguration; import org.apache.cassandra.sidecar.config.S3ClientConfiguration; import org.apache.cassandra.sidecar.config.S3ProxyConfiguration; import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.config.SslConfiguration; -import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl; -import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; -import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.TestServiceConfiguration; import org.apache.cassandra.sidecar.coordination.ClusterLease; import org.apache.cassandra.sidecar.lifecycle.InJvmDTestLifecycleProvider; import org.apache.cassandra.sidecar.lifecycle.LifecycleProvider; @@ -116,7 +114,6 @@ import org.apache.cassandra.testing.TestVersionSupplier; import static org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl.DEFAULT_API_CALL_TIMEOUT; -import static org.apache.cassandra.sidecar.testing.MtlsTestHelper.CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS; import static org.apache.cassandra.testing.DriverTestUtils.buildContactPoints; import static org.apache.cassandra.testing.utils.IInstanceUtils.tryGetIntConfig; import static org.assertj.core.api.Assertions.assertThat; @@ -440,6 +437,30 @@ protected void waitForSchemaReady(ServerWrapper serverWrapper, long timeout, Tim .isTrue(); } + /** + * Polls a condition until it returns true or timeout is reached. + * Uses System.nanoTime() for accurate timing and Uninterruptibles for consistent sleep behavior. + * + * @param condition the condition to check + * @param timeoutSeconds maximum time to wait in seconds + * @param pollIntervalMillis interval between checks in milliseconds + * @throws AssertionError if timeout is reached before condition is met + */ + protected void waitUntil(BooleanSupplier condition, long timeoutSeconds, long pollIntervalMillis) + { + long startTime = System.nanoTime(); + long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSeconds); + + while (!condition.getAsBoolean()) + { + if (System.nanoTime() - startTime > timeoutNanos) + { + throw new AssertionError("Condition not met within " + timeoutSeconds + " seconds"); + } + Uninterruptibles.sleepUninterruptibly(pollIntervalMillis, TimeUnit.MILLISECONDS); + } + } + /** * Stops the Sidecar service * @@ -775,48 +796,23 @@ public LifecycleProvider lifecycleProvider() } public static SidecarConfigurationImpl.Builder defaultConfigurationBuilder( - MtlsTestHelper mtlsTestHelper, Function configurationOverrides) + MtlsTestHelper mtlsTestHelper, + Function configurationOverrides) { - ServiceConfiguration conf = ServiceConfigurationImpl.builder() - .host("0.0.0.0") // binds to all interfaces, potential security issue if left running for long - .port(0) // let the test find an available port + ServiceConfiguration conf = TestServiceConfiguration.builder() .schemaKeyspaceConfiguration(SchemaKeyspaceConfigurationImpl.builder() .isEnabled(true) .build()) .build(); - - SslConfiguration sslConfiguration = null; - if (mtlsTestHelper.isEnabled()) - { - LOGGER.info("Enabling test mTLS certificate/keystore."); - - KeyStoreConfiguration truststoreConfiguration = - new KeyStoreConfigurationImpl(mtlsTestHelper.trustStorePath(), - mtlsTestHelper.trustStorePassword(), - mtlsTestHelper.trustStoreType(), - SecondBoundConfiguration.parse("60s")); - - KeyStoreConfiguration keyStoreConfiguration = - new KeyStoreConfigurationImpl(mtlsTestHelper.serverKeyStorePath(), - mtlsTestHelper.serverKeyStorePassword(), - mtlsTestHelper.serverKeyStoreType(), - SecondBoundConfiguration.parse("60s")); - - sslConfiguration = SslConfigurationImpl.builder() - .enabled(true) - .keystore(keyStoreConfiguration) - .truststore(truststoreConfiguration) - .build(); - } - else - { - LOGGER.info("Not enabling mTLS for testing purposes. Set '{}' to 'true' if you would " + - "like mTLS enabled.", CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS); - } - S3ClientConfiguration s3ClientConfig = new S3ClientConfigurationImpl("s3-client", 4, SecondBoundConfiguration.parse("60s"), - 5242880, DEFAULT_API_CALL_TIMEOUT, - buildTestS3ProxyConfig()); + SslConfiguration sslConfiguration = mtlsTestHelper.createServerSslConfiguration(); + S3ClientConfiguration s3ClientConfig = + new S3ClientConfigurationImpl("s3-client", + 4, + SecondBoundConfiguration.parse("60s"), + 5242880, + DEFAULT_API_CALL_TIMEOUT, + buildTestS3ProxyConfig()); SidecarConfigurationImpl.Builder builder = SidecarConfigurationImpl.builder() .serviceConfiguration(conf) diff --git a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcConfig.java b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcConfig.java new file mode 100644 index 000000000..02a286b9f --- /dev/null +++ b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcConfig.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.testing; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.sidecar.cdc.CdcConfig; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; + +/** + * Test implementation of {@link CdcConfig} for integration tests. + * Provides hardcoded configuration values suitable for testing CDC functionality + * without requiring external Kafka infrastructure. + */ +public class TestCdcConfig implements CdcConfig +{ + @Override + public String env() + { + return "test"; + } + + @Override + public String kafkaTopic() + { + return "test-topic"; + } + + @Override + public TopicFormatType topicFormat() + { + return TopicFormatType.KEYSPACETABLE; + } + + @Override + public boolean cdcEnabled() + { + return true; + } + + @Override + public String jobId() + { + return "test-job-id"; + } + + @Override + public Map kafkaConfigs() + { + return new HashMap<>(); + } + + @Override + public Map cdcConfigs() + { + return new HashMap<>(); + } + + @Override + public boolean logOnly() + { + return true; + } + + @Override + public String datacenter() + { + return "datacenter1"; + } + + @Override + public SecondBoundConfiguration watermarkWindow() + { + return new SecondBoundConfiguration(3, TimeUnit.DAYS); + } + + @Override + public int maxRecordSizeBytes() + { + return -1; + } + + @Override + public String compression() + { + return null; + } + + @Override + public MillisecondBoundConfiguration minDelayBetweenMicroBatches() + { + return new MillisecondBoundConfiguration(1000, TimeUnit.MILLISECONDS); + } + + @Override + public int maxCommitLogsPerInstance() + { + return 4; + } + + @Override + public int maxWatermarkerSize() + { + return 400000; + } + + @Override + public boolean persistEnabled() + { + return true; + } + + @Override + public boolean failOnRecordTooLargeError() + { + return false; + } + + @Override + public boolean failOnKafkaError() + { + return true; + } + + @Override + public MillisecondBoundConfiguration persistDelay() + { + return new MillisecondBoundConfiguration(1000, TimeUnit.MILLISECONDS); + } + + @Override + public boolean isConfigReady() + { + return true; + } +} diff --git a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcEventConsumer.java b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcEventConsumer.java new file mode 100644 index 000000000..2836cbaed --- /dev/null +++ b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcEventConsumer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.testing; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.google.inject.Singleton; +import org.apache.cassandra.cdc.api.EventConsumer; +import org.apache.cassandra.cdc.msg.CdcEvent; + +/** + * Test implementation of EventConsumer for CDC integration tests. + * Stores CDC events in a concurrent queue that can be accessed for test assertions. + */ +@Singleton +public class TestCdcEventConsumer implements EventConsumer +{ + private final Queue events = new ConcurrentLinkedQueue<>(); + + @Override + public void accept(CdcEvent event) + { + events.offer(event); + } + + /** + * @return all CDC events captured so far as a list + */ + public List getEvents() + { + return new ArrayList<>(events); + } + + /** + * Clear all captured events + */ + public void clear() + { + events.clear(); + } +} diff --git a/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java new file mode 100644 index 000000000..4388c3e61 --- /dev/null +++ b/integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TestCdcPublisher.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.testing; + +import com.google.inject.Provider; +import io.vertx.core.Vertx; +import org.apache.cassandra.cdc.api.EventConsumer; +import org.apache.cassandra.cdc.api.SchemaSupplier; +import org.apache.cassandra.cdc.msg.CdcEvent; +import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider; +import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider; +import org.apache.cassandra.cdc.sidecar.SidecarCdcClient; +import org.apache.cassandra.cdc.stats.ICdcStats; +import org.apache.cassandra.sidecar.cdc.CdcConfig; +import org.apache.cassandra.sidecar.cdc.CdcPublisher; +import org.apache.cassandra.sidecar.cdc.SidecarCdcStats; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.coordination.RangeManager; +import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; +import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor; +import org.apache.cassandra.sidecar.tasks.ScheduleDecision; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.kafka.common.serialization.Serializer; + +/** + * Test implementation of CdcPublisher that uses an in-memory event consumer + * instead of publishing to Kafka. This allows integration tests to capture + * and verify CDC events without requiring a Kafka infrastructure. + */ +public class TestCdcPublisher extends CdcPublisher +{ + private final TestCdcEventConsumer testEventConsumer = new TestCdcEventConsumer(); + private final CdcDatabaseAccessor databaseAccessor; + + public TestCdcPublisher(Vertx vertx, + SidecarConfiguration sidecarConfiguration, + ExecutorPools executorPools, + ClusterConfigProvider clusterConfigProvider, + SchemaSupplier schemaSupplier, + CdcSidecarInstancesProvider sidecarInstancesProvider, + SidecarCdcClient.ClientConfig clientConfig, + InstanceMetadataFetcher instanceMetadataFetcher, + CdcConfig conf, + CdcDatabaseAccessor databaseAccessor, + ICdcStats cdcStats, + VirtualTablesDatabaseAccessor virtualTables, + SidecarCdcStats sidecarCdcStats, + Serializer avroSerializer, + Provider rangeManagerProvider) + { + super(vertx, sidecarConfiguration, executorPools, clusterConfigProvider, + schemaSupplier, sidecarInstancesProvider, clientConfig, + instanceMetadataFetcher, conf, databaseAccessor, cdcStats, + virtualTables, sidecarCdcStats, avroSerializer, rangeManagerProvider); + this.databaseAccessor = databaseAccessor; + } + + @Override + public EventConsumer eventConsumer(CdcConfig conf, Serializer avroSerializer) + { + return testEventConsumer; + } + + /** + * Override scheduleDecision to execute in tests when database is ready, + * bypassing the initialization and cache warming checks required in production. + */ + @Override + public ScheduleDecision scheduleDecision() + { + // If already running, skip to avoid redundant execution attempts + if (isRunning()) + { + return ScheduleDecision.SKIP; + } + + // Only execute if the database accessor is available + // This prevents CassandraUnavailableException during test startup + if (databaseAccessor.isAvailable()) + { + return ScheduleDecision.EXECUTE; + } + + // Database not ready yet, skip this iteration and retry later + return ScheduleDecision.SKIP; + } + + /** + * @return the test CDC event consumer for test assertions + */ + public TestCdcEventConsumer getTestEventConsumer() + { + return testEventConsumer; + } +} diff --git a/integration-framework/src/main/java/org/apache/cassandra/testing/TlsTestUtils.java b/integration-framework/src/main/java/org/apache/cassandra/testing/TlsTestUtils.java index ae802bb1f..92d3800a6 100644 --- a/integration-framework/src/main/java/org/apache/cassandra/testing/TlsTestUtils.java +++ b/integration-framework/src/main/java/org/apache/cassandra/testing/TlsTestUtils.java @@ -87,7 +87,8 @@ public static void withAuthenticatedSession(IInstance instance, InetAddress address = nativeInetSocketAddress.getAddress(); com.datastax.driver.core.Cluster.Builder builder = com.datastax.driver.core.Cluster.builder() - .withLoadBalancingPolicy(new DCAwareRoundRobinPolicy.Builder().build()) + .withLoadBalancingPolicy( + new DCAwareRoundRobinPolicy.Builder().build()) .withSSL(sslOptions) .withoutJMXReporting() .withAuthProvider(new PlainTextAuthProvider(username, password)) diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java new file mode 100644 index 000000000..961a5224c --- /dev/null +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/cdc/CdcIntegrationTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.cdc.msg.CdcEvent; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.sidecar.testing.SharedClusterCdcSidecarIntegrationTestBase; +import org.apache.cassandra.sidecar.testing.TestCdcEventConsumer; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test for CDC functionality. + * Tests that mutations on CDC-enabled tables are captured and published to TestCdcEventConsumer. + */ +public class CdcIntegrationTest extends SharedClusterCdcSidecarIntegrationTestBase +{ + private static final QualifiedName CDC_TEST_TABLE = new QualifiedName("cdc_test_ks", "cdc_test_table"); + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(CDC_TEST_TABLE, DC1_RF1); + + String createTableStatement = "CREATE TABLE IF NOT EXISTS %s " + + "(id int PRIMARY KEY, value int) " + + "WITH cdc = true"; + createTestTable(CDC_TEST_TABLE, createTableStatement); + } + + @Override + protected void beforeTestStart() + { + waitForSchemaReady(30, TimeUnit.SECONDS); + } + + @Test + void testCdcEventsPublishedToInMemory() + { + // Write mutations into the test table + int mutationCount = 100; + Map expectedMutations = new HashMap<>(); + for (int i = 1; i <= mutationCount; i++) + { + String query = String.format("INSERT INTO %s (id, value) VALUES (%d, %d)", CDC_TEST_TABLE, i, i); + cluster.getFirstRunningInstance() + .coordinator() + .execute(query, ConsistencyLevel.ONE); + expectedMutations.put(i, i); + } + + TestCdcEventConsumer consumer = getTestEventConsumer(); + waitUntil(() -> consumer.getEvents().size() >= mutationCount, 120, 1000); + assertThat(consumer.getEvents().size()) + .as("All CDC events should be published to in-memory consumer") + .isGreaterThanOrEqualTo(mutationCount); + + // Verify all the mutations with expected values + List events = consumer.getEvents(); + for (CdcEvent cdcEvent : events) + { + assertThat(cdcEvent.keyspace).isEqualTo(CDC_TEST_TABLE.keyspace()); + assertThat(cdcEvent.table).isEqualTo(CDC_TEST_TABLE.table()); + assertThat(cdcEvent.getKind()).isEqualTo(CdcEvent.Kind.INSERT); + assertThat(cdcEvent.getValueColumns().get(0).columnName).isEqualTo("value"); + + int value = ByteBuffer.wrap(Objects.requireNonNull(cdcEvent.getValueColumns().get(0).getBytes())).getInt(); + expectedMutations.remove(value); + } + assertThat(expectedMutations).isEmpty(); + } +} diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java index 6adb51b00..aff265b1d 100644 --- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/health/SidecarPeerDownDetectorIntegrationTest.java @@ -38,20 +38,14 @@ import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.apache.cassandra.sidecar.common.server.utils.DriverUtils; import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; -import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; -import org.apache.cassandra.sidecar.config.KeyStoreConfiguration; import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.config.SidecarClientConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; -import org.apache.cassandra.sidecar.config.SslConfiguration; -import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.SchemaKeyspaceConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; -import org.apache.cassandra.sidecar.config.yaml.SidecarClientConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; import org.apache.cassandra.sidecar.config.yaml.SidecarPeerHealthConfigurationImpl; -import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl; import org.apache.cassandra.sidecar.coordination.CassandraClientTokenRingProvider; import org.apache.cassandra.sidecar.coordination.InnerDcTokenAdjacentPeerProvider; import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask; @@ -63,8 +57,6 @@ import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.testing.ClusterBuilderConfiguration; -import static org.apache.cassandra.sidecar.testing.MtlsTestHelper.CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS; -import static org.apache.cassandra.sidecar.testing.MtlsTestHelper.PASSWORD_STRING; import static org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase.IntegrationTestModule.cassandraInstanceHostname; import static org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase.IntegrationTestModule.defaultConfigurationBuilder; import static org.apache.cassandra.testing.TestUtils.DC1_RF3; @@ -230,38 +222,7 @@ public SidecarConfiguration sidecarConfiguration() .schemaKeyspaceConfiguration(SCHEMA_KEYSPACE_CONFIG) .build(); - // We need to provide mTLS configuration for the Sidecar client so it can talk to - // other sidecars using mTLS - SslConfiguration clientSslConfiguration = null; - if (mtlsTestHelper.isEnabled()) - { - LOGGER.info("Enabling test mTLS certificate/keystore."); - - KeyStoreConfiguration truststoreConfiguration = - new KeyStoreConfigurationImpl(mtlsTestHelper.trustStorePath(), - mtlsTestHelper.trustStorePassword(), - mtlsTestHelper.trustStoreType(), - SecondBoundConfiguration.parse("60s")); - - KeyStoreConfiguration keyStoreConfiguration = - new KeyStoreConfigurationImpl(mtlsTestHelper.clientKeyStorePath(), - PASSWORD_STRING, - mtlsTestHelper.serverKeyStoreType(), // server and client keystore types are the same - SecondBoundConfiguration.parse("60s")); - - clientSslConfiguration = SslConfigurationImpl.builder() - .enabled(true) - .keystore(keyStoreConfiguration) - .truststore(truststoreConfiguration) - .build(); - } - else - { - LOGGER.info("Not enabling mTLS for testing purposes. Set '{}' to 'true' if you would " + - "like mTLS enabled.", CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS); - } - - SidecarClientConfiguration sidecarClientConfiguration = new SidecarClientConfigurationImpl(clientSslConfiguration); + SidecarClientConfiguration sidecarClientConfiguration = mtlsTestHelper.createSidecarClientConfiguration(); // Let's run this very frequently for testing purposes SidecarPeerHealthConfigurationImpl sidecarPeerHealthConfiguration diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java new file mode 100644 index 000000000..6f4110001 --- /dev/null +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/testing/SharedClusterCdcSidecarIntegrationTestBase.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.testing; + +import java.util.Map; +import java.util.function.Function; + +import org.junit.jupiter.api.AfterEach; + +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import io.vertx.core.Vertx; +import org.apache.cassandra.cdc.api.SchemaSupplier; +import org.apache.cassandra.cdc.msg.CdcEvent; +import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider; +import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider; +import org.apache.cassandra.cdc.sidecar.SidecarCdcClient; +import org.apache.cassandra.cdc.stats.ICdcStats; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.sidecar.cdc.CdcConfig; +import org.apache.cassandra.sidecar.cdc.CdcPublisher; +import org.apache.cassandra.sidecar.cdc.SidecarCdcStats; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.config.SidecarClientConfiguration; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; +import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager; +import org.apache.cassandra.sidecar.coordination.TokenRingProvider; +import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; +import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.kafka.common.serialization.Serializer; + +import static org.assertj.core.api.Assumptions.assumeThat; + +/** + * Base class for CDC integration tests. Extends SharedClusterIntegrationTestBase with + * CDC-specific configuration and setup, including: + * - CDC-enabled Cassandra cluster configuration + * - TestCdcPublisher with TestCdcEventConsumer + * - Cassandra 4.1 version requirement + * - Helper methods to access CDC components + */ +public abstract class SharedClusterCdcSidecarIntegrationTestBase extends SharedClusterIntegrationTestBase +{ + @AfterEach + void cleanupCdcConsumerAfterEachTest() + { + TestCdcPublisher testCdcPublisher = (TestCdcPublisher) serverWrapper.injector.getInstance(CdcPublisher.class); + if (testCdcPublisher != null) + { + TestCdcEventConsumer consumer = testCdcPublisher.getTestEventConsumer(); + if (consumer != null) + { + consumer.clear(); + } + } + } + + @Override + protected void beforeClusterProvisioning() + { + // The current CDC implementation cannot read 5.x commitlogs, so verify Cassandra version is 4.x + SimpleCassandraVersion version = SimpleCassandraVersion.create(testVersion.version()); + assumeThat(version.major) + .as("Current CDC implementation cannot read 5.x commitlogs, requires Cassandra 4.x") + .isEqualTo(4); + } + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .dcCount(1) + .nodesPerDc(1) + .additionalInstanceConfig(Map.of("cdc_enabled", true)); + } + + @Override + protected Function configurationOverrides() + { + return builder -> { + // Override service configuration to use specific port for CDC tests + ServiceConfiguration existingConfig = builder.build().serviceConfiguration(); + ServiceConfiguration cdcServiceConfig = ServiceConfigurationImpl.builder() + .host(existingConfig.host()) + .port(9043) // TODO: Make this port dynamically allocated + .schemaKeyspaceConfiguration(existingConfig.schemaKeyspaceConfiguration()) + .build(); + builder.serviceConfiguration(cdcServiceConfig); + + // Configure sidecar client for mTLS if enabled + SidecarClientConfiguration clientConfig = mtlsTestHelper.createSidecarClientConfiguration(); + if (clientConfig != null) + { + builder.sidecarClientConfiguration(clientConfig); + } + return builder; + }; + } + + @Override + protected void startSidecar(ICluster cluster) throws InterruptedException + { + AbstractModule cdcModule = new CdcTestModule(); + serverWrapper = startSidecarWithInstances(cluster, cdcModule); + } + + /** + * @return the TestCdcPublisher instance for test access + */ + protected TestCdcPublisher getCdcPublisher() + { + return (TestCdcPublisher) serverWrapper.injector.getInstance(CdcPublisher.class); + } + + /** + * @return the TestCdcEventConsumer for test assertions + */ + protected TestCdcEventConsumer getTestEventConsumer() + { + TestCdcPublisher publisher = getCdcPublisher(); + return publisher != null ? publisher.getTestEventConsumer() : null; + } + + /** + * CDC-specific Guice module that provides test implementations for CDC components. + */ + private static class CdcTestModule extends AbstractModule + { + @Provides + @Singleton + CdcPublisher cdcPublisher(Vertx vertx, + SidecarConfiguration sidecarConfiguration, + ExecutorPools executorPools, + ClusterConfigProvider clusterConfigProvider, + SchemaSupplier schemaSupplier, + CdcSidecarInstancesProvider sidecarInstancesProvider, + SidecarCdcClient.ClientConfig clientConfig, + InstanceMetadataFetcher instanceMetadataFetcher, + CdcConfig conf, + CdcDatabaseAccessor databaseAccessor, + ICdcStats cdcStats, + VirtualTablesDatabaseAccessor virtualTables, + SidecarCdcStats sidecarCdcStats, + Serializer avroSerializer, + TokenRingProvider tokenRingProvider) + { + return new TestCdcPublisher(vertx, + sidecarConfiguration, + executorPools, + clusterConfigProvider, + schemaSupplier, + sidecarInstancesProvider, + clientConfig, + instanceMetadataFetcher, + conf, + databaseAccessor, + cdcStats, + virtualTables, + sidecarCdcStats, + avroSerializer, + () -> new ContentionFreeRangeManager(vertx, tokenRingProvider)); + } + + @Provides + @Singleton + public CdcConfig cdcConfig() + { + return new TestCdcConfig(); + } + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java index 3d369c5e0..b6fc14e4f 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java @@ -377,28 +377,21 @@ public SidecarCdcClient.ClientConfig clientConfig(SidecarConfiguration sidecarCo @Provides @Singleton - public TableSchema virtualTablesDatabaseAccessor(ServiceConfiguration configuration) - { - return new CdcStatesSchema(configuration); - } - - @ProvidesIntoMap - @KeyClassMapKey(PeriodicTaskMapKeys.CdcPublisherTaskKey.class) - PeriodicTask cdcPublisherTask(Vertx vertx, - SidecarConfiguration sidecarConfiguration, - ExecutorPools executorPools, - ClusterConfigProvider clusterConfigProvider, - SchemaSupplier schemaSupplier, - CdcSidecarInstancesProvider sidecarInstancesProvider, - SidecarCdcClient.ClientConfig clientConfig, - InstanceMetadataFetcher instanceMetadataFetcher, - CdcConfig conf, - CdcDatabaseAccessor databaseAccessor, - TokenRingProvider tokenRingProvider, - ICdcStats cdcStats, - VirtualTablesDatabaseAccessor virtualTables, - SidecarCdcStats sidecarCdcStats, - Serializer avroSerializer) + CdcPublisher cdcPublisher(Vertx vertx, + SidecarConfiguration sidecarConfiguration, + ExecutorPools executorPools, + ClusterConfigProvider clusterConfigProvider, + SchemaSupplier schemaSupplier, + CdcSidecarInstancesProvider sidecarInstancesProvider, + SidecarCdcClient.ClientConfig clientConfig, + InstanceMetadataFetcher instanceMetadataFetcher, + CdcConfig conf, + CdcDatabaseAccessor databaseAccessor, + TokenRingProvider tokenRingProvider, + ICdcStats cdcStats, + VirtualTablesDatabaseAccessor virtualTables, + SidecarCdcStats sidecarCdcStats, + Serializer avroSerializer) { return new CdcPublisher(vertx, sidecarConfiguration, @@ -417,6 +410,20 @@ PeriodicTask cdcPublisherTask(Vertx vertx, () -> new ContentionFreeRangeManager(vertx, tokenRingProvider)); } + @Provides + @Singleton + public TableSchema virtualTablesDatabaseAccessor(ServiceConfiguration configuration) + { + return new CdcStatesSchema(configuration); + } + + @ProvidesIntoMap + @KeyClassMapKey(PeriodicTaskMapKeys.CdcPublisherTaskKey.class) + PeriodicTask cdcPublisherTask(CdcPublisher cdcPublisher) + { + return cdcPublisher; + } + @Singleton @ProvidesIntoMap @KeyClassMapKey(PeriodicTaskMapKeys.CdcConfigRefresherNotifierKey.class)