Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.3.0
-----
* RangeManager should be singleton in CDCModule (CASSSIDECAR-411)
* CDC: Add end-to-end CDC integration tests (CASSSIDECAR-308)
* SchemaStorePublisherFactory should be Injectable in CachingSchemaStore (CASSSIDECAR-408)
* Fix StorageClientTest Docker API compatibility and improve CI test reporting (CASSSIDECAR-410)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
import org.apache.cassandra.sidecar.coordination.RangeManager;
import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
Expand Down Expand Up @@ -168,6 +169,7 @@ CdcPublisher cdcPublisher(Vertx vertx,
Serializer<CdcEvent> avroSerializer,
TokenRingProvider tokenRingProvider)
{
RangeManager rangeManager = new ContentionFreeRangeManager(vertx, tokenRingProvider);
return new TestCdcPublisher(vertx,
sidecarConfiguration,
executorPools,
Expand All @@ -182,7 +184,7 @@ CdcPublisher cdcPublisher(Vertx vertx,
virtualTables,
sidecarCdcStats,
avroSerializer,
() -> new ContentionFreeRangeManager(vertx, tokenRingProvider));
() -> rangeManager);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
import org.apache.cassandra.sidecar.common.request.data.AllServicesConfigPayload;
import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.SidecarClientConfiguration;
Expand All @@ -61,6 +62,7 @@
import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
import org.apache.cassandra.sidecar.coordination.DynamicSidecarInstancesProvider;
import org.apache.cassandra.sidecar.coordination.InnerDcTokenAdjacentPeerProvider;
import org.apache.cassandra.sidecar.coordination.RangeManager;
import org.apache.cassandra.sidecar.coordination.SidecarHttpHealthProvider;
import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthMonitorTask;
import org.apache.cassandra.sidecar.coordination.SidecarPeerHealthProvider;
Expand Down Expand Up @@ -360,9 +362,9 @@ public ICdcStats cdcStats()

@Provides
@Singleton
public TokenRingProvider tokenRingProvider(InstancesMetadata instancesMetadata, InstanceMetadataFetcher instanceMetadataFetcher, ServiceConfiguration configuration)
public TokenRingProvider tokenRingProvider(InstancesMetadata instancesMetadata, InstanceMetadataFetcher instanceMetadataFetcher, DnsResolver dnsResolver)
{
return new CassandraClientTokenRingProvider(instancesMetadata, instanceMetadataFetcher, configuration.dnsResolver());
return new CassandraClientTokenRingProvider(instancesMetadata, instanceMetadataFetcher, dnsResolver);
}

@Provides
Expand All @@ -375,6 +377,13 @@ public SidecarCdcClient.ClientConfig clientConfig(SidecarConfiguration sidecarCo
sidecarClientConfiguration.retryDelay().toIntMillis());
}

@Provides
@Singleton
RangeManager rangeManager(Vertx vertx, TokenRingProvider tokenRingProvider)
{
return new ContentionFreeRangeManager(vertx, tokenRingProvider);
}

