diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 0301d3d47d..9a6e2a16f8 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -381,6 +381,21 @@ public class ConfigOptions {
// ------------------------------------------------------------------------
// ConfigOptions for Coordinator Server
// ------------------------------------------------------------------------
+ /**
+ * The maximum number of bucket rebalance tasks to activate in one rebalance round.
+ *
+ *
The default value {@code 0} disables round limiting and preserves the existing behavior of
+ * registering the whole rebalance plan at once.
+ */
+ public static final ConfigOption COORDINATOR_REBALANCE_MAX_BUCKETS_PER_ROUND =
+ key("coordinator.rebalance.max-buckets-per-round")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ "The maximum number of bucket rebalance tasks activated in one rebalance round. "
+ + "A positive value splits large rebalance plans into multiple rounds to reduce coordinator and cluster pressure. "
+ + "The default value 0 disables round limiting.");
+
/**
* The config parameter defining the network address to connect to for communication with the
* coordinator server.
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index 537e2df63f..1e793bf04a 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -261,7 +261,11 @@ public CoordinatorEventProcessor(
this.internalListenerName = conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
this.rebalanceManager =
new RebalanceManager(
- this, zooKeeperClient, coordinatorEventManager, SystemClock.getInstance());
+ this,
+ zooKeeperClient,
+ coordinatorEventManager,
+ SystemClock.getInstance(),
+ conf);
this.ioExecutor = ioExecutor;
this.lakeTableHelper =
new LakeTableHelper(zooKeeperClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR));
@@ -1390,25 +1394,9 @@ private RebalanceResponse processRebalance(RebalanceEvent rebalanceEvent) {
rebalanceManager.getRebalanceId()));
}
- RebalanceTask rebalanceTask;
long startTime = System.currentTimeMillis();
- try {
- // 1. generate rebalance plan.
- rebalanceTask =
- rebalanceManager.generateRebalanceTask(rebalanceEvent.getGoalsByPriority());
-
- // 2. execute rebalance plan.
- Map executePlan = rebalanceTask.getExecutePlan();
- zooKeeperClient.registerRebalanceTask(rebalanceTask);
- rebalanceManager.registerRebalance(
- rebalanceTask.getRebalanceId(), executePlan, RebalanceStatus.NOT_STARTED);
- } catch (Exception e) {
- throw new RebalanceFailureException(
- String.format(
- "Failed to generate plan and execute rebalance. The root cause: %s",
- e.getMessage()),
- e);
- }
+ RebalanceTask rebalanceTask =
+ rebalanceManager.generateAndRegisterRebalance(rebalanceEvent.getGoalsByPriority());
LOG.info(
"Generate Rebalance plan rebalance id {} with {} ms.",
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
index 9f2f350bdb..7ed4fd752b 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
@@ -23,7 +23,10 @@
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
import org.apache.fluss.cluster.rebalance.ServerTag;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.NoRebalanceInProgressException;
+import org.apache.fluss.exception.RebalanceFailureException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.server.coordinator.CoordinatorContext;
import org.apache.fluss.server.coordinator.CoordinatorEventProcessor;
@@ -37,6 +40,8 @@
import org.apache.fluss.server.metadata.ServerInfo;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.LeaderAndIsr;
+import org.apache.fluss.server.zk.data.RebalanceExecution;
+import org.apache.fluss.server.zk.data.RebalanceRound;
import org.apache.fluss.server.zk.data.RebalanceTask;
import org.apache.fluss.utils.clock.Clock;
import org.apache.fluss.utils.clock.SystemClock;
@@ -48,7 +53,10 @@
import javax.annotation.Nullable;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -89,6 +97,7 @@ public class RebalanceManager {
private final EventManager eventManager;
private final Clock clock;
private final ScheduledExecutorService timeoutChecker;
+ private final int rebalanceMaxBucketsPerRound;
/** A queue of in progress table bucket to rebalance. */
private final Queue inProgressRebalanceTasksQueue = new ArrayDeque<>();
@@ -105,6 +114,7 @@ public class RebalanceManager {
private volatile long registerTime;
private volatile @Nullable RebalanceStatus rebalanceStatus;
private volatile @Nullable String currentRebalanceId;
+ private volatile @Nullable RebalanceExecution currentRebalanceExecution;
private volatile boolean isClosed = false;
/**
@@ -126,13 +136,22 @@ public RebalanceManager(
ZooKeeperClient zkClient,
EventManager eventManager,
Clock clock) {
+ this(eventProcessor, zkClient, eventManager, clock, new Configuration());
+ }
+
+ public RebalanceManager(
+ CoordinatorEventProcessor eventProcessor,
+ ZooKeeperClient zkClient,
+ EventManager eventManager,
+ Clock clock,
+ Configuration conf) {
this(
eventProcessor,
zkClient,
eventManager,
clock,
- Executors.newScheduledThreadPool(
- 1, new ExecutorThreadFactory("rebalance-timeout")));
+ Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("rebalance-timeout")),
+ conf);
}
@VisibleForTesting
@@ -142,11 +161,24 @@ public RebalanceManager(
EventManager eventManager,
Clock clock,
ScheduledExecutorService timeoutChecker) {
+ this(eventProcessor, zkClient, eventManager, clock, timeoutChecker, new Configuration());
+ }
+
+ @VisibleForTesting
+ RebalanceManager(
+ CoordinatorEventProcessor eventProcessor,
+ ZooKeeperClient zkClient,
+ EventManager eventManager,
+ Clock clock,
+ ScheduledExecutorService timeoutChecker,
+ Configuration conf) {
this.eventProcessor = eventProcessor;
this.zkClient = zkClient;
this.eventManager = eventManager;
this.clock = clock == null ? SystemClock.getInstance() : clock;
this.timeoutChecker = timeoutChecker;
+ this.rebalanceMaxBucketsPerRound =
+ Math.max(0, conf.getInt(ConfigOptions.COORDINATOR_REBALANCE_MAX_BUCKETS_PER_ROUND));
this.goalOptimizer = new GoalOptimizer();
}
@@ -174,16 +206,22 @@ public void start() {
private void initialize() {
try {
- zkClient.getRebalanceTask()
- .ifPresent(
- rebalancePlan ->
- registerRebalance(
- rebalancePlan.getRebalanceId(),
- rebalancePlan.getExecutePlan(),
- rebalancePlan.getRebalanceStatus()));
+ Optional rebalanceExecution = zkClient.getRebalanceExecution();
+ if (rebalanceExecution.isPresent()) {
+ restoreRoundBasedRebalance(rebalanceExecution.get());
+ return;
+ }
+
+ Optional rebalanceTask = zkClient.getRebalanceTask();
+ if (rebalanceTask.isPresent()) {
+ RebalanceTask task = rebalanceTask.get();
+ currentRebalanceExecution = null;
+ registerRebalance(
+ task.getRebalanceId(), task.getExecutePlan(), task.getRebalanceStatus());
+ }
} catch (Exception e) {
LOG.error(
- "Failed to get rebalance plan from zookeeper, it will be treated as no"
+ "Failed to get rebalance plan from ZooKeeper, it will be treated as no "
+ "rebalance tasks.",
e);
}
@@ -193,6 +231,18 @@ public void registerRebalance(
String rebalanceId,
Map rebalancePlan,
RebalanceStatus newStatus) {
+ Map rebalanceResults = new HashMap<>();
+ for (Map.Entry entry : rebalancePlan.entrySet()) {
+ rebalanceResults.put(
+ entry.getKey(), RebalanceResultForBucket.of(entry.getValue(), newStatus));
+ }
+ registerRebalanceResults(rebalanceId, rebalanceResults, newStatus);
+ }
+
+ private void registerRebalanceResults(
+ String rebalanceId,
+ Map rebalanceResults,
+ RebalanceStatus newStatus) {
checkNotClosed();
registerTime = System.currentTimeMillis();
// first clear all exists tasks.
@@ -204,21 +254,24 @@ public void registerRebalance(
inflightTaskStartMs = -1;
currentRebalanceId = rebalanceId;
- if (rebalancePlan.isEmpty()) {
+ if (rebalanceResults.isEmpty()) {
completeRebalance();
return;
}
- rebalancePlan.forEach(
- ((tableBucket, planForBucket) -> {
- if (FINAL_STATUSES.contains(newStatus)) {
+ rebalanceResults.forEach(
+ ((tableBucket, resultForBucket) -> {
+ if (FINAL_STATUSES.contains(resultForBucket.status())) {
finishedRebalanceTasks.put(
- tableBucket, RebalanceResultForBucket.of(planForBucket, newStatus));
+ tableBucket,
+ RebalanceResultForBucket.of(
+ resultForBucket.plan(), resultForBucket.status()));
} else {
inProgressRebalanceTasksQueue.add(tableBucket);
inProgressRebalanceTasks.put(
tableBucket,
- RebalanceResultForBucket.of(planForBucket, NOT_STARTED));
+ RebalanceResultForBucket.of(
+ resultForBucket.plan(), resultForBucket.status()));
}
}));
@@ -240,6 +293,7 @@ public void finishRebalanceTask(TableBucket tableBucket, RebalanceStatus statusF
finishedRebalanceTasks.put(
tableBucket,
RebalanceResultForBucket.of(resultForBucket.plan(), statusForBucket));
+ updateCurrentRoundBucketStatus(tableBucket, resultForBucket.plan(), statusForBucket);
// Clear gate (bucket) first, then data (startMs).
inflightTaskBucket = null;
inflightTaskStartMs = -1;
@@ -251,7 +305,11 @@ public void finishRebalanceTask(TableBucket tableBucket, RebalanceStatus statusF
if (inProgressRebalanceTasksQueue.isEmpty()) {
// All rebalance tasks are completed.
- completeRebalance();
+ if (currentRebalanceExecution == null) {
+ completeRebalance();
+ } else {
+ completeRoundAndMaybeStartNext();
+ }
} else {
// Trigger one rebalance task to execute.
processNewRebalanceTask();
@@ -277,6 +335,11 @@ public void finishRebalanceTask(TableBucket tableBucket, RebalanceStatus statusF
return null;
}
+ Optional roundBasedProgress = listRoundBasedRebalanceProgress();
+ if (roundBasedProgress.isPresent()) {
+ return roundBasedProgress.get();
+ }
+
Map progressForBucketMap = new HashMap<>();
progressForBucketMap.putAll(inProgressRebalanceTasks);
progressForBucketMap.putAll(finishedRebalanceTasks);
@@ -317,9 +380,11 @@ public void cancelRebalance(@Nullable String rebalanceId) {
rebalanceTask.getExecutePlan()));
}
} catch (Exception e) {
- LOG.error("Error when delete rebalance plan from zookeeper.", e);
+ LOG.error("Error when canceling rebalance task in ZooKeeper.", e);
}
+ cancelRoundBasedRebalance();
+
rebalanceStatus = CANCELED;
inProgressRebalanceTasksQueue.clear();
inProgressRebalanceTasks.clear();
@@ -334,6 +399,11 @@ public void cancelRebalance(@Nullable String rebalanceId) {
public boolean hasInProgressRebalance() {
checkNotClosed();
+ RebalanceExecution rebalanceExecution = currentRebalanceExecution;
+ if (rebalanceExecution != null
+ && !FINAL_STATUSES.contains(rebalanceExecution.getRebalanceStatus())) {
+ return true;
+ }
return !inProgressRebalanceTasks.isEmpty() || !inProgressRebalanceTasksQueue.isEmpty();
}
@@ -366,6 +436,314 @@ public RebalanceTask generateRebalanceTask(List goalsByPriority) {
return buildRebalanceTask(rebalanceId, rebalancePlanForBuckets);
}
+ /**
+ * Generates and registers a rebalance task. Large plans are split into recoverable rounds when
+ * {@link ConfigOptions#COORDINATOR_REBALANCE_MAX_BUCKETS_PER_ROUND} is configured with a
+ * positive value.
+ */
+ public RebalanceTask generateAndRegisterRebalance(List goalsByPriority) {
+ checkNotClosed();
+ RebalanceTask rebalanceTask = generateRebalanceTask(goalsByPriority);
+ try {
+ return registerGeneratedRebalanceTask(rebalanceTask);
+ } catch (Exception e) {
+ throw new RebalanceFailureException(
+ String.format(
+ "Failed to generate plan and execute rebalance. The root cause: %s",
+ e.getMessage()),
+ e);
+ }
+ }
+
+ @VisibleForTesting
+ RebalanceTask registerGeneratedRebalanceTask(RebalanceTask rebalanceTask) throws Exception {
+ Map executePlan = rebalanceTask.getExecutePlan();
+
+ zkClient.deleteRebalanceTask();
+ if (!shouldSplitRebalancePlan(executePlan)) {
+ currentRebalanceExecution = null;
+ zkClient.registerRebalanceTask(rebalanceTask);
+ registerRebalance(
+ rebalanceTask.getRebalanceId(), executePlan, RebalanceStatus.NOT_STARTED);
+ return rebalanceTask;
+ }
+
+ List rebalanceRounds = splitRebalancePlan(executePlan);
+ RebalanceExecution rebalanceExecution =
+ new RebalanceExecution(
+ rebalanceTask.getRebalanceId(),
+ REBALANCING,
+ rebalanceMaxBucketsPerRound,
+ 0,
+ rebalanceRounds.size());
+
+ for (RebalanceRound rebalanceRound : rebalanceRounds) {
+ zkClient.registerRebalanceRound(rebalanceRound);
+ }
+ zkClient.registerRebalanceExecution(rebalanceExecution);
+
+ currentRebalanceExecution = rebalanceExecution;
+ RebalanceRound firstRound = rebalanceRounds.get(0);
+ RebalanceTask firstRoundTask =
+ new RebalanceTask(
+ rebalanceTask.getRebalanceId(),
+ RebalanceStatus.NOT_STARTED,
+ firstRound.getExecutePlan());
+ zkClient.registerRebalanceTask(firstRoundTask);
+ registerRebalanceResults(
+ rebalanceTask.getRebalanceId(),
+ firstRound.getProgressForBucketMap(),
+ RebalanceStatus.NOT_STARTED);
+ LOG.info(
+ "Split rebalance task {} into {} rounds by {}={}, current round buckets {}.",
+ rebalanceTask.getRebalanceId(),
+ rebalanceRounds.size(),
+ ConfigOptions.COORDINATOR_REBALANCE_MAX_BUCKETS_PER_ROUND.key(),
+ rebalanceMaxBucketsPerRound,
+ firstRound.getProgressForBucketMap().size());
+ return firstRoundTask;
+ }
+
+ private boolean shouldSplitRebalancePlan(Map executePlan) {
+ return rebalanceMaxBucketsPerRound > 0 && executePlan.size() > rebalanceMaxBucketsPerRound;
+ }
+
+ private List splitRebalancePlan(
+ Map executePlan) {
+ List> sortedEntries =
+ new ArrayList<>(executePlan.entrySet());
+ sortedEntries.sort(
+ Comparator.comparing(
+ (Map.Entry entry) ->
+ entry.getKey().getTableId())
+ .thenComparing(
+ entry ->
+ entry.getKey().getPartitionId() == null
+ ? Long.MIN_VALUE
+ : entry.getKey().getPartitionId())
+ .thenComparing(entry -> entry.getKey().getBucket()));
+
+ List rebalanceRounds = new ArrayList<>();
+ for (int i = 0; i < sortedEntries.size(); i += rebalanceMaxBucketsPerRound) {
+ int end = Math.min(i + rebalanceMaxBucketsPerRound, sortedEntries.size());
+ Map roundPlan = new LinkedHashMap<>();
+ for (int j = i; j < end; j++) {
+ Map.Entry entry = sortedEntries.get(j);
+ roundPlan.put(entry.getKey(), entry.getValue());
+ }
+ rebalanceRounds.add(RebalanceRound.ofPlan(rebalanceRounds.size(), roundPlan));
+ }
+ return rebalanceRounds;
+ }
+
+ private void restoreRoundBasedRebalance(RebalanceExecution rebalanceExecution)
+ throws Exception {
+ List rebalanceRounds = zkClient.getRebalanceRounds();
+ currentRebalanceId = rebalanceExecution.getRebalanceId();
+ rebalanceStatus = rebalanceExecution.getRebalanceStatus();
+ currentRebalanceExecution = rebalanceExecution;
+
+ if (FINAL_STATUSES.contains(rebalanceExecution.getRebalanceStatus())) {
+ LOG.info(
+ "Restore final round-based rebalance {} with status {}.",
+ rebalanceExecution.getRebalanceId(),
+ rebalanceExecution.getRebalanceStatus());
+ return;
+ }
+
+ Optional roundToResume = findFirstUnfinishedRound(rebalanceRounds);
+ if (!roundToResume.isPresent()) {
+ RebalanceExecution completedExecution = rebalanceExecution.withStatus(COMPLETED);
+ zkClient.registerRebalanceExecution(completedExecution);
+ currentRebalanceExecution = completedExecution;
+ rebalanceStatus = COMPLETED;
+ return;
+ }
+
+ RebalanceRound rebalanceRound = roundToResume.get();
+ RebalanceExecution resumedExecution =
+ rebalanceExecution.withStatusAndCurrentRound(
+ REBALANCING, rebalanceRound.getRoundIndex());
+ zkClient.registerRebalanceExecution(resumedExecution);
+ currentRebalanceExecution = resumedExecution;
+ zkClient.registerRebalanceTask(
+ new RebalanceTask(
+ resumedExecution.getRebalanceId(),
+ RebalanceStatus.NOT_STARTED,
+ rebalanceRound.getExecutePlan()));
+ registerRebalanceResults(
+ resumedExecution.getRebalanceId(),
+ rebalanceRound.getProgressForBucketMap(),
+ RebalanceStatus.NOT_STARTED);
+ }
+
+ private Optional findFirstUnfinishedRound(List rounds) {
+ for (RebalanceRound round : rounds) {
+ if (round.hasUnfinishedTasks()) {
+ return Optional.of(round);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private void updateCurrentRoundBucketStatus(
+ TableBucket tableBucket,
+ RebalancePlanForBucket planForBucket,
+ RebalanceStatus rebalanceStatus) {
+ RebalanceExecution rebalanceExecution = currentRebalanceExecution;
+ if (rebalanceExecution == null) {
+ return;
+ }
+
+ try {
+ Optional rebalanceRound =
+ zkClient.getRebalanceRound(rebalanceExecution.getCurrentRound());
+ if (!rebalanceRound.isPresent()) {
+ LOG.warn(
+ "No rebalance round {} found when updating bucket {} status to {}.",
+ rebalanceExecution.getCurrentRound(),
+ tableBucket,
+ rebalanceStatus);
+ return;
+ }
+ zkClient.registerRebalanceRound(
+ rebalanceRound
+ .get()
+ .withBucketStatus(tableBucket, planForBucket, rebalanceStatus));
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to update rebalance round status for bucket {} to {}. "
+ + "The current round progress will be persisted again before advancing rounds.",
+ tableBucket,
+ rebalanceStatus,
+ e);
+ }
+ }
+
+ private void completeRoundAndMaybeStartNext() {
+ RebalanceExecution rebalanceExecution = checkNotNull(currentRebalanceExecution);
+ try {
+ persistCurrentRoundProgress(rebalanceExecution);
+ int nextRoundIndex = rebalanceExecution.getCurrentRound() + 1;
+ if (nextRoundIndex >= rebalanceExecution.getTotalRounds()) {
+ RebalanceExecution completedExecution = rebalanceExecution.withStatus(COMPLETED);
+ zkClient.registerRebalanceExecution(completedExecution);
+ currentRebalanceExecution = completedExecution;
+ Optional currentRound =
+ zkClient.getRebalanceRound(rebalanceExecution.getCurrentRound());
+ zkClient.registerRebalanceTask(
+ new RebalanceTask(
+ rebalanceExecution.getRebalanceId(),
+ COMPLETED,
+ currentRound
+ .map(RebalanceRound::getExecutePlan)
+ .orElseGet(HashMap::new)));
+ completeRebalanceInMemory();
+ return;
+ }
+
+ Optional nextRound = zkClient.getRebalanceRound(nextRoundIndex);
+ if (!nextRound.isPresent()) {
+ throw new RebalanceFailureException(
+ String.format(
+ "Rebalance round %s does not exist for rebalance id %s.",
+ nextRoundIndex, rebalanceExecution.getRebalanceId()));
+ }
+
+ RebalanceExecution nextExecution =
+ rebalanceExecution.withStatusAndCurrentRound(REBALANCING, nextRoundIndex);
+ zkClient.registerRebalanceExecution(nextExecution);
+ currentRebalanceExecution = nextExecution;
+ zkClient.registerRebalanceTask(
+ new RebalanceTask(
+ nextExecution.getRebalanceId(),
+ RebalanceStatus.NOT_STARTED,
+ nextRound.get().getExecutePlan()));
+ registerRebalanceResults(
+ nextExecution.getRebalanceId(),
+ nextRound.get().getProgressForBucketMap(),
+ RebalanceStatus.NOT_STARTED);
+ LOG.info(
+ "Start rebalance round {}/{} for rebalance id {} with {} buckets.",
+ nextRoundIndex + 1,
+ nextExecution.getTotalRounds(),
+ nextExecution.getRebalanceId(),
+ nextRound.get().getProgressForBucketMap().size());
+ } catch (Exception e) {
+ throw new RebalanceFailureException(
+ String.format(
+ "Failed to complete current rebalance round. The root cause: %s",
+ e.getMessage()),
+ e);
+ }
+ }
+
+ private void persistCurrentRoundProgress(RebalanceExecution rebalanceExecution)
+ throws Exception {
+ Map progressForBucketMap = new LinkedHashMap<>();
+ progressForBucketMap.putAll(finishedRebalanceTasks);
+ progressForBucketMap.putAll(inProgressRebalanceTasks);
+ zkClient.registerRebalanceRound(
+ new RebalanceRound(rebalanceExecution.getCurrentRound(), progressForBucketMap));
+ }
+
+ private Optional listRoundBasedRebalanceProgress() {
+ RebalanceExecution rebalanceExecution = currentRebalanceExecution;
+ if (rebalanceExecution == null) {
+ try {
+ Optional persistedExecution = zkClient.getRebalanceExecution();
+ if (persistedExecution.isPresent()) {
+ rebalanceExecution = persistedExecution.get();
+ }
+ } catch (Exception e) {
+ LOG.error("Error when loading round-based rebalance execution from zookeeper.", e);
+ return Optional.empty();
+ }
+ }
+
+ if (rebalanceExecution == null) {
+ return Optional.empty();
+ }
+
+ try {
+ Map progressForBucketMap = new HashMap<>();
+ for (RebalanceRound rebalanceRound : zkClient.getRebalanceRounds()) {
+ progressForBucketMap.putAll(rebalanceRound.getProgressForBucketMap());
+ }
+ return Optional.of(
+ new RebalanceProgress(
+ rebalanceExecution.getRebalanceId(),
+ rebalanceExecution.getRebalanceStatus(),
+ 0.0,
+ progressForBucketMap));
+ } catch (Exception e) {
+ LOG.error("Error when loading round-based rebalance progress from zookeeper.", e);
+ return Optional.empty();
+ }
+ }
+
+ private void cancelRoundBasedRebalance() {
+ RebalanceExecution rebalanceExecution = currentRebalanceExecution;
+ if (rebalanceExecution == null) {
+ return;
+ }
+
+ try {
+ for (RebalanceRound rebalanceRound : zkClient.getRebalanceRounds()) {
+ zkClient.registerRebalanceRound(rebalanceRound.withUnfinishedStatus(CANCELED));
+ }
+ RebalanceExecution canceledExecution = rebalanceExecution.withStatus(CANCELED);
+ zkClient.registerRebalanceExecution(canceledExecution);
+ currentRebalanceExecution = canceledExecution;
+ } catch (Exception e) {
+ throw new RebalanceFailureException(
+ String.format(
+ "Failed to cancel round-based rebalance. The root cause: %s",
+ e.getMessage()),
+ e);
+ }
+ }
+
public @Nullable RebalancePlanForBucket getRebalancePlanForBucket(TableBucket tableBucket) {
checkNotClosed();
RebalanceResultForBucket resultForBucket = inProgressRebalanceTasks.get(tableBucket);
@@ -407,9 +785,16 @@ private void completeRebalance() {
LOG.error("Error when update rebalance plan from zookeeper.", e);
}
+ completeRebalanceInMemory();
+ }
+
+ private void completeRebalanceInMemory() {
rebalanceStatus = COMPLETED;
inProgressRebalanceTasks.clear();
inProgressRebalanceTasksQueue.clear();
+ // Clear gate (bucket) first, then data (startMs).
+ inflightTaskBucket = null;
+ inflightTaskStartMs = -1;
// Here, it will not clear finishedRebalanceTasks, because it will be used by
// listRebalanceProgress. It will be cleared when next register.
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index 533cca6ff4..dc3e9cadc3 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -18,7 +18,6 @@
package org.apache.fluss.server.zk;
import org.apache.fluss.annotation.Internal;
-import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.FlussConfigUtils;
@@ -50,6 +49,8 @@
import org.apache.fluss.server.zk.data.LeaderAndIsr;
import org.apache.fluss.server.zk.data.PartitionAssignment;
import org.apache.fluss.server.zk.data.PartitionRegistration;
+import org.apache.fluss.server.zk.data.RebalanceExecution;
+import org.apache.fluss.server.zk.data.RebalanceRound;
import org.apache.fluss.server.zk.data.RebalanceTask;
import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
import org.apache.fluss.server.zk.data.ResourceAcl;
@@ -76,6 +77,9 @@
import org.apache.fluss.server.zk.data.ZkData.PartitionsZNode;
import org.apache.fluss.server.zk.data.ZkData.ProducerIdZNode;
import org.apache.fluss.server.zk.data.ZkData.ProducersZNode;
+import org.apache.fluss.server.zk.data.ZkData.RebalanceExecutionZNode;
+import org.apache.fluss.server.zk.data.ZkData.RebalanceRoundZNode;
+import org.apache.fluss.server.zk.data.ZkData.RebalanceRoundsZNode;
import org.apache.fluss.server.zk.data.ZkData.RebalanceZNode;
import org.apache.fluss.server.zk.data.ZkData.ResourceAclNode;
import org.apache.fluss.server.zk.data.ZkData.SchemaZNode;
@@ -1623,16 +1627,7 @@ public void deleteServerTags() throws Exception {
}
public void registerRebalanceTask(RebalanceTask rebalanceTask) throws Exception {
- String path = RebalanceZNode.path();
- Stat stat = zkClient.checkExists().forPath(path);
- if (stat == null) {
- zkClient.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path, RebalanceZNode.encode(rebalanceTask));
- } else {
- zkClient.setData().forPath(path, RebalanceZNode.encode(rebalanceTask));
- }
+ upsertPath(RebalanceZNode.path(), RebalanceZNode.encode(rebalanceTask));
}
public Optional getRebalanceTask() throws Exception {
@@ -1640,10 +1635,45 @@ public Optional getRebalanceTask() throws Exception {
return getOrEmpty(path).map(RebalanceZNode::decode);
}
- /** Deletes the rebalance task from ZooKeeper. Only for testing propose now */
- @VisibleForTesting
+ public void registerRebalanceExecution(RebalanceExecution rebalanceExecution) throws Exception {
+ upsertPath(
+ RebalanceExecutionZNode.path(), RebalanceExecutionZNode.encode(rebalanceExecution));
+ }
+
+ public Optional getRebalanceExecution() throws Exception {
+ return getOrEmpty(RebalanceExecutionZNode.path()).map(RebalanceExecutionZNode::decode);
+ }
+
+ public void registerRebalanceRound(RebalanceRound rebalanceRound) throws Exception {
+ upsertPath(
+ RebalanceRoundZNode.path(rebalanceRound.getRoundIndex()),
+ RebalanceRoundZNode.encode(rebalanceRound));
+ }
+
+ public Optional getRebalanceRound(int roundIndex) throws Exception {
+ return getOrEmpty(RebalanceRoundZNode.path(roundIndex)).map(RebalanceRoundZNode::decode);
+ }
+
+ public List getRebalanceRounds() throws Exception {
+ List roundIndexes = new ArrayList<>();
+ for (String child : getChildren(RebalanceRoundsZNode.path())) {
+ roundIndexes.add(Integer.parseInt(child));
+ }
+ Collections.sort(roundIndexes);
+
+ List rebalanceRounds = new ArrayList<>();
+ for (Integer roundIndex : roundIndexes) {
+ Optional rebalanceRound = getRebalanceRound(roundIndex);
+ if (rebalanceRound.isPresent()) {
+ rebalanceRounds.add(rebalanceRound.get());
+ }
+ }
+ return rebalanceRounds;
+ }
+
+ /** Deletes the rebalance task and all round-based rebalance metadata from ZooKeeper. */
public void deleteRebalanceTask() throws Exception {
- deletePath(RebalanceZNode.path());
+ deletePathWithChildren(RebalanceZNode.path());
}
// --------------------------------------------------------------------------------------------
@@ -1682,6 +1712,26 @@ public void deletePath(String path) throws Exception {
}
}
+ /** Delete a path with all children. */
+ public void deletePathWithChildren(String path) throws Exception {
+ try {
+ zkClient.delete().deletingChildrenIfNeeded().forPath(path);
+ } catch (KeeperException.NoNodeException ignored) {
+ }
+ }
+
+ private void upsertPath(String path, byte[] data) throws Exception {
+ Stat stat = zkClient.checkExists().forPath(path);
+ if (stat == null) {
+ zkClient.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path, data);
+ } else {
+ zkClient.setData().forPath(path, data);
+ }
+ }
+
public CuratorFramework getCuratorClient() {
return zkClient;
}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalanceExecution.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalanceExecution.java
new file mode 100644
index 0000000000..1f53a42906
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalanceExecution.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
+
+import java.util.Objects;
+
+/** Persistent metadata for a round-based rebalance execution. */
+public final class RebalanceExecution {
+
+ private final String rebalanceId;
+ private final RebalanceStatus rebalanceStatus;
+ private final int maxBucketsPerRound;
+ private final int currentRound;
+ private final int totalRounds;
+
+ public RebalanceExecution(
+ String rebalanceId,
+ RebalanceStatus rebalanceStatus,
+ int maxBucketsPerRound,
+ int currentRound,
+ int totalRounds) {
+ this.rebalanceId = rebalanceId;
+ this.rebalanceStatus = rebalanceStatus;
+ this.maxBucketsPerRound = maxBucketsPerRound;
+ this.currentRound = currentRound;
+ this.totalRounds = totalRounds;
+ }
+
+ public String getRebalanceId() {
+ return rebalanceId;
+ }
+
+ public RebalanceStatus getRebalanceStatus() {
+ return rebalanceStatus;
+ }
+
+ public int getMaxBucketsPerRound() {
+ return maxBucketsPerRound;
+ }
+
+ public int getCurrentRound() {
+ return currentRound;
+ }
+
+ public int getTotalRounds() {
+ return totalRounds;
+ }
+
+ public RebalanceExecution withStatus(RebalanceStatus newStatus) {
+ return new RebalanceExecution(
+ rebalanceId, newStatus, maxBucketsPerRound, currentRound, totalRounds);
+ }
+
+ public RebalanceExecution withCurrentRound(int newCurrentRound) {
+ return new RebalanceExecution(
+ rebalanceId, rebalanceStatus, maxBucketsPerRound, newCurrentRound, totalRounds);
+ }
+
+ public RebalanceExecution withStatusAndCurrentRound(
+ RebalanceStatus newStatus, int newCurrentRound) {
+ return new RebalanceExecution(
+ rebalanceId, newStatus, maxBucketsPerRound, newCurrentRound, totalRounds);
+ }
+
+ @Override
+ public String toString() {
+ return "RebalanceExecution{"
+ + "rebalanceId='"
+ + rebalanceId
+ + '\''
+ + ", rebalanceStatus="
+ + rebalanceStatus
+ + ", maxBucketsPerRound="
+ + maxBucketsPerRound
+ + ", currentRound="
+ + currentRound
+ + ", totalRounds="
+ + totalRounds
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RebalanceExecution that = (RebalanceExecution) o;
+ return maxBucketsPerRound == that.maxBucketsPerRound
+ && currentRound == that.currentRound
+ && totalRounds == that.totalRounds
+ && Objects.equals(rebalanceId, that.rebalanceId)
+ && rebalanceStatus == that.rebalanceStatus;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ rebalanceId, rebalanceStatus, maxBucketsPerRound, currentRound, totalRounds);
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalanceExecutionJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalanceExecutionJsonSerde.java
new file mode 100644
index 0000000000..eda743c1e3
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalanceExecutionJsonSerde.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
+import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+
+/** Json serializer and deserializer for {@link RebalanceExecution}. */
+public class RebalanceExecutionJsonSerde
+ implements JsonSerializer, JsonDeserializer {
+
+ public static final RebalanceExecutionJsonSerde INSTANCE = new RebalanceExecutionJsonSerde();
+
+ private static final String VERSION_KEY = "version";
+ private static final String REBALANCE_ID = "rebalance_id";
+ private static final String REBALANCE_STATUS = "rebalance_status";
+ private static final String MAX_BUCKETS_PER_ROUND = "max_buckets_per_round";
+ private static final String CURRENT_ROUND = "current_round";
+ private static final String TOTAL_ROUNDS = "total_rounds";
+
+ private static final int VERSION = 1;
+
+ @Override
+ public void serialize(RebalanceExecution execution, JsonGenerator generator)
+ throws IOException {
+ generator.writeStartObject();
+ generator.writeNumberField(VERSION_KEY, VERSION);
+ generator.writeStringField(REBALANCE_ID, execution.getRebalanceId());
+ generator.writeNumberField(REBALANCE_STATUS, execution.getRebalanceStatus().getCode());
+ generator.writeNumberField(MAX_BUCKETS_PER_ROUND, execution.getMaxBucketsPerRound());
+ generator.writeNumberField(CURRENT_ROUND, execution.getCurrentRound());
+ generator.writeNumberField(TOTAL_ROUNDS, execution.getTotalRounds());
+ generator.writeEndObject();
+ }
+
+ @Override
+ public RebalanceExecution deserialize(JsonNode node) {
+ return new RebalanceExecution(
+ node.get(REBALANCE_ID).asText(),
+ RebalanceStatus.of(node.get(REBALANCE_STATUS).asInt()),
+ node.get(MAX_BUCKETS_PER_ROUND).asInt(),
+ node.get(CURRENT_ROUND).asInt(),
+ node.get(TOTAL_ROUNDS).asInt());
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalanceRound.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalanceRound.java
new file mode 100644
index 0000000000..9bc0194bd8
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalanceRound.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
+import org.apache.fluss.metadata.TableBucket;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.FINAL_STATUSES;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED;
+
+/** Persistent task state for one round of a rebalance execution. */
+public final class RebalanceRound {
+
+ private final int roundIndex;
+ private final Map progressForBucketMap;
+
+ public RebalanceRound(
+ int roundIndex, Map progressForBucketMap) {
+ this.roundIndex = roundIndex;
+ this.progressForBucketMap =
+ Collections.unmodifiableMap(new LinkedHashMap<>(progressForBucketMap));
+ }
+
+ public static RebalanceRound ofPlan(
+ int roundIndex, Map executePlan) {
+ Map progressForBucketMap = new LinkedHashMap<>();
+ for (Map.Entry entry : executePlan.entrySet()) {
+ progressForBucketMap.put(
+ entry.getKey(), RebalanceResultForBucket.of(entry.getValue(), NOT_STARTED));
+ }
+ return new RebalanceRound(roundIndex, progressForBucketMap);
+ }
+
+ public int getRoundIndex() {
+ return roundIndex;
+ }
+
+ public Map getProgressForBucketMap() {
+ return progressForBucketMap;
+ }
+
+ public Map getExecutePlan() {
+ Map executePlan = new LinkedHashMap<>();
+ for (Map.Entry entry :
+ progressForBucketMap.entrySet()) {
+ executePlan.put(entry.getKey(), entry.getValue().plan());
+ }
+ return executePlan;
+ }
+
+ public boolean hasUnfinishedTasks() {
+ for (RebalanceResultForBucket resultForBucket : progressForBucketMap.values()) {
+ if (!FINAL_STATUSES.contains(resultForBucket.status())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public RebalanceRound withBucketStatus(
+ TableBucket tableBucket,
+ RebalancePlanForBucket planForBucket,
+ RebalanceStatus rebalanceStatus) {
+ Map newProgressForBucketMap =
+ new LinkedHashMap<>(progressForBucketMap);
+ newProgressForBucketMap.put(
+ tableBucket, RebalanceResultForBucket.of(planForBucket, rebalanceStatus));
+ return new RebalanceRound(roundIndex, newProgressForBucketMap);
+ }
+
+ public RebalanceRound withUnfinishedStatus(RebalanceStatus rebalanceStatus) {
+ Map newProgressForBucketMap = new LinkedHashMap<>();
+ for (Map.Entry entry :
+ progressForBucketMap.entrySet()) {
+ RebalanceResultForBucket resultForBucket = entry.getValue();
+ if (FINAL_STATUSES.contains(resultForBucket.status())) {
+ newProgressForBucketMap.put(entry.getKey(), resultForBucket);
+ } else {
+ newProgressForBucketMap.put(
+ entry.getKey(),
+ RebalanceResultForBucket.of(resultForBucket.plan(), rebalanceStatus));
+ }
+ }
+ return new RebalanceRound(roundIndex, newProgressForBucketMap);
+ }
+
+ @Override
+ public String toString() {
+ return "RebalanceRound{"
+ + "roundIndex="
+ + roundIndex
+ + ", progressForBucketMap="
+ + progressForBucketMap
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RebalanceRound that = (RebalanceRound) o;
+ if (roundIndex != that.roundIndex
+ || progressForBucketMap.size() != that.progressForBucketMap.size()) {
+ return false;
+ }
+ for (Map.Entry entry :
+ progressForBucketMap.entrySet()) {
+ RebalanceResultForBucket thatResult = that.progressForBucketMap.get(entry.getKey());
+ if (thatResult == null || !equalsRebalanceResult(entry.getValue(), thatResult)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int entriesHash = 0;
+ for (Map.Entry entry :
+ progressForBucketMap.entrySet()) {
+ RebalanceResultForBucket resultForBucket = entry.getValue();
+ entriesHash +=
+ Objects.hash(entry.getKey(), resultForBucket.plan(), resultForBucket.status());
+ }
+ return 31 * Objects.hash(roundIndex) + entriesHash;
+ }
+
+ private static boolean equalsRebalanceResult(
+ RebalanceResultForBucket left, RebalanceResultForBucket right) {
+ return Objects.equals(left.plan(), right.plan()) && left.status() == right.status();
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalanceRoundJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalanceRoundJsonSerde.java
new file mode 100644
index 0000000000..31884565fc
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalanceRoundJsonSerde.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePartition;
+import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Json serializer and deserializer for {@link RebalanceRound}. */
+public class RebalanceRoundJsonSerde
+ implements JsonSerializer, JsonDeserializer {
+
+ public static final RebalanceRoundJsonSerde INSTANCE = new RebalanceRoundJsonSerde();
+
+ private static final String VERSION_KEY = "version";
+ private static final String ROUND_INDEX = "round_index";
+ private static final String REBALANCE_PLAN = "rebalance_plan";
+
+ private static final String TABLE_ID = "table_id";
+ private static final String PARTITION_ID = "partition_id";
+
+ private static final String BUCKETS = "buckets";
+ private static final String BUCKET_ID = "bucket_id";
+ private static final String ORIGINAL_LEADER = "original_leader";
+ private static final String NEW_LEADER = "new_leader";
+ private static final String ORIGIN_REPLICAS = "origin_replicas";
+ private static final String NEW_REPLICAS = "new_replicas";
+ private static final String REBALANCE_STATUS = "rebalance_status";
+
+ private static final int VERSION = 1;
+
+ @Override
+ public void serialize(RebalanceRound rebalanceRound, JsonGenerator generator)
+ throws IOException {
+ generator.writeStartObject();
+ generator.writeNumberField(VERSION_KEY, VERSION);
+ generator.writeNumberField(ROUND_INDEX, rebalanceRound.getRoundIndex());
+
+ Map> resultsForBuckets = new LinkedHashMap<>();
+ Map> resultsForBucketsOfPartitionedTable =
+ new LinkedHashMap<>();
+ for (Map.Entry entry :
+ rebalanceRound.getProgressForBucketMap().entrySet()) {
+ TableBucket tableBucket = entry.getKey();
+ if (tableBucket.getPartitionId() == null) {
+ resultsForBuckets
+ .computeIfAbsent(tableBucket.getTableId(), k -> new ArrayList<>())
+ .add(entry.getValue());
+ } else {
+ resultsForBucketsOfPartitionedTable
+ .computeIfAbsent(
+ new TablePartition(
+ tableBucket.getTableId(), tableBucket.getPartitionId()),
+ k -> new ArrayList<>())
+ .add(entry.getValue());
+ }
+ }
+
+ generator.writeArrayFieldStart(REBALANCE_PLAN);
+ for (Map.Entry> entry : resultsForBuckets.entrySet()) {
+ generator.writeStartObject();
+ generator.writeNumberField(TABLE_ID, entry.getKey());
+ generator.writeArrayFieldStart(BUCKETS);
+ for (RebalanceResultForBucket resultForBucket : entry.getValue()) {
+ serializeRebalanceResultForBucket(generator, resultForBucket);
+ }
+ generator.writeEndArray();
+ generator.writeEndObject();
+ }
+ for (Map.Entry> entry :
+ resultsForBucketsOfPartitionedTable.entrySet()) {
+ generator.writeStartObject();
+ generator.writeNumberField(TABLE_ID, entry.getKey().getTableId());
+ generator.writeNumberField(PARTITION_ID, entry.getKey().getPartitionId());
+ generator.writeArrayFieldStart(BUCKETS);
+ for (RebalanceResultForBucket resultForBucket : entry.getValue()) {
+ serializeRebalanceResultForBucket(generator, resultForBucket);
+ }
+ generator.writeEndArray();
+ generator.writeEndObject();
+ }
+ generator.writeEndArray();
+ generator.writeEndObject();
+ }
+
+ @Override
+ public RebalanceRound deserialize(JsonNode node) {
+ int roundIndex = node.get(ROUND_INDEX).asInt();
+ Map progressForBucketMap = new LinkedHashMap<>();
+ for (JsonNode tablePartitionPlanNode : node.get(REBALANCE_PLAN)) {
+ long tableId = tablePartitionPlanNode.get(TABLE_ID).asLong();
+
+ Long partitionId = null;
+ if (tablePartitionPlanNode.has(PARTITION_ID)) {
+ partitionId = tablePartitionPlanNode.get(PARTITION_ID).asLong();
+ }
+
+ for (JsonNode bucketPlanNode : tablePartitionPlanNode.get(BUCKETS)) {
+ int bucketId = bucketPlanNode.get(BUCKET_ID).asInt();
+ TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
+ RebalancePlanForBucket planForBucket =
+ new RebalancePlanForBucket(
+ tableBucket,
+ bucketPlanNode.get(ORIGINAL_LEADER).asInt(),
+ bucketPlanNode.get(NEW_LEADER).asInt(),
+ toIntegerList(bucketPlanNode.get(ORIGIN_REPLICAS)),
+ toIntegerList(bucketPlanNode.get(NEW_REPLICAS)));
+ RebalanceStatus rebalanceStatus =
+ RebalanceStatus.of(bucketPlanNode.get(REBALANCE_STATUS).asInt());
+ progressForBucketMap.put(
+ tableBucket, RebalanceResultForBucket.of(planForBucket, rebalanceStatus));
+ }
+ }
+ return new RebalanceRound(roundIndex, progressForBucketMap);
+ }
+
+ private void serializeRebalanceResultForBucket(
+ JsonGenerator generator, RebalanceResultForBucket resultForBucket) throws IOException {
+ RebalancePlanForBucket bucketPlan = resultForBucket.plan();
+ generator.writeStartObject();
+ generator.writeNumberField(BUCKET_ID, bucketPlan.getBucketId());
+ generator.writeNumberField(ORIGINAL_LEADER, bucketPlan.getOriginalLeader());
+ generator.writeNumberField(NEW_LEADER, bucketPlan.getNewLeader());
+ generator.writeArrayFieldStart(ORIGIN_REPLICAS);
+ for (Integer replica : bucketPlan.getOriginReplicas()) {
+ generator.writeNumber(replica);
+ }
+ generator.writeEndArray();
+ generator.writeArrayFieldStart(NEW_REPLICAS);
+ for (Integer replica : bucketPlan.getNewReplicas()) {
+ generator.writeNumber(replica);
+ }
+ generator.writeEndArray();
+ generator.writeNumberField(REBALANCE_STATUS, resultForBucket.status().getCode());
+ generator.writeEndObject();
+ }
+
+ private List toIntegerList(JsonNode node) {
+ List values = new ArrayList<>();
+ Iterator elements = node.elements();
+ while (elements.hasNext()) {
+ values.add(elements.next().asInt());
+ }
+ return values;
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
index 941512e0f2..7ce009748c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
@@ -903,6 +903,57 @@ public static RebalanceTask decode(byte[] json) {
}
}
+ /**
+ * The znode for round-based rebalance execution metadata. The znode path is:
+ *
+ * /cluster/rebalance/execution
+ */
+ public static final class RebalanceExecutionZNode {
+ public static String path() {
+ return RebalanceZNode.path() + "/execution";
+ }
+
+ public static byte[] encode(RebalanceExecution rebalanceExecution) {
+ return JsonSerdeUtils.writeValueAsBytes(
+ rebalanceExecution, RebalanceExecutionJsonSerde.INSTANCE);
+ }
+
+ public static RebalanceExecution decode(byte[] json) {
+ return JsonSerdeUtils.readValue(json, RebalanceExecutionJsonSerde.INSTANCE);
+ }
+ }
+
+ /**
+ * The znode for all round-based rebalance round tasks. The znode path is:
+ *
+ *
/cluster/rebalance/rounds
+ */
+ public static final class RebalanceRoundsZNode {
+ public static String path() {
+ return RebalanceZNode.path() + "/rounds";
+ }
+ }
+
+ /**
+ * The znode for one round-based rebalance round task. The znode path is:
+ *
+ *
/cluster/rebalance/rounds/[roundIndex]
+ */
+ public static final class RebalanceRoundZNode {
+ public static String path(int roundIndex) {
+ return RebalanceRoundsZNode.path() + "/" + roundIndex;
+ }
+
+ public static byte[] encode(RebalanceRound rebalanceRound) {
+ return JsonSerdeUtils.writeValueAsBytes(
+ rebalanceRound, RebalanceRoundJsonSerde.INSTANCE);
+ }
+
+ public static RebalanceRound decode(byte[] json) {
+ return JsonSerdeUtils.readValue(json, RebalanceRoundJsonSerde.INSTANCE);
+ }
+ }
+
// ------------------------------------------------------------------------------------------
// ZNodes under "/producers/"
// ------------------------------------------------------------------------------------------
diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
index 0360b6a4e8..12a08b02c4 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.fluss.server.coordinator.rebalance;
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceProgress;
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
import org.apache.fluss.config.ConfigOptions;
@@ -41,6 +42,8 @@
import org.apache.fluss.server.zk.ZkEpoch;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.server.zk.data.RebalanceExecution;
+import org.apache.fluss.server.zk.data.RebalanceRound;
import org.apache.fluss.server.zk.data.RebalanceTask;
import org.apache.fluss.testutils.common.AllCallbackWrapper;
import org.apache.fluss.utils.clock.ManualClock;
@@ -57,15 +60,23 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED;
import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED;
import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING;
import static org.apache.fluss.cluster.rebalance.RebalanceStatus.TIMEOUT;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
/** Test for {@link RebalanceManager}. */
public class RebalanceManagerTest {
@@ -162,6 +173,231 @@ void testRebalanceWithoutTask() throws Exception {
.hasValue(new RebalanceTask(rebalanceId, COMPLETED, new HashMap<>()));
}
+ @Test
+ void testRegisterGeneratedRebalanceTaskSplitsIntoRoundsAndCompletes() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_BUCKETS_PER_ROUND, 2);
+ RebalanceManager manager = createTestingRebalanceManager(conf);
+
+ Map plan = createRebalancePlan(5);
+ RebalanceTask firstRoundTask =
+ manager.registerGeneratedRebalanceTask(
+ new RebalanceTask("round-test", NOT_STARTED, plan));
+
+ assertThat(firstRoundTask.getExecutePlan()).hasSize(2);
+ assertThat(zookeeperClient.getRebalanceTask().get().getExecutePlan()).hasSize(2);
+ assertThat(zookeeperClient.getRebalanceRounds()).hasSize(3);
+ assertThat(zookeeperClient.getRebalanceExecution())
+ .hasValue(new RebalanceExecution("round-test", REBALANCING, 2, 0, 3));
+ assertThat(manager.listRebalanceProgress(null).progressForBucketMap()).hasSize(5);
+
+ finishCurrentRound(manager, COMPLETED);
+ assertThat(zookeeperClient.getRebalanceExecution())
+ .hasValue(new RebalanceExecution("round-test", REBALANCING, 2, 1, 3));
+ assertThat(zookeeperClient.getRebalanceTask().get().getExecutePlan()).hasSize(2);
+
+ finishCurrentRound(manager, COMPLETED);
+ assertThat(zookeeperClient.getRebalanceExecution())
+ .hasValue(new RebalanceExecution("round-test", REBALANCING, 2, 2, 3));
+ assertThat(zookeeperClient.getRebalanceTask().get().getExecutePlan()).hasSize(1);
+
+ finishCurrentRound(manager, COMPLETED);
+ assertThat(zookeeperClient.getRebalanceExecution())
+ .hasValue(new RebalanceExecution("round-test", COMPLETED, 2, 2, 3));
+ assertThat(zookeeperClient.getRebalanceTask().get().getRebalanceStatus())
+ .isEqualTo(COMPLETED);
+ assertThat(manager.hasInProgressRebalance()).isFalse();
+ RebalanceProgress progress = manager.listRebalanceProgress(null);
+ assertThat(progress.status()).isEqualTo(COMPLETED);
+ assertThat(progress.progressForBucketMap()).hasSize(5);
+ assertThat(progress.progressForBucketMap().values())
+ .allSatisfy(result -> assertThat(result.status()).isEqualTo(COMPLETED));
+
+ manager.close();
+ }
+
+ @Test
+ void testRoundBasedRebalanceContinuesWhenBucketProgressUpdateFails() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_BUCKETS_PER_ROUND, 2);
+ ZooKeeperClient flakyZkClient = spy(zookeeperClient);
+ RebalanceManager manager =
+ new RebalanceManager(
+ buildCoordinatorEventProcessor(conf),
+ flakyZkClient,
+ new RecordingEventManager(),
+ SystemClock.getInstance(),
+ new NoOpScheduledExecutor(),
+ conf);
+ manager.startup();
+
+ Map plan = createRebalancePlan(3);
+ manager.registerGeneratedRebalanceTask(
+ new RebalanceTask("round-update-failure-test", NOT_STARTED, plan));
+ AtomicBoolean failNextRoundUpdate = new AtomicBoolean(true);
+ doAnswer(
+ invocation -> {
+ RebalanceRound rebalanceRound = invocation.getArgument(0);
+ if (failNextRoundUpdate.getAndSet(false)
+ && rebalanceRound.getRoundIndex() == 0) {
+ throw new Exception("Injected round progress update failure.");
+ }
+ invocation.callRealMethod();
+ return null;
+ })
+ .when(flakyZkClient)
+ .registerRebalanceRound(any(RebalanceRound.class));
+
+ List firstRoundBuckets =
+ new ArrayList<>(flakyZkClient.getRebalanceTask().get().getExecutePlan().keySet());
+ for (TableBucket tableBucket : firstRoundBuckets) {
+ manager.finishRebalanceTask(tableBucket, COMPLETED);
+ }
+
+ assertThat(flakyZkClient.getRebalanceExecution())
+ .hasValue(
+ new RebalanceExecution("round-update-failure-test", REBALANCING, 2, 1, 2));
+ assertThat(flakyZkClient.getRebalanceTask().get().getExecutePlan()).hasSize(1);
+ assertThat(flakyZkClient.getRebalanceRound(0)).isPresent();
+ assertThat(flakyZkClient.getRebalanceRound(0).get().getProgressForBucketMap().values())
+ .allSatisfy(result -> assertThat(result.status()).isEqualTo(COMPLETED));
+
+ manager.close();
+ }
+
+ @Test
+ void testRegisterGeneratedRebalanceTaskKeepsLegacyTaskWhenRoundLimitDisabled()
+ throws Exception {
+ RebalanceManager manager = createTestingRebalanceManager(new Configuration());
+
+ Map plan = createRebalancePlan(3);
+ RebalanceTask rebalanceTask =
+ manager.registerGeneratedRebalanceTask(
+ new RebalanceTask("legacy-test", NOT_STARTED, plan));
+
+ assertThat(rebalanceTask.getExecutePlan()).hasSize(3);
+ assertThat(zookeeperClient.getRebalanceTask()).isPresent();
+ RebalanceTask persistedTask = zookeeperClient.getRebalanceTask().get();
+ assertThat(persistedTask.getRebalanceId()).isEqualTo("legacy-test");
+ assertThat(persistedTask.getRebalanceStatus()).isEqualTo(NOT_STARTED);
+ assertThat(persistedTask.getExecutePlan()).isEqualTo(plan);
+ assertThat(zookeeperClient.getRebalanceExecution()).isEmpty();
+ assertThat(manager.listRebalanceProgress(null).progressForBucketMap()).hasSize(3);
+
+ manager.close();
+ }
+
+ @Test
+ void testRegisterGeneratedRebalanceTaskKeepsLegacyTaskWhenPlanFitsRoundLimit()
+ throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_BUCKETS_PER_ROUND, 3);
+ RebalanceManager manager = createTestingRebalanceManager(conf);
+
+ Map plan = createRebalancePlan(3);
+ RebalanceTask rebalanceTask =
+ manager.registerGeneratedRebalanceTask(
+ new RebalanceTask("small-plan-test", NOT_STARTED, plan));
+
+ assertThat(rebalanceTask.getExecutePlan()).hasSize(3);
+ assertThat(zookeeperClient.getRebalanceTask()).isPresent();
+ RebalanceTask persistedTask = zookeeperClient.getRebalanceTask().get();
+ assertThat(persistedTask.getRebalanceId()).isEqualTo("small-plan-test");
+ assertThat(persistedTask.getRebalanceStatus()).isEqualTo(NOT_STARTED);
+ assertThat(persistedTask.getExecutePlan()).isEqualTo(plan);
+ assertThat(zookeeperClient.getRebalanceExecution()).isEmpty();
+ assertThat(manager.listRebalanceProgress(null).progressForBucketMap()).hasSize(3);
+
+ manager.close();
+ }
+
+ @Test
+ void testRoundBasedRebalanceHandlesPartitionedBuckets() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_BUCKETS_PER_ROUND, 2);
+ RebalanceManager manager = createTestingRebalanceManager(conf);
+
+ Map plan = createPartitionedRebalancePlan(3);
+ manager.registerGeneratedRebalanceTask(
+ new RebalanceTask("partitioned-round-test", NOT_STARTED, plan));
+
+ assertThat(zookeeperClient.getRebalanceRounds()).hasSize(2);
+ assertThat(zookeeperClient.getRebalanceExecution())
+ .hasValue(new RebalanceExecution("partitioned-round-test", REBALANCING, 2, 0, 2));
+ assertThat(manager.listRebalanceProgress(null).progressForBucketMap())
+ .containsOnlyKeys(plan.keySet());
+
+ finishCurrentRound(manager, COMPLETED);
+ assertThat(zookeeperClient.getRebalanceTask().get().getExecutePlan())
+ .containsOnlyKeys(new TableBucket(1L, 10L, 2));
+
+ finishCurrentRound(manager, COMPLETED);
+ assertThat(zookeeperClient.getRebalanceExecution())
+ .hasValue(new RebalanceExecution("partitioned-round-test", COMPLETED, 2, 1, 2));
+ assertThat(manager.listRebalanceProgress(null).progressForBucketMap().values())
+ .allSatisfy(result -> assertThat(result.status()).isEqualTo(COMPLETED));
+
+ manager.close();
+ }
+
+ @Test
+ void testRoundBasedRebalanceRestoresFromZooKeeper() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_BUCKETS_PER_ROUND, 2);
+ RebalanceManager manager = createTestingRebalanceManager(conf);
+
+ Map plan = createRebalancePlan(3);
+ manager.registerGeneratedRebalanceTask(
+ new RebalanceTask("restore-test", NOT_STARTED, plan));
+ finishCurrentRound(manager, COMPLETED);
+ manager.close();
+
+ RebalanceManager restoredManager = createTestingRebalanceManager(conf);
+ assertThat(restoredManager.hasInProgressRebalance()).isTrue();
+ assertThat(zookeeperClient.getRebalanceExecution())
+ .hasValue(new RebalanceExecution("restore-test", REBALANCING, 2, 1, 2));
+ assertThat(restoredManager.listRebalanceProgress(null).progressForBucketMap()).hasSize(3);
+
+ finishCurrentRound(restoredManager, COMPLETED);
+ assertThat(zookeeperClient.getRebalanceExecution())
+ .hasValue(new RebalanceExecution("restore-test", COMPLETED, 2, 1, 2));
+ assertThat(restoredManager.listRebalanceProgress(null).progressForBucketMap()).hasSize(3);
+
+ restoredManager.close();
+ }
+
+ @Test
+ void testCancelRoundBasedRebalanceMarksAllUnfinishedBuckets() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.COORDINATOR_REBALANCE_MAX_BUCKETS_PER_ROUND, 2);
+ RebalanceManager manager = createTestingRebalanceManager(conf);
+
+ Map plan = createRebalancePlan(3);
+ manager.registerGeneratedRebalanceTask(new RebalanceTask("cancel-test", NOT_STARTED, plan));
+ TableBucket firstBucket =
+ zookeeperClient
+ .getRebalanceTask()
+ .get()
+ .getExecutePlan()
+ .keySet()
+ .iterator()
+ .next();
+ manager.finishRebalanceTask(firstBucket, COMPLETED);
+ manager.cancelRebalance("cancel-test");
+
+ Optional rebalanceExecution = zookeeperClient.getRebalanceExecution();
+ assertThat(rebalanceExecution)
+ .hasValue(new RebalanceExecution("cancel-test", CANCELED, 2, 0, 2));
+ RebalanceProgress progress = manager.listRebalanceProgress(null);
+ assertThat(progress.status()).isEqualTo(CANCELED);
+ assertThat(progress.progressForBucketMap()).hasSize(3);
+ assertThat(progress.progressForBucketMap().get(firstBucket).status()).isEqualTo(COMPLETED);
+ assertThat(progress.progressForBucketMap().values())
+ .allSatisfy(result -> assertThat(result.status()).isIn(COMPLETED, CANCELED));
+
+ manager.close();
+ }
+
@Test
void testTimeoutEnqueuesEvent() throws Exception {
ManualClock clock = new ManualClock(0L);
@@ -315,6 +551,58 @@ private CoordinatorEventProcessor buildCoordinatorEventProcessor(Configuration c
SystemClock.getInstance());
}
+ private RebalanceManager createTestingRebalanceManager(Configuration conf) {
+ return createTestingRebalanceManager(conf, SystemClock.getInstance());
+ }
+
+ private RebalanceManager createTestingRebalanceManager(
+ Configuration conf, org.apache.fluss.utils.clock.Clock clock) {
+ RebalanceManager manager =
+ new RebalanceManager(
+ buildCoordinatorEventProcessor(conf),
+ zookeeperClient,
+ new RecordingEventManager(),
+ clock,
+ new NoOpScheduledExecutor(),
+ conf);
+ manager.startup();
+ return manager;
+ }
+
+ private Map createRebalancePlan(int bucketCount) {
+ Map plan = new LinkedHashMap<>();
+ for (int bucketId = 0; bucketId < bucketCount; bucketId++) {
+ TableBucket tableBucket = new TableBucket(1L, bucketId);
+ plan.put(
+ tableBucket,
+ new RebalancePlanForBucket(
+ tableBucket, 0, 0, Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2)));
+ }
+ return plan;
+ }
+
+ private Map createPartitionedRebalancePlan(
+ int bucketCount) {
+ Map plan = new LinkedHashMap<>();
+ for (int bucketId = 0; bucketId < bucketCount; bucketId++) {
+ TableBucket tableBucket = new TableBucket(1L, 10L, bucketId);
+ plan.put(
+ tableBucket,
+ new RebalancePlanForBucket(
+ tableBucket, 0, 0, Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2)));
+ }
+ return plan;
+ }
+
+ private void finishCurrentRound(RebalanceManager manager, RebalanceStatus status)
+ throws Exception {
+ List currentRoundBuckets =
+ new ArrayList<>(zookeeperClient.getRebalanceTask().get().getExecutePlan().keySet());
+ for (TableBucket tableBucket : currentRoundBuckets) {
+ manager.finishRebalanceTask(tableBucket, status);
+ }
+ }
+
/** Records events put into the coordinator event queue. */
private static final class RecordingEventManager implements EventManager {
final List events = new ArrayList<>();
diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalanceExecutionJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalanceExecutionJsonSerdeTest.java
new file mode 100644
index 0000000000..8aac5010b8
--- /dev/null
+++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalanceExecutionJsonSerdeTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import org.apache.fluss.utils.json.JsonSerdeTestBase;
+
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING;
+
+/** Test for {@link RebalanceExecutionJsonSerde}. */
+class RebalanceExecutionJsonSerdeTest extends JsonSerdeTestBase {
+
+ RebalanceExecutionJsonSerdeTest() {
+ super(RebalanceExecutionJsonSerde.INSTANCE);
+ }
+
+ @Override
+ protected RebalanceExecution[] createObjects() {
+ return new RebalanceExecution[] {
+ new RebalanceExecution("rebalance-execution-1", REBALANCING, 100, 1, 3),
+ new RebalanceExecution("rebalance-execution-2", CANCELED, 50, 0, 2)
+ };
+ }
+
+ @Override
+ protected String[] expectedJsons() {
+ return new String[] {
+ "{\"version\":1,\"rebalance_id\":\"rebalance-execution-1\",\"rebalance_status\":1,\"max_buckets_per_round\":100,\"current_round\":1,\"total_rounds\":3}",
+ "{\"version\":1,\"rebalance_id\":\"rebalance-execution-2\",\"rebalance_status\":4,\"max_buckets_per_round\":50,\"current_round\":0,\"total_rounds\":2}"
+ };
+ }
+}
diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalanceRoundJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalanceRoundJsonSerdeTest.java
new file mode 100644
index 0000000000..896bcaaf6a
--- /dev/null
+++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalanceRoundJsonSerdeTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.json.JsonSerdeTestBase;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RebalanceRoundJsonSerde}. */
+class RebalanceRoundJsonSerdeTest extends JsonSerdeTestBase {
+
+ RebalanceRoundJsonSerdeTest() {
+ super(RebalanceRoundJsonSerde.INSTANCE);
+ }
+
+ @Override
+ protected RebalanceRound[] createObjects() {
+ Map progressForBucketMap = new LinkedHashMap<>();
+ putProgress(progressForBucketMap, new TableBucket(0L, 0), 0, 3, COMPLETED);
+ putProgress(progressForBucketMap, new TableBucket(0L, 1), 1, 1, REBALANCING);
+ putProgress(progressForBucketMap, new TableBucket(1L, 10L, 0), 0, 3, NOT_STARTED);
+ putProgress(progressForBucketMap, new TableBucket(1L, 10L, 1), 1, 1, COMPLETED);
+ return new RebalanceRound[] {new RebalanceRound(2, progressForBucketMap)};
+ }
+
+ @Override
+ protected String[] expectedJsons() {
+ return new String[] {
+ "{\"version\":1,\"round_index\":2,\"rebalance_plan\":"
+ + "[{\"table_id\":0,\"buckets\":["
+ + "{\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5],\"rebalance_status\":3},"
+ + "{\"bucket_id\":1,\"original_leader\":1,\"new_leader\":1,\"origin_replicas\":[0,1,2],\"new_replicas\":[1,2,3],\"rebalance_status\":1}]},"
+ + "{\"table_id\":1,\"partition_id\":10,\"buckets\":["
+ + "{\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5],\"rebalance_status\":0},"
+ + "{\"bucket_id\":1,\"original_leader\":1,\"new_leader\":1,\"origin_replicas\":[0,1,2],\"new_replicas\":[1,2,3],\"rebalance_status\":3}]}]}"
+ };
+ }
+
+ @Test
+ void testHashCodeIgnoresProgressMapIterationOrder() {
+ Map firstProgressForBucketMap =
+ new LinkedHashMap<>();
+ Map secondProgressForBucketMap =
+ new LinkedHashMap<>();
+ TableBucket firstBucket = new TableBucket(0L, 0);
+ TableBucket secondBucket = new TableBucket(1L, 10L, 1);
+
+ putProgress(firstProgressForBucketMap, firstBucket, 0, 3, COMPLETED);
+ putProgress(firstProgressForBucketMap, secondBucket, 1, 1, REBALANCING);
+ putProgress(secondProgressForBucketMap, secondBucket, 1, 1, REBALANCING);
+ putProgress(secondProgressForBucketMap, firstBucket, 0, 3, COMPLETED);
+
+ RebalanceRound firstRound = new RebalanceRound(1, firstProgressForBucketMap);
+ RebalanceRound secondRound = new RebalanceRound(1, secondProgressForBucketMap);
+
+ assertThat(firstRound).isEqualTo(secondRound);
+ assertThat(firstRound).hasSameHashCodeAs(secondRound);
+ }
+
+ private void putProgress(
+ Map progressForBucketMap,
+ TableBucket tableBucket,
+ int originalLeader,
+ int newLeader,
+ RebalanceStatus rebalanceStatus) {
+ RebalancePlanForBucket planForBucket =
+ new RebalancePlanForBucket(
+ tableBucket,
+ originalLeader,
+ newLeader,
+ Arrays.asList(0, 1, 2),
+ Arrays.asList(newLeader, newLeader + 1, newLeader + 2));
+ progressForBucketMap.put(
+ tableBucket, RebalanceResultForBucket.of(planForBucket, rebalanceStatus));
+ }
+}
diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md
index 0a4223074d..4a48be3c16 100644
--- a/website/docs/maintenance/configuration.md
+++ b/website/docs/maintenance/configuration.md
@@ -61,6 +61,7 @@ during the Fluss cluster working.
| coordinator.io-pool.size | Integer | 10 | **Deprecated**: This option is deprecated. Please use `server.io-pool.size` instead. The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 10. |
| coordinator.producer-offsets.ttl | Duration | 24h | The TTL (time-to-live) for producer offsets. Producer offsets older than this TTL will be automatically cleaned up by the coordinator server. Producer offsets are used for undo recovery when a Flink job fails over before completing its first checkpoint. The default value is 24 hours. |
| coordinator.producer-offsets.cleanup-interval | Duration | 1h | The interval for cleaning up expired producer offsets and orphan files in remote storage. The cleanup task runs periodically to remove expired offsets and any orphan files that may have been left behind due to incomplete operations. The default value is 1 hour. |
+| coordinator.rebalance.max-buckets-per-round | Integer | 0 | The maximum number of bucket rebalance tasks activated in one rebalance round. A positive value splits large rebalance plans into multiple rounds to reduce coordinator and cluster pressure. The default value 0 disables round limiting. |
| coordinator.lifecycle-throttler.inflight-timeout | Duration | 3min | The timeout for an in-flight drop event in the coordinator's TableLifecycleThrottler. If a drop event has been admitted but the corresponding completion callback has not arrived within this timeout, the throttler abandons tracking of that drop and continues admitting the next pending drop. |
| coordinator.lifecycle-throttler.timeout-check-interval | Duration | 1min | The periodic interval at which the coordinator's TableLifecycleThrottler scans in-flight drops for timeouts. |
diff --git a/website/docs/maintenance/operations/rebalance.md b/website/docs/maintenance/operations/rebalance.md
index 418b7c585f..c5b383cac7 100644
--- a/website/docs/maintenance/operations/rebalance.md
+++ b/website/docs/maintenance/operations/rebalance.md
@@ -77,6 +77,16 @@ Available rebalance goals:
Goals are processed in the order specified. When using `RACK_AWARE`, always place it first to ensure subsequent goals (like `REPLICA_DISTRIBUTION`) respect rack constraints. If `RACK_AWARE` is not the first goal, replica movements may violate rack awareness requirements.
:::
+### Limit Active Bucket Tasks
+
+Large clusters can generate rebalance plans with many bucket movements. To reduce coordinator and TabletServer pressure, configure the maximum number of bucket rebalance tasks that can be activated in one round:
+
+```yaml title="conf/server.yaml"
+coordinator.rebalance.max-buckets-per-round: 100
+```
+
+The default value is `0`, which disables round limiting and preserves the existing behavior of registering the whole rebalance plan at once. When this option is set to a positive value, Fluss persists the full rebalance progress and activates the next round only after all bucket tasks in the current round reach a final state.
+
### 3. Monitor Progress
Track the rebalance operation using the returned rebalance ID: