Skip to content

Commit

Permalink
chore(bors): merge pull request #695
Browse files Browse the repository at this point in the history
695: Fixup correct nvme controller and make child add v1 idempotent r=tiagolobocastro a=tiagolobocastro

    feat(csi/node/timeout): add nvme-io-engine timeout and parse humantime
    
    Adds new parameter "--nvme-io-timeout".
    This is used to set the timeout per nvme block device.
    TODO: Check if this is enough to avoid setting the global timeout..
    Also let's parse the "--nvme-core-io-timeout" as humantime as well..
    
    Signed-off-by: Tiago Castro <[email protected]>

---

    fix(nexus/add-child/v1): make add child v1 idempotent
    
    When v1 nexus add child was added, it was not made idempotent.
    Even though this is not an issue per se, as the child eventually gets
    GCd and re-added it can cause strange logging..
    TODO: should we have different behaviour depending on the state?
    Example if faulted should we remove/readd?
    Bonus: Fixes old test which stopped working a long time ago when
    pstor was enabled for the data-plane by not enabling it for that
    particular test only..
    
    Signed-off-by: Tiago Castro <[email protected]>

---

    fix(csi-node/nvmf/fixup): fixup correct nvme controller
    
    When we replace an existing path, the new path has a different controller number. And so
    the controller number and device number now mismatch, meaning we can not safely deref
    /sys/class/nvme/nvme{major}
    Instead, we can simply deref
    /sys/class/block/nvme{major}c*n1/queue
    The major ensures we use the original device number, and the glob ensures we modify the
    timeout for all controllers.
    
    Signed-off-by: Tiago Castro <[email protected]>


Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Dec 6, 2023
2 parents baa101c + d6f3380 commit cbc833a
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 76 deletions.
37 changes: 27 additions & 10 deletions control-plane/agents/src/bin/core/controller/io_engine/v1/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,39 @@ impl crate::controller::io_engine::NexusChildApi<Nexus, Nexus, ()> for super::Rp
err
)]
async fn add_child(&self, request: &AddNexusChild) -> Result<Nexus, SvcError> {
let rpc_nexus = self
let result = self
.nexus()
.add_child_nexus(request.to_rpc())
.await
.context(GrpcRequestError {
resource: ResourceKind::Child,
request: "add_child_nexus",
})?;
match rpc_nexus.into_inner().nexus {
None => Err(SvcError::Internal {
details: format!(
"resource: {}, request: {}, err: {}",
"Nexus", "add_child", "no nexus returned"
),
}),
Some(nexus) => Ok(rpc_nexus_to_agent(&nexus, &request.node)?),
});
match result {
Ok(rpc_nexus) => match rpc_nexus.into_inner().nexus {
None => Err(SvcError::Internal {
details: format!(
"resource: {}, request: {}, err: {}",
"Nexus", "add_child", "no nexus returned"
),
}),
Some(nexus) => Ok(rpc_nexus_to_agent(&nexus, &request.node)?),
},
Err(error) if error.tonic_code() == tonic::Code::AlreadyExists => {
let nexus = self.fetch_nexus(&request.nexus).await?;
if let Some(child) = nexus.child(request.uri.as_str()) {
// todo: Should we do anything here depending on the state?
tracing::warn!(
?child,
nexus=%request.nexus,
"Child is already part of the nexus"
);
Ok(nexus)
} else {
Err(error)
}
}
Err(error) => Err(error),
}
}

