Skip to content

Commit 5709a4b

Browse files
committed
CASSSIDECAR-409 Adding gossip based safety check to Live Migration data copy task endpoint
1 parent bab2da8 commit 5709a4b

19 files changed

Lines changed: 1113 additions & 68 deletions

File tree

client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,40 @@ public class LiveMigrationDataCopyRequest
4545
public final int maxConcurrency;
4646

4747
/**
48-
* Creates a new request with auto-generated ID.
48+
* Batch size for fetching gossip information from cluster instances.
49+
* During live migration safety checks, gossip info is fetched from instances in parallel batches.
50+
* This controls how many instances are contacted simultaneously per batch.
51+
* It is an optional parameter. Server will apply default if not specified.
52+
*/
53+
public final Integer gossipFetchBatchSize;
54+
55+
/**
56+
* Maximum number of batch retry attempts when fetching gossip information.
57+
* If all instances in a batch fail to return gossip info, the next batch is tried.
58+
* This limits the total number of batch attempts across all instances.
59+
* It is an optional parameter. Server will apply default if not specified.
60+
*/
61+
public final Integer gossipFetchMaxRetries;
62+
63+
/**
64+
* Flag to skip gossip-based safety checks before data copy.
65+
* When true, the safety validation that ensures destination is not present in cluster gossip
66+
* will be skipped. Use with caution - skipping this check may lead to data loss if the
67+
* destination node has been started.
68+
* Optional parameter - defaults to false (performs safety checks) if not specified.
69+
*/
70+
public final Boolean skipGossipCheck;
71+
72+
/**
73+
* Creates a new live migration data copy request.
4974
*/
5075
@JsonCreator
5176
public LiveMigrationDataCopyRequest(@JsonProperty("maxIterations") int maxIterations,
5277
@JsonProperty("successThreshold") double successThreshold,
53-
@JsonProperty("maxConcurrency") int maxConcurrency)
78+
@JsonProperty("maxConcurrency") int maxConcurrency,
79+
@JsonProperty("gossipFetchBatchSize") Integer gossipFetchBatchSize,
80+
@JsonProperty("gossipFetchMaxRetries") Integer gossipFetchMaxRetries,
81+
@JsonProperty("skipGossipCheck") Boolean skipGossipCheck)
5482
{
5583

5684
if (maxIterations <= 0)
@@ -71,8 +99,24 @@ public LiveMigrationDataCopyRequest(@JsonProperty("maxIterations") int maxIterat
7199
+ ". It cannot be less than or equal to zero.");
72100
}
73101

102+
// Validate optional gossip fetch parameters if specified
103+
if (gossipFetchBatchSize != null && gossipFetchBatchSize <= 0)
104+
{
105+
throw new IllegalArgumentException("Invalid gossipFetchBatchSize " + gossipFetchBatchSize
106+
+ ". It must be greater than zero when specified.");
107+
}
108+
109+
if (gossipFetchMaxRetries != null && gossipFetchMaxRetries <= 0)
110+
{
111+
throw new IllegalArgumentException("Invalid gossipFetchMaxRetries " + gossipFetchMaxRetries
112+
+ ". It must be greater than zero when specified.");
113+
}
114+
74115
this.maxIterations = maxIterations;
75116
this.successThreshold = successThreshold;
76117
this.maxConcurrency = maxConcurrency;
118+
this.gossipFetchBatchSize = gossipFetchBatchSize;
119+
this.gossipFetchMaxRetries = gossipFetchMaxRetries;
120+
this.skipGossipCheck = skipGossipCheck;
77121
}
78122
}

client-common/src/test/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequestTest.java

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class LiveMigrationDataCopyRequestTest
3232
@Test
3333
void testSerializationDeserializationRoundTrip() throws Exception
3434
{
35-
LiveMigrationDataCopyRequest original = new LiveMigrationDataCopyRequest(5, 0.95, 10);
35+
LiveMigrationDataCopyRequest original = new LiveMigrationDataCopyRequest(5, 0.95, 10, null, null, null);
3636

3737
String json = objectMapper.writeValueAsString(original);
3838
LiveMigrationDataCopyRequest deserialized = objectMapper.readValue(json, LiveMigrationDataCopyRequest.class);
@@ -42,58 +42,74 @@ void testSerializationDeserializationRoundTrip() throws Exception
4242
assertThat(deserialized.maxConcurrency).isEqualTo(original.maxConcurrency);
4343
}
4444

