Skip to content

[sled-agent-config-reconciler] Flesh out main reconicilation task #8162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sled-agent/config-reconciler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ chrono.workspace = true
debug-ignore.workspace = true
derive_more.workspace = true
dropshot.workspace = true
either.workspace = true
futures.workspace = true
glob.workspace = true
id-map.workspace = true
Expand Down
88 changes: 87 additions & 1 deletion sled-agent/config-reconciler/src/dataset_serialization_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use illumos_utils::zfs::DestroyDatasetError;
use illumos_utils::zfs::Mountpoint;
use illumos_utils::zfs::WhichDatasets;
use illumos_utils::zfs::Zfs;
use illumos_utils::zpool::PathInPool;
use illumos_utils::zpool::ZpoolOrRamdisk;
use nexus_sled_agent_shared::inventory::InventoryDataset;
use omicron_common::disk::DatasetConfig;
use omicron_common::disk::DatasetKind;
Expand All @@ -32,6 +34,8 @@ use omicron_common::disk::SharedDatasetConfig;
use omicron_common::zpool_name::ZpoolName;
use omicron_uuid_kinds::DatasetUuid;
use sled_storage::config::MountConfig;
use sled_storage::dataset::U2_DEBUG_DATASET;
use sled_storage::dataset::ZONE_DATASET;
use sled_storage::manager::NestedDatasetConfig;
use sled_storage::manager::NestedDatasetListOptions;
use sled_storage::manager::NestedDatasetLocation;
Expand Down Expand Up @@ -83,6 +87,34 @@ pub enum DatasetEnsureError {
TestError(&'static str),
}

impl DatasetEnsureError {
fn is_retryable(&self) -> bool {
match self {
// These errors might be retryable; there are probably cases where
// they won't be, but we need more context than we have available
// from just the error to know for sure. For now, assume they are
// retryable - that may mean we churn on something doomed, but
// that's better than failing to retry something we should have
// retried.
DatasetEnsureError::ZpoolNotFound(_)
| DatasetEnsureError::EnsureFailed { .. } => true,

// Errors that we know aren't retryable: recovering from these
// require config changes, so there's no need to retry until that
// happens.
DatasetEnsureError::TransientZoneRootNoConfig(_)
| DatasetEnsureError::UuidMismatch { .. } => false,

DatasetEnsureError::TransientZoneRootFailure { err, .. } => {
err.is_retryable()
}

#[cfg(test)]
DatasetEnsureError::TestError(_) => false,
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum NestedDatasetMountError {
#[error("could not mount dataset {}", .name)]
Expand Down Expand Up @@ -129,6 +161,60 @@ pub enum NestedDatasetListError {
#[derive(Debug, Clone, Default)]
pub(crate) struct DatasetEnsureResult(IdMap<SingleDatasetEnsureResult>);

impl DatasetEnsureResult {
pub(crate) fn has_retryable_error(&self) -> bool {
self.0.iter().any(|result| match &result.state {
DatasetState::Ensured => false,
DatasetState::FailedToEnsure(err) => err.is_retryable(),
})
}

pub(crate) fn all_mounted_debug_datasets<'a>(
&'a self,
mount_config: &'a MountConfig,
) -> impl Iterator<Item = PathInPool> + 'a {
self.all_mounted_datasets(mount_config, DatasetKind::Debug)
}

pub(crate) fn all_mounted_zone_root_datasets<'a>(
&'a self,
mount_config: &'a MountConfig,
) -> impl Iterator<Item = PathInPool> + 'a {
self.all_mounted_datasets(mount_config, DatasetKind::TransientZoneRoot)
}

fn all_mounted_datasets<'a>(
&'a self,
mount_config: &'a MountConfig,
kind: DatasetKind,
) -> impl Iterator<Item = PathInPool> + 'a {
// We're a helper called by the pub methods on this type, so we only
// have to handle the `kind`s they call us with.
let mountpoint = match &kind {
DatasetKind::Debug => U2_DEBUG_DATASET,
DatasetKind::TransientZoneRoot => ZONE_DATASET,
_ => unreachable!(
"private function called with unexpected kind {kind:?}"
),
};
self.0
.iter()
.filter(|result| match &result.state {
DatasetState::Ensured => true,
DatasetState::FailedToEnsure(_) => false,
})
.filter(move |result| *result.config.name.kind() == kind)
.map(|result| {
let pool = *result.config.name.pool();
PathInPool {
pool: ZpoolOrRamdisk::Zpool(pool),
path: pool
.dataset_mountpoint(&mount_config.root, mountpoint),
}
})
}
}

#[derive(Debug, Clone)]
struct SingleDatasetEnsureResult {
config: DatasetConfig,
Expand All @@ -149,7 +235,7 @@ enum DatasetState {
FailedToEnsure(Arc<DatasetEnsureError>),
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct DatasetTaskHandle(mpsc::Sender<DatasetTaskRequest>);

impl DatasetTaskHandle {
Expand Down
10 changes: 8 additions & 2 deletions sled-agent/config-reconciler/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::dump_setup_task;
use crate::internal_disks::InternalDisksReceiver;
use crate::ledger::LedgerTaskHandle;
use crate::raw_disks;
use crate::raw_disks::RawDisksReceiver;
use crate::raw_disks::RawDisksSender;
use crate::reconciler_task;
use crate::reconciler_task::CurrentlyManagedZpools;
Expand All @@ -71,6 +72,7 @@ pub struct ConfigReconcilerSpawnToken {
reconciler_result_tx: watch::Sender<ReconcilerResult>,
currently_managed_zpools_tx: watch::Sender<Arc<CurrentlyManagedZpools>>,
external_disks_tx: watch::Sender<HashSet<Disk>>,
raw_disks_rx: RawDisksReceiver,
ledger_task_log: Logger,
reconciler_task_log: Logger,
}
Expand Down Expand Up @@ -110,7 +112,7 @@ impl ConfigReconcilerHandle {
let internal_disks_rx =
InternalDisksReceiver::spawn_internal_disks_task(
Arc::clone(&mount_config),
raw_disks_rx,
raw_disks_rx.clone(),
base_log,
);

Expand All @@ -125,7 +127,7 @@ impl ConfigReconcilerHandle {
);

let (reconciler_result_tx, reconciler_result_rx) =
watch::channel(ReconcilerResult::default());
watch::channel(ReconcilerResult::new(Arc::clone(&mount_config)));
let (currently_managed_zpools_tx, currently_managed_zpools_rx) =
watch::channel(Arc::default());
let currently_managed_zpools_rx =
Expand Down Expand Up @@ -156,6 +158,7 @@ impl ConfigReconcilerHandle {
reconciler_result_tx,
currently_managed_zpools_tx,
external_disks_tx,
raw_disks_rx,
ledger_task_log: base_log
.new(slog::o!("component" => "SledConfigLedgerTask")),
reconciler_task_log: base_log
Expand Down Expand Up @@ -188,6 +191,7 @@ impl ConfigReconcilerHandle {
reconciler_result_tx,
currently_managed_zpools_tx,
external_disks_tx,
raw_disks_rx,
ledger_task_log,
reconciler_task_log,
} = token;
Expand All @@ -213,12 +217,14 @@ impl ConfigReconcilerHandle {

reconciler_task::spawn(
Arc::clone(self.internal_disks_rx.mount_config()),
self.dataset_task.clone(),
key_requester,
time_sync_config,
current_config_rx,
reconciler_result_tx,
currently_managed_zpools_tx,
external_disks_tx,
raw_disks_rx,
sled_agent_facilities,
reconciler_task_log,
);
Expand Down
35 changes: 19 additions & 16 deletions sled-agent/config-reconciler/src/internal_disks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use tokio::sync::watch::error::RecvError;
use crate::disks_common::MaybeUpdatedDisk;
use crate::disks_common::update_properties_from_raw_disk;
use crate::raw_disks::RawDiskWithId;
use crate::raw_disks::RawDisksReceiver;

/// A thin wrapper around a [`watch::Receiver`] that presents a similar API.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -143,7 +144,7 @@ impl InternalDisksReceiver {

pub(crate) fn spawn_internal_disks_task(
mount_config: Arc<MountConfig>,
raw_disks_rx: watch::Receiver<IdMap<RawDiskWithId>>,
raw_disks_rx: RawDisksReceiver,
base_log: &Logger,
) -> Self {
Self::spawn_with_disk_adopter(
Expand All @@ -160,7 +161,7 @@ impl InternalDisksReceiver {

fn spawn_with_disk_adopter<T: DiskAdopter>(
mount_config: Arc<MountConfig>,
raw_disks_rx: watch::Receiver<IdMap<RawDiskWithId>>,
raw_disks_rx: RawDisksReceiver,
base_log: &Logger,
disk_adopter: T,
) -> Self {
Expand Down Expand Up @@ -537,7 +538,7 @@ impl IdMappable for InternalDisk {

struct InternalDisksTask {
// Input channel for changes to the raw disks sled-agent sees.
raw_disks_rx: watch::Receiver<IdMap<RawDiskWithId>>,
raw_disks_rx: RawDisksReceiver,

// The set of disks we've successfully started managing.
disks_tx: watch::Sender<IdMap<InternalDisk>>,
Expand Down Expand Up @@ -934,11 +935,11 @@ mod tests {
async fn only_m2_disks_are_adopted() {
let logctx = dev::test_setup_log("only_m2_disks_are_adopted");

let (raw_disks_tx, raw_disks_rx) = watch::channel(IdMap::new());
let (raw_disks_tx, raw_disks_rx) = watch::channel(Arc::default());
let disk_adopter = Arc::new(TestDiskAdopter::default());
let mut disks_rx = InternalDisksReceiver::spawn_with_disk_adopter(
Arc::new(any_mount_config()),
raw_disks_rx,
RawDisksReceiver(raw_disks_rx),
&logctx.log,
Arc::clone(&disk_adopter),
);
Expand All @@ -948,6 +949,7 @@ mod tests {

// Add four disks: two M.2 and two U.2.
raw_disks_tx.send_modify(|disks| {
let disks = Arc::make_mut(disks);
for disk in [
new_raw_test_disk(DiskVariant::M2, "m2-0"),
new_raw_test_disk(DiskVariant::U2, "u2-0"),
Expand Down Expand Up @@ -994,12 +996,13 @@ mod tests {

// Setup: one disk.
let mut raw_disk = new_raw_test_disk(DiskVariant::M2, "test-m2");
let (raw_disks_tx, raw_disks_rx) =
watch::channel([raw_disk.clone().into()].into_iter().collect());
let (raw_disks_tx, raw_disks_rx) = watch::channel(Arc::new(
[raw_disk.clone().into()].into_iter().collect(),
));
let disk_adopter = Arc::new(TestDiskAdopter::default());
let mut disks_rx = InternalDisksReceiver::spawn_with_disk_adopter(
Arc::new(any_mount_config()),
raw_disks_rx,
RawDisksReceiver(raw_disks_rx),
&logctx.log,
Arc::clone(&disk_adopter),
);
Expand All @@ -1021,7 +1024,7 @@ mod tests {
);
*raw_disk.firmware_mut() = new_firmware;
raw_disks_tx.send_modify(|disks| {
disks.insert(raw_disk.clone().into());
Arc::make_mut(disks).insert(raw_disk.clone().into());
});

// Wait for the change to be noticed.
Expand Down Expand Up @@ -1051,17 +1054,17 @@ mod tests {
// Setup: two disks.
let raw_disk1 = new_raw_test_disk(DiskVariant::M2, "m2-1");
let raw_disk2 = new_raw_test_disk(DiskVariant::M2, "m2-2");
let (raw_disks_tx, raw_disks_rx) = watch::channel(
let (raw_disks_tx, raw_disks_rx) = watch::channel(Arc::new(
[&raw_disk1, &raw_disk2]
.into_iter()
.cloned()
.map(From::from)
.collect(),
);
));
let disk_adopter = Arc::new(TestDiskAdopter::default());
let mut disks_rx = InternalDisksReceiver::spawn_with_disk_adopter(
Arc::new(any_mount_config()),
raw_disks_rx,
RawDisksReceiver(raw_disks_rx),
&logctx.log,
Arc::clone(&disk_adopter),
);
Expand All @@ -1075,7 +1078,7 @@ mod tests {

// Remove test disk 1.
raw_disks_tx.send_modify(|raw_disks| {
raw_disks.remove(raw_disk1.identity());
Arc::make_mut(raw_disks).remove(raw_disk1.identity());
});

// Wait for the removal to be propagated.
Expand All @@ -1100,9 +1103,9 @@ mod tests {

// Setup: one disk, and configure the disk adopter to fail.
let raw_disk = new_raw_test_disk(DiskVariant::M2, "test-m2");
let (_raw_disks_tx, raw_disks_rx) = watch::channel(
let (_raw_disks_tx, raw_disks_rx) = watch::channel(Arc::new(
[&raw_disk].into_iter().cloned().map(From::from).collect(),
);
));
let disk_adopter = Arc::new(TestDiskAdopter::default());
disk_adopter.inner.lock().unwrap().should_fail_requests.insert(
raw_disk.identity().clone(),
Expand All @@ -1113,7 +1116,7 @@ mod tests {

let mut disks_rx = InternalDisksReceiver::spawn_with_disk_adopter(
Arc::new(any_mount_config()),
raw_disks_rx,
RawDisksReceiver(raw_disks_rx),
&logctx.log,
Arc::clone(&disk_adopter),
);
Expand Down
Loading
Loading