Skip to content

Commit

Permalink
Cleanup running containers on the Control-C signal
Browse files Browse the repository at this point in the history
- Add the `signal` feature to `tokio` to interrupt and handle the
  Control-C signal in Butido.
- Add Control-C signal handling into the `Orchestrator`.
- Implement `Drop` on the `JobHandle` to ensure container cleanup.

Fixes science-computing#409

Signed-off-by: Nico Steinle <[email protected]>
  • Loading branch information
ammernico committed Oct 14, 2024
1 parent f2c8507 commit 752d2fc
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ shiplift = "0.7"
syntect = "5"
tar = "0.4"
terminal_size = "0.4"
tokio = { version = "1", features = ["macros", "fs", "process", "io-util", "time"] }
tokio = { version = "1", features = ["macros", "fs", "process", "io-util", "signal", "time"] }
tokio-util = "0.7"
tokio-stream = "0.1"
toml = "0.8"
tracing = "0.1"
Expand Down
1 change: 1 addition & 0 deletions src/endpoint/configured.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ impl From<shiplift::rep::Image> for Image {
}
}

#[derive(Clone)]
pub struct EndpointHandle(Arc<Endpoint>);

impl EndpointHandle {
Expand Down
42 changes: 38 additions & 4 deletions src/endpoint/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use itertools::Itertools;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::RwLock;
use tracing::trace;
use tracing::{debug, error, trace};
use uuid::Uuid;

use crate::db::models as dbmodels;
Expand Down Expand Up @@ -95,6 +95,7 @@ impl EndpointScheduler {
log_dir: self.log_dir.clone(),
bar,
endpoint,
container_id: None,
max_endpoint_name_length: self.max_endpoint_name_length,
job,
staging_store: self.staging_store.clone(),
Expand Down Expand Up @@ -136,9 +137,11 @@ impl EndpointScheduler {
}
}

#[derive(Clone)]
pub struct JobHandle {
log_dir: Option<PathBuf>,
endpoint: EndpointHandle,
container_id: Option<String>,
max_endpoint_name_length: usize,
job: RunnableJob,
bar: ProgressBar,
Expand All @@ -155,7 +158,7 @@ impl std::fmt::Debug for JobHandle {
}

impl JobHandle {
pub async fn run(self) -> Result<Result<Vec<ArtifactPath>>> {
pub async fn run(mut self) -> Result<Result<Vec<ArtifactPath>>> {
let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
let endpoint_uri = self.endpoint.uri().clone();
let endpoint_name = self.endpoint.name().clone();
Expand All @@ -181,6 +184,7 @@ impl JobHandle {
)
.await?;
let container_id = prepared_container.create_info().id.clone();
self.container_id = Some(container_id.clone());
let running_container = prepared_container
.start()
.await
Expand All @@ -202,12 +206,12 @@ impl JobHandle {
package_name: &package.name,
package_version: &package.version,
log_dir: self.log_dir.as_ref(),
job: self.job,
job: self.job.clone(),
log_receiver,
bar: self.bar.clone(),
}
.join();
drop(self.bar);
drop(self.bar.clone());

let (run_container, logres) = tokio::join!(running_container, logres);
let log =
Expand Down Expand Up @@ -370,6 +374,36 @@ impl JobHandle {
}
}

impl Drop for JobHandle {
fn drop(&mut self) {
debug!("Cleaning up JobHandle");
if self.container_id.is_some() {
debug!("Container was already started");
let docker = self.endpoint.docker().clone();
let container_id = self.container_id.take().unwrap();

tokio::spawn(async move {
let container = docker.containers().get(&container_id);
let container_info = container.inspect().await.unwrap();

if container_info.state.running {
debug!("Container is still running, cleaning up...");
match container.kill(None).await {
Ok(_) => debug!("Stopped container with id: {}", container_id),
Err(e) => {
error!("Failed to stop container with id: {}\n{}", container_id, e)
}
}
} else {
debug!("Container has already finished");
}
});
} else {
debug!("No container created");
}
}
}

struct LogReceiver<'a> {
endpoint_name: &'a str,
max_endpoint_name_length: &'a usize,
Expand Down
2 changes: 1 addition & 1 deletion src/job/runnable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::util::docker::ImageName;
use crate::util::EnvironmentVariableName;

/// A job configuration that can be run. All inputs are clear here.
#[derive(Debug, Getters)]
#[derive(Clone, Debug, Getters)]
pub struct RunnableJob {
#[getset(get = "pub")]
uuid: Uuid,
Expand Down
108 changes: 67 additions & 41 deletions src/orchestrator/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use std::borrow::Borrow;
use std::collections::HashMap;
use std::path::PathBuf;
use std::process::ExitCode;
use std::sync::Arc;
use std::sync::Mutex;

Expand All @@ -32,8 +33,9 @@ use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use tracing::{debug, error, trace};
use tracing::{debug, error, info, trace};
use typed_builder::TypedBuilder;
use uuid::Uuid;

Expand Down Expand Up @@ -265,12 +267,26 @@ impl Borrow<ArtifactPath> for ProducedArtifact {

impl<'a> Orchestrator<'a> {
pub async fn run(self, output: &mut Vec<ArtifactPath>) -> Result<HashMap<Uuid, Error>> {
let (results, errors) = self.run_tree().await?;
let token = CancellationToken::new();
let cloned_token = token.clone();

tokio::spawn(async move {
info!("Received the ctl-c signal, stopping...");
tokio::signal::ctrl_c().await.unwrap();
token.cancel();
ExitCode::from(1)
});

let (results, errors) = self.run_tree(cloned_token).await?;

output.extend(results);
Ok(errors)
}

async fn run_tree(self) -> Result<(Vec<ArtifactPath>, HashMap<Uuid, Error>)> {
async fn run_tree(
self,
token: CancellationToken,
) -> Result<(Vec<ArtifactPath>, HashMap<Uuid, Error>)> {
let prepare_span = tracing::debug_span!("run tree preparation");

// There is no async code until we drop this guard, so this is fine
Expand Down Expand Up @@ -452,45 +468,55 @@ impl<'a> Orchestrator<'a> {
// The JobTask::run implementation handles the rest, we just have to wait for all futures
// to succeed.
let run_span = tracing::debug_span!("run");
let running_jobs = jobs
.into_iter()
.map(|prep| {
trace!(parent: &run_span, job_uuid = %prep.1.jobdef.job.uuid(), "Creating JobTask");
// the sender is set or we need to use the root sender
let sender = prep
.3
.into_inner()
.unwrap_or_else(|| vec![root_sender.clone()]);
JobTask::new(prep.0, prep.1, sender)
})
.inspect(
|task| trace!(parent: &run_span, job_uuid = %task.jobdef.job.uuid(), "Running job"),
)
.map(|task| {
task.run()
.instrument(tracing::debug_span!(parent: &run_span, "JobTask::run"))
})
.collect::<futures::stream::FuturesUnordered<_>>();
debug!("Built {} jobs", running_jobs.len());

running_jobs
.collect::<Result<()>>()
.instrument(run_span.clone())
.await?;
trace!(parent: &run_span, "All jobs finished");
drop(run_span);

match root_receiver.recv().await {
None => Err(anyhow!("No result received...")),
Some(Ok(results)) => {
let results = results
.into_iter()
.flat_map(|tpl| tpl.1.into_iter())
.map(ProducedArtifact::unpack)
.collect();
Ok((results, HashMap::with_capacity(0)))

tokio::select! {
_ = token.cancelled() => {
anyhow::bail!("Received Control-C signal");
}
r = async {
let running_jobs = jobs
.into_iter()
.map(|prep| {
trace!(parent: &run_span, job_uuid = %prep.1.jobdef.job.uuid(), "Creating JobTask");
// the sender is set or we need to use the root sender
let sender = prep
.3
.into_inner()
.unwrap_or_else(|| vec![root_sender.clone()]);
JobTask::new(prep.0, prep.1, sender)
})
.inspect(
|task| trace!(parent: &run_span, job_uuid = %task.jobdef.job.uuid(), "Running job"),
)
.map(|task| {
task.run()
.instrument(tracing::debug_span!(parent: &run_span, "JobTask::run"))
})
.collect::<futures::stream::FuturesUnordered<_>>();
debug!("Built {} jobs", running_jobs.len());

running_jobs
.collect::<Result<()>>()
.instrument(run_span.clone())
.await?;
trace!(parent: &run_span, "All jobs finished");
drop(run_span);

match root_receiver.recv().await {
None => Err(anyhow!("No result received...")),
Some(Ok(results)) => {
let results = results
.into_iter()
.flat_map(|tpl| tpl.1.into_iter())
.map(ProducedArtifact::unpack)
.collect();
Ok((results, HashMap::with_capacity(0)))
}
Some(Err(errors)) => Ok((vec![], errors)),
}
} => {
r
}
Some(Err(errors)) => Ok((vec![], errors)),
}
}
}
Expand Down

0 comments on commit 752d2fc

Please sign in to comment.