45+
@Test
46+
void testSerializationDeserializationRoundTripWithOptionalValues() throws Exception
47+
{
48+
LiveMigrationDataCopyRequest original = new LiveMigrationDataCopyRequest(5, 0.95, 10, 5, 5, true);
49+
50+
String json = objectMapper.writeValueAsString(original);
51+
LiveMigrationDataCopyRequest deserialized = objectMapper.readValue(json, LiveMigrationDataCopyRequest.class);
52+
53+
assertThat(deserialized.maxIterations).isEqualTo(original.maxIterations);
54+
assertThat(deserialized.successThreshold).isEqualTo(original.successThreshold);
55+
assertThat(deserialized.maxConcurrency).isEqualTo(original.maxConcurrency);
56+
assertThat(deserialized.gossipFetchBatchSize).isEqualTo(original.gossipFetchBatchSize);
57+
assertThat(deserialized.gossipFetchMaxRetries).isEqualTo(original.gossipFetchMaxRetries);
58+
assertThat(deserialized.skipGossipCheck).isEqualTo(original.skipGossipCheck);
59+
}
60+
4561
@Test
4662
void testConstructorWithInvalidMaxIterationsThrowsException()
4763
{
48-
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(0, 0.95, 10))
64+
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(0, 0.95, 10, null, null, null))
4965
.isInstanceOf(IllegalArgumentException.class)
5066
.hasMessage("Invalid maxIterations 0. It cannot be less than or equal to zero.");
5167
}
5268

5369
@Test
5470
void testConstructorWithNegativeMaxIterationsThrowsException()
5571
{
56-
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(-5, 0.95, 10))
72+
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(-5, 0.95, 10, null, null, null))
5773
.isInstanceOf(IllegalArgumentException.class)
5874
.hasMessage("Invalid maxIterations -5. It cannot be less than or equal to zero.");
5975
}
6076

6177
@Test
6278
void testConstructorWithInvalidSuccessThresholdBelowZeroThrowsException()
6379
{
64-
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, -0.1, 10))
80+
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, -0.1, 10, null, null, null))
6581
.isInstanceOf(IllegalArgumentException.class)
6682
.hasMessage("Invalid successThreshold -0.1. It cannot be less than zero or greater than one.");
6783
}
6884

6985
@Test
7086
void testConstructorWithInvalidSuccessThresholdAboveOneThrowsException()
7187
{
72-
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 1.5, 10))
88+
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 1.5, 10, null, null, null))
7389
.isInstanceOf(IllegalArgumentException.class)
7490
.hasMessage("Invalid successThreshold 1.5. It cannot be less than zero or greater than one.");
7591
}
7692

7793
@Test
7894
void testConstructorWithInvalidMaxConcurrencyThrowsException()
7995
{
80-
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, 0))
96+
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, 0, null, null, null))
8197
.isInstanceOf(IllegalArgumentException.class)
8298
.hasMessage("Invalid maxConcurrency 0. It cannot be less than or equal to zero.");
8399
}
84100

85101
@Test
86102
void testConstructorWithNegativeMaxConcurrencyThrowsException()
87103
{
88-
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, -3))
104+
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, -3, null, null, null))
89105
.isInstanceOf(IllegalArgumentException.class)
90106
.hasMessage("Invalid maxConcurrency -3. It cannot be less than or equal to zero.");
91107
}
92108

93109
@Test
94110
void testValidBoundaryValues()
95111
{
96-
LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 0.0, 1);
112+
LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 0.0, 1, null, null, null);
97113

