diff --git a/CASSSIDECAR-343.patch b/CASSSIDECAR-343.patch new file mode 100644 index 000000000..1919e2bbf --- /dev/null +++ b/CASSSIDECAR-343.patch @@ -0,0 +1,1127 @@ +diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java +index 7bfdfdd..0c6affb 100644 +--- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java ++++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java +@@ -323,4 +323,14 @@ public class CassandraStorageOperations implements StorageOperations + return jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) + .getCompactionThroughputMbPerSec(); + } ++ ++ /** ++ * {@inheritDoc} ++ */ ++ @Override ++ public void flush(@NotNull String keyspace, @NotNull String... tableNames) throws IOException ++ { ++ jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) ++ .forceKeyspaceFlush(keyspace, tableNames); ++ } + } +diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java +index a40255e..36a8657 100644 +--- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java ++++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java +@@ -219,4 +219,10 @@ public class GossipDependentStorageJmxOperations implements StorageJmxOperations + { + return delegate.getCompactionThroughputMbPerSec(); + } ++ ++ @Override ++ public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException ++ { ++ delegate.forceKeyspaceFlush(keyspaceName, tableNames); ++ } + } +diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java +index c76ccce..fc7fa3d 100644 +--- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java ++++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java +@@ -228,4 +228,13 @@ public interface StorageJmxOperations + * @return the current compaction throughput in megabytes per second, or 0 if throughput cannot be determined + */ + int getCompactionThroughputMbPerSec(); ++ ++ /** ++ * Triggers the node flush operation to flush memtables for the specified keyspace and tables. ++ * ++ * @param keyspaceName the keyspace name ++ * @param tableNames the array of table names to flush; if empty, all tables in the keyspace will be flushed ++ * @throws IOException if an I/O error occurs during the flush operation ++ */ ++ void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException; + } +diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +index d67e4cd..b641df6 100644 +--- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java ++++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +@@ -140,6 +140,7 @@ public final class ApiEndpointsV1 + public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + CASSANDRA + OPERATIONAL_JOBS; + public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + PER_OPERATIONAL_JOB; + public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA + "/operations/decommission"; ++ public static final String NODE_FLUSH_ROUTE = API_V1 + CASSANDRA + PER_KEYSPACE + "/flush"; + public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/streams"; + public static final String TABLE_STATS_ROUTE = API_V1 + CASSANDRA + PER_KEYSPACE + PER_TABLE + "/stats"; + +diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeFlushRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeFlushRequest.java +index f6d629d..7a4edfc 100644 +--- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeFlushRequest.java ++++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeFlushRequest.java +@@ -20,8 +20,9 @@ package org.apache.cassandra.sidecar.common.request; + + import java.util.List; + +-import com.fasterxml.jackson.annotation.JsonProperty; ++import io.netty.handler.codec.http.HttpMethod; + import org.apache.cassandra.sidecar.common.ApiEndpointsV1; ++import org.apache.cassandra.sidecar.common.request.data.NodeFlushRequestPayload; + import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; + + /** +@@ -29,8 +30,7 @@ import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; + */ + public class NodeFlushRequest extends JsonRequest + { +- @JsonProperty("tableNames") +- private final List tableNames; ++ private final NodeFlushRequestPayload payload; + + /** + * Constructs a NodeFlushRequest for the given keyspace and table names +@@ -41,7 +41,7 @@ public class NodeFlushRequest extends JsonRequest + public NodeFlushRequest(String keyspace, List tableNames) + { + super(ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, keyspace)); +- this.tableNames = tableNames; ++ this.payload = new NodeFlushRequestPayload(tableNames); + } + + /** +@@ -49,6 +49,18 @@ public class NodeFlushRequest extends JsonRequest + */ + public List tableNames() + { +- return tableNames; ++ return payload.tableNames(); ++ } ++ ++ @Override ++ public HttpMethod method() ++ { ++ return HttpMethod.POST; ++ } ++ ++ @Override ++ public Object requestBody() ++ { ++ return payload; + } + } +diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeFlushRequestPayload.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeFlushRequestPayload.java +index 8803f56..b2802dd 100644 +--- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeFlushRequestPayload.java ++++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeFlushRequestPayload.java +@@ -16,20 +16,24 @@ + * limitations under the License. + */ + +-package org.apache.cassandra.sidecar.handlers.data; ++package org.apache.cassandra.sidecar.common.request.data; + + import java.util.Collections; + import java.util.List; + ++import com.fasterxml.jackson.annotation.JsonCreator; ++import com.fasterxml.jackson.annotation.JsonIgnoreProperties; ++import com.fasterxml.jackson.annotation.JsonInclude; + import com.fasterxml.jackson.annotation.JsonProperty; + import org.jetbrains.annotations.NotNull; + + /** + * Represents the request payload for node flush operation + */ ++@JsonIgnoreProperties(ignoreUnknown = true) ++@JsonInclude(JsonInclude.Include.NON_NULL) + public class NodeFlushRequestPayload + { +- @JsonProperty("tableNames") + private final List tableNames; + + /** +@@ -37,6 +41,7 @@ public class NodeFlushRequestPayload + * + * @param tableNames the list of table names to flush; null or empty list means flush all tables + */ ++ @JsonCreator + public NodeFlushRequestPayload(@JsonProperty("tableNames") List tableNames) + { + this.tableNames = tableNames != null ? tableNames : Collections.emptyList(); +@@ -46,8 +51,18 @@ public class NodeFlushRequestPayload + * @return the list of table names to flush + */ + @NotNull ++ @JsonProperty("tableNames") + public List tableNames() + { + return tableNames; + } ++ ++ /** ++ * {@inheritDoc} ++ */ ++ @Override ++ public String toString() ++ { ++ return "NodeFlushRequestPayload{tableNames=" + tableNames + "}"; ++ } + } +diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +index 2e630c3..caa64c9 100644 +--- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java ++++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +@@ -45,6 +45,7 @@ import org.apache.cassandra.sidecar.common.request.ListOperationalJobsRequest; + import org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest; + import org.apache.cassandra.sidecar.common.request.NativeUpdateRequest; + import org.apache.cassandra.sidecar.common.request.NodeDecommissionRequest; ++import org.apache.cassandra.sidecar.common.request.NodeFlushRequest; + import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest; + import org.apache.cassandra.sidecar.common.request.OperationalJobRequest; + import org.apache.cassandra.sidecar.common.request.ReportSchemaRequest; +diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +index 373bd33..2ee60a0 100644 +--- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java ++++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +@@ -19,6 +19,7 @@ + + package org.apache.cassandra.sidecar.client; + ++import java.util.Collections; + import java.util.List; + import java.util.Map; + import java.util.Objects; +@@ -45,6 +46,7 @@ import org.apache.cassandra.sidecar.common.request.DeleteServiceConfigRequest; + import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest; + import org.apache.cassandra.sidecar.common.request.ListCdcSegmentsRequest; + import org.apache.cassandra.sidecar.common.request.LiveMigrationListInstanceFilesRequest; ++import org.apache.cassandra.sidecar.common.request.NodeFlushRequest; + import org.apache.cassandra.sidecar.common.request.RestoreJobProgressRequest; + import org.apache.cassandra.sidecar.common.request.RestoreJobSummaryRequest; + import org.apache.cassandra.sidecar.common.request.Service; +@@ -823,6 +825,36 @@ public class SidecarClient implements AutoCloseable, SidecarClientBlobRestoreExt + .build()); + } + ++ /** ++ * Executes the node flush request using the default retry policy and provided {@code instance}. ++ * Flushes memtables for the specified keyspace and optionally for specific tables. ++ * ++ * @param instance the instance where the request will be executed ++ * @param keyspace the keyspace name to flush ++ * @param tableNames the list of table names to flush (can be empty to flush all tables) ++ * @return a completable future of the operational job response ++ */ ++ public CompletableFuture nodeFlush(SidecarInstance instance, String keyspace, List tableNames) ++ { ++ return executor.executeRequestAsync(requestBuilder() ++ .singleInstanceSelectionPolicy(instance) ++ .request(new NodeFlushRequest(keyspace, tableNames)) ++ .build()); ++ } ++ ++ /** ++ * Executes the node flush request using the default retry policy and provided {@code instance}. ++ * Flushes all memtables for the specified keyspace. ++ * ++ * @param instance the instance where the request will be executed ++ * @param keyspace the keyspace name to flush ++ * @return a completable future of the operational job response ++ */ ++ public CompletableFuture nodeFlush(SidecarInstance instance, String keyspace) ++ { ++ return nodeFlush(instance, keyspace, Collections.emptyList()); ++ } ++ + /** + * Sends a request to start or stop Cassandra gossiping on the provided instance. + *

+diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/NodeFlushIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/NodeFlushIntegrationTest.java +index 1aec7c9..35eb429 100644 +--- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/NodeFlushIntegrationTest.java ++++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/NodeFlushIntegrationTest.java +@@ -18,22 +18,26 @@ + + package org.apache.cassandra.sidecar.routes; + +-import java.util.Arrays; +-import java.util.concurrent.TimeUnit; ++import java.util.Collections; ++import java.util.List; ++import java.util.Map; + + import org.junit.jupiter.api.Test; + +-import com.google.common.collect.ImmutableMap; + import io.vertx.core.buffer.Buffer; + import io.vertx.core.json.JsonObject; + import io.vertx.ext.web.client.HttpResponse; + import org.apache.cassandra.sidecar.common.ApiEndpointsV1; + import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; ++import org.apache.cassandra.sidecar.testing.QualifiedName; + import org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase; + + import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; + import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; ++import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; + import static io.netty.handler.codec.http.HttpResponseStatus.OK; ++import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED; ++import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; + import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking; + import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert; + import static org.assertj.core.api.Assertions.assertThat; +@@ -44,183 +48,246 @@ import static org.assertj.core.api.Assertions.assertThat; + public class NodeFlushIntegrationTest extends SharedClusterSidecarIntegrationTestBase + { + private static final String TEST_KEYSPACE = "testkeyspace"; +- private static final String TEST_TABLE = "testtable"; ++ public static final String TESTTABLE_1 = "testtable1"; ++ public static final String TESTTABLE_2 = "testtable2"; ++ public static final String OPERATION_FLUSH = "flush"; ++ public static final String LOCALHOST = "localhost"; ++ public static final String TABLE_NAMES = "tableNames"; + + @Override + protected void initializeSchemaForTest() + { +- createTestKeyspace(ImmutableMap.of("testkeyspace", +- "CREATE KEYSPACE IF NOT EXISTS testkeyspace " + +- "WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }")); +- createTestTable(ImmutableMap.of("testtable", +- "CREATE TABLE IF NOT EXISTS testkeyspace.testtable " + +- "(id int PRIMARY KEY, name text)")); ++ createTestKeyspace(TEST_KEYSPACE, Map.of("replication_factor", 1)); ++ ++ createTestTable(new QualifiedName(TEST_KEYSPACE, TESTTABLE_1), ++ "CREATE TABLE %s ( \n" + ++ " id int PRIMARY KEY, \n" + ++ " data text \n" + ++ ");"); ++ ++ createTestTable(new QualifiedName(TEST_KEYSPACE, TESTTABLE_2), ++ "CREATE TABLE %s ( \n" + ++ " id int PRIMARY KEY, \n" + ++ " data text \n" + ++ ");"); + } + +- @Override +- protected void beforeTestStart() ++ /** ++ * Tests flushing multiple tables within a keyspace. ++ * Verifies the API accepts the request and the flush operation completes successfully. ++ */ ++ @Test ++ void testFlushMultipleTables() + { +- // wait for the schema initialization +- waitForSchemaReady(30, TimeUnit.SECONDS); ++ JsonObject requestBody = new JsonObject(); ++ requestBody.put(TABLE_NAMES, List.of(TESTTABLE_1, TESTTABLE_2)); ++ ++ String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, TEST_KEYSPACE); ++ HttpResponse flushResponse = getBlocking( ++ trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) ++ .sendJsonObject(requestBody)); ++ ++ assertThat(flushResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); ++ ++ JsonObject responseBody = flushResponse.bodyAsJsonObject(); ++ assertThat(responseBody).isNotNull(); ++ String jobId = responseBody.getString("jobId"); ++ assertThat(jobId).isNotNull(); ++ assertThat(responseBody.getString("operation")).isEqualTo(OPERATION_FLUSH); ++ ++ // loopAssert that the job status is eventually SUCCEEDED ++ validateOperationalJobStatusEventually(jobId, OPERATION_FLUSH, SUCCEEDED); + } + ++ /** ++ * Tests flushing a single table within a keyspace. ++ * Verifies the API accepts the request and the flush operation completes successfully. ++ */ + @Test +- void testFlushKeyspaceWithSpecificTables() ++ void testFlushSingleTable() + { + JsonObject requestBody = new JsonObject(); +- requestBody.put("tableNames", Arrays.asList(TEST_TABLE)); ++ requestBody.put(TABLE_NAMES, List.of(TESTTABLE_1)); + + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, TEST_KEYSPACE); + HttpResponse flushResponse = getBlocking( +- trustedClient().post(serverWrapper.serverPort, "localhost", flushRoute) ++ trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) + .sendJsonObject(requestBody)); + + assertThat(flushResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); + + JsonObject responseBody = flushResponse.bodyAsJsonObject(); + assertThat(responseBody).isNotNull(); +- assertThat(responseBody.getString("jobId")).isNotNull(); +- assertThat(responseBody.getString("operation")).isEqualTo("flush"); +- assertThat(responseBody.getString("jobStatus")).isIn( +- OperationalJobStatus.CREATED.name(), +- OperationalJobStatus.RUNNING.name(), +- OperationalJobStatus.SUCCEEDED.name() +- ); +- +- // Validate the operational job status using the OperationalJobHandler + String jobId = responseBody.getString("jobId"); +- validateOperationalJobStatus(jobId, "flush"); ++ assertThat(jobId).isNotNull(); ++ assertThat(responseBody.getString("operation")).isEqualTo(OPERATION_FLUSH); ++ ++ // loopAssert that the job status is eventually SUCCEEDED ++ validateOperationalJobStatusEventually(jobId, OPERATION_FLUSH, SUCCEEDED); + } + ++ /** ++ * Tests flushing an entire keyspace by providing an empty table list. ++ * Verifies the API accepts the request and flushes all tables in the keyspace successfully. ++ */ + @Test +- void testFlushKeyspaceWithEmptyTableList() ++ void testFlushEmptyTableList() + { + JsonObject requestBody = new JsonObject(); +- requestBody.put("tableNames", Arrays.asList()); ++ requestBody.put(TABLE_NAMES, Collections.emptyList()); + + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, TEST_KEYSPACE); + HttpResponse flushResponse = getBlocking( +- trustedClient().post(serverWrapper.serverPort, "localhost", flushRoute) ++ trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) + .sendJsonObject(requestBody)); + + assertThat(flushResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); + + JsonObject responseBody = flushResponse.bodyAsJsonObject(); + assertThat(responseBody).isNotNull(); +- assertThat(responseBody.getString("jobId")).isNotNull(); +- assertThat(responseBody.getString("operation")).isEqualTo("flush"); +- assertThat(responseBody.getString("jobStatus")).isIn( +- OperationalJobStatus.CREATED.name(), +- OperationalJobStatus.RUNNING.name(), +- OperationalJobStatus.SUCCEEDED.name() +- ); ++ String jobId = responseBody.getString("jobId"); ++ assertThat(jobId).isNotNull(); ++ assertThat(responseBody.getString("operation")).isEqualTo(OPERATION_FLUSH); ++ ++ // loopAssert that the job status is eventually SUCCEEDED ++ validateOperationalJobStatusEventually(jobId, OPERATION_FLUSH, SUCCEEDED); + } + ++ /** ++ * Tests flushing an entire keyspace by providing no payload. ++ * Verifies the API accepts the request and flushes all tables in the keyspace successfully. ++ */ + @Test +- void testFlushKeyspaceWithoutPayload() ++ void testFlushNoPayload() + { + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, TEST_KEYSPACE); + HttpResponse flushResponse = getBlocking( +- trustedClient().post(serverWrapper.serverPort, "localhost", flushRoute) ++ trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) + .send()); + + assertThat(flushResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); + + JsonObject responseBody = flushResponse.bodyAsJsonObject(); + assertThat(responseBody).isNotNull(); +- assertThat(responseBody.getString("jobId")).isNotNull(); +- assertThat(responseBody.getString("operation")).isEqualTo("flush"); +- assertThat(responseBody.getString("jobStatus")).isIn( +- OperationalJobStatus.CREATED.name(), +- OperationalJobStatus.RUNNING.name(), +- OperationalJobStatus.SUCCEEDED.name() +- ); ++ String jobId = responseBody.getString("jobId"); ++ assertThat(jobId).isNotNull(); ++ assertThat(responseBody.getString("operation")).isEqualTo(OPERATION_FLUSH); ++ ++ // loopAssert that the job status is eventually SUCCEEDED ++ validateOperationalJobStatusEventually(jobId, OPERATION_FLUSH, SUCCEEDED); + } + ++ /** ++ * Tests flushing a non-existent keyspace. ++ * Verifies the API accepts the request but the flush operation ultimately fails. ++ */ + @Test + void testFlushNonExistentKeyspace() + { + JsonObject requestBody = new JsonObject(); +- requestBody.put("tableNames", Arrays.asList("table1")); ++ requestBody.put(TABLE_NAMES, List.of(TESTTABLE_1)); + +- String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, "nonexistent"); ++ String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, "garbagekeyspace"); + HttpResponse flushResponse = getBlocking( +- trustedClient().post(serverWrapper.serverPort, "localhost", flushRoute) ++ trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) + .sendJsonObject(requestBody)); + +- assertThat(flushResponse.statusCode()).isEqualTo(BAD_REQUEST.code()); ++ assertThat(flushResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); ++ ++ JsonObject responseBody = flushResponse.bodyAsJsonObject(); ++ assertThat(responseBody).isNotNull(); ++ String jobId = responseBody.getString("jobId"); ++ assertThat(jobId).isNotNull(); ++ assertThat(responseBody.getString("operation")).isEqualTo(OPERATION_FLUSH); ++ ++ // loopAssert that the job status is eventually FAILED ++ validateOperationalJobStatusEventually(jobId, OPERATION_FLUSH, FAILED); + } + ++ /** ++ * Tests flushing a non-existent table within a valid keyspace. ++ * Verifies the API accepts the request but the flush operation ultimately fails. ++ */ + @Test +- void testFlushInvalidPayload() ++ void testFlushNonExistentTable() + { +- String invalidJson = "{ invalid json }"; ++ JsonObject requestBody = new JsonObject(); ++ requestBody.put(TABLE_NAMES, List.of("garbagetesttable")); + + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, TEST_KEYSPACE); + HttpResponse flushResponse = getBlocking( +- trustedClient().post(serverWrapper.serverPort, "localhost", flushRoute) +- .sendBuffer(Buffer.buffer(invalidJson))); ++ trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) ++ .sendJsonObject(requestBody)); + +- assertThat(flushResponse.statusCode()).isEqualTo(BAD_REQUEST.code()); ++ assertThat(flushResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); ++ ++ JsonObject responseBody = flushResponse.bodyAsJsonObject(); ++ assertThat(responseBody).isNotNull(); ++ String jobId = responseBody.getString("jobId"); ++ assertThat(jobId).isNotNull(); ++ assertThat(responseBody.getString("operation")).isEqualTo(OPERATION_FLUSH); ++ ++ // loopAssert that the job status is eventually FAILED ++ validateOperationalJobStatusEventually(jobId, OPERATION_FLUSH, FAILED); + } + ++ /** ++ * Tests sending a malformed JSON payload to the flush API. ++ * Verifies the API immediately rejects the request with a bad request error. ++ */ + @Test +- void testFlushInvalidTableName() ++ void testFlushMalformedPayload() + { +- JsonObject requestBody = new JsonObject(); +- requestBody.put("tableNames", Arrays.asList("invalid-table-name!@#")); ++ String malformedJson = "{ invalid json }"; + + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, TEST_KEYSPACE); + HttpResponse flushResponse = getBlocking( +- trustedClient().post(serverWrapper.serverPort, "localhost", flushRoute) +- .sendJsonObject(requestBody)); ++ trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) ++ .sendBuffer(Buffer.buffer(malformedJson))); + ++ // Validate failure response + assertThat(flushResponse.statusCode()).isEqualTo(BAD_REQUEST.code()); + } + + /** +- * Validates the operational job status by querying the OperationalJobHandler endpoint +- * and waiting for the job to reach a final state if necessary. ++ * Tests attempting to flush the system keyspace. ++ * Verifies the API rejects the request as system keyspace operations are forbidden. ++ */ ++ @Test ++ void testFlushSystemKeyspaceFailure() ++ { ++ String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, "system"); ++ HttpResponse flushResponse = getBlocking( ++ trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) ++ .send()); ++ ++ // Validate failure response ++ assertThat(flushResponse.statusCode()).isEqualTo(FORBIDDEN.code()); ++ } ++ ++ /** ++ * Validates that the operational job status eventually reaches the expected status + * +- * @param jobId the ID of the operational job to validate ++ * @param jobId the ID of the operational job to validate + * @param expectedOperation the expected operation name (e.g., "flush", "decommission", "drain") ++ * @param expectedStatus the expected final status (SUCCEEDED or FAILED) + */ +- private void validateOperationalJobStatus(String jobId, String expectedOperation) ++ private void validateOperationalJobStatusEventually(String jobId, String expectedOperation, OperationalJobStatus expectedStatus) + { + String operationalJobRoute = ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replace(":operationId", jobId); +- +- HttpResponse jobStatusResponse = getBlocking( +- trustedClient().get(serverWrapper.serverPort, "localhost", operationalJobRoute) +- .send()); +- +- assertThat(jobStatusResponse.statusCode()).isEqualTo(OK.code()); +- +- JsonObject jobStatusBody = jobStatusResponse.bodyAsJsonObject(); +- assertThat(jobStatusBody).isNotNull(); +- assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId); +- assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation); +- assertThat(jobStatusBody.getString("jobStatus")).isIn( +- OperationalJobStatus.RUNNING.name(), +- OperationalJobStatus.SUCCEEDED.name() +- ); +- +- // If the job is still running, wait for it to complete or reach a final state +- if (OperationalJobStatus.RUNNING.name().equals(jobStatusBody.getString("jobStatus"))) +- { +- loopAssert(30, 500, () -> { +- HttpResponse finalJobStatusResponse = getBlocking( +- trustedClient().get(serverWrapper.serverPort, "localhost", operationalJobRoute) +- .send()); +- +- assertThat(finalJobStatusResponse.statusCode()).isEqualTo(OK.code()); +- +- JsonObject finalJobStatusBody = finalJobStatusResponse.bodyAsJsonObject(); +- assertThat(finalJobStatusBody).isNotNull(); +- assertThat(finalJobStatusBody.getString("jobStatus")).isIn( +- OperationalJobStatus.SUCCEEDED.name(), +- OperationalJobStatus.FAILED.name() +- ); +- }); +- } ++ ++ loopAssert(30, 500, () -> { ++ HttpResponse jobStatusResponse = getBlocking( ++ trustedClient().get(serverWrapper.serverPort, LOCALHOST, operationalJobRoute) ++ .send()); ++ ++ assertThat(jobStatusResponse.statusCode()).isEqualTo(OK.code()); ++ ++ JsonObject jobStatusBody = jobStatusResponse.bodyAsJsonObject(); ++ assertThat(jobStatusBody).isNotNull(); ++ assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId); ++ assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation); ++ assertThat(jobStatusBody.getString("jobStatus")).isEqualTo(expectedStatus.name()); ++ }); + } + } +diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java +index d135e61..d958586 100644 +--- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java ++++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java +@@ -174,4 +174,12 @@ public interface StorageOperations + * @return the current compaction throughput in megabytes per second, or 0 if throughput cannot be determined + */ + int getCompactionThroughputMbPerSec(); ++ ++ /** ++ * Triggers the node flush operation to flush memtables for the specified keyspace and tables. ++ * ++ * @param keyspace the keyspace name ++ * @param tableNames the array of table names to flush; if empty, all tables in the keyspace will be flushed ++ */ ++ void flush(@NotNull String keyspace, @NotNull String... tableNames) throws IOException; + } +diff --git a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java +index b57ac62..e7f5ca8 100644 +--- a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java ++++ b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java +@@ -62,6 +62,7 @@ public class BasicPermissions + // sidecar operation related permissions + public static final Permission READ_OPERATIONAL_JOB = new DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE); + public static final Permission DECOMMISSION_NODE = new DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE); ++ public static final Permission FLUSH_NODE = new DomainAwarePermission("NODE:FLUSH", KEYSPACE_SCOPE); + + // Permissions related to Schema Reporting + public static final Permission REPORT_SCHEMA = new DomainAwarePermission("SCHEMA:PUBLISH", CLUSTER_SCOPE); +diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java +index a7c4f7f..57423be 100644 +--- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java ++++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java +@@ -29,14 +29,21 @@ import io.vertx.core.net.SocketAddress; + import io.vertx.ext.web.RoutingContext; + import io.vertx.ext.web.handler.HttpException; + import org.apache.cassandra.sidecar.adapters.base.exception.OperationUnavailableException; ++import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; ++import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; + import org.apache.cassandra.sidecar.common.server.data.Name; + import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; + import org.apache.cassandra.sidecar.common.server.exceptions.JmxAuthenticationException; + import org.apache.cassandra.sidecar.common.utils.Preconditions; + import org.apache.cassandra.sidecar.concurrent.ExecutorPools; ++import org.apache.cassandra.sidecar.config.ServiceConfiguration; + import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; ++import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException; ++import org.apache.cassandra.sidecar.job.OperationalJob; ++import org.apache.cassandra.sidecar.job.OperationalJobManager; + import org.apache.cassandra.sidecar.utils.CassandraInputValidator; + import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; ++import org.apache.cassandra.sidecar.utils.OperationalJobUtils; + import org.jetbrains.annotations.NotNull; + + import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; +@@ -317,4 +324,33 @@ public abstract class AbstractHandler implements Handler + } + return host; + } ++ ++ /** ++ * Handles the submission and execution of an operational job. ++ * ++ * @param jobManager the manager responsible for submitting and tracking operational jobs ++ * @param config the service configuration containing execution parameters ++ * @param context the routing context for the HTTP request/response ++ * @param job the operational job to be executed ++ */ ++ protected void handleOperationalJob( ++ OperationalJobManager jobManager, ServiceConfiguration config, RoutingContext context, OperationalJob job, boolean checkConflict) ++ { ++ try ++ { ++ jobManager.trySubmitJob(job, checkConflict); ++ } ++ catch (OperationalJobConflictException oje) ++ { ++ String reason = oje.getMessage(); ++ logger.error("Conflicting job encountered. reason={}", reason); ++ context.response().setStatusCode(HttpResponseStatus.CONFLICT.code()); ++ context.json(new OperationalJobResponse(job.jobId(), OperationalJobStatus.FAILED, job.name(), reason)); ++ return; ++ } ++ ++ // Get the result, waiting for the specified wait time for result ++ job.asyncResult(executorPools.service(), config.operationalJobExecutionMaxWaitTime()) ++ .onComplete(v -> OperationalJobUtils.sendStatusBasedResponse(context, job)); ++ } + } +diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java +index 63b5b66..3c4b341 100644 +--- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java ++++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java +@@ -91,7 +91,7 @@ public class NodeDecommissionHandler extends AbstractHandler implements + NodeDecommissionJob job = new NodeDecommissionJob(UUIDs.timeBased(), operations, isForce); + try + { +- jobManager.trySubmitJob(job); ++ jobManager.trySubmitJob(job, true); + } + catch (OperationalJobConflictException oje) + { +diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandler.java +index 102e9fd..2107855 100644 +--- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandler.java ++++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandler.java +@@ -36,7 +36,7 @@ import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; + import org.apache.cassandra.sidecar.common.server.StorageOperations; + import org.apache.cassandra.sidecar.concurrent.ExecutorPools; + import org.apache.cassandra.sidecar.config.ServiceConfiguration; +-import org.apache.cassandra.sidecar.handlers.data.NodeFlushRequestPayload; ++import org.apache.cassandra.sidecar.common.request.data.NodeFlushRequestPayload; + import org.apache.cassandra.sidecar.job.NodeFlushJob; + import org.apache.cassandra.sidecar.job.OperationalJobManager; + import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +@@ -103,7 +103,7 @@ public class NodeFlushHandler extends AbstractHandler i + protected NodeFlushRequestPayload extractParamsOrThrow(RoutingContext context) + { + String bodyString = context.body().asString(); +- if (bodyString == null || bodyString.isBlank()) // TODO SKL json encoder writes null as "null" ++ if (bodyString == null || bodyString.isBlank() || bodyString.equals("null")) //json encoder writes null as "null" + { + return new NodeFlushRequestPayload(Collections.emptyList()); + } +diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java +index c4b6b9a..04fa5d0 100644 +--- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java ++++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java +@@ -189,7 +189,7 @@ public abstract class OperationalJob implements Task + /** + * OperationalJob body. The implementation is executed in a blocking manner. + */ +- protected abstract void executeInternal(); ++ protected abstract void executeInternal() throws Exception; + + /** + * Execute the job behavior as specified in the internal execution {@link #executeInternal()}, +diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java +index 3c37dda..7567105 100644 +--- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java ++++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java +@@ -79,12 +79,16 @@ public class OperationalJobManager + * tracked and not running. The job is triggered on a separate internal thread-pool. + * The job execution failure behavior is tracked within the {@link OperationalJob}. + * +- * @param job OperationalJob instance to submit ++ * @param job OperationalJob instance to submit ++ * @param checkConflict whether to check for job conflicts before submission + * @throws OperationalJobConflictException when the same operational job is already running on Cassandra + */ +- public void trySubmitJob(OperationalJob job) throws OperationalJobConflictException ++ public void trySubmitJob(OperationalJob job, boolean checkConflict) throws OperationalJobConflictException + { +- checkConflict(job); ++ if (checkConflict) ++ { ++ checkConflict(job); ++ } + + // New job is submitted for all cases when we do not have a corresponding downstream job + jobTracker.computeIfAbsent(job.jobId(), jobId -> { +diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java +index 9b1278a..e71b5bd 100644 +--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java ++++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java +@@ -21,6 +21,7 @@ package org.apache.cassandra.sidecar.modules; + import com.google.inject.AbstractModule; + import com.google.inject.multibindings.ProvidesIntoMap; + import jakarta.ws.rs.GET; ++import jakarta.ws.rs.POST; + import jakarta.ws.rs.PUT; + import jakarta.ws.rs.Path; + import org.apache.cassandra.sidecar.adapters.base.db.schema.ConnectedClientsSchema; +@@ -45,6 +46,7 @@ import org.apache.cassandra.sidecar.handlers.KeyspaceSchemaHandler; + import org.apache.cassandra.sidecar.handlers.ListOperationalJobsHandler; + import org.apache.cassandra.sidecar.handlers.NativeUpdateHandler; + import org.apache.cassandra.sidecar.handlers.NodeDecommissionHandler; ++import org.apache.cassandra.sidecar.handlers.NodeFlushHandler; + import org.apache.cassandra.sidecar.handlers.OperationalJobHandler; + import org.apache.cassandra.sidecar.handlers.RingHandler; + import org.apache.cassandra.sidecar.handlers.SchemaHandler; +@@ -156,6 +158,29 @@ public class CassandraOperationsModule extends AbstractModule + return factory.buildRouteWithHandler(nodeDecommissionHandler); + } + ++ @POST ++ @Path(ApiEndpointsV1.NODE_FLUSH_ROUTE) ++ @Operation(summary = "Flush node memtables", ++ description = "Flushes memtables for the specified keyspace and optional table names") ++ @APIResponse(description = "Node flush operation completed successfully", ++ responseCode = "200", ++ content = @Content(mediaType = "application/json", ++ schema = @Schema(implementation = OperationalJobResponse.class))) ++ @APIResponse(description = "Node flush operation initiated successfully", ++ responseCode = "202", ++ content = @Content(mediaType = "application/json", ++ schema = @Schema(implementation = OperationalJobResponse.class))) ++ @ProvidesIntoMap ++ @KeyClassMapKey(VertxRouteMapKeys.CassandraNodeFlushRouteKey.class) ++ VertxRoute cassandraNodeFlushRoute(RouteBuilder.Factory factory, ++ NodeFlushHandler nodeFlushHandler) ++ { ++ return factory.builderForRoute() ++ .setBodyHandler(true) ++ .handler(nodeFlushHandler) ++ .build(); ++ } ++ + @GET + @Path(ApiEndpointsV1.STREAM_STATS_ROUTE) + @Operation(summary = "Get stream statistics", +diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java +index e3bec84..629a75a 100644 +--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java ++++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java +@@ -83,6 +83,11 @@ public interface VertxRouteMapKeys + HttpMethod HTTP_METHOD = HttpMethod.PUT; + String ROUTE_URI = ApiEndpointsV1.NODE_DECOMMISSION_ROUTE; + } ++ interface CassandraNodeFlushRouteKey extends RouteClassKey ++ { ++ HttpMethod HTTP_METHOD = HttpMethod.POST; ++ String ROUTE_URI = ApiEndpointsV1.NODE_FLUSH_ROUTE; ++ } + interface CassandraNodeSettingsRouteKey extends RouteClassKey + { + HttpMethod HTTP_METHOD = HttpMethod.GET; +diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandlerTest.java +index c13c2bb..45c98c2 100644 +--- a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandlerTest.java ++++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandlerTest.java +@@ -21,7 +21,6 @@ package org.apache.cassandra.sidecar.handlers; + import java.io.IOException; + import java.util.Arrays; + import java.util.Collections; +-import java.util.List; + import java.util.concurrent.CountDownLatch; + import java.util.concurrent.TimeUnit; + +@@ -39,26 +38,15 @@ import com.google.inject.Module; + import com.google.inject.Provides; + import com.google.inject.Singleton; + import com.google.inject.util.Modules; +-import io.netty.handler.codec.http.HttpResponseStatus; +-import io.vertx.core.AsyncResult; +-import io.vertx.core.Handler; + import io.vertx.core.Vertx; +-import io.vertx.core.buffer.Buffer; +-import io.vertx.core.http.HttpMethod; + import io.vertx.core.json.JsonObject; +-import io.vertx.ext.web.client.HttpRequest; +-import io.vertx.ext.web.client.HttpResponse; + import io.vertx.ext.web.client.WebClient; +-import io.vertx.ext.web.client.WebClientOptions; +-import io.vertx.ext.web.client.predicate.ResponsePredicate; +-import io.vertx.ext.web.codec.BodyCodec; + import io.vertx.junit5.VertxExtension; + import io.vertx.junit5.VertxTestContext; + import org.apache.cassandra.sidecar.TestModule; + import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; + import org.apache.cassandra.sidecar.cluster.InstancesMetadata; + import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +-import org.apache.cassandra.sidecar.common.request.data.NodeFlushRequestPayload; + import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; + import org.apache.cassandra.sidecar.common.server.StorageOperations; + import org.apache.cassandra.sidecar.modules.SidecarModules; +@@ -70,7 +58,6 @@ import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; + import static io.netty.handler.codec.http.HttpResponseStatus.OK; + import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING; + import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; +-import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking; + import static org.assertj.core.api.Assertions.assertThat; + import static org.mockito.ArgumentMatchers.any; + import static org.mockito.ArgumentMatchers.anyString; +@@ -87,6 +74,8 @@ import static org.mockito.Mockito.when; + public class NodeFlushHandlerTest + { + static final Logger LOGGER = LoggerFactory.getLogger(NodeFlushHandlerTest.class); ++ public static final String LOCAL_HOST = "127.0.0.1"; ++ public static final String TEST_ROUTE = "/api/v1/cassandra/keyspaces/testkeyspace/flush"; + Vertx vertx; + Server server; + StorageOperations mockStorageOperations = mock(StorageOperations.class); +@@ -94,6 +83,9 @@ public class NodeFlushHandlerTest + @BeforeEach + void before() throws InterruptedException + { ++ // Reset mock before each test ++ org.mockito.Mockito.reset(mockStorageOperations); ++ + Injector injector; + Module testOverride = Modules.override(new TestModule()) + .with(new NodeFlushHandlerTest.NodeFlushTestModule()); +@@ -123,32 +115,14 @@ public class NodeFlushHandlerTest + void testFlushLongRunning(VertxTestContext context) throws IOException + { + doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null)) +- .when(mockStorageOperations).flush(anyString(), any(String[].class)); ++ .when(mockStorageOperations).flush("testkeyspace", "table1", "table2"); ++ ++ WebClient client = WebClient.create(vertx); + +- WebClient client = WebClient.create(vertx, new WebClientOptions()); +- String testRoute = "/api/v1/cassandra/keyspaces/testkeyspace/flush"; +- + JsonObject requestBody = new JsonObject(); + requestBody.put("tableNames", Arrays.asList("table1", "table2")); + +- NodeFlushRequestPayload payload = new NodeFlushRequestPayload(Arrays.asList("table1", "table2")); +- /* HttpResponse response = getBlocking(client.post(server.actualPort(), "127.0.0.1", testRoute).sendJson(payload), +- 10, TimeUnit.SECONDS, +- "Create RestoreJob");*/ +- //assertThat(response.statusCode()).isEqualTo(ACCEPTED.code()); +- postAndVerify(testRoute, +- JsonObject.mapFrom(payload), +- asyncResult -> { +- context.verify(() -> { +- HttpResponse response = asyncResult.result(); +- assertThat(response).isNotNull(); +- assertThat(response.statusCode()).isEqualTo(ACCEPTED.code()); +- }); +- }); +- +- /*client.post(server.actualPort(), "127.0.0.1", testRoute) +- .putHeader("Content-Type", "application/json") +- .expect(ResponsePredicate.SC_ACCEPTED) ++ client.post(server.actualPort(), LOCAL_HOST, TEST_ROUTE) + .sendJsonObject(requestBody, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(ACCEPTED.code()); + OperationalJobResponse flushResponse = response.bodyAsJson(OperationalJobResponse.class); +@@ -156,45 +130,18 @@ public class NodeFlushHandlerTest + assertThat(flushResponse.status()).isEqualTo(RUNNING); + assertThat(flushResponse.operation()).isEqualTo("flush"); + context.completeNow(); +- }));*/ +- } +- +- protected void postAndVerify(String endpoint, +- JsonObject requestPayload, +- Handler>> responseVerifier) +- { +- sendRequestAndVerify(HttpMethod.POST, endpoint, requestPayload, responseVerifier); +- } +- private void sendRequestAndVerify(HttpMethod httpMethod, +- String endpoint, +- JsonObject requestPayload, +- Handler>> responseVerifier) +- { +- WebClient client = WebClient.create(vertx, new WebClientOptions()); +- HttpRequest request = client.request(httpMethod, server.actualPort(), "localhost", endpoint); +- assertThat(request).isNotNull(); +- request.as(BodyCodec.buffer()); +- if (requestPayload == null) +- { +- request.send(responseVerifier); +- } +- else +- { +- request.sendJsonObject(requestPayload, responseVerifier); +- } ++ })); + } + + @Test + void testFlushCompleted(VertxTestContext context) + { +- WebClient client = WebClient.create(vertx, new WebClientOptions()); +- String testRoute = "/api/v1/cassandra/keyspaces/testkeyspace/flush"; +- ++ WebClient client = WebClient.create(vertx); ++ + JsonObject requestBody = new JsonObject(); +- requestBody.put("tableNames", Arrays.asList("table1")); +- +- client.post(server.actualPort(), "127.0.0.1", testRoute) +- .putHeader("Content-Type", "application/json") ++ requestBody.put("tableNames", Collections.singletonList("table1")); ++ ++ client.post(server.actualPort(), LOCAL_HOST, TEST_ROUTE) + .sendJsonObject(requestBody, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + LOGGER.info("Flush Response: {}", response.bodyAsString()); +@@ -219,14 +166,12 @@ public class NodeFlushHandlerTest + @Test + void testFlushEmptyTableList(VertxTestContext context) + { +- WebClient client = WebClient.create(vertx, new WebClientOptions()); +- String testRoute = "/api/v1/cassandra/keyspaces/testkeyspace/flush"; +- ++ WebClient client = WebClient.create(vertx); ++ + JsonObject requestBody = new JsonObject(); + requestBody.put("tableNames", Collections.emptyList()); +- +- client.post(server.actualPort(), "127.0.0.1", testRoute) +- .putHeader("Content-Type", "application/json") ++ ++ client.post(server.actualPort(), LOCAL_HOST, TEST_ROUTE) + .sendJsonObject(requestBody, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse flushResponse = response.bodyAsJson(OperationalJobResponse.class); +@@ -249,10 +194,9 @@ public class NodeFlushHandlerTest + @Test + void testFlushNoPayload(VertxTestContext context) + { +- WebClient client = WebClient.create(vertx, new WebClientOptions()); +- String testRoute = "/api/v1/cassandra/keyspaces/testkeyspace/flush"; +- +- client.post(server.actualPort(), "127.0.0.1", testRoute) ++ WebClient client = WebClient.create(vertx); ++ ++ client.post(server.actualPort(), LOCAL_HOST, TEST_ROUTE) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse flushResponse = response.bodyAsJson(OperationalJobResponse.class); +@@ -276,16 +220,13 @@ public class NodeFlushHandlerTest + void testFlushFailed(VertxTestContext context) throws IOException + { + doThrow(new RuntimeException("Flush failed")).when(mockStorageOperations).flush(anyString(), any(String[].class)); +- +- WebClient client = WebClient.create(vertx, new WebClientOptions()); +- String testRoute = "/api/v1/cassandra/keyspaces/testkeyspace/flush"; +- ++ ++ WebClient client = WebClient.create(vertx); ++ + JsonObject requestBody = new JsonObject(); +- requestBody.put("tableNames", List.of("table1")); +- +- client.post(server.actualPort(), "127.0.0.1", testRoute) +- .putHeader("Content-Type", "application/json") +- .expect(ResponsePredicate.SC_OK) ++ requestBody.put("tableNames", Collections.singletonList("table1")); ++ ++ client.post(server.actualPort(), LOCAL_HOST, TEST_ROUTE) + .sendJsonObject(requestBody, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + context.completeNow(); +@@ -295,14 +236,11 @@ public class NodeFlushHandlerTest + @Test + void testFlushInvalidPayload(VertxTestContext context) + { +- WebClient client = WebClient.create(vertx, new WebClientOptions()); +- String testRoute = "/api/v1/cassandra/keyspaces/testkeyspace/flush"; +- ++ WebClient client = WebClient.create(vertx); ++ + String invalidJson = "{ invalid json }"; +- +- client.post(server.actualPort(), "127.0.0.1", testRoute) +- .putHeader("Content-Type", "application/json") +- .expect(ResponsePredicate.SC_BAD_REQUEST) ++ ++ client.post(server.actualPort(), LOCAL_HOST, TEST_ROUTE) + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(invalidJson), context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); + context.completeNow(); +@@ -319,7 +257,7 @@ public class NodeFlushHandlerTest + public InstancesMetadata instanceMetadata() + { + final int instanceId = 100; +- final String host = "127.0.0.1"; ++ final String host = LOCAL_HOST; + final InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.host()).thenReturn(host); + when(instanceMetadata.port()).thenReturn(9042); +diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java +index f7b6cfe..2c3f69f 100644 +--- a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java ++++ b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java +@@ -77,7 +77,7 @@ class OperationalJobManagerTest + OperationalJobManager manager = new OperationalJobManager(tracker, executorPool); + + OperationalJob testJob = OperationalJobTest.createOperationalJob(SUCCEEDED); +- manager.trySubmitJob(testJob); ++ manager.trySubmitJob(testJob, true); + testJob.execute(Promise.promise()); + assertThat(testJob.asyncResult().isComplete()).isTrue(); + assertThat(testJob.status()).isEqualTo(SUCCEEDED); +@@ -94,7 +94,7 @@ class OperationalJobManagerTest + when(mockPools.internal()).thenReturn(mockExecPool); + when(mockExecPool.runBlocking(any())).thenReturn(null); + OperationalJobManager manager = new OperationalJobManager(tracker, executorPool); +- assertThatThrownBy(() -> manager.trySubmitJob(runningJob)) ++ assertThatThrownBy(() -> manager.trySubmitJob(runningJob, true)) + .isExactlyInstanceOf(OperationalJobConflictException.class) + .hasMessage("The same operational job is already running on Cassandra. operationName='Operation X'"); + } +@@ -109,7 +109,7 @@ class OperationalJobManagerTest + + OperationalJob testJob = OperationalJobTest.createOperationalJob(jobId, SecondBoundConfiguration.parse("10s")); + +- manager.trySubmitJob(testJob); ++ manager.trySubmitJob(testJob, true); + // execute the job async. + vertx.executeBlocking(testJob::execute); + // by the time of checking, the job should still be running. It runs for 10 seconds. +@@ -141,7 +141,7 @@ class OperationalJobManagerTest + } + }; + +- manager.trySubmitJob(failingJob); ++ manager.trySubmitJob(failingJob, true); + failingJob.execute(Promise.promise()); + assertThat(failingJob.asyncResult().isComplete()).isTrue(); + assertThat(failingJob.asyncResult().failed()).isTrue(); diff --git a/CASSSIDECAR-344.patch b/CASSSIDECAR-344.patch new file mode 100644 index 000000000..4cf3967c1 --- /dev/null +++ b/CASSSIDECAR-344.patch @@ -0,0 +1,472 @@ +diff --git a/CHANGES.txt b/CHANGES.txt +index 0fb5a93..2036d33 100644 +--- a/CHANGES.txt ++++ b/CHANGES.txt +@@ -1,5 +1,6 @@ + 0.3.0 + ----- ++ * Sidecar endpoint for moving a node to a new token (CASSSIDECAR-344) + * Improve FilteringMetricRegistry implementation (CASSSIDECAR-347) + * Add lifecycle APIs for starting and stopping Cassandra (CASSSIDECAR-266) + * Implementation of CassandraClusterSchemaMonitor (CASSSIDECAR-245) +diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java +index 7bfdfdd..5ec27ec 100644 +--- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java ++++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java +@@ -323,4 +323,14 @@ public class CassandraStorageOperations implements StorageOperations + return jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) + .getCompactionThroughputMbPerSec(); + } ++ ++ /** ++ * {@inheritDoc} ++ */ ++ @Override ++ public void move(String newToken) ++ { ++ jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) ++ .move(newToken); ++ } + } +diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java +index a40255e..28ca98c 100644 +--- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java ++++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java +@@ -219,4 +219,10 @@ public class GossipDependentStorageJmxOperations implements StorageJmxOperations + { + return delegate.getCompactionThroughputMbPerSec(); + } ++ ++ @Override ++ public void move(String newToken) ++ { ++ delegate.move(newToken); ++ } + } +diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java +index c76ccce..7b0c482 100644 +--- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java ++++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java +@@ -228,4 +228,11 @@ public interface StorageJmxOperations + * @return the current compaction throughput in megabytes per second, or 0 if throughput cannot be determined + */ + int getCompactionThroughputMbPerSec(); ++ ++ /** ++ * Triggers the node move operation to move this node to a new token ++ * ++ * @param newToken the new token for the node to move to ++ */ ++ void move(String newToken); + } +diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +index d67e4cd..90a1a2e 100644 +--- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java ++++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +@@ -140,6 +140,7 @@ public final class ApiEndpointsV1 + public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + CASSANDRA + OPERATIONAL_JOBS; + public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + PER_OPERATIONAL_JOB; + public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA + "/operations/decommission"; ++ public static final String NODE_MOVE_ROUTE = API_V1 + CASSANDRA + "/operations/move"; + public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/streams"; + public static final String TABLE_STATS_ROUTE = API_V1 + CASSANDRA + PER_KEYSPACE + PER_TABLE + "/stats"; + +diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +index 2e630c3..7d22941 100644 +--- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java ++++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +@@ -45,6 +45,7 @@ import org.apache.cassandra.sidecar.common.request.ListOperationalJobsRequest; + import org.apache.cassandra.sidecar.common.request.ListSnapshotFilesRequest; + import org.apache.cassandra.sidecar.common.request.NativeUpdateRequest; + import org.apache.cassandra.sidecar.common.request.NodeDecommissionRequest; ++import org.apache.cassandra.sidecar.common.request.NodeMoveRequest; + import org.apache.cassandra.sidecar.common.request.NodeSettingsRequest; + import org.apache.cassandra.sidecar.common.request.OperationalJobRequest; + import org.apache.cassandra.sidecar.common.request.ReportSchemaRequest; +@@ -582,6 +583,18 @@ public class RequestContext + return request(NODE_DECOMMISSION_REQUEST); + } + ++ /** ++ * Sets the {@code request} to be a {@link NodeMoveRequest} and returns a reference to this Builder ++ * enabling method chaining. ++ * ++ * @param newToken the new token for the node to move to ++ * @return a reference to this Builder ++ */ ++ public Builder nodeMoveRequest(String newToken) ++ { ++ return request(new NodeMoveRequest(newToken)); ++ } ++ + /** + * Sets the {@code request} to be a {@link GossipUpdateRequest} for the + * given {@link NodeCommandRequestPayload.State state}, and returns a reference to this Builder enabling method chaining. +diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +index 373bd33..6157078 100644 +--- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java ++++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +@@ -823,6 +823,21 @@ public class SidecarClient implements AutoCloseable, SidecarClientBlobRestoreExt + .build()); + } + ++ /** ++ * Executes the node move request using the default retry policy and configured selection policy ++ * ++ * @param instance the instance where the request will be executed ++ * @param newToken the new token for the node to move to ++ * @return a completable future of the operational job response ++ */ ++ public CompletableFuture nodeMove(SidecarInstance instance, String newToken) ++ { ++ return executor.executeRequestAsync(requestBuilder() ++ .singleInstanceSelectionPolicy(instance) ++ .nodeMoveRequest(newToken) ++ .build()); ++ } ++ + /** + * Sends a request to start or stop Cassandra gossiping on the provided instance. + *

+diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +index b13a209..d38c826 100644 +--- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java ++++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +@@ -1393,6 +1393,26 @@ abstract class SidecarClientTest + validateResponseServed(ApiEndpointsV1.NODE_DECOMMISSION_ROUTE); + } + ++ @Test ++ public void testNodeMove() throws Exception ++ { ++ UUID jobId = UUID.randomUUID(); ++ String newToken = "123456789"; ++ String nodeMoveString = "{\"jobId\":\"" + jobId + "\",\"jobStatus\":\"SUCCEEDED\",\"instance\":\"127.0.0.1\"}"; ++ ++ MockResponse response = new MockResponse() ++ .setResponseCode(OK.code()) ++ .setHeader("content-type", "application/json") ++ .setBody(nodeMoveString); ++ enqueue(response); ++ ++ SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(servers.get(0)); ++ OperationalJobResponse result = client.nodeMove(sidecarInstance, newToken).get(30, TimeUnit.SECONDS); ++ assertThat(result).isNotNull(); ++ assertThat(result.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); ++ validateResponseServed(ApiEndpointsV1.NODE_MOVE_ROUTE + "?newToken=" + newToken); ++ } ++ + @Test + void testFailsWithOneAttemptPerServer() + { +diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java +index c530541..806e6b5 100644 +--- a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java ++++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java +@@ -1,4 +1,161 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ + package org.apache.cassandra.sidecar.routes; + +-public class CassandraNodeOperationsIntegrationTest { +-} ++import java.util.concurrent.TimeUnit; ++ ++import org.junit.jupiter.api.Test; ++ ++import io.vertx.core.buffer.Buffer; ++import io.vertx.core.json.JsonObject; ++import io.vertx.ext.web.client.HttpResponse; ++import org.apache.cassandra.sidecar.common.ApiEndpointsV1; ++import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; ++import org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase; ++ ++import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; ++import static io.netty.handler.codec.http.HttpResponseStatus.OK; ++import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking; ++import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert; ++import static org.assertj.core.api.Assertions.assertThat; ++ ++/** ++ * Integration tests for Cassandra node operations ++ */ ++public class CassandraNodeOperationsIntegrationTest extends SharedClusterSidecarIntegrationTestBase ++{ ++ public static final String CASSANDRA_VERSION_4_0 = "4.0"; ++ ++ @Override ++ protected void initializeSchemaForTest() ++ { ++ // No schema init needed ++ } ++ ++ @Override ++ protected void beforeTestStart() ++ { ++ // wait for the schema initialization ++ waitForSchemaReady(30, TimeUnit.SECONDS); ++ } ++ ++ @Test ++ void testNodeMoveOperationSuccess() ++ { ++ // Use a test token - this is a valid token for Murmur3Partitioner ++ String testToken = "123456789"; ++ ++ // Initiate move operation ++ HttpResponse moveResponse = getBlocking( ++ trustedClient().put(serverWrapper.serverPort, "localhost", ApiEndpointsV1.NODE_MOVE_ROUTE + "?newToken=" + testToken) ++ .send()); ++ ++ assertThat(moveResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); ++ ++ JsonObject responseBody = moveResponse.bodyAsJsonObject(); ++ assertThat(responseBody).isNotNull(); ++ assertThat(responseBody.getString("jobId")).isNotNull(); ++ assertThat(responseBody.getString("operation")).isEqualTo("move"); ++ assertThat(responseBody.getString("jobStatus")).isIn( ++ OperationalJobStatus.CREATED.name(), ++ OperationalJobStatus.RUNNING.name(), ++ OperationalJobStatus.SUCCEEDED.name() ++ ); ++ ++ // Verify the job eventually completes (or at least gets processed) ++ loopAssert(30, 500, () -> { ++ HttpResponse streamStatsResponse = getBlocking( ++ trustedClient().get(serverWrapper.serverPort, "localhost", ApiEndpointsV1.STREAM_STATS_ROUTE) ++ .send()); ++ ++ assertThat(streamStatsResponse.statusCode()).isEqualTo(OK.code()); ++ ++ JsonObject streamStats = streamStatsResponse.bodyAsJsonObject(); ++ assertThat(streamStats).isNotNull(); ++ // The operationMode should be either NORMAL (completed) or MOVING (in progress) ++ assertThat(streamStats.getString("operationMode")).isIn("NORMAL", "MOVING"); ++ }); ++ ++ // Validate the operational job status using the OperationalJobHandler ++ String jobId = responseBody.getString("jobId"); ++ validateOperationalJobStatus(jobId, "move"); ++ } ++ ++ /** ++ * Validates the operational job status by querying the OperationalJobHandler endpoint ++ * and waiting for the job to reach a final state if necessary. ++ * ++ * @param jobId the ID of the operational job to validate ++ * @param expectedOperation the expected operation name (e.g., "move", "decommission", "drain") ++ */ ++ private void validateOperationalJobStatus(String jobId, String expectedOperation) ++ { ++ String operationalJobRoute = ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replace(":operationId", jobId); ++ ++ HttpResponse jobStatusResponse = getBlocking( ++ trustedClient().get(serverWrapper.serverPort, "localhost", operationalJobRoute) ++ .send()); ++ ++ assertThat(jobStatusResponse.statusCode()).isEqualTo(OK.code()); ++ ++ JsonObject jobStatusBody = jobStatusResponse.bodyAsJsonObject(); ++ assertThat(jobStatusBody).isNotNull(); ++ assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId); ++ assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation); ++ assertThat(jobStatusBody.getString("jobStatus")).isIn( ++ OperationalJobStatus.RUNNING.name(), ++ OperationalJobStatus.SUCCEEDED.name() ++ ); ++ ++ // If the job is still running, wait for it to complete or reach a final state ++ if (OperationalJobStatus.RUNNING.name().equals(jobStatusBody.getString("jobStatus"))) ++ { ++ loopAssert(30, 500, () -> { ++ HttpResponse finalJobStatusResponse = getBlocking( ++ trustedClient().get(serverWrapper.serverPort, "localhost", operationalJobRoute) ++ .send()); ++ ++ assertThat(finalJobStatusResponse.statusCode()).isEqualTo(OK.code()); ++ ++ JsonObject finalJobStatusBody = finalJobStatusResponse.bodyAsJsonObject(); ++ assertThat(finalJobStatusBody).isNotNull(); ++ assertThat(finalJobStatusBody.getString("jobStatus")).isIn( ++ OperationalJobStatus.SUCCEEDED.name(), ++ OperationalJobStatus.FAILED.name() ++ ); ++ }); ++ } ++ } ++ ++ /** ++ * {@inheritDoc} ++ */ ++ @Override ++ protected void tearDown() throws Exception ++ { ++ try ++ { ++ super.tearDown(); ++ } ++ catch (IllegalStateException ex) ++ { ++ logger.error("Exception in tear down", ex); ++ } ++ } ++} +\ No newline at end of file +diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java +index d135e61..e0b0252 100644 +--- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java ++++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java +@@ -174,4 +174,11 @@ public interface StorageOperations + * @return the current compaction throughput in megabytes per second, or 0 if throughput cannot be determined + */ + int getCompactionThroughputMbPerSec(); ++ ++ /** ++ * Triggers the node move operation to move the node to a new token. ++ * ++ * @param newToken the new token for the node to move to ++ */ ++ void move(String newToken); + } +diff --git a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java +index b57ac62..1acd0b4 100644 +--- a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java ++++ b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java +@@ -62,6 +62,7 @@ public class BasicPermissions + // sidecar operation related permissions + public static final Permission READ_OPERATIONAL_JOB = new DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE); + public static final Permission DECOMMISSION_NODE = new DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE); ++ public static final Permission MOVE_NODE = new DomainAwarePermission("NODE:MOVE", OPERATION_SCOPE); + + // Permissions related to Schema Reporting + public static final Permission REPORT_SCHEMA = new DomainAwarePermission("SCHEMA:PUBLISH", CLUSTER_SCOPE); +diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java +index a7c4f7f..72cc0a9 100644 +--- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java ++++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java +@@ -29,14 +29,21 @@ import io.vertx.core.net.SocketAddress; + import io.vertx.ext.web.RoutingContext; + import io.vertx.ext.web.handler.HttpException; + import org.apache.cassandra.sidecar.adapters.base.exception.OperationUnavailableException; ++import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; ++import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; + import org.apache.cassandra.sidecar.common.server.data.Name; + import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; + import org.apache.cassandra.sidecar.common.server.exceptions.JmxAuthenticationException; + import org.apache.cassandra.sidecar.common.utils.Preconditions; + import org.apache.cassandra.sidecar.concurrent.ExecutorPools; ++import org.apache.cassandra.sidecar.config.ServiceConfiguration; + import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; ++import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException; ++import org.apache.cassandra.sidecar.job.OperationalJob; ++import org.apache.cassandra.sidecar.job.OperationalJobManager; + import org.apache.cassandra.sidecar.utils.CassandraInputValidator; + import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; ++import org.apache.cassandra.sidecar.utils.OperationalJobUtils; + import org.jetbrains.annotations.NotNull; + + import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; +@@ -317,4 +324,32 @@ public abstract class AbstractHandler implements Handler + } + return host; + } ++ ++ /** ++ * Handles the submission and execution of an operational job. ++ * ++ * @param jobManager the manager responsible for submitting and tracking operational jobs ++ * @param config the service configuration containing execution parameters ++ * @param context the routing context for the HTTP request/response ++ * @param job the operational job to be executed ++ */ ++ protected void handleOperationalJob(OperationalJobManager jobManager, ServiceConfiguration config, RoutingContext context, OperationalJob job) ++ { ++ try ++ { ++ jobManager.trySubmitJob(job); ++ } ++ catch (OperationalJobConflictException oje) ++ { ++ String reason = oje.getMessage(); ++ logger.error("Conflicting job encountered. reason={}", reason); ++ context.response().setStatusCode(HttpResponseStatus.CONFLICT.code()); ++ context.json(new OperationalJobResponse(job.jobId(), OperationalJobStatus.FAILED, job.name(), reason)); ++ return; ++ } ++ ++ // Get the result, waiting for the specified wait time for result ++ job.asyncResult(executorPools.service(), config.operationalJobExecutionMaxWaitTime()) ++ .onComplete(v -> OperationalJobUtils.sendStatusBasedResponse(context, job)); ++ } + } +diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java +index 9b1278a..d7a1834 100644 +--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java ++++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java +@@ -45,6 +45,7 @@ import org.apache.cassandra.sidecar.handlers.KeyspaceSchemaHandler; + import org.apache.cassandra.sidecar.handlers.ListOperationalJobsHandler; + import org.apache.cassandra.sidecar.handlers.NativeUpdateHandler; + import org.apache.cassandra.sidecar.handlers.NodeDecommissionHandler; ++import org.apache.cassandra.sidecar.handlers.NodeMoveHandler; + import org.apache.cassandra.sidecar.handlers.OperationalJobHandler; + import org.apache.cassandra.sidecar.handlers.RingHandler; + import org.apache.cassandra.sidecar.handlers.SchemaHandler; +@@ -156,6 +157,26 @@ public class CassandraOperationsModule extends AbstractModule + return factory.buildRouteWithHandler(nodeDecommissionHandler); + } + ++ @PUT ++ @Path(ApiEndpointsV1.NODE_MOVE_ROUTE) ++ @Operation(summary = "Move node to new token", ++ description = "Moves the Cassandra node to a new token in the ring") ++ @APIResponse(description = "Node move operation completed successfully", ++ responseCode = "200", ++ content = @Content(mediaType = "application/json", ++ schema = @Schema(implementation = OperationalJobResponse.class))) ++ @APIResponse(description = "Node move operation initiated successfully", ++ responseCode = "202", ++ content = @Content(mediaType = "application/json", ++ schema = @Schema(implementation = OperationalJobResponse.class))) ++ @ProvidesIntoMap ++ @KeyClassMapKey(VertxRouteMapKeys.CassandraNodeMoveRouteKey.class) ++ VertxRoute cassandraNodeMoveRoute(RouteBuilder.Factory factory, ++ NodeMoveHandler nodeMoveHandler) ++ { ++ return factory.buildRouteWithHandler(nodeMoveHandler); ++ } ++ + @GET + @Path(ApiEndpointsV1.STREAM_STATS_ROUTE) + @Operation(summary = "Get stream statistics", +diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java +index e3bec84..5a0b164 100644 +--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java ++++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java +@@ -83,6 +83,11 @@ public interface VertxRouteMapKeys + HttpMethod HTTP_METHOD = HttpMethod.PUT; + String ROUTE_URI = ApiEndpointsV1.NODE_DECOMMISSION_ROUTE; + } ++ interface CassandraNodeMoveRouteKey extends RouteClassKey ++ { ++ HttpMethod HTTP_METHOD = HttpMethod.PUT; ++ String ROUTE_URI = ApiEndpointsV1.NODE_MOVE_ROUTE; ++ } + interface CassandraNodeSettingsRouteKey extends RouteClassKey + { + HttpMethod HTTP_METHOD = HttpMethod.GET; diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java index 7bfdfdd4b..0c6affb44 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java @@ -323,4 +323,14 @@ public int getCompactionThroughputMbPerSec() return jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) .getCompactionThroughputMbPerSec(); } + + /** + * {@inheritDoc} + */ + @Override + public void flush(@NotNull String keyspace, @NotNull String... tableNames) throws IOException + { + jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME) + .forceKeyspaceFlush(keyspace, tableNames); + } } diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java index a40255edf..36a865728 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/GossipDependentStorageJmxOperations.java @@ -219,4 +219,10 @@ public int getCompactionThroughputMbPerSec() { return delegate.getCompactionThroughputMbPerSec(); } + + @Override + public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException + { + delegate.forceKeyspaceFlush(keyspaceName, tableNames); + } } diff --git a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java index c76ccce6d..fc7fa3d2e 100644 --- a/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java +++ b/adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/jmx/StorageJmxOperations.java @@ -228,4 +228,13 @@ public interface StorageJmxOperations * @return the current compaction throughput in megabytes per second, or 0 if throughput cannot be determined */ int getCompactionThroughputMbPerSec(); + + /** + * Triggers the node flush operation to flush memtables for the specified keyspace and tables. + * + * @param keyspaceName the keyspace name + * @param tableNames the array of table names to flush; if empty, all tables in the keyspace will be flushed + * @throws IOException if an I/O error occurs during the flush operation + */ + void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException; } diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java index d67e4cdc3..b641df690 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java @@ -140,6 +140,7 @@ public final class ApiEndpointsV1 public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + CASSANDRA + OPERATIONAL_JOBS; public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + PER_OPERATIONAL_JOB; public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA + "/operations/decommission"; + public static final String NODE_FLUSH_ROUTE = API_V1 + CASSANDRA + PER_KEYSPACE + "/flush"; public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/streams"; public static final String TABLE_STATS_ROUTE = API_V1 + CASSANDRA + PER_KEYSPACE + PER_TABLE + "/stats"; diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeFlushRequest.class b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeFlushRequest.class new file mode 100644 index 000000000..84822bfae Binary files /dev/null and b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeFlushRequest.class differ diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeFlushRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeFlushRequest.java new file mode 100644 index 000000000..7a4edfc76 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/NodeFlushRequest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request; + +import java.util.List; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.request.data.NodeFlushRequestPayload; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; + +/** + * Represents a node flush request + */ +public class NodeFlushRequest extends JsonRequest +{ + private final NodeFlushRequestPayload payload; + + /** + * Constructs a NodeFlushRequest for the given keyspace and table names + * + * @param keyspace the keyspace name + * @param tableNames the list of table names to flush (can be empty) + */ + public NodeFlushRequest(String keyspace, List tableNames) + { + super(ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, keyspace)); + this.payload = new NodeFlushRequestPayload(tableNames); + } + + /** + * @return the list of table names to flush + */ + public List tableNames() + { + return payload.tableNames(); + } + + @Override + public HttpMethod method() + { + return HttpMethod.POST; + } + + @Override + public Object requestBody() + { + return payload; + } +} diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeFlushRequestPayload.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeFlushRequestPayload.java new file mode 100644 index 000000000..b2802ddea --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/NodeFlushRequestPayload.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.request.data; + +import java.util.Collections; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.jetbrains.annotations.NotNull; + +/** + * Represents the request payload for node flush operation + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class NodeFlushRequestPayload +{ + private final List tableNames; + + /** + * Constructs a NodeFlushRequestPayload + * + * @param tableNames the list of table names to flush; null or empty list means flush all tables + */ + @JsonCreator + public NodeFlushRequestPayload(@JsonProperty("tableNames") List tableNames) + { + this.tableNames = tableNames != null ? tableNames : Collections.emptyList(); + } + + /** + * @return the list of table names to flush + */ + @NotNull + @JsonProperty("tableNames") + public List tableNames() + { + return tableNames; + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() + { + return "NodeFlushRequestPayload{tableNames=" + tableNames + "}"; + } +} diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java index 373bd336e..2ee60a02d 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java @@ -19,6 +19,7 @@ package org.apache.cassandra.sidecar.client; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -45,6 +46,7 @@ import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest; import org.apache.cassandra.sidecar.common.request.ListCdcSegmentsRequest; import org.apache.cassandra.sidecar.common.request.LiveMigrationListInstanceFilesRequest; +import org.apache.cassandra.sidecar.common.request.NodeFlushRequest; import org.apache.cassandra.sidecar.common.request.RestoreJobProgressRequest; import org.apache.cassandra.sidecar.common.request.RestoreJobSummaryRequest; import org.apache.cassandra.sidecar.common.request.Service; @@ -823,6 +825,36 @@ public CompletableFuture nodeDecommission(SidecarInstanc .build()); } + /** + * Executes the node flush request using the default retry policy and provided {@code instance}. + * Flushes memtables for the specified keyspace and optionally for specific tables. + * + * @param instance the instance where the request will be executed + * @param keyspace the keyspace name to flush + * @param tableNames the list of table names to flush (can be empty to flush all tables) + * @return a completable future of the operational job response + */ + public CompletableFuture nodeFlush(SidecarInstance instance, String keyspace, List tableNames) + { + return executor.executeRequestAsync(requestBuilder() + .singleInstanceSelectionPolicy(instance) + .request(new NodeFlushRequest(keyspace, tableNames)) + .build()); + } + + /** + * Executes the node flush request using the default retry policy and provided {@code instance}. + * Flushes all memtables for the specified keyspace. + * + * @param instance the instance where the request will be executed + * @param keyspace the keyspace name to flush + * @return a completable future of the operational job response + */ + public CompletableFuture nodeFlush(SidecarInstance instance, String keyspace) + { + return nodeFlush(instance, keyspace, Collections.emptyList()); + } + /** * Sends a request to start or stop Cassandra gossiping on the provided instance. *

diff --git a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/NodeFlushIntegrationTest.java b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/NodeFlushIntegrationTest.java new file mode 100644 index 000000000..35eb42965 --- /dev/null +++ b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/NodeFlushIntegrationTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.HttpResponse; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.FAILED; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; +import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking; +import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for node flush operation + */ +public class NodeFlushIntegrationTest extends SharedClusterSidecarIntegrationTestBase +{ + private static final String TEST_KEYSPACE = "testkeyspace"; + public static final String TESTTABLE_1 = "testtable1"; + public static final String TESTTABLE_2 = "testtable2"; + public static final String OPERATION_FLUSH = "flush"; + public static final String LOCALHOST = "localhost"; + public static final String TABLE_NAMES = "tableNames"; + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, Map.of("replication_factor", 1)); + + createTestTable(new QualifiedName(TEST_KEYSPACE, TESTTABLE_1), + "CREATE TABLE %s ( \n" + + " id int PRIMARY KEY, \n" + + " data text \n" + + ");"); + + createTestTable(new QualifiedName(TEST_KEYSPACE, TESTTABLE_2), + "CREATE TABLE %s ( \n" + + " id int PRIMARY KEY, \n" + + " data text \n" + + ");"); + } + + /** + * Tests flushing multiple tables within a keyspace. + * Verifies the API accepts the request and the flush operation completes successfully. + */ + @Test + void testFlushMultipleTables() + { + JsonObject requestBody = new JsonObject(); + requestBody.put(TABLE_NAMES, List.of(TESTTABLE_1, TESTTABLE_2)); + + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, TEST_KEYSPACE); + HttpResponse flushResponse = getBlocking( + trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) + .sendJsonObject(requestBody)); + + assertThat(flushResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); + + JsonObject responseBody = flushResponse.bodyAsJsonObject(); + assertThat(responseBody).isNotNull(); + String jobId = responseBody.getString("jobId"); + assertThat(jobId).isNotNull(); + assertThat(responseBody.getString("operation")).isEqualTo(OPERATION_FLUSH); + + // loopAssert that the job status is eventually SUCCEEDED + validateOperationalJobStatusEventually(jobId, OPERATION_FLUSH, SUCCEEDED); + } + + /** + * Tests flushing a single table within a keyspace. + * Verifies the API accepts the request and the flush operation completes successfully. + */ + @Test + void testFlushSingleTable() + { + JsonObject requestBody = new JsonObject(); + requestBody.put(TABLE_NAMES, List.of(TESTTABLE_1)); + + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, TEST_KEYSPACE); + HttpResponse flushResponse = getBlocking( + trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) + .sendJsonObject(requestBody)); + + assertThat(flushResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); + + JsonObject responseBody = flushResponse.bodyAsJsonObject(); + assertThat(responseBody).isNotNull(); + String jobId = responseBody.getString("jobId"); + assertThat(jobId).isNotNull(); + assertThat(responseBody.getString("operation")).isEqualTo(OPERATION_FLUSH); + + // loopAssert that the job status is eventually SUCCEEDED + validateOperationalJobStatusEventually(jobId, OPERATION_FLUSH, SUCCEEDED); + } + + /** + * Tests flushing an entire keyspace by providing an empty table list. + * Verifies the API accepts the request and flushes all tables in the keyspace successfully. + */ + @Test + void testFlushEmptyTableList() + { + JsonObject requestBody = new JsonObject(); + requestBody.put(TABLE_NAMES, Collections.emptyList()); + + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, TEST_KEYSPACE); + HttpResponse flushResponse = getBlocking( + trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) + .sendJsonObject(requestBody)); + + assertThat(flushResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); + + JsonObject responseBody = flushResponse.bodyAsJsonObject(); + assertThat(responseBody).isNotNull(); + String jobId = responseBody.getString("jobId"); + assertThat(jobId).isNotNull(); + assertThat(responseBody.getString("operation")).isEqualTo(OPERATION_FLUSH); + + // loopAssert that the job status is eventually SUCCEEDED + validateOperationalJobStatusEventually(jobId, OPERATION_FLUSH, SUCCEEDED); + } + + /** + * Tests flushing an entire keyspace by providing no payload. + * Verifies the API accepts the request and flushes all tables in the keyspace successfully. + */ + @Test + void testFlushNoPayload() + { + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, TEST_KEYSPACE); + HttpResponse flushResponse = getBlocking( + trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) + .send()); + + assertThat(flushResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); + + JsonObject responseBody = flushResponse.bodyAsJsonObject(); + assertThat(responseBody).isNotNull(); + String jobId = responseBody.getString("jobId"); + assertThat(jobId).isNotNull(); + assertThat(responseBody.getString("operation")).isEqualTo(OPERATION_FLUSH); + + // loopAssert that the job status is eventually SUCCEEDED + validateOperationalJobStatusEventually(jobId, OPERATION_FLUSH, SUCCEEDED); + } + + /** + * Tests flushing a non-existent keyspace. + * Verifies the API accepts the request but the flush operation ultimately fails. + */ + @Test + void testFlushNonExistentKeyspace() + { + JsonObject requestBody = new JsonObject(); + requestBody.put(TABLE_NAMES, List.of(TESTTABLE_1)); + + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, "garbagekeyspace"); + HttpResponse flushResponse = getBlocking( + trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) + .sendJsonObject(requestBody)); + + assertThat(flushResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); + + JsonObject responseBody = flushResponse.bodyAsJsonObject(); + assertThat(responseBody).isNotNull(); + String jobId = responseBody.getString("jobId"); + assertThat(jobId).isNotNull(); + assertThat(responseBody.getString("operation")).isEqualTo(OPERATION_FLUSH); + + // loopAssert that the job status is eventually FAILED + validateOperationalJobStatusEventually(jobId, OPERATION_FLUSH, FAILED); + } + + /** + * Tests flushing a non-existent table within a valid keyspace. + * Verifies the API accepts the request but the flush operation ultimately fails. + */ + @Test + void testFlushNonExistentTable() + { + JsonObject requestBody = new JsonObject(); + requestBody.put(TABLE_NAMES, List.of("garbagetesttable")); + + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, TEST_KEYSPACE); + HttpResponse flushResponse = getBlocking( + trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) + .sendJsonObject(requestBody)); + + assertThat(flushResponse.statusCode()).isIn(OK.code(), ACCEPTED.code()); + + JsonObject responseBody = flushResponse.bodyAsJsonObject(); + assertThat(responseBody).isNotNull(); + String jobId = responseBody.getString("jobId"); + assertThat(jobId).isNotNull(); + assertThat(responseBody.getString("operation")).isEqualTo(OPERATION_FLUSH); + + // loopAssert that the job status is eventually FAILED + validateOperationalJobStatusEventually(jobId, OPERATION_FLUSH, FAILED); + } + + /** + * Tests sending a malformed JSON payload to the flush API. + * Verifies the API immediately rejects the request with a bad request error. + */ + @Test + void testFlushMalformedPayload() + { + String malformedJson = "{ invalid json }"; + + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, TEST_KEYSPACE); + HttpResponse flushResponse = getBlocking( + trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) + .sendBuffer(Buffer.buffer(malformedJson))); + + // Validate failure response + assertThat(flushResponse.statusCode()).isEqualTo(BAD_REQUEST.code()); + } + + /** + * Tests attempting to flush the system keyspace. + * Verifies the API rejects the request as system keyspace operations are forbidden. + */ + @Test + void testFlushSystemKeyspaceFailure() + { + String flushRoute = ApiEndpointsV1.NODE_FLUSH_ROUTE.replace(ApiEndpointsV1.KEYSPACE_PATH_PARAM, "system"); + HttpResponse flushResponse = getBlocking( + trustedClient().post(serverWrapper.serverPort, LOCALHOST, flushRoute) + .send()); + + // Validate failure response + assertThat(flushResponse.statusCode()).isEqualTo(FORBIDDEN.code()); + } + + /** + * Validates that the operational job status eventually reaches the expected status + * + * @param jobId the ID of the operational job to validate + * @param expectedOperation the expected operation name (e.g., "flush", "decommission", "drain") + * @param expectedStatus the expected final status (SUCCEEDED or FAILED) + */ + private void validateOperationalJobStatusEventually(String jobId, String expectedOperation, OperationalJobStatus expectedStatus) + { + String operationalJobRoute = ApiEndpointsV1.OPERATIONAL_JOB_ROUTE.replace(":operationId", jobId); + + loopAssert(30, 500, () -> { + HttpResponse jobStatusResponse = getBlocking( + trustedClient().get(serverWrapper.serverPort, LOCALHOST, operationalJobRoute) + .send()); + + assertThat(jobStatusResponse.statusCode()).isEqualTo(OK.code()); + + JsonObject jobStatusBody = jobStatusResponse.bodyAsJsonObject(); + assertThat(jobStatusBody).isNotNull(); + assertThat(jobStatusBody.getString("jobId")).isEqualTo(jobId); + assertThat(jobStatusBody.getString("operation")).isEqualTo(expectedOperation); + assertThat(jobStatusBody.getString("jobStatus")).isEqualTo(expectedStatus.name()); + }); + } +} diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java index d135e61d6..d95858669 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java @@ -174,4 +174,12 @@ default void outOfRangeDataCleanup(@NotNull String keyspace, @NotNull String tab * @return the current compaction throughput in megabytes per second, or 0 if throughput cannot be determined */ int getCompactionThroughputMbPerSec(); + + /** + * Triggers the node flush operation to flush memtables for the specified keyspace and tables. + * + * @param keyspace the keyspace name + * @param tableNames the array of table names to flush; if empty, all tables in the keyspace will be flushed + */ + void flush(@NotNull String keyspace, @NotNull String... tableNames) throws IOException; } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java index b57ac6234..e7f5ca8dd 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java @@ -62,6 +62,7 @@ public class BasicPermissions // sidecar operation related permissions public static final Permission READ_OPERATIONAL_JOB = new DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE); public static final Permission DECOMMISSION_NODE = new DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE); + public static final Permission FLUSH_NODE = new DomainAwarePermission("NODE:FLUSH", KEYSPACE_SCOPE); // Permissions related to Schema Reporting public static final Permission REPORT_SCHEMA = new DomainAwarePermission("SCHEMA:PUBLISH", CLUSTER_SCOPE); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java index a7c4f7f69..57423be53 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/AbstractHandler.java @@ -29,14 +29,21 @@ import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.HttpException; import org.apache.cassandra.sidecar.adapters.base.exception.OperationUnavailableException; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; import org.apache.cassandra.sidecar.common.server.data.Name; import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; import org.apache.cassandra.sidecar.common.server.exceptions.JmxAuthenticationException; import org.apache.cassandra.sidecar.common.utils.Preconditions; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; +import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException; +import org.apache.cassandra.sidecar.job.OperationalJob; +import org.apache.cassandra.sidecar.job.OperationalJobManager; import org.apache.cassandra.sidecar.utils.CassandraInputValidator; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.OperationalJobUtils; import org.jetbrains.annotations.NotNull; import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; @@ -317,4 +324,33 @@ public static String extractHostAddressWithoutPort(HttpServerRequest request) th } return host; } + + /** + * Handles the submission and execution of an operational job. + * + * @param jobManager the manager responsible for submitting and tracking operational jobs + * @param config the service configuration containing execution parameters + * @param context the routing context for the HTTP request/response + * @param job the operational job to be executed + */ + protected void handleOperationalJob( + OperationalJobManager jobManager, ServiceConfiguration config, RoutingContext context, OperationalJob job, boolean checkConflict) + { + try + { + jobManager.trySubmitJob(job, checkConflict); + } + catch (OperationalJobConflictException oje) + { + String reason = oje.getMessage(); + logger.error("Conflicting job encountered. reason={}", reason); + context.response().setStatusCode(HttpResponseStatus.CONFLICT.code()); + context.json(new OperationalJobResponse(job.jobId(), OperationalJobStatus.FAILED, job.name(), reason)); + return; + } + + // Get the result, waiting for the specified wait time for result + job.asyncResult(executorPools.service(), config.operationalJobExecutionMaxWaitTime()) + .onComplete(v -> OperationalJobUtils.sendStatusBasedResponse(context, job)); + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java index 63b5b66eb..3c4b341f5 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeDecommissionHandler.java @@ -91,7 +91,7 @@ public void handleInternal(RoutingContext context, NodeDecommissionJob job = new NodeDecommissionJob(UUIDs.timeBased(), operations, isForce); try { - jobManager.trySubmitJob(job); + jobManager.trySubmitJob(job, true); } catch (OperationalJobConflictException oje) { diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandler.java new file mode 100644 index 000000000..0d1a1a4e7 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandler.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.common.request.data.NodeFlushRequestPayload; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.job.NodeFlushJob; +import org.apache.cassandra.sidecar.job.OperationalJobManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Provides REST API for asynchronously flushing memtables for a specified keyspace and tables + */ +public class NodeFlushHandler extends AbstractHandler implements AccessProtected +{ + private final OperationalJobManager jobManager; + private final ServiceConfiguration config; + + /** + * Constructs a handler with the provided dependencies + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + * @param jobManager the manager responsible for submitting and tracking operational jobs + * @param config the service configuration containing execution parameters + */ + @Inject + protected NodeFlushHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + ServiceConfiguration config, + CassandraInputValidator validator, + OperationalJobManager jobManager) + { + super(metadataFetcher, executorPools, validator); + this.jobManager = jobManager; + this.config = config; + } + + @Override + public Set requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.FLUSH_NODE.toAuthorization()); + } + + /** + * {@inheritDoc} + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + NodeFlushRequestPayload payload) + { + String keyspace = keyspace(context, true).name(); + StorageOperations operations = metadataFetcher.delegate(host).storageOperations(); + NodeFlushJob job = new NodeFlushJob(UUIDs.timeBased(), operations, keyspace, payload.tableNames()); + handleOperationalJob(jobManager, config, context, job, false); + } + + /** + * {@inheritDoc} + */ + @Override + protected NodeFlushRequestPayload extractParamsOrThrow(RoutingContext context) + { + String bodyString = context.body().asString(); + if (bodyString == null || bodyString.isBlank() || bodyString.equals("null")) //json encoder writes null as "null" + { + return new NodeFlushRequestPayload(Collections.emptyList()); + } + + NodeFlushRequestPayload payload; + + try + { + payload = Json.decodeValue(bodyString, NodeFlushRequestPayload.class); + } + catch (DecodeException decodeException) + { + logger.warn("Bad request for node flush. Received invalid JSON payload."); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + "Invalid request payload", + decodeException); + } + + return new NodeFlushRequestPayload(validateTableNames(payload.tableNames())); + } + + private List validateTableNames(List tableNames) + { + return tableNames.stream() + .filter(tableName -> tableName != null && !tableName.isBlank()) + .map(tableName -> validator.validateTableName(tableName).name()) + .collect(Collectors.toList()); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/NodeFlushJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeFlushJob.java new file mode 100644 index 000000000..fac4bce6f --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/NodeFlushJob.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.server.StorageOperations; + +/** + * Implementation of {@link OperationalJob} to perform node flush operation. + */ +public class NodeFlushJob extends OperationalJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NodeFlushJob.class); + private static final String OPERATION = "flush"; + private final String keyspace; + private final List tableNames; + protected StorageOperations storageOperations; + + public NodeFlushJob(UUID jobId, StorageOperations storageOps, String keyspace, List tableNames) + { + super(jobId); + this.storageOperations = storageOps; + this.keyspace = keyspace; + this.tableNames = tableNames; + } + + @Override + public boolean isRunningOnCassandra() + { + return false; + } + + /** + * {@inheritDoc} + */ + @Override + protected void executeInternal() throws IOException + { + LOGGER.info("Executing flush operation for keyspace={}, tables={}. jobId={}", keyspace, tableNames, this.jobId()); + + String[] tableArray = tableNames != null ? tableNames.toArray(new String[0]) : new String[0]; + storageOperations.flush(keyspace, tableArray); + + LOGGER.info("Flush operation completed for keyspace={}, tables={}. jobId={}", keyspace, tableNames, this.jobId()); + } + + /** + * {@inheritDoc} + */ + @Override + public String name() + { + return OPERATION; + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java index c4b6b9a34..04fa5d0c5 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java @@ -189,7 +189,7 @@ public Future asyncResult(TaskExecutorPool executorPool, DurationSpec wait /** * OperationalJob body. The implementation is executed in a blocking manner. */ - protected abstract void executeInternal(); + protected abstract void executeInternal() throws Exception; /** * Execute the job behavior as specified in the internal execution {@link #executeInternal()}, diff --git a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java index 3c37ddaa1..75671056d 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java @@ -79,12 +79,16 @@ public OperationalJob getJobIfExists(UUID jobId) * tracked and not running. The job is triggered on a separate internal thread-pool. * The job execution failure behavior is tracked within the {@link OperationalJob}. * - * @param job OperationalJob instance to submit + * @param job OperationalJob instance to submit + * @param checkConflict whether to check for job conflicts before submission * @throws OperationalJobConflictException when the same operational job is already running on Cassandra */ - public void trySubmitJob(OperationalJob job) throws OperationalJobConflictException + public void trySubmitJob(OperationalJob job, boolean checkConflict) throws OperationalJobConflictException { - checkConflict(job); + if (checkConflict) + { + checkConflict(job); + } // New job is submitted for all cases when we do not have a corresponding downstream job jobTracker.computeIfAbsent(job.jobId(), jobId -> { diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java index 9b1278a88..e71b5bd4d 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/CassandraOperationsModule.java @@ -21,6 +21,7 @@ import com.google.inject.AbstractModule; import com.google.inject.multibindings.ProvidesIntoMap; import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; import jakarta.ws.rs.PUT; import jakarta.ws.rs.Path; import org.apache.cassandra.sidecar.adapters.base.db.schema.ConnectedClientsSchema; @@ -45,6 +46,7 @@ import org.apache.cassandra.sidecar.handlers.ListOperationalJobsHandler; import org.apache.cassandra.sidecar.handlers.NativeUpdateHandler; import org.apache.cassandra.sidecar.handlers.NodeDecommissionHandler; +import org.apache.cassandra.sidecar.handlers.NodeFlushHandler; import org.apache.cassandra.sidecar.handlers.OperationalJobHandler; import org.apache.cassandra.sidecar.handlers.RingHandler; import org.apache.cassandra.sidecar.handlers.SchemaHandler; @@ -156,6 +158,29 @@ VertxRoute cassandraNodeDecommissionRoute(RouteBuilder.Factory factory, return factory.buildRouteWithHandler(nodeDecommissionHandler); } + @POST + @Path(ApiEndpointsV1.NODE_FLUSH_ROUTE) + @Operation(summary = "Flush node memtables", + description = "Flushes memtables for the specified keyspace and optional table names") + @APIResponse(description = "Node flush operation completed successfully", + responseCode = "200", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = OperationalJobResponse.class))) + @APIResponse(description = "Node flush operation initiated successfully", + responseCode = "202", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = OperationalJobResponse.class))) + @ProvidesIntoMap + @KeyClassMapKey(VertxRouteMapKeys.CassandraNodeFlushRouteKey.class) + VertxRoute cassandraNodeFlushRoute(RouteBuilder.Factory factory, + NodeFlushHandler nodeFlushHandler) + { + return factory.builderForRoute() + .setBodyHandler(true) + .handler(nodeFlushHandler) + .build(); + } + @GET @Path(ApiEndpointsV1.STREAM_STATS_ROUTE) @Operation(summary = "Get stream statistics", diff --git a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java index e3bec846f..629a75ac6 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java @@ -83,6 +83,11 @@ interface CassandraNodeDecommissionRouteKey extends RouteClassKey HttpMethod HTTP_METHOD = HttpMethod.PUT; String ROUTE_URI = ApiEndpointsV1.NODE_DECOMMISSION_ROUTE; } + interface CassandraNodeFlushRouteKey extends RouteClassKey + { + HttpMethod HTTP_METHOD = HttpMethod.POST; + String ROUTE_URI = ApiEndpointsV1.NODE_FLUSH_ROUTE; + } interface CassandraNodeSettingsRouteKey extends RouteClassKey { HttpMethod HTTP_METHOD = HttpMethod.GET; diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandlerTest.java new file mode 100644 index 000000000..45c98c22c --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/NodeFlushHandlerTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.WebClient; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.modules.SidecarModules; +import org.apache.cassandra.sidecar.server.Server; +import org.mockito.AdditionalAnswers; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.RUNNING; +import static org.apache.cassandra.sidecar.common.data.OperationalJobStatus.SUCCEEDED; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link NodeFlushHandler} + */ +@ExtendWith(VertxExtension.class) +public class NodeFlushHandlerTest +{ + static final Logger LOGGER = LoggerFactory.getLogger(NodeFlushHandlerTest.class); + public static final String LOCAL_HOST = "127.0.0.1"; + public static final String TEST_ROUTE = "/api/v1/cassandra/keyspaces/testkeyspace/flush"; + Vertx vertx; + Server server; + StorageOperations mockStorageOperations = mock(StorageOperations.class); + + @BeforeEach + void before() throws InterruptedException + { + // Reset mock before each test + org.mockito.Mockito.reset(mockStorageOperations); + + Injector injector; + Module testOverride = Modules.override(new TestModule()) + .with(new NodeFlushHandlerTest.NodeFlushTestModule()); + injector = Guice.createInjector(Modules.override(SidecarModules.all()) + .with(testOverride)); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (closeLatch.await(60, TimeUnit.SECONDS)) + LOGGER.info("Close event received before timeout."); + else + LOGGER.error("Close event timed out."); + } + + @Test + void testFlushLongRunning(VertxTestContext context) throws IOException + { + doAnswer(AdditionalAnswers.answersWithDelay(6000, invocation -> null)) + .when(mockStorageOperations).flush("testkeyspace", "table1", "table2"); + + WebClient client = WebClient.create(vertx); + + JsonObject requestBody = new JsonObject(); + requestBody.put("tableNames", Arrays.asList("table1", "table2")); + + client.post(server.actualPort(), LOCAL_HOST, TEST_ROUTE) + .sendJsonObject(requestBody, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(ACCEPTED.code()); + OperationalJobResponse flushResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(flushResponse).isNotNull(); + assertThat(flushResponse.status()).isEqualTo(RUNNING); + assertThat(flushResponse.operation()).isEqualTo("flush"); + context.completeNow(); + })); + } + + @Test + void testFlushCompleted(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + + JsonObject requestBody = new JsonObject(); + requestBody.put("tableNames", Collections.singletonList("table1")); + + client.post(server.actualPort(), LOCAL_HOST, TEST_ROUTE) + .sendJsonObject(requestBody, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + LOGGER.info("Flush Response: {}", response.bodyAsString()); + + OperationalJobResponse flushResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(flushResponse).isNotNull(); + assertThat(flushResponse.status()).isEqualTo(SUCCEEDED); + assertThat(flushResponse.operation()).isEqualTo("flush"); + + try + { + verify(mockStorageOperations).flush("testkeyspace", "table1"); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + context.completeNow(); + })); + } + + @Test + void testFlushEmptyTableList(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + + JsonObject requestBody = new JsonObject(); + requestBody.put("tableNames", Collections.emptyList()); + + client.post(server.actualPort(), LOCAL_HOST, TEST_ROUTE) + .sendJsonObject(requestBody, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse flushResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(flushResponse).isNotNull(); + assertThat(flushResponse.status()).isEqualTo(SUCCEEDED); + assertThat(flushResponse.operation()).isEqualTo("flush"); + + try + { + verify(mockStorageOperations).flush("testkeyspace"); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + context.completeNow(); + })); + } + + @Test + void testFlushNoPayload(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + + client.post(server.actualPort(), LOCAL_HOST, TEST_ROUTE) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + OperationalJobResponse flushResponse = response.bodyAsJson(OperationalJobResponse.class); + assertThat(flushResponse).isNotNull(); + assertThat(flushResponse.status()).isEqualTo(SUCCEEDED); + assertThat(flushResponse.operation()).isEqualTo("flush"); + + try + { + verify(mockStorageOperations).flush("testkeyspace"); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + context.completeNow(); + })); + } + + @Test + void testFlushFailed(VertxTestContext context) throws IOException + { + doThrow(new RuntimeException("Flush failed")).when(mockStorageOperations).flush(anyString(), any(String[].class)); + + WebClient client = WebClient.create(vertx); + + JsonObject requestBody = new JsonObject(); + requestBody.put("tableNames", Collections.singletonList("table1")); + + client.post(server.actualPort(), LOCAL_HOST, TEST_ROUTE) + .sendJsonObject(requestBody, context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + context.completeNow(); + })); + } + + @Test + void testFlushInvalidPayload(VertxTestContext context) + { + WebClient client = WebClient.create(vertx); + + String invalidJson = "{ invalid json }"; + + client.post(server.actualPort(), LOCAL_HOST, TEST_ROUTE) + .sendBuffer(io.vertx.core.buffer.Buffer.buffer(invalidJson), context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code()); + context.completeNow(); + })); + } + + /** + * Test guice module for Node Flush handler tests + */ + class NodeFlushTestModule extends AbstractModule + { + @Provides + @Singleton + public InstancesMetadata instanceMetadata() + { + final int instanceId = 100; + final String host = LOCAL_HOST; + final InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.host()).thenReturn(host); + when(instanceMetadata.port()).thenReturn(9042); + when(instanceMetadata.id()).thenReturn(instanceId); + when(instanceMetadata.stagingDir()).thenReturn(""); + + CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); + + when(delegate.storageOperations()).thenReturn(mockStorageOperations); + when(instanceMetadata.delegate()).thenReturn(delegate); + + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata)); + when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata); + when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata); + + return mockInstancesMetadata; + } + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/NodeFlushJobTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeFlushJobTest.java new file mode 100644 index 000000000..b67b16b99 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/NodeFlushJobTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import org.junit.jupiter.api.Test; + +import com.datastax.driver.core.utils.UUIDs; +import io.vertx.core.Promise; +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.exceptions.OperationalJobException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link NodeFlushJob} + */ +class NodeFlushJobTest +{ + @Test + void testConstructor() + { + UUID jobId = UUIDs.timeBased(); + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + List tableNames = Arrays.asList("table1", "table2"); + + NodeFlushJob job = new NodeFlushJob(jobId, storageOps, keyspace, tableNames); + + assertThat(job.jobId()).isEqualTo(jobId); + assertThat(job.name()).isEqualTo("flush"); + assertThat(job.isRunningOnCassandra()).isFalse(); + } + + @Test + void testConstructorWithNullTableNames() + { + UUID jobId = UUIDs.timeBased(); + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + + NodeFlushJob job = new NodeFlushJob(jobId, storageOps, keyspace, null); + + assertThat(job.jobId()).isEqualTo(jobId); + assertThat(job.name()).isEqualTo("flush"); + assertThat(job.isRunningOnCassandra()).isFalse(); + } + + @Test + void testExecuteInternalWithMultipleTables() throws IOException + { + UUID jobId = UUIDs.timeBased(); + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + List tableNames = Arrays.asList("table1", "table2"); + + NodeFlushJob job = new NodeFlushJob(jobId, storageOps, keyspace, tableNames); + Promise promise = Promise.promise(); + + job.execute(promise); + + assertThat(promise.future().succeeded()).isTrue(); + assertThat(job.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + verify(storageOps).flush(keyspace, "table1", "table2"); + } + + @Test + void testExecuteInternalWithSingleTable() throws IOException + { + UUID jobId = UUIDs.timeBased(); + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + List tableNames = Collections.singletonList("table1"); + + NodeFlushJob job = new NodeFlushJob(jobId, storageOps, keyspace, tableNames); + Promise promise = Promise.promise(); + + job.execute(promise); + + assertThat(promise.future().succeeded()).isTrue(); + assertThat(job.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + verify(storageOps).flush(keyspace, "table1"); + } + + @Test + void testExecuteInternalWithEmptyTableList() throws IOException + { + UUID jobId = UUIDs.timeBased(); + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + List tableNames = Collections.emptyList(); + + NodeFlushJob job = new NodeFlushJob(jobId, storageOps, keyspace, tableNames); + Promise promise = Promise.promise(); + + job.execute(promise); + + assertThat(promise.future().succeeded()).isTrue(); + assertThat(job.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + verify(storageOps).flush(keyspace); + } + + @Test + void testExecuteInternalWithNullTableList() throws IOException + { + UUID jobId = UUIDs.timeBased(); + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + + NodeFlushJob job = new NodeFlushJob(jobId, storageOps, keyspace, null); + Promise promise = Promise.promise(); + + job.execute(promise); + + assertThat(promise.future().succeeded()).isTrue(); + assertThat(job.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + verify(storageOps).flush(keyspace); + } + + @Test + void testExecuteInternalFailure() throws IOException + { + UUID jobId = UUIDs.timeBased(); + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + List tableNames = Arrays.asList("table1", "table2"); + String errorMessage = "Flush operation failed"; + + doThrow(new IOException(errorMessage)).when(storageOps).flush(keyspace, "table1", "table2"); + + NodeFlushJob job = new NodeFlushJob(jobId, storageOps, keyspace, tableNames); + Promise promise = Promise.promise(); + + job.execute(promise); + + assertThat(promise.future().failed()).isTrue(); + assertThat(promise.future().cause()) + .isInstanceOf(OperationalJobException.class) + .hasCauseInstanceOf(IOException.class); + assertThat(promise.future().cause().getCause().getMessage()).isEqualTo(errorMessage); + assertThat(job.status()).isEqualTo(OperationalJobStatus.FAILED); + verify(storageOps).flush(keyspace, "table1", "table2"); + } + + @Test + void testExecuteInternalRuntimeException() throws IOException + { + UUID jobId = UUIDs.timeBased(); + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + List tableNames = Arrays.asList("table1", "table2"); + String errorMessage = "Runtime error during flush"; + + doThrow(new RuntimeException(errorMessage)).when(storageOps).flush(keyspace, "table1", "table2"); + + NodeFlushJob job = new NodeFlushJob(jobId, storageOps, keyspace, tableNames); + Promise promise = Promise.promise(); + + job.execute(promise); + + assertThat(promise.future().failed()).isTrue(); + assertThat(promise.future().cause()) + .isInstanceOf(OperationalJobException.class) + .hasCauseInstanceOf(RuntimeException.class); + assertThat(promise.future().cause().getCause().getMessage()).isEqualTo(errorMessage); + assertThat(job.status()).isEqualTo(OperationalJobStatus.FAILED); + verify(storageOps).flush(keyspace, "table1", "table2"); + } + + @Test + void testJobIdValidation() + { + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + List tableNames = Arrays.asList("table1", "table2"); + + // Test with non-time-based UUID should fail + UUID randomUuid = UUID.randomUUID(); + assertThatThrownBy(() -> new NodeFlushJob(randomUuid, storageOps, keyspace, tableNames)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("OperationalJob accepts only time-based UUID"); + } + + @Test + void testJobStatusLifecycle() + { + UUID jobId = UUIDs.timeBased(); + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + List tableNames = Arrays.asList("table1", "table2"); + + NodeFlushJob job = new NodeFlushJob(jobId, storageOps, keyspace, tableNames); + + // Initial status should be CREATED + assertThat(job.status()).isEqualTo(OperationalJobStatus.CREATED); + assertThat(job.isExecuting()).isFalse(); + + Promise promise = Promise.promise(); + job.execute(promise); + + // After successful execution, status should be SUCCEEDED + assertThat(promise.future().succeeded()).isTrue(); + assertThat(job.status()).isEqualTo(OperationalJobStatus.SUCCEEDED); + } + + @Test + void testCreationTime() + { + UUID jobId = UUIDs.timeBased(); + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + List tableNames = Arrays.asList("table1", "table2"); + + long beforeCreation = System.currentTimeMillis(); + NodeFlushJob job = new NodeFlushJob(jobId, storageOps, keyspace, tableNames); + long afterCreation = System.currentTimeMillis(); + + long creationTime = job.creationTime(); + assertThat(creationTime).isBetween(beforeCreation - 1000, afterCreation + 1000); // Allow 1s tolerance + } + + @Test + void testIsStale() + { + UUID jobId = UUIDs.timeBased(); + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + List tableNames = Arrays.asList("table1", "table2"); + + NodeFlushJob job = new NodeFlushJob(jobId, storageOps, keyspace, tableNames); + + long currentTime = System.currentTimeMillis(); + long ttl = 5000; // 5 seconds + + // Job should not be stale immediately + assertThat(job.isStale(currentTime, ttl)).isFalse(); + + // Job should be stale if reference time is beyond TTL + assertThat(job.isStale(currentTime + ttl + 1000, ttl)).isTrue(); + } + + @Test + void testAsyncResult() + { + UUID jobId = UUIDs.timeBased(); + StorageOperations storageOps = mock(StorageOperations.class); + String keyspace = "testkeyspace"; + List tableNames = Arrays.asList("table1", "table2"); + + NodeFlushJob job = new NodeFlushJob(jobId, storageOps, keyspace, tableNames); + + // Initially, async result should not be complete + assertThat(job.asyncResult().isComplete()).isFalse(); + + Promise promise = Promise.promise(); + job.execute(promise); + + // After execution, async result should be complete + assertThat(job.asyncResult().isComplete()).isTrue(); + assertThat(job.asyncResult().succeeded()).isTrue(); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java index f7b6cfe96..2c3f69f81 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/job/OperationalJobManagerTest.java @@ -77,7 +77,7 @@ void testWithNoDownstreamJob() OperationalJobManager manager = new OperationalJobManager(tracker, executorPool); OperationalJob testJob = OperationalJobTest.createOperationalJob(SUCCEEDED); - manager.trySubmitJob(testJob); + manager.trySubmitJob(testJob, true); testJob.execute(Promise.promise()); assertThat(testJob.asyncResult().isComplete()).isTrue(); assertThat(testJob.status()).isEqualTo(SUCCEEDED); @@ -94,7 +94,7 @@ void testWithRunningDownstreamJob() when(mockPools.internal()).thenReturn(mockExecPool); when(mockExecPool.runBlocking(any())).thenReturn(null); OperationalJobManager manager = new OperationalJobManager(tracker, executorPool); - assertThatThrownBy(() -> manager.trySubmitJob(runningJob)) + assertThatThrownBy(() -> manager.trySubmitJob(runningJob, true)) .isExactlyInstanceOf(OperationalJobConflictException.class) .hasMessage("The same operational job is already running on Cassandra. operationName='Operation X'"); } @@ -109,7 +109,7 @@ void testWithLongRunningJob() OperationalJob testJob = OperationalJobTest.createOperationalJob(jobId, SecondBoundConfiguration.parse("10s")); - manager.trySubmitJob(testJob); + manager.trySubmitJob(testJob, true); // execute the job async. vertx.executeBlocking(testJob::execute); // by the time of checking, the job should still be running. It runs for 10 seconds. @@ -141,7 +141,7 @@ protected void executeInternal() throws OperationalJobException } }; - manager.trySubmitJob(failingJob); + manager.trySubmitJob(failingJob, true); failingJob.execute(Promise.promise()); assertThat(failingJob.asyncResult().isComplete()).isTrue(); assertThat(failingJob.asyncResult().failed()).isTrue();