Skip to content

Commit

Permalink
chore(bors): merge pull request #641
Browse files Browse the repository at this point in the history
641: fix(nexus/replica): check with replica owner before destroying it r=tiagolobocastro a=tiagolobocastro

When removing a nexus child, don't attempt to destroy the equivalent replica, let the other reconcilers do this.
Adds a test for this using a fake nexus volume.

Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Jul 17, 2023
2 parents a3fcbbc + 035148e commit 6d08273
Show file tree
Hide file tree
Showing 12 changed files with 285 additions and 168 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::controller::{
reconciler::{poller::ReconcilerWorker, GarbageCollect, PollContext, TaskPoller},
resources::{
operations::ResourceLifecycle,
operations::{ResourceLifecycle, ResourceOwnerUpdate},
operations_helper::{OperationSequenceGuard, SpecOperationsHelper},
OperationGuardArc, TraceSpan,
},
task_poller::{PollEvent, PollResult, PollTimer, PollTriggerEvent, PollerState},
};
use stor_port::types::v0::{store::nexus::NexusSpec, transport::DestroyNexus};
use stor_port::types::v0::{
store::nexus::NexusSpec,
transport::{DestroyNexus, NexusOwners},
};
use tracing::Instrument;

/// Nexus Garbage Collector reconciler
Expand Down Expand Up @@ -98,8 +101,11 @@ async fn destroy_orphaned_nexus(

if let Some(owner) = &nexus.as_ref().owner {
if context.specs().volume_rsc(owner).is_none() {
let owner = NexusOwners::All;
nexus.warn_span(|| tracing::warn!("Attempting to disown orphaned nexus"));
nexus.disown(context.registry()).await?;
nexus
.remove_owners(context.registry(), &owner, true)
.await?;
nexus.info_span(|| tracing::info!("Successfully disowned orphaned nexus"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use stor_port::{
store::{
nexus::{NexusSpec, ReplicaUri},
nexus_child::NexusChild,
volume::VolumeSpec,
},
transport::{
Child, ChildUri, CreateNexus, Nexus, NexusChildActionContext, NexusShareProtocol,
Expand All @@ -44,7 +45,6 @@ use std::{
sync::Arc,
time::{Duration, SystemTime},
};

use tokio::sync::RwLock;
use tracing::Instrument;

Expand Down Expand Up @@ -125,9 +125,9 @@ async fn nexus_reconciler(

if reconcile {
match squash_results(vec![
handle_faulted_children(nexus, context).await,
handle_faulted_children(nexus, &mut None, context).await,
unknown_children_remover(nexus, context).await,
missing_children_remover(nexus, context).await,
missing_children_remover(nexus, &mut None, context).await,
fixup_nexus_protocol(nexus, context).await,
enospc_children_onliner(nexus, context).await,
]) {
Expand All @@ -144,14 +144,15 @@ async fn nexus_reconciler(
#[tracing::instrument(skip(nexus, context), level = "trace", fields(nexus.uuid = %nexus.uuid(), nexus.node = %nexus.as_ref().node, request.reconcile = true))]
pub(super) async fn handle_faulted_children(
nexus: &mut OperationGuardArc<NexusSpec>,
volume: &mut Option<&mut OperationGuardArc<VolumeSpec>>,
context: &PollContext,
) -> PollResult {
let nexus_uuid = nexus.uuid();
let nexus_state = context.registry().nexus(nexus_uuid).await?;
let child_count = nexus_state.children.len();
if nexus_state.status == NexusStatus::Degraded && child_count > 1 {
for child in nexus_state.children.iter().filter(|c| c.state.faulted()) {
let _ = handle_faulted_child(nexus, child, &nexus_state, context).await;
let _ = handle_faulted_child(nexus, volume, child, &nexus_state, context).await;
}
}
Ok(PollerState::Idle)
Expand All @@ -162,6 +163,7 @@ pub(super) async fn handle_faulted_children(
/// or it opts for a full rebuilding by faulting the child instead.
async fn handle_faulted_child(
nexus_spec: &mut OperationGuardArc<NexusSpec>,
volume: &mut Option<&mut OperationGuardArc<VolumeSpec>>,
child: &Child,
nexus: &Nexus,
context: &PollContext,
Expand All @@ -171,16 +173,16 @@ async fn handle_faulted_child(

let Some(child_uuid) = nexus_spec.as_ref().replica_uri(&child.uri).map(|r| r.uuid()) else {
tracing::warn!(%child.uri, "Unknown Child found, a full rebuild is required");
return faulted_children_remover(nexus_spec, child, context).await;
return faulted_children_remover(nexus_spec, volume, child, context).await;
};
let Some(faulted_at) = child.faulted_at else {
tracing::warn!(%child.uri, child.uuid=%child_uuid, "Child faulted without a timestamp set, a full rebuild required");
return faulted_children_remover(nexus_spec, child, context).await;
return faulted_children_remover(nexus_spec, volume, child, context).await;
};

if !can_partial_rebuild {
tracing::warn!(%child.uri, child.uuid=%child_uuid, ?child.state_reason, "No IO log available, this child cannot be partially rebuilt");
faulted_children_remover(nexus_spec, child, context).await?;
faulted_children_remover(nexus_spec, volume, child, context).await?;
} else if child_replica_is_online(child_uuid, context).await {
let child_uuid = child_uuid.clone();
tracing::info!(%child.uri, child.uuid=%child_uuid, ?child.state_reason, "Child's replica is back online within the partial rebuild window");
Expand All @@ -192,20 +194,21 @@ async fn handle_faulted_child(
%error,
"Failed to online child, a full rebuild is required"
);
faulted_children_remover(nexus_spec, child, context).await?;
faulted_children_remover(nexus_spec, volume, child, context).await?;
}
} else if let Some(elapsed) =
wait_duration_elapsed(&child.uri, child_uuid, faulted_at, wait_duration)
{
tracing::warn!(%child.uri, child.uuid=%child_uuid, ?child.state_reason, ?elapsed, "Partial rebuild window elapsed, a full rebuild is required");
faulted_children_remover(nexus_spec, child, context).await?;
faulted_children_remover(nexus_spec, volume, child, context).await?;
}
Ok(())
}

/// Removes child from nexus children list to initiate Full rebuild of child.
async fn faulted_children_remover(
nexus: &mut OperationGuardArc<NexusSpec>,
volume: &mut Option<&mut OperationGuardArc<VolumeSpec>>,
child: &Child,
context: &PollContext,
) -> Result<(), SvcError> {
Expand All @@ -216,9 +219,8 @@ async fn faulted_children_remover(
tracing::warn!(%child.uri, %child.state, %child.state_reason, ?faulted_at, "Attempting to remove faulted child")
});
nexus
.remove_child_by_uri(context.registry(), &nexus_state, &child.uri, true)
.remove_vol_child_by_uri(volume, context.registry(), &nexus_state, &child.uri)
.await?;

nexus.warn_span(|| {
tracing::warn!(%child.uri, %child.state, %child.state_reason, ?faulted_at, "Successfully removed faulted child")
});
Expand Down Expand Up @@ -301,7 +303,7 @@ pub(super) async fn unknown_children_remover(
tracing::warn!("Attempting to remove unknown child '{}'", child.uri)
});
if let Err(error) = nexus
.remove_child_by_uri(context.registry(), &nexus_state, &child.uri, false)
.remove_child_by_uri(context.registry(), &nexus_state, &child.uri)
.await
{
nexus.error(&format!(
Expand Down Expand Up @@ -330,6 +332,7 @@ pub(super) async fn unknown_children_remover(
#[tracing::instrument(skip(nexus, context), level = "trace", fields(nexus.uuid = %nexus.uuid(), request.reconcile = true))]
pub(super) async fn missing_children_remover(
nexus: &mut OperationGuardArc<NexusSpec>,
volume: &mut Option<&mut OperationGuardArc<VolumeSpec>>,
context: &PollContext,
) -> PollResult {
let nexus_uuid = nexus.uuid();
Expand All @@ -346,7 +349,7 @@ pub(super) async fn missing_children_remover(
));

if let Err(error) = nexus
.remove_child_by_uri(context.registry(), &nexus_state, &child.uri(), true)
.remove_vol_child_by_uri(volume, context.registry(), &nexus_state, &child.uri())
.await
{
nexus.error_span(|| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use stor_port::types::v0::{
store::{
nexus::NexusSpec, nexus_persistence::NexusInfo, replica::ReplicaSpec, volume::VolumeSpec,
},
transport::{DestroyVolume, ReplicaOwners, VolumeStatus},
transport::{DestroyVolume, NexusOwners, ReplicaOwners, VolumeStatus},
};
use tracing::Instrument;

Expand Down Expand Up @@ -160,7 +160,8 @@ async fn disown_unused_nexuses(

nexus.warn_span(|| tracing::warn!("Attempting to disown unused nexus"));
// the nexus garbage collector will destroy the disowned nexuses
match nexus.disown(context.registry()).await {
let owner = NexusOwners::Volume(volume.uuid().clone());
match nexus.remove_owners(context.registry(), &owner, false).await {
Ok(_) => {
nexus.info_span(|| tracing::info!("Successfully disowned unused nexus"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ async fn hot_spare_reconcile(
volume_spec: &mut ResourceMutex<VolumeSpec>,
context: &PollContext,
) -> PollResult {
let uuid = volume_spec.uuid();
let volume_state = context.registry().volume_state(uuid).await?;
let mut volume = match volume_spec.operation_guard() {
Ok(guard) => guard,
Err(_) => return PollResult::Ok(PollerState::Busy),
};
let volume_state = context.registry().volume_state(volume.uuid()).await?;

if !volume.as_ref().policy.self_heal {
return PollResult::Ok(PollerState::Idle);
Expand Down Expand Up @@ -84,7 +83,7 @@ async fn hot_spare_nexus_reconcile(
}

// generic nexus reconciliation (does not matter that it belongs to a volume)
results.push(generic_nexus_reconciler(&mut nexus, context).await);
results.push(generic_nexus_reconciler(volume, &mut nexus, context).await);

// fixup the volume replica count: creates new replicas when we're behind
// removes extra replicas but only if they're UNUSED (by a nexus)
Expand All @@ -100,26 +99,28 @@ async fn hot_spare_nexus_reconcile(

#[tracing::instrument(skip(context, nexus), fields(nexus.uuid = %nexus.uuid(), nexus.node = %nexus.as_ref().node, volume.uuid = tracing::field::Empty, request.reconcile = true))]
async fn generic_nexus_reconciler(
volume: &mut OperationGuardArc<VolumeSpec>,
nexus: &mut OperationGuardArc<NexusSpec>,
context: &PollContext,
) -> PollResult {
if let Some(ref volume_uuid) = nexus.as_ref().owner {
tracing::Span::current().record("volume.uuid", volume_uuid.as_str());
}
let mut results = vec![];
results.push(handle_faulted_children(nexus, context).await);
results.push(handle_faulted_children(nexus, &mut Some(volume), context).await);
results.push(unknown_children_remover(nexus, context).await);
results.push(missing_children_remover(nexus, context).await);
results.push(missing_children_remover(nexus, &mut Some(volume), context).await);
squash_results(results)
}

/// Checks if nexus is Degraded and any child is Faulted. If yes, Depending on rebuild policy for
/// child it performs rebuild operation. We exclude NoSpace Degrade.
async fn handle_faulted_children(
nexus: &mut OperationGuardArc<NexusSpec>,
volume: &mut Option<&mut OperationGuardArc<VolumeSpec>>,
context: &PollContext,
) -> PollResult {
nexus::handle_faulted_children(nexus, context).await
nexus::handle_faulted_children(nexus, volume, context).await
}

/// Given a degraded volume
Expand All @@ -139,9 +140,10 @@ async fn unknown_children_remover(
/// And the replicas should eventually be destroyed
async fn missing_children_remover(
nexus: &mut OperationGuardArc<NexusSpec>,
volume: &mut Option<&mut OperationGuardArc<VolumeSpec>>,
context: &PollContext,
) -> PollResult {
nexus::missing_children_remover(nexus, context).await
nexus::missing_children_remover(nexus, volume, context).await
}

/// Given a degraded volume
Expand Down
74 changes: 37 additions & 37 deletions control-plane/agents/src/bin/core/nexus/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use crate::{
registry::Registry,
resources::{
operations::{
ResourceLifecycle, ResourceOffspring, ResourceSharing, ResourceShutdownOperations,
ResourceSnapshotting,
ResourceLifecycle, ResourceOffspring, ResourceOwnerUpdate, ResourceSharing,
ResourceShutdownOperations,
},
operations_helper::{
GuardedOperationsHelper, OnCreateFail, OperationSequenceGuard, SpecOperationsHelper,
},
OperationGuardArc, TraceSpan,
OperationGuardArc, TraceSpan, UpdateInnerValue,
},
scheduling::resources::HealthyChildItems,
wrapper::{GetterOps, NodeWrapper},
Expand All @@ -26,7 +26,7 @@ use stor_port::types::v0::{
transport::{
child::Child,
nexus::{CreateNexus, DestroyNexus, Nexus, ShareNexus, UnshareNexus},
AddNexusChild, FaultNexusChild, NodeStatus, RemoveNexusChild, ShutdownNexus,
AddNexusChild, FaultNexusChild, NexusOwners, NodeStatus, RemoveNexusChild, ShutdownNexus,
},
};

Expand Down Expand Up @@ -350,6 +350,39 @@ impl ResourceShutdownOperations for OperationGuardArc<NexusSpec> {
}
}

#[async_trait::async_trait]
impl ResourceOwnerUpdate for OperationGuardArc<NexusSpec> {
type Update = NexusOwners;

async fn remove_owners(
&mut self,
registry: &Registry,
request: &Self::Update,
update_on_commit: bool,
) -> Result<(), SvcError> {
// we don't really need the state, this is a configuration-only change.
let state = Default::default();

let spec_clone = self
.start_update(
registry,
&state,
NexusOperation::OwnerUpdate(request.clone()),
)
.await?;

match self.complete_update(registry, Ok(()), spec_clone).await {
Ok(_) => Ok(()),
Err(SvcError::Store { .. }) if update_on_commit => {
self.lock().disowned_by_owners(request);
self.update();
Ok(())
}
Err(error) => Err(error),
}
}
}

impl OperationGuardArc<NexusSpec> {
async fn create_nexus(
&self,
Expand Down Expand Up @@ -491,36 +524,3 @@ impl OperationGuardArc<NexusSpec> {
}
}
}

#[async_trait::async_trait]
impl ResourceSnapshotting for OperationGuardArc<NexusSpec> {
type Create = ();
type CreateOutput = ();
type Destroy = ();
type List = ();
type ListOutput = ();

async fn create_snap(
&mut self,
_registry: &Registry,
_request: &Self::Create,
) -> Result<Self::CreateOutput, SvcError> {
todo!()
}

async fn list_snaps(
&self,
_registry: &Registry,
_request: &Self::List,
) -> Result<Self::ListOutput, SvcError> {
todo!()
}

async fn destroy_snap(
&mut self,
_registry: &Registry,
_request: &Self::Destroy,
) -> Result<(), SvcError> {
todo!()
}
}
Loading

0 comments on commit 6d08273

Please sign in to comment.