Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,21 @@ public class ConfigOptions {
// ------------------------------------------------------------------------
// ConfigOptions for Coordinator Server
// ------------------------------------------------------------------------
/**
* The maximum number of bucket rebalance tasks to activate in one rebalance round.
*
* <p>The default value {@code 0} disables round limiting and preserves the existing behavior of
* registering the whole rebalance plan at once.
*/
public static final ConfigOption<Integer> COORDINATOR_REBALANCE_MAX_BUCKETS_PER_ROUND =
key("coordinator.rebalance.max-buckets-per-round")
.intType()
.defaultValue(0)
.withDescription(
"The maximum number of bucket rebalance tasks activated in one rebalance round. "
+ "A positive value splits large rebalance plans into multiple rounds to reduce coordinator and cluster pressure. "
+ "The default value 0 disables round limiting.");

/**
* The config parameter defining the network address to connect to for communication with the
* coordinator server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ public CoordinatorEventProcessor(
this.internalListenerName = conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
this.rebalanceManager =
new RebalanceManager(
this, zooKeeperClient, coordinatorEventManager, SystemClock.getInstance());
this,
zooKeeperClient,
coordinatorEventManager,
SystemClock.getInstance(),
conf);
this.ioExecutor = ioExecutor;
this.lakeTableHelper =
new LakeTableHelper(zooKeeperClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR));
Expand Down Expand Up @@ -1390,25 +1394,9 @@ private RebalanceResponse processRebalance(RebalanceEvent rebalanceEvent) {
rebalanceManager.getRebalanceId()));
}

RebalanceTask rebalanceTask;
long startTime = System.currentTimeMillis();
try {
// 1. generate rebalance plan.
rebalanceTask =
rebalanceManager.generateRebalanceTask(rebalanceEvent.getGoalsByPriority());

// 2. execute rebalance plan.
Map<TableBucket, RebalancePlanForBucket> executePlan = rebalanceTask.getExecutePlan();
zooKeeperClient.registerRebalanceTask(rebalanceTask);
rebalanceManager.registerRebalance(
rebalanceTask.getRebalanceId(), executePlan, RebalanceStatus.NOT_STARTED);
} catch (Exception e) {
throw new RebalanceFailureException(
String.format(
"Failed to generate plan and execute rebalance. The root cause: %s",
e.getMessage()),
e);
}
RebalanceTask rebalanceTask =
rebalanceManager.generateAndRegisterRebalance(rebalanceEvent.getGoalsByPriority());

LOG.info(
"Generate Rebalance plan rebalance id {} with {} ms.",
Expand Down
Loading