Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't destroy replicas part of shutdown nexuses #862

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
use crate::controller::{
reconciler::{GarbageCollect, PollContext, TaskPoller},
reconciler::{poller::ReconcilerWorker, GarbageCollect, PollContext, TaskPoller},
resources::{
operations::{ResourceLifecycle, ResourceResize},
operations::{ResourceLifecycle, ResourceOwnerUpdate, ResourceResize},
operations_helper::{OperationSequenceGuard, SpecOperationsHelper},
OperationGuardArc, TraceSpan, TraceStrLog,
OperationGuardArc, ResourceUid, TraceSpan, TraceStrLog,
},
task_poller::{PollEvent, PollResult, PollTimer, PollTriggerEvent, PollerState},
};

use crate::controller::{
reconciler::poller::ReconcilerWorker,
resources::{operations::ResourceOwnerUpdate, ResourceMutex, ResourceUid},
};
use agents::errors::SvcError;
use stor_port::types::v0::{
store::{
nexus::NexusSpec, nexus_persistence::NexusInfo, replica::ReplicaSpec, volume::VolumeSpec,
},
store::{nexus_persistence::NexusInfo, replica::ReplicaSpec, volume::VolumeSpec},
transport::{DestroyVolume, NexusOwners, ReplicaOwners, ResizeReplica, VolumeStatus},
};
use tracing::Instrument;
Expand Down Expand Up @@ -345,62 +338,42 @@ async fn disown_non_reservable_replicas(
let mut results = vec![];

// Fetch all nexuses that are not properly shutdown
let shutdown_failed_nexuses: Vec<ResourceMutex<NexusSpec>> = context
let shutdown_failed_nexuses = context
.registry()
.specs()
.volume_failed_shutdown_nexuses(volume.uuid())
.await;

// Remove the local child from the shutdown pending nexuses as they are
// non reservable.
// Remove the local children from the volume as they are non-reservable.
for nexus in shutdown_failed_nexuses {
if target.uuid() == nexus.uuid() {
tiagolobocastro marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

let children = nexus.lock().clone().children;
for child in children {
if let Some(replica_uri) = child.as_replica() {
if replica_uri.uri().is_local() {
if let Ok(mut replica) = context.specs().replica(replica_uri.uuid()).await {
if !replica.as_ref().dirty()
&& replica.as_ref().owners.owned_by(volume.uuid())
&& !replica
.as_ref()
.owners
.nexuses()
.iter()
.any(|p| p != nexus.uuid())
&& !is_replica_healthy(
context,
&mut nexus_info,
replica.as_ref(),
volume.as_ref(),
)
.await?
{
volume.warn_span(|| tracing::warn!(replica.uuid = %replica.as_ref().uuid, "Attempting to disown non reservable replica"));

// Since its a local replica of a shutting down nexus i.e. a failed
// shutdown nexus we don't want it
// to be a part of anything. Once this replica has no owner, the
// garbage collector will remove the nexus. Even if the clean up is
// delayed we since this replica is
// anyhow not a part of the volume, we should be safe.
match replica
.remove_owners(
context.registry(),
&ReplicaOwners::new_disown_all(),
true,
)
.await
{
Ok(_) => {
volume.info_span(|| tracing::info!(replica.uuid = %replica.as_ref().uuid, "Successfully disowned non reservable replica"));
}
Err(error) => {
volume.error_span(|| tracing::error!(replica.uuid = %replica.as_ref().uuid, "Failed to disown non reservable replica"));
results.push(Err(error));
}
}
}
}
let Some(replica_uri) = child.as_replica() else {
continue;
};
if !replica_uri.uri().is_local() {
continue;
}
let Ok(replica) = context.specs().replica(replica_uri.uuid()).await else {
continue;
};
if !replica.as_ref().dirty()
&& replica.as_ref().owners.owned_by(volume.uuid())
&& !replica
.as_ref()
.owners
.nexuses()
.iter()
.any(|p| p != nexus.uuid())
&& !is_replica_healthy(context, &mut nexus_info, replica.as_ref(), volume.as_ref())
tiagolobocastro marked this conversation as resolved.
Show resolved Hide resolved
.await?
{
if let Err(error) = disown_non_reservable_replica(volume, replica, context).await {
results.push(Err(error));
}
}
}
Expand All @@ -409,6 +382,36 @@ async fn disown_non_reservable_replicas(
GarbageCollector::squash_results(results)
}

async fn disown_non_reservable_replica(
volume: &OperationGuardArc<VolumeSpec>,
mut replica: OperationGuardArc<ReplicaSpec>,
context: &PollContext,
) -> PollResult {
volume.warn_span(|| tracing::warn!(replica.uuid = %replica.as_ref().uuid, "Attempting to disown non reservable replica"));

// Since it's a local replica of a shutting down nexus i.e. a failed
// shutdown nexus we don't want to be a part of anything until such nexus is destroyed.
// Even if the cleanup is delayed, since this replica is
// anyhow not a part of the new volume nexus, we should be safe.
match replica
.remove_owners(
context.registry(),
&ReplicaOwners::new(Some(volume.uuid().clone()), vec![]),
true,
)
.await
{
Ok(_) => {
volume.info_span(|| tracing::info!(replica.uuid = %replica.as_ref().uuid, "Successfully disowned non reservable replica"));
PollResult::Ok(PollerState::Idle)
}
Err(error) => {
volume.error_span(|| tracing::error!(replica.uuid = %replica.as_ref().uuid, "Failed to disown non reservable replica"));
PollResult::Err(error)
}
}
}

async fn is_replica_healthy(
context: &PollContext,
nexus_info: &mut Option<NexusInfo>,
Expand Down
156 changes: 154 additions & 2 deletions control-plane/agents/src/bin/core/tests/volume/switchover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use stor_port::{
openapi::{apis::specs_api::tower::client::direct::Specs, models, models::SpecStatus},
store::nexus::NexusSpec,
transport::{
CreateVolume, DestroyShutdownTargets, DestroyVolume, Filter, GetSpecs, Nexus,
PublishVolume, RepublishVolume, VolumeShareProtocol,
CreateReplica, CreateVolume, DestroyReplica, DestroyShutdownTargets, DestroyVolume,
Filter, GetSpecs, Nexus, NodeStatus, PublishVolume, ReplicaId, RepublishVolume,
VolumeShareProtocol,
},
},
};
Expand Down Expand Up @@ -542,3 +543,154 @@ async fn node_exhaustion() {
assert_eq!(error.kind, ReplyErrorKind::ResourceExhausted);
assert_eq!(error.resource, ResourceKind::Node);
}

#[tokio::test]
async fn shutdown_failed_replica_owner() {
let grpc_timeout = Duration::from_millis(512);

const POOL_SIZE_BYTES: u64 = 128 * 1024 * 1024;
let cluster = ClusterBuilder::builder()
.with_rest(true)
.with_agents(vec!["core"])
.with_io_engines(2)
.with_tmpfs_pool(POOL_SIZE_BYTES)
.with_cache_period("100ms")
.with_reconcile_period(Duration::from_millis(100), Duration::from_millis(100))
.with_node_deadline("2s")
.with_options(|o| {
o.with_io_engine_env("MAYASTOR_HB_INTERVAL_SEC", "1")
.with_isolated_io_engine(true)
})
.with_req_timeouts_min(true, grpc_timeout, grpc_timeout)
.build()
.await
.unwrap();

let vol_cli = cluster.grpc_client().volume();

let volume = vol_cli
.create(
&CreateVolume {
uuid: "1e3cf927-80c2-47a8-adf0-95c486bdd7b7".try_into().unwrap(),
size: 5242880,
replicas: 2,
..Default::default()
},
None,
)
.await
.unwrap();

let volume = vol_cli
.publish(
&PublishVolume {
uuid: volume.uuid().clone(),
share: None,
target_node: Some(cluster.node(0)),
publish_context: HashMap::new(),
frontend_nodes: vec![cluster.node(1).to_string()],
},
None,
)
.await
.unwrap();

cluster
.composer()
.pause(cluster.node(0).as_str())
.await
.unwrap();

cluster
.wait_node_status(cluster.node(0), NodeStatus::Unknown)
.await
.unwrap();

vol_cli
.republish(
&RepublishVolume {
uuid: volume.uuid().clone(),
target_node: Some(cluster.node(1)),
share: VolumeShareProtocol::Nvmf,
reuse_existing: true,
frontend_node: cluster.node(1),
reuse_existing_fallback: false,
},
None,
)
.await
.unwrap();

cluster
.composer()
.thaw(cluster.node(0).as_str())
.await
.unwrap();

cluster
.wait_node_status(cluster.node(0), NodeStatus::Online)
.await
.unwrap();

assert!(!wait_till_pool_locked(&cluster).await);

let nexus_cli = cluster.grpc_client().nexus();
let nexuses = nexus_cli
.get(Filter::Node(cluster.node(0)), None)
.await
.unwrap();
assert_eq!(nexuses.into_inner().len(), 1);

let request = DestroyShutdownTargets::new(volume.uuid().clone(), None);
vol_cli
.destroy_shutdown_target(&request, None)
.await
.unwrap();

let nexuses = nexus_cli
.get(Filter::Node(cluster.node(0)), None)
.await
.unwrap();
assert!(nexuses.into_inner().is_empty());
}

async fn poll_pool_locked(cluster: &Cluster, is_locked: &mut bool) {
if *is_locked {
return;
}

let replica_cli = cluster.grpc_client().replica();
let req = CreateReplica {
node: cluster.node(0),
uuid: ReplicaId::new(),
pool_id: cluster.pool(0, 0),
size: 4 * 1024 * 1024,
thin: true,
..Default::default()
};

let Ok(replica) = replica_cli.create(&req, None).await else {
*is_locked = true;
return;
};

let req = DestroyReplica::from(replica);
*is_locked = replica_cli.destroy(&req, None).await.is_err();
}

async fn wait_till_pool_locked(cluster: &Cluster) -> bool {
let timeout = Duration::from_secs(2);
let start = std::time::Instant::now();
let mut is_locked = false;
loop {
poll_pool_locked(cluster, &mut is_locked).await;
if is_locked {
return true;
}

if std::time::Instant::now() > (start + timeout) {
return false;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
}