98114
assertThat(request.maxIterations).isEqualTo(1);
99115
assertThat(request.successThreshold).isEqualTo(0.0);
@@ -105,10 +121,29 @@ void testValidBoundaryValuesUpperBound()
105121
{
106122
LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(Integer.MAX_VALUE,
107123
1.0,
108-
Integer.MAX_VALUE);
124+
Integer.MAX_VALUE,
125+
null,
126+
null,
127+
null);
109128

110129
assertThat(request.maxIterations).isEqualTo(Integer.MAX_VALUE);
111130
assertThat(request.successThreshold).isEqualTo(1.0);
112131
assertThat(request.maxConcurrency).isEqualTo(Integer.MAX_VALUE);
113132
}
133+
134+
@Test
135+
void testConstructorWithInvalidGossipFetchBatchSizeThrowsException()
136+
{
137+
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, 10, 0, null, null))
138+
.isInstanceOf(IllegalArgumentException.class)
139+
.hasMessage("Invalid gossipFetchBatchSize 0. It must be greater than zero when specified.");
140+
}
141+
142+
@Test
143+
void testConstructorWithInvalidGossipFetchMaxRetriesThrowsException()
144+
{
145+
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, 10, null, -1, null))
146+
.isInstanceOf(IllegalArgumentException.class)
147+
.hasMessage("Invalid gossipFetchMaxRetries -1. It must be greater than zero when specified.");
148+
}
114149
}

client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,19 @@ public CompletableFuture<GossipInfoResponse> gossipInfo()
257257
return executor.executeRequestAsync(requestBuilder().gossipInfoRequest().build());
258258
}
259259

260+
/**
261+
* Executes the gossip info request using the default retry policy and provided {@code instance}
262+
*
263+
* @param instance the instance where the request will be executed
264+
* @return a completable future of the gossip info
265+
*/
266+
public CompletableFuture<GossipInfoResponse> gossipInfo(SidecarInstance instance)
267+
{
268+
return executor.executeRequestAsync(requestBuilder()
269+
.singleInstanceSelectionPolicy(instance)
270+
.gossipInfoRequest()
271+
.build());
272+
}
260273

261274
/**
262275
* Executes the GET gossip health request using the default retry policy and configured selection policy

conf/sidecar.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,8 @@ live_migration:
477477
migration_map: # Map of source and destination Cassandra instances
478478
# localhost1: localhost4 # This entry says that localhost1 will be migrated to localhost4
479479
max_concurrent_downloads: 20 # Maximum number of concurrent downloads allowed
480+
gossip_fetch_batch_size: 3 # Batch size for fetching gossip information from cluster instances during safety checks
481+
gossip_fetch_max_retries: 3 # Maximum number of batch retry attempts when fetching gossip information
480482

481483
# Configuration to allow sidecar start and stop Cassandra instances via the lifecycle API (disabled by default)
482484
lifecycle:

server/src/main/java/org/apache/cassandra/sidecar/config/LiveMigrationConfiguration.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,20 @@ public interface LiveMigrationConfiguration
5252
* Maximum number of concurrent downloads allowed.
5353
*/
5454
int maxConcurrentDownloads();
55+
56+
/**
57+
* Batch size for fetching gossip information from cluster instances during safety checks.
58+
* Controls how many instances are contacted simultaneously per batch.
59+
*
60+
* @return the batch size for gossip fetching
61+
*/
62+
int gossipFetchBatchSize();
63+
64+
/**
65+
* Maximum number of batch retry attempts when fetching gossip information.
66+
* If all instances in a batch fail to return gossip info, the next batch is tried.
67+
*
68+
* @return the maximum number of retry attempts
69+
*/
70+
int gossipFetchMaxRetries();
5571
}

