Skip to content

[Admin] Ensure balanced replica-set selection for partitions #3330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 84 additions & 135 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use restate_types::metadata_store::keys::partition_processor_epoch_key;
use restate_types::net::partition_processor_manager::{
ControlProcessor, ControlProcessors, ProcessorCommand,
};
use restate_types::nodes_config::NodesConfiguration;
use restate_types::nodes_config::{NodeConfig, NodesConfiguration};
use restate_types::partition_table::{PartitionPlacement, PartitionReplication, PartitionTable};
use restate_types::partitions::state::{PartitionReplicaSetStates, ReplicaSetState};
use restate_types::partitions::{PartitionConfiguration, worker_candidate_filter};
Expand Down Expand Up @@ -60,24 +60,6 @@ pub enum Error {
Shutdown(#[from] ShutdownError),
}

/// Placement hints for the [`Scheduler`]. The hints can specify which nodes should be chosen for
/// the partition processor placement and on which node the leader should run.
pub trait PartitionProcessorPlacementHints {
fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator<Item = &PlainNodeId>;

fn preferred_leader(&self, partition_id: &PartitionId) -> Option<PlainNodeId>;
}

impl<T: PartitionProcessorPlacementHints> PartitionProcessorPlacementHints for &T {
fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator<Item = &PlainNodeId> {
(*self).preferred_nodes(partition_id)
}

fn preferred_leader(&self, partition_id: &PartitionId) -> Option<PlainNodeId> {
(*self).preferred_leader(partition_id)
}
}

#[derive(Debug, Clone)]
struct PartitionState {
target_leader: Option<PlainNodeId>,
Expand Down Expand Up @@ -146,10 +128,6 @@ impl PartitionState {
)
}

fn contains_replica_current(&self, node_id: PlainNodeId) -> bool {
self.current.replica_set().contains(node_id)
}

fn generate_instructions(
&self,
partition_id: &PartitionId,
Expand Down Expand Up @@ -275,44 +253,33 @@ impl<T: TransportConnect> Scheduler<T> {

// todo a bulk get of all EpochMetadata if self.partitions.is_empty()

for partition_id in partition_table.iter_ids() {
let entry = self.partitions.entry(*partition_id);
for partition_id in partition_table.iter_ids().copied() {
let entry = self.partitions.entry(partition_id);

// make sure that we have a valid partition processor configuration
let mut occupied_entry = match entry {
Entry::Occupied(mut entry) if entry.get().current.is_valid() => {
if Self::requires_reconfiguration(entry.get(), nodes_config) {
let partition_replication = Self::partition_replication_to_replication_property(
nodes_config,
&partition_table,
);
if Self::requires_reconfiguration(
partition_id,
entry.get(),
&partition_replication,
nodes_config,
) {
trace!("Partition {} requires reconfiguration", partition_id);

let partition_replication =
Self::partition_replication_to_replication_property(
nodes_config,
&partition_table,
);

// select all valid worker candidates as the preferred nodes for the next
// configuration
let preferred_nodes = entry
.get()
.replicas()
.filter(|replica| {
// only keep alive nodes in the preferred nodes set to allow moving
// slowly to a more evenly spread replica set if nodes are currently
// dead.
observed_cluster_state.alive_generation(**replica).is_some()
})
.copied()
.collect();

if let Some(next) = Self::choose_partition_configuration(
*partition_id,
partition_id,
nodes_config,
partition_replication,
preferred_nodes,
NodeSet::new(),
) {
*entry.get_mut() = Self::reconfigure_partition_configuration(
self.metadata_writer.raw_metadata_store_client(),
*partition_id,
partition_id,
entry
.get()
.next
Expand All @@ -323,7 +290,7 @@ impl<T: TransportConnect> Scheduler<T> {
)
.await?;
Self::note_observed_membership_update(
*partition_id,
partition_id,
entry.get(),
&self.replica_set_states,
);
Expand All @@ -340,21 +307,21 @@ impl<T: TransportConnect> Scheduler<T> {

// no or no valid current configuration, pick a valid configuration
if let Some(current) = Self::choose_partition_configuration(
*partition_id,
partition_id,
nodes_config,
partition_replication.clone(),
NodeSet::default(),
) {
let occupied_entry = entry.insert_entry(
Self::store_initial_partition_configuration(
self.metadata_writer.raw_metadata_store_client(),
*partition_id,
partition_id,
current,
)
.await?,
);
Self::note_observed_membership_update(
*partition_id,
partition_id,
occupied_entry.get(),
&self.replica_set_states,
);
Expand All @@ -371,11 +338,11 @@ impl<T: TransportConnect> Scheduler<T> {
// next configuration has become active
if let Some(next) = &occupied_entry.get().next {
if next.replica_set().iter().any(|node_id| {
observed_cluster_state.is_partition_processor_active(partition_id, node_id)
observed_cluster_state.is_partition_processor_active(&partition_id, node_id)
}) {
let partition_configuration_update = Self::complete_reconfiguration(
self.metadata_writer.raw_metadata_store_client(),
*partition_id,
partition_id,
occupied_entry.get().current.version(),
next.version(),
)
Expand All @@ -385,7 +352,7 @@ impl<T: TransportConnect> Scheduler<T> {
partition_configuration_update.next,
) {
Self::note_observed_membership_update(
*partition_id,
partition_id,
occupied_entry.get(),
&self.replica_set_states,
);
Expand All @@ -394,7 +361,7 @@ impl<T: TransportConnect> Scheduler<T> {
}

// select the leader based on the observed cluster state
self.select_leader(partition_id, observed_cluster_state);
self.select_leader(&partition_id, observed_cluster_state);
}

// update the PartitionTable placement which is still needed for routing messages from the
Expand Down Expand Up @@ -517,7 +484,7 @@ impl<T: TransportConnect> Scheduler<T> {
.await
{
Ok(epoch_metadata) => {
debug!("Reconfigured partition {} to {:?}", partition_id, next);
debug!(%partition_id, "Reconfigured partition to {next:?}");
let (_, _, current, next) = epoch_metadata.into_inner();
Ok(PartitionState::new(current, next))
}
Expand Down Expand Up @@ -562,7 +529,11 @@ impl<T: TransportConnect> Scheduler<T> {
}
}).await {
Ok(epoch_metadata) => {
info!("Successfully transitioned from partition processor configuration {} to {}", current_version, next_version);
info!(
%partition_id,
replica_set = %epoch_metadata.current().replica_set(),
"Transitioned from partition configuration {current_version} to {next_version}"
);
let (_, _, current, next) = epoch_metadata.into_inner();
Ok(PartitionConfigurationUpdate {
current,
Expand All @@ -581,37 +552,52 @@ impl<T: TransportConnect> Scheduler<T> {
/// Checks whether the given partition requires reconfiguration. A partition requires
/// reconfiguration in the following cases:
///
/// * next contains a replica that is no longer a valid worker candidate.
/// * current contains a replica that is no longer a valid worker candidate, and there is no
/// ongoing reconfiguration happening already
/// * Partition replication has changed.
/// * Possible improvement/re-balance in replica-set, this includes if a node has been dead for
/// some time.
///
/// Note: if we take whether a node is dead or not into account, we can do great job but we
/// need to rest our dead timers/instants when we switch from follower to leader. This is to
/// avoid knee-jerk reaction if we are new leaders with outdated view of the world.
///
/// In this case, the method returns true, otherwise false.
fn requires_reconfiguration(
partition_id: PartitionId,
partition_state: &PartitionState,
default_replication: &ReplicationProperty,
nodes_config: &NodesConfiguration,
) -> bool {
let current_requires_reconfiguration =
partition_state.current.replica_set().iter().any(|replica| {
!nodes_config
.find_node_by_id(*replica)
.map(|node_config| worker_candidate_filter(*replica, node_config))
.unwrap_or_default()
});

let next_requires_reconfiguration = partition_state.next.as_ref().map(|next| {
next.replica_set().iter().any(|replica| {
!nodes_config
.find_node_by_id(*replica)
.map(|node_config| worker_candidate_filter(*replica, node_config))
.unwrap_or_default()
})
next.replication() != default_replication ||
// check if a different replica-set is eminent
Self::choose_partition_configuration(
partition_id,
nodes_config,
default_replication.clone(),
NodeSet::default(),
)
.map(|new_config|
!new_config.replica_set().is_equivalent(next.replica_set()))
.unwrap_or(false)
});

let ongoing_reconfiguration = next_requires_reconfiguration.is_some();
let next_requires_reconfiguration = next_requires_reconfiguration.unwrap_or(false);
if next_requires_reconfiguration.is_some_and(|c| c) {
return true;
}

(current_requires_reconfiguration && !ongoing_reconfiguration)
|| next_requires_reconfiguration
partition_state.current.replication() != default_replication
|| Self::choose_partition_configuration(
partition_id,
nodes_config,
default_replication.clone(),
NodeSet::default(),
)
.map(|new_config| {
!new_config
.replica_set()
.is_equivalent(partition_state.current.replica_set())
})
.unwrap_or(false)
}

fn choose_partition_configuration(
Expand All @@ -622,36 +608,31 @@ impl<T: TransportConnect> Scheduler<T> {
) -> Option<PartitionConfiguration> {
let options =
SelectorOptions::new(u64::from(partition_id)).with_preferred_nodes(preferred_nodes);
let cluster_state = TaskCenter::with_current(|tc| tc.cluster_state().clone());
let filter = |node_id: PlainNodeId, node_config: &NodeConfig| {
cluster_state.is_alive(node_id.into()) && worker_candidate_filter(node_id, node_config)
};

BalancedSpreadSelector::select(
nodes_config,
&partition_replication,
worker_candidate_filter,
&options,
)
.map(|replica_set| {
PartitionConfiguration::new(partition_replication, replica_set, HashMap::default())
})
.inspect_err(|err| {
debug!(
"Failed to select replica set for partition {partition_id}: {}",
err
)
})
.ok()
BalancedSpreadSelector::select(nodes_config, &partition_replication, filter, &options)
.map(|replica_set| {
PartitionConfiguration::new(partition_replication, replica_set, HashMap::default())
})
.inspect_err(|err| {
debug!(
"Failed to select replica set for partition {partition_id}: {}",
err
)
})
.ok()
}

/// Selects a leader based on the current target leader, observed cluster state and preferred leader.
///
/// 1. Try to keep the observed leader
/// 2. Prefer worker nodes that are caught up
/// 1. Prefer worker nodes that are caught up
/// 2.1 Choose the current target leader
/// 2.2 Choose the preferred leader
/// 2.3 Pick any of the nodes in the current partition configuration
/// 3. Pick worker nodes that are alive
/// 3.1 Choose the current target leader
/// 3.2 Choose the preferred leader
/// 3.3 Pick any of the nodes in the current partition configuration
/// 2. Pick worker nodes that are alive
fn select_leader(
&mut self,
partition_id: &PartitionId,
Expand All @@ -661,28 +642,6 @@ impl<T: TransportConnect> Scheduler<T> {
return;
};

// 1. keep current observed leader
if let Some(leader) = observed_cluster_state
.partition_state(partition_id)
.and_then(|partition_state| {
partition_state
.partition_processors
.iter()
.find(|(node_id, state)| {
state.run_mode == RunMode::Leader
&& partition.contains_replica_current(**node_id)
})
.map(|(node_id, _)| *node_id)
})
{
assert!(
observed_cluster_state.alive_generation(leader).is_some(),
"only alive nodes should run the leader"
);
partition.target_leader = Some(leader);
return;
}

if let Some(observed_partition_state) = observed_cluster_state.partition_state(partition_id)
{
if let Some(leader) =
Expand Down Expand Up @@ -711,17 +670,7 @@ impl<T: TransportConnect> Scheduler<T> {
observed_cluster_state: &ObservedClusterState,
additional_criterion: impl Fn(PlainNodeId) -> bool,
) -> Option<PlainNodeId> {
// 1. keep the current target leader if it is still alive because we might have instructed
// it already
if partition.target_leader.is_some_and(|leader| {
observed_cluster_state.alive_generation(leader).is_some()
&& partition.contains_replica_current(leader)
&& additional_criterion(leader)
}) {
return partition.target_leader;
}

// 2. select any of the alive nodes in current
// select any of the alive nodes in current
if let Some(alive_replica) =
partition
.current
Expand Down
Loading
Loading