@Provides
@Singleton
CdcPublisher cdcPublisher(Vertx vertx,
Expand All @@ -387,11 +396,11 @@ CdcPublisher cdcPublisher(Vertx vertx,
InstanceMetadataFetcher instanceMetadataFetcher,
CdcConfig conf,
CdcDatabaseAccessor databaseAccessor,
TokenRingProvider tokenRingProvider,
ICdcStats cdcStats,
VirtualTablesDatabaseAccessor virtualTables,
SidecarCdcStats sidecarCdcStats,
Serializer<CdcEvent> avroSerializer)
Serializer<CdcEvent> avroSerializer,
RangeManager rangeManager)
{
return new CdcPublisher(vertx,
sidecarConfiguration,
Expand All @@ -407,7 +416,7 @@ CdcPublisher cdcPublisher(Vertx vertx,
virtualTables,
sidecarCdcStats,
avroSerializer,
() -> new ContentionFreeRangeManager(vertx, tokenRingProvider));
() -> rangeManager);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.inject.name.Named;
import io.vertx.core.Vertx;
import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.cluster.CassandraClientTokenRingProviderTest;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
import org.apache.cassandra.sidecar.cluster.InstancesMetadataImpl;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
Expand Down Expand Up @@ -199,7 +200,7 @@ private InstanceMetadata mockInstance(TestCassandraAdapterDelegate delegate,
{
StorageOperations mockStorageOperations = mock(StorageOperations.class);
when(mockStorageOperations.dataFileLocations()).thenReturn(List.of(dataDir));
Metadata metadata = mock(Metadata.class);
Metadata metadata = CassandraClientTokenRingProviderTest.getMetadata();
KeyspaceMetadata keyspaceMetadata = mock(KeyspaceMetadata.class);
when(metadata.getKeyspace(any())).thenReturn(keyspaceMetadata);
TableMetadata tableMetadata = mock(TableMetadata.class);
Expand Down Expand Up @@ -245,4 +246,11 @@ public CassandraVersionProvider cassandraVersionProvider()
builder.add(new MockCassandraFactory());
return builder.build();
}

@Provides
@Singleton
public DnsResolver dnsResolver()
{
return CassandraClientTokenRingProviderTest.mockDnsResolver();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,46 +92,46 @@ public String reverseResolve(String s)
}
};
Set<Host> allHosts = Set.of(
mockHost("local1", "127.0.0.1", "-9223372036854775808", "DC1"),
mockHost("local2", "127.0.0.2", "-8301034833169298228", "DC1"),
mockHost("local3", "127.0.0.3", "-7378697629483820647", "DC1"),
mockHost("local4", "127.0.0.4", "-6456360425798343066", "DC1"),
mockHost("local5", "127.0.0.5", "-5534023222112865485", "DC1"),
mockHost("local6", "127.0.0.6", "-4611686018427387904", "DC1"),
mockHost("local7", "127.0.0.7", "-3689348814741910324", "DC1"),
mockHost("local8", "127.0.0.8", "-2767011611056432743", "DC1"),
mockHost("local9", "127.0.0.9", "-1844674407370955162", "DC1"),
mockHost("local10", "127.0.0.10", "-922337203685477581", "DC1"),
mockHost("local11", "127.0.0.11", "0", "DC1"),
mockHost("local12", "127.0.0.12", "922337203685477580", "DC1"),
mockHost("local13", "127.0.0.13", "1844674407370955161", "DC1"),
mockHost("local14", "127.0.0.14", "2767011611056432742", "DC1"),
mockHost("local15", "127.0.0.15", "3689348814741910323", "DC1"),
mockHost("local16", "127.0.0.16", "4611686018427387904", "DC1"),
mockHost("local17", "127.0.0.17", "5534023222112865484", "DC1"),
mockHost("local18", "127.0.0.18", "6456360425798343065", "DC1"),
mockHost("local19", "127.0.0.19", "7378697629483820646", "DC1"),
mockHost("local20", "127.0.0.20", "8301034833169298227", "DC1"),
mockHost("local21", "127.0.0.21", "-9223372036854775807", "DC2"),
mockHost("local22", "127.0.0.22", "-8301034833169298227", "DC2"),
mockHost("local23", "127.0.0.23", "-7378697629483820646", "DC2"),
mockHost("local24", "127.0.0.24", "-6456360425798343065", "DC2"),
mockHost("local25", "127.0.0.25", "-5534023222112865484", "DC2"),
mockHost("local26", "127.0.0.26", "-4611686018427387903", "DC2"),
mockHost("local27", "127.0.0.27", "-3689348814741910323", "DC2"),
mockHost("local28", "127.0.0.28", "-2767011611056432742", "DC2"),
mockHost("local29", "127.0.0.29", "-1844674407370955161", "DC2"),
mockHost("local30", "127.0.0.30", "-922337203685477580", "DC2"),
mockHost("local31", "127.0.0.31", "1", "DC2"),
mockHost("local32", "127.0.0.32", "922337203685477581", "DC2"),
mockHost("local33", "127.0.0.33", "1844674407370955162", "DC2"),
mockHost("local34", "127.0.0.34", "2767011611056432743", "DC2"),
mockHost("local35", "127.0.0.35", "3689348814741910324", "DC2"),
mockHost("local36", "127.0.0.36", "4611686018427387905", "DC2"),
mockHost("local37", "127.0.0.37", "5534023222112865485", "DC2"),
mockHost("local38", "127.0.0.38", "6456360425798343066", "DC2"),
mockHost("local39", "127.0.0.39", "7378697629483820647", "DC2"),
mockHost("local40", "127.0.0.40", "8301034833169298228", "DC2")
mockHost("localhost1", "127.0.0.1", "-9223372036854775808", "DC1"),
mockHost("localhost2", "127.0.0.2", "-8301034833169298228", "DC1"),
mockHost("localhost3", "127.0.0.3", "-7378697629483820647", "DC1"),
mockHost("localhost4", "127.0.0.4", "-6456360425798343066", "DC1"),
mockHost("localhost5", "127.0.0.5", "-5534023222112865485", "DC1"),
mockHost("localhost6", "127.0.0.6", "-4611686018427387904", "DC1"),
mockHost("localhost7", "127.0.0.7", "-3689348814741910324", "DC1"),
mockHost("localhost8", "127.0.0.8", "-2767011611056432743", "DC1"),
mockHost("localhost9", "127.0.0.9", "-1844674407370955162", "DC1"),
mockHost("localhost10", "127.0.0.10", "-922337203685477581", "DC1"),
mockHost("localhost11", "127.0.0.11", "0", "DC1"),
mockHost("localhost12", "127.0.0.12", "922337203685477580", "DC1"),
mockHost("localhost13", "127.0.0.13", "1844674407370955161", "DC1"),
mockHost("localhost14", "127.0.0.14", "2767011611056432742", "DC1"),
mockHost("localhost15", "127.0.0.15", "3689348814741910323", "DC1"),
mockHost("localhost16", "127.0.0.16", "4611686018427387904", "DC1"),
mockHost("localhost17", "127.0.0.17", "5534023222112865484", "DC1"),
mockHost("localhost18", "127.0.0.18", "6456360425798343065", "DC1"),
mockHost("localhost19", "127.0.0.19", "7378697629483820646", "DC1"),
mockHost("localhost20", "127.0.0.20", "8301034833169298227", "DC1"),
mockHost("localhost21", "127.0.0.21", "-9223372036854775807", "DC2"),
mockHost("localhost22", "127.0.0.22", "-8301034833169298227", "DC2"),
mockHost("localhost23", "127.0.0.23", "-7378697629483820646", "DC2"),
mockHost("localhost24", "127.0.0.24", "-6456360425798343065", "DC2"),
mockHost("localhost25", "127.0.0.25", "-5534023222112865484", "DC2"),
mockHost("localhost26", "127.0.0.26", "-4611686018427387903", "DC2"),
mockHost("localhost27", "127.0.0.27", "-3689348814741910323", "DC2"),
mockHost("localhost28", "127.0.0.28", "-2767011611056432742", "DC2"),
mockHost("localhost29", "127.0.0.29", "-1844674407370955161", "DC2"),
mockHost("localhost30", "127.0.0.30", "-922337203685477580", "DC2"),
mockHost("localhost31", "127.0.0.31", "1", "DC2"),
mockHost("localhost32", "127.0.0.32", "922337203685477581", "DC2"),
mockHost("localhost33", "127.0.0.33", "1844674407370955162", "DC2"),
mockHost("localhost34", "127.0.0.34", "2767011611056432743", "DC2"),
mockHost("localhost35", "127.0.0.35", "3689348814741910324", "DC2"),
mockHost("localhost36", "127.0.0.36", "4611686018427387905", "DC2"),
mockHost("localhost37", "127.0.0.37", "5534023222112865485", "DC2"),
mockHost("localhost38", "127.0.0.38", "6456360425798343066", "DC2"),
mockHost("localhost39", "127.0.0.39", "7378697629483820647", "DC2"),
mockHost("localhost40", "127.0.0.40", "8301034833169298228", "DC2")
);
when(metadata.getAllHosts()).thenReturn(allHosts);

Expand Down Expand Up @@ -208,7 +208,7 @@ public String reverseResolve(String s)
.compareTo(BigInteger.ONE) > 0));
}