Expand Down
26 changes: 19 additions & 7 deletions control-plane/agents/src/bin/core/tests/nexus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ async fn nexus_child_transaction() {
.with_agents(vec!["core"])
.with_req_timeouts(grpc_timeout, grpc_timeout)
.with_grpc_timeouts(grpc_timeout_opts())
.with_reconcile_period(Duration::from_secs(100), Duration::from_secs(100))
.build()
.await
.unwrap();
Expand Down Expand Up @@ -489,12 +490,12 @@ async fn nexus_child_transaction() {
// unpause io_engine
cluster.composer().thaw(io_engine.as_str()).await.unwrap();

// now it should be shared successfully
let uri = nexus_client
// now it should be added successfully
let child = nexus_client
.add_nexus_child(&add_child, None)
.await
.unwrap();
println!("Share uri: {uri:?}");
println!("Child: {child:?}");

cluster.composer().pause(io_engine.as_str()).await.unwrap();

Expand All @@ -520,13 +521,23 @@ async fn nexus_child_transaction() {
.len(),
1
);

let mut io_engine = cluster.grpc_handle(&cluster.node(0)).await.unwrap();
io_engine
.add_child(add_child.nexus.as_str(), add_child.uri.as_str(), true)
.await
.unwrap();

// now it should be added successfully
let child = nexus_client
.add_nexus_child(&add_child, None)
.await
.unwrap();
println!("Child: {child:?}");
}

/// Tests child add and remove operations when the store is temporarily down
/// TODO: these tests don't work anymore because the io_engine also writes child healthy states
/// to etcd so we can't simply pause etcd anymore..
/// Tests child add and remove operations when the store is temporarily down.
#[tokio::test]
#[ignore]
async fn nexus_child_transaction_store() {
let store_timeout = Duration::from_millis(250);
let reconcile_period = Duration::from_millis(250);
Expand All @@ -539,6 +550,7 @@ async fn nexus_child_transaction_store() {
.with_reconcile_period(reconcile_period, reconcile_period)
.with_store_timeout(store_timeout)
.with_grpc_timeouts(grpc_timeout_opts())
.with_options(|b| b.with_io_engine_no_pstor(true))
.build()
.await
.unwrap();
Expand Down
34 changes: 33 additions & 1 deletion control-plane/csi-driver/src/bin/node/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ pub fn nvme_keep_alive_tmo() -> String {
pub fn nvme_ctrl_loss_tmo() -> String {
Parameters::NvmeCtrlLossTmo.as_ref().to_kebab_case()
}
/// Command line arg name for `Parameters::NvmeIoTimeout`.
pub fn nvme_io_tmo() -> String {
Parameters::NvmeIoTimeout.as_ref().to_kebab_case()
}

/// Global configuration parameters.
#[derive(Debug, Default)]
Expand All @@ -42,17 +46,21 @@ pub(crate) struct NvmeConfig {
/// Default value for `ctrl_loss_tmo` when not specified via the volume parameters (sc).
ctrl_loss_tmo: Option<u32>,
keep_alive_tmo: Option<u32>,
/// Default value for `io_tmo` when not specified via the volume parameters (sc).
io_tmo: Option<humantime::Duration>,
}
impl NvmeConfig {
fn new(
nr_io_queues: Option<u32>,
ctrl_loss_tmo: Option<u32>,
keep_alive_tmo: Option<u32>,
io_tmo: Option<humantime::Duration>,
) -> Self {
Self {
nr_io_queues,
ctrl_loss_tmo,
keep_alive_tmo,
io_tmo,
}
}
/// Number of IO Queues.
Expand All @@ -68,6 +76,10 @@ impl NvmeConfig {
pub(crate) fn keep_alive_tmo(&self) -> Option<u32> {
self.keep_alive_tmo
}
/// The io timeout.
pub(crate) fn io_tmo(&self) -> Option<humantime::Duration> {
self.io_tmo
}
}

/// Get a mutex guard over the `Config`.
Expand Down Expand Up @@ -112,7 +124,22 @@ impl TryFrom<NvmeArgValues> for NvmeConfig {
error
)
})?;
Ok(Self::new(nvme_nr_ioq, ctrl_loss_tmo, keep_alive_tmo))
let nvme_io_tmo = Parameters::nvme_io_timeout(
src.0.get(Parameters::NvmeIoTimeout.as_ref()),
)
.map_err(|error| {
anyhow::anyhow!(
"Invalid value for {}, error = {}",
Parameters::NvmeIoTimeout.as_ref(),
error
)
})?;
Ok(Self::new(
nvme_nr_ioq,
ctrl_loss_tmo,
keep_alive_tmo,
nvme_io_tmo,
))
}
}
/// Nvme Arguments taken from the CSI volume calls (storage class parameters).
Expand Down Expand Up @@ -155,6 +182,11 @@ impl TryFrom<&ArgMatches> for NvmeArgValues {
map.0
.insert(Parameters::NvmeKeepAliveTmo.to_string(), value.to_string());
}

if let Some(value) = matches.get_one::<String>(&nvme_io_tmo()) {
map.0
.insert(Parameters::NvmeIoTimeout.to_string(), value.to_string());
}
Ok(map)
}
}
Expand Down
112 changes: 61 additions & 51 deletions control-plane/csi-driver/src/bin/node/dev/nvmf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use csi_driver::PublishParams;
use glob::glob;
use nvmeadm::nvmf_subsystem::Subsystem;
use regex::Regex;
use tracing::debug;
use udev::{Device, Enumerator};
use url::Url;
use uuid::Uuid;
Expand All @@ -35,7 +34,7 @@ pub(super) struct NvmfAttach {
port: u16,
uuid: Uuid,
nqn: String,
io_timeout: Option<u32>,
io_tmo: Option<u32>,
nr_io_queues: Option<u32>,
ctrl_loss_tmo: Option<u32>,
keep_alive_tmo: Option<u32>,
Expand All @@ -50,6 +49,7 @@ impl NvmfAttach {
uuid: Uuid,
nqn: String,
nr_io_queues: Option<u32>,
io_tmo: Option<humantime::Duration>,
ctrl_loss_tmo: Option<u32>,
keep_alive_tmo: Option<u32>,
hostnqn: Option<String>,
Expand All @@ -59,7 +59,7 @@ impl NvmfAttach {
port,
uuid,
nqn,
io_timeout: None,
io_tmo: io_tmo.map(|io_tmo| io_tmo.as_secs().try_into().unwrap_or(u32::MAX)),
nr_io_queues,
ctrl_loss_tmo,
keep_alive_tmo,
Expand Down Expand Up @@ -104,6 +104,7 @@ impl TryFrom<&Url> for NvmfAttach {
let nr_io_queues = config().nvme().nr_io_queues();
let ctrl_loss_tmo = config().nvme().ctrl_loss_tmo();
let keep_alive_tmo = config().nvme().keep_alive_tmo();
let io_tmo = config().nvme().io_tmo();

let hash_query: HashMap<_, _> = url.query_pairs().collect();
let hostnqn = hash_query.get("hostnqn").map(ToString::to_string);
Expand All @@ -114,6 +115,7 @@ impl TryFrom<&Url> for NvmfAttach {
uuid,
segments[0].to_string(),
nr_io_queues,
io_tmo,
ctrl_loss_tmo,
keep_alive_tmo,
hostnqn,
Expand All @@ -130,9 +132,6 @@ impl Attach for NvmfAttach {
let publish_context = PublishParams::try_from(context)
.map_err(|error| DeviceError::new(&error.to_string()))?;

if let Some(val) = publish_context.io_timeout() {
self.io_timeout = Some(*val);
}
if let Some(val) = publish_context.ctrl_loss_tmo() {
self.ctrl_loss_tmo = Some(*val);
}
Expand All @@ -159,7 +158,7 @@ impl Attach for NvmfAttach {
Err(NvmeError::SubsystemNotFound { .. }) => {
// The default reconnect delay in linux kernel is set to 10s. Use the
// same default value unless the timeout is less or equal to 10.
let reconnect_delay = match self.io_timeout {
let reconnect_delay = match self.io_tmo {
Some(io_timeout) => {
if io_timeout <= 10 {
Some(1)
Expand Down Expand Up @@ -200,47 +199,59 @@ impl Attach for NvmfAttach {
}

async fn fixup(&self) -> Result<(), DeviceError> {
if let Some(io_timeout) = self.io_timeout {
let device = self
.get_device()?
.ok_or_else(|| DeviceError::new("NVMe device not found"))?;
let dev_name = device.sysname().to_str().unwrap();
let major = DEVICE_REGEX
.captures(dev_name)
.ok_or_else(|| {
DeviceError::new(&format!(
"NVMe device \"{}\" does not match \"{}\"",
dev_name, *DEVICE_REGEX,
))
})?
.get(1)
.unwrap()
.as_str();
let pattern = format!("/sys/class/nvme/nvme{major}/nvme*n1/queue");
let path = glob(&pattern)
.unwrap()
.next()
.ok_or_else(|| {
DeviceError::new(&format!(
"failed to look up sysfs device directory \"{pattern}\"",
))
})?
.map_err(|_| {
DeviceError::new(&format!(
"IO error when reading device directory \"{pattern}\""
))
})?;
// If the timeout was higher than nexus's timeout then IOs could
// error out earlier than they should. Therefore we should make sure
// that timeouts in the nexus are set to a very high value.
debug!(
"Setting IO timeout on \"{}\" to {}s",
path.to_string_lossy(),
io_timeout
);
sysfs::write_value(&path, "io_timeout", 1000 * io_timeout)?;
let Some(io_timeout) = self.io_tmo else {
return Ok(());
};

let device = self
.get_device()?
.ok_or_else(|| DeviceError::new("NVMe device not found"))?;
let dev_name = device.sysname().to_str().unwrap();
let major = DEVICE_REGEX
.captures(dev_name)
.ok_or_else(|| {
DeviceError::new(&format!(
"NVMe device \"{}\" does not match \"{}\"",
dev_name, *DEVICE_REGEX,
))
})?
.get(1)
.unwrap()
.as_str();
let pattern = format!("/sys/class/block/nvme{major}c*n1/queue");
let glob = glob(&pattern).unwrap();
let result = glob
.into_iter()
.map(|glob_result| {
match glob_result {
Ok(path) => {
let path_str = path.display();
// If the timeout was higher than nexus's timeout then IOs could
// error out earlier than they should. Therefore we should make sure
// that timeouts in the nexus are set to a very high value.
tracing::debug!("Setting IO timeout on \"{path_str}\" to {io_timeout}s",);
sysfs::write_value(&path, "io_timeout", 1000 * io_timeout).map_err(
|error| {
tracing::error!(%error, path=%path_str, "Failed to set io_timeout to {io_timeout}s");
error.into()
},
)
}
Err(error) => {
// This should never happen as we should always have permissions to list.
tracing::error!(%error, "Unable to collect sysfs for /dev/nvme{major}");
Err(DeviceError::new(error.to_string().as_str()))
}
}
})
.collect::<Result<Vec<()>, DeviceError>>();
match result {
Ok(r) if r.is_empty() => Err(DeviceError::new(&format!(
"look up of sysfs device directory \"{pattern}\" found 0 entries",
))),
Ok(_) => Ok(()),
Err(error) => Err(error),
}
Ok(())
}
}

Expand Down Expand Up @@ -284,10 +295,9 @@ pub(crate) fn check_nvme_tcp_module() -> Result<(), std::io::Error> {
/// (note, this is a system-wide parameter)
pub(crate) fn set_nvmecore_iotimeout(io_timeout_secs: u32) -> Result<(), std::io::Error> {
let path = Path::new("/sys/module/nvme_core/parameters");
debug!(
"Setting nvme_core IO timeout on \"{}\" to {}s",
path.to_string_lossy(),
io_timeout_secs
tracing::debug!(
"Setting nvme_core IO timeout on \"{path}\" to {io_timeout_secs}s",
path = path.to_string_lossy(),
);
sysfs::write_value(path, "io_timeout", io_timeout_secs)?;
Ok(())
Expand Down
Loading

0 comments on commit cbc833a

Please sign in to comment.