server/src/main/java/org/apache/cassandra/sidecar/config/yaml/LiveMigrationConfigurationImpl.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,29 @@ public class LiveMigrationConfigurationImpl implements LiveMigrationConfiguratio
3333
{
3434

3535
public static final int DEFAULT_MAX_CONCURRENT_DOWNLOADS = 20;
36+
public static final int DEFAULT_GOSSIP_FETCH_BATCH_SIZE = 3;
37+
public static final int DEFAULT_GOSSIP_FETCH_MAX_RETRIES = 3;
3638

3739
private final Set<String> filesToExclude;
3840
private final Set<String> directoriesToExclude;
3941
private final Map<String, String> migrationMap;
4042
private final int maxConcurrentDownloads;
43+
private final int gossipFetchBatchSize;
44+
private final int gossipFetchMaxRetries;
4145

4246
public LiveMigrationConfigurationImpl()
4347
{
44-
this(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), DEFAULT_MAX_CONCURRENT_DOWNLOADS);
48+
this(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(),
49+
DEFAULT_MAX_CONCURRENT_DOWNLOADS, DEFAULT_GOSSIP_FETCH_BATCH_SIZE, DEFAULT_GOSSIP_FETCH_MAX_RETRIES);
4550
}
4651

4752
@JsonCreator
4853
public LiveMigrationConfigurationImpl(@JsonProperty("files_to_exclude") Set<String> filesToExclude,
4954
@JsonProperty("dirs_to_exclude") Set<String> directoriesToExclude,
5055
@JsonProperty("migration_map") Map<String, String> migrationMap,
51-
@JsonProperty("max_concurrent_downloads") int maxConcurrentDownloads)
56+
@JsonProperty("max_concurrent_downloads") int maxConcurrentDownloads,
57+
@JsonProperty("gossip_fetch_batch_size") Integer gossipFetchBatchSize,
58+
@JsonProperty("gossip_fetch_max_retries") Integer gossipFetchMaxRetries)
5259
{
5360
this.filesToExclude = filesToExclude;
5461
this.directoriesToExclude = directoriesToExclude;
@@ -60,6 +67,24 @@ public LiveMigrationConfigurationImpl(@JsonProperty("files_to_exclude") Set<Stri
6067
". It must be >= 1");
6168
}
6269
this.maxConcurrentDownloads = maxConcurrentDownloads;
70+
71+
if (gossipFetchBatchSize != null && gossipFetchBatchSize < 1)
72+
{
73+
throw new IllegalArgumentException("Invalid gossip_fetch_batch_size " + gossipFetchBatchSize +
74+
". It must be >= 1");
75+
}
76+
this.gossipFetchBatchSize = gossipFetchBatchSize == null
77+
? DEFAULT_GOSSIP_FETCH_BATCH_SIZE
78+
: gossipFetchBatchSize;
79+
80+
if (gossipFetchMaxRetries != null && gossipFetchMaxRetries < 1)
81+
{
82+
throw new IllegalArgumentException("Invalid gossip_fetch_max_retries " + gossipFetchMaxRetries +
83+
". It must be >= 1");
84+
}
85+
this.gossipFetchMaxRetries = gossipFetchMaxRetries == null
86+
? DEFAULT_GOSSIP_FETCH_MAX_RETRIES
87+
: gossipFetchMaxRetries;
6388
}
6489

6590
@Override
@@ -89,4 +114,18 @@ public int maxConcurrentDownloads()
89114
{
90115
return maxConcurrentDownloads;
91116
}
117+
118+
@Override
119+
@JsonProperty("gossip_fetch_batch_size")
120+
public int gossipFetchBatchSize()
121+
{
122+
return gossipFetchBatchSize;
123+
}
124+
125+
@Override
126+
@JsonProperty("gossip_fetch_max_retries")
127+
public int gossipFetchMaxRetries()
128+
{
129+
return gossipFetchMaxRetries;
130+
}
92131
}

server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ protected void handleInternal(RoutingContext context,
103103
.onFailure(throwable -> {
104104
if (throwable instanceof LiveMigrationInvalidRequestException)
105105
{
106-
LOGGER.error("Input payload is not valid.", throwable);
106+
LOGGER.error("Invalid live migration request.", throwable);
107107
context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, throwable.getMessage(), throwable));
108108
}
109109
else if (throwable instanceof LiveMigrationDataCopyInProgressException)

0 commit comments

Comments
 (0)