private static Host mockHost(String node, String ip, String token, String dc)
public static Host mockHost(String node, String ip, String token, String dc)
{
Host host = mock(Host.class, RETURNS_DEEP_STUBS);
when(host.getTokens()).thenAnswer(invocation -> Set.of(new MockToken(token)));
Expand Down Expand Up @@ -282,12 +282,12 @@ private static TokenRange createMockTokenRange(Token start, Token end)
}
}

protected DnsResolver mockDnsResolver()
public static DnsResolver mockDnsResolver()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the increase in scope from private to public? I don't see any new consumers in the PR...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am reusing these helper methods in TestModule to fix failing tests.

{

Map<String, String> dnsMap = Map.of("local1", "127.0.0.1",
"local2", "127.0.0.2",
"local13", "127.0.0.3"
Map<String, String> dnsMap = Map.of("localhost", "127.0.0.1",
"localhost2", "127.0.0.2",
"localhost3", "127.0.0.3"
);
DnsResolver dnsResolver = mock(DnsResolver.class);
try
Expand All @@ -308,21 +308,21 @@ private InstancesMetadata mockInstancesMetadata()
{
InstancesMetadata instancesMetadata = mock(InstancesMetadata.class);

InstanceMetadata instance1 = getMockInstanceMetaData(101000101, "local1", getMetadata());
InstanceMetadata instance2 = getMockInstanceMetaData(101000201, "local2", getMetadata());
InstanceMetadata instance3 = getMockInstanceMetaData(101000301, "local3", getMetadata());
InstanceMetadata instance1 = getMockInstanceMetaData(101000101, "localhost", getMetadata());
InstanceMetadata instance2 = getMockInstanceMetaData(101000201, "localhost2", getMetadata());
InstanceMetadata instance3 = getMockInstanceMetaData(101000301, "localhost3", getMetadata());
when(instancesMetadata.instances()).thenReturn(List.of(instance1, instance2, instance3));
return instancesMetadata;
}

private Metadata getMetadata()
public static Metadata getMetadata()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above - purpose of scope promotion? If intentional we should either annotated @VisibleForTesting or javadoc it w/@link so subsequent refactors pull this in / break / make noise in an obvious way.

{
Metadata metadata = mock(Metadata.class);
when(metadata.getPartitioner()).thenReturn(Partitioners.MURMUR3.getClass().getSimpleName().toLowerCase());
Set<Host> allHosts = Set.of(
mockHost("local1", "127.0.0.1", "-9223372036854775808", "DC1"),
mockHost("local2", "127.0.0.2", "-8301034833169298228", "DC1"),
mockHost("local3", "127.0.0.3", "-7378697629483820647", "DC1")
mockHost("localhost", "127.0.0.1", "-9223372036854775808", "DC1"),
mockHost("localhost2", "127.0.0.2", "-8301034833169298228", "DC1"),
mockHost("localhost3", "127.0.0.3", "-7378697629483820647", "DC1")
);
when(metadata.getAllHosts()).thenReturn(allHosts);
return metadata;
Expand Down