diff --git a/Cargo.lock b/Cargo.lock index e5b76b6..815268a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -243,12 +243,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "atomic" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" - [[package]] name = "atomic-waker" version = "1.1.2" @@ -813,7 +807,6 @@ dependencies = [ "clap", "futures", "hex", - "human-repr", "reqwest", "serde", "serde_json", @@ -855,7 +848,6 @@ dependencies = [ "gix", "globset", "hex", - "human-repr", "joinery", "json-canon", "lazy_format", @@ -884,7 +876,6 @@ dependencies = [ "strum 0.26.3", "superconsole", "tar", - "termwiz 0.22.0", "thiserror", "tick-encoding", "tokio", @@ -1033,12 +1024,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "cfg_aliases" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" - [[package]] name = "cfg_aliases" version = "0.2.1" @@ -1793,31 +1778,12 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "euclid" -version = "0.22.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0f0eb73b934648cd7a4a61f1b15391cd95dab0b4da6e2e66c2a072c144b4a20" -dependencies = [ - "num-traits", -] - [[package]] name = "event-listener" version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" -[[package]] -name = "fancy-regex" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b95f7c0680e4142284cf8b22c14a476e87d61b004a3a0861872b32ef7ead40a2" -dependencies = [ - "bit-set", - "regex", -] - [[package]] name = "faster-hex" version = "0.9.0" @@ -3159,12 +3125,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "human-repr" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f58b778a5761513caf593693f8951c97a5b610841e754788400f32102eefdff1" - [[package]] name = "human_format" version = "1.1.0" @@ -3625,16 +3585,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "mac_address" -version = "1.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8836fae9d0d4be2c8b4efcdd79e828a2faa058a90d005abf42f91cac5493a08e" -dependencies = [ - "nix 0.28.0", - "winapi", -] - [[package]] name = "matchers" version = "0.1.0" @@ -3824,19 +3774,6 @@ dependencies = [ "pin-utils", ] -[[package]] -name = "nix" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" -dependencies = [ - "bitflags 2.6.0", - "cfg-if 1.0.0", - "cfg_aliases 0.1.1", - "libc", - "memoffset 0.9.0", -] - [[package]] name = "nix" version = "0.29.0" @@ -3845,7 +3782,7 @@ checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ "bitflags 2.6.0", "cfg-if 1.0.0", - "cfg_aliases 0.2.1", + "cfg_aliases", "libc", ] @@ -4085,15 +4022,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "ordered-float" -version = "4.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" -dependencies = [ - "num-traits", -] - [[package]] name = "outref" version = "0.1.0" @@ -5844,7 +5772,7 @@ dependencies = [ "crossbeam-epoch", "crossterm", "itertools 0.10.5", - "termwiz 0.18.0", + "termwiz", "thiserror", "unicode-segmentation", ] @@ -6342,19 +6270,6 @@ dependencies = [ "phf_codegen", ] -[[package]] -name = "terminfo" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "666cd3a6681775d22b200409aad3b089c5b99fb11ecdd8a204d9d62f8148498f" -dependencies = [ - "dirs 4.0.0", - "fnv", - "nom 7.1.3", - "phf 0.11.2", - "phf_codegen", -] - [[package]] name = "termios" version = "0.3.3" @@ -6385,7 +6300,7 @@ dependencies = [ "nix 0.24.3", "num-derive", "num-traits", - "ordered-float 3.9.2", + "ordered-float", "pest", "pest_derive", "phf 0.10.1", @@ -6394,59 +6309,15 @@ dependencies = [ "sha2 0.9.9", "signal-hook 0.1.17", "siphasher", - "terminfo 0.7.5", + "terminfo", "termios", "thiserror", "ucd-trie", "unicode-segmentation", "vtparse", "wezterm-bidi", - "wezterm-color-types 0.2.0", - "wezterm-dynamic 0.1.0", - "winapi", -] - -[[package]] -name = "termwiz" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a75313e21da5d4406ea31402035b3b97aa74c04356bdfafa5d1043ab4e551d1" -dependencies = [ - "anyhow", - "base64 0.21.7", - "bitflags 2.6.0", - "fancy-regex", - "filedescriptor", - "finl_unicode", - "fixedbitset", - "hex", - "lazy_static", - "libc", - "log", - "memmem", - "nix 0.26.4", - "num-derive", - "num-traits", - "ordered-float 4.2.0", - "pest", - "pest_derive", - "phf 0.11.2", - "semver 0.11.0", - "sha2 0.10.8", - "signal-hook 0.3.17", - "siphasher", - "tempfile", - "terminfo 0.8.0", - "termios", - "thiserror", - "ucd-trie", - "unicode-segmentation", - "vtparse", - "wezterm-bidi", - "wezterm-blob-leases", - "wezterm-color-types 0.3.0", - "wezterm-dynamic 0.2.0", - "wezterm-input-types", + "wezterm-color-types", + "wezterm-dynamic", "winapi", ] @@ -6997,10 +6868,6 @@ name = "uuid" version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" -dependencies = [ - "atomic", - "getrandom", -] [[package]] name = "v8" @@ -7233,21 +7100,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1560382cf39b0fa92473eae4d5b3772f88c63202cbf5a72c35db72ba99e66c36" dependencies = [ "log", - "wezterm-dynamic 0.1.0", -] - -[[package]] -name = "wezterm-blob-leases" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5a5e0adf7eed68976410def849a4bdab6f6e9f6163f152de9cb89deea9e60b" -dependencies = [ - "getrandom", - "mac_address", - "once_cell", - "sha2 0.10.8", - "thiserror", - "uuid", + "wezterm-dynamic", ] [[package]] @@ -7259,19 +7112,7 @@ dependencies = [ "csscolorparser", "deltae", "lazy_static", - "wezterm-dynamic 0.1.0", -] - -[[package]] -name = "wezterm-color-types" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7de81ef35c9010270d63772bebef2f2d6d1f2d20a983d27505ac850b8c4b4296" -dependencies = [ - "csscolorparser", - "deltae", - "lazy_static", - "wezterm-dynamic 0.2.0", + "wezterm-dynamic", ] [[package]] @@ -7281,20 +7122,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75e78c0cc60a76de5d93f9dad05651105351e151b6446ab305514945d7588aa" dependencies = [ "log", - "ordered-float 3.9.2", - "strsim", - "thiserror", - "wezterm-dynamic-derive", -] - -[[package]] -name = "wezterm-dynamic" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfb128bacfa86734e07681fb6068e34c144698e84ee022d6e009145d1abb77b5" -dependencies = [ - "log", - "ordered-float 4.2.0", + "ordered-float", "strsim", "thiserror", "wezterm-dynamic-derive", @@ -7311,18 +7139,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "wezterm-input-types" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7012add459f951456ec9d6c7e6fc340b1ce15d6fc9629f8c42853412c029e57e" -dependencies = [ - "bitflags 1.3.2", - "euclid", - "lazy_static", - "wezterm-dynamic 0.2.0", -] - [[package]] name = "which" version = "4.4.2" diff --git a/crates/brioche-core/Cargo.toml b/crates/brioche-core/Cargo.toml index 2741e03..644a4f4 100644 --- a/crates/brioche-core/Cargo.toml +++ b/crates/brioche-core/Cargo.toml @@ -24,7 +24,6 @@ directories = "5.0.1" futures = "0.3.29" globset = "0.4.14" hex = "0.4.3" -human-repr = "1.1.0" joinery = "3.1.0" json-canon = "0.1.3" lazy_format = "2.0.3" @@ -48,7 +47,6 @@ sha2 = "0.10.8" sqlx = { version = "0.7.3", features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "json"] } strum = { version = "0.26.3", features = ["derive"] } superconsole = "0.2.0" -termwiz = "0.22.0" thiserror = "1.0.51" tick-encoding = "0.1.2" tokio = { version = "1.35.0", features = ["full", "tracing"] } diff --git a/crates/brioche-core/src/bake/process.rs b/crates/brioche-core/src/bake/process.rs index dce3a6e..5084eca 100644 --- a/crates/brioche-core/src/bake/process.rs +++ b/crates/brioche-core/src/bake/process.rs @@ -15,6 +15,10 @@ use crate::{ CompleteProcessTemplateComponent, CompressionFormat, DirectoryError, DownloadRecipe, Meta, ProcessRecipe, ProcessTemplate, ProcessTemplateComponent, Recipe, Unarchive, WithMeta, }, + reporter::{ + job::{NewJob, ProcessPacket, ProcessStatus, UpdateJob}, + JobId, + }, sandbox::{ HostPathMode, SandboxExecutionConfig, SandboxPath, SandboxPathOptions, SandboxTemplate, SandboxTemplateComponent, @@ -288,6 +292,12 @@ pub async fn bake_process( let _permit = brioche.process_semaphore.acquire().await; tracing::debug!("acquired process semaphore permit"); + let created_at = std::time::Instant::now(); + let mut job_status = ProcessStatus::Preparing { created_at }; + let job_id = brioche.reporter.add_job(NewJob::Process { + status: job_status.clone(), + }); + let hash = Recipe::CompleteProcess(process.clone()).hash(); let temp_dir = brioche.home.join("process-temp"); @@ -502,9 +512,17 @@ pub async fn bake_process( }; let result = if brioche.self_exec_processes { - run_sandboxed_self_exec(brioche, sandbox_config, stdout_file, stderr_file).await + run_sandboxed_self_exec( + brioche, + sandbox_config, + job_id, + &mut job_status, + stdout_file, + stderr_file, + ) + .await } else { - run_sandboxed_inline(sandbox_config).await + run_sandboxed_inline(brioche, sandbox_config, job_id, &mut job_status).await }; match result { @@ -541,10 +559,31 @@ pub async fn bake_process( bake_dir.remove().await?; } + job_status.to_finalized(std::time::Instant::now())?; + brioche.reporter.update_job( + job_id, + UpdateJob::ProcessUpdateStatus { + status: job_status.clone(), + }, + ); + Ok(result.value) } -async fn run_sandboxed_inline(sandbox_config: SandboxExecutionConfig) -> anyhow::Result<()> { +async fn run_sandboxed_inline( + brioche: &Brioche, + sandbox_config: SandboxExecutionConfig, + job_id: JobId, + job_status: &mut ProcessStatus, +) -> anyhow::Result<()> { + job_status.to_running(std::time::Instant::now(), None)?; + brioche.reporter.update_job( + job_id, + UpdateJob::ProcessUpdateStatus { + status: job_status.clone(), + }, + ); + let status = tokio::task::spawn_blocking(|| crate::sandbox::run_sandbox(sandbox_config)).await??; @@ -553,12 +592,22 @@ async fn run_sandboxed_inline(sandbox_config: SandboxExecutionConfig) -> anyhow: "sandboxed process exited with non-zero status code" ); + job_status.to_ran(std::time::Instant::now())?; + brioche.reporter.update_job( + job_id, + UpdateJob::ProcessUpdateStatus { + status: job_status.clone(), + }, + ); + Ok(()) } async fn run_sandboxed_self_exec( brioche: &Brioche, sandbox_config: SandboxExecutionConfig, + job_id: JobId, + job_status: &mut ProcessStatus, write_stdout: impl tokio::io::AsyncWrite + Send + Sync + 'static, write_stderr: impl tokio::io::AsyncWrite + Send + Sync + 'static, ) -> anyhow::Result<()> { @@ -573,15 +622,17 @@ async fn run_sandboxed_self_exec( .stderr(std::process::Stdio::piped()) .spawn()?; - let start = std::time::Instant::now(); let child_id = child.id(); let mut stdout = child.stdout.take().expect("failed to get stdout"); let mut stderr = child.stderr.take().expect("failed to get stderr"); - let mut job_status = crate::reporter::ProcessStatus::Running { child_id, start }; - let job_id = brioche.reporter.add_job(crate::reporter::NewJob::Process { - status: job_status.clone(), - }); + job_status.to_running(std::time::Instant::now(), child_id)?; + brioche.reporter.update_job( + job_id, + UpdateJob::ProcessUpdateStatus { + status: job_status.clone(), + }, + ); tokio::task::spawn({ let brioche = brioche.clone(); @@ -595,12 +646,12 @@ async fn run_sandboxed_self_exec( bytes_read = stdout.read(&mut stdout_buffer) => { let buffer = &stdout_buffer[..bytes_read?]; write_stdout.write_all(buffer).await?; - crate::reporter::ProcessPacket::Stdout(buffer.to_vec()) + ProcessPacket::Stdout(buffer.to_vec()) } bytes_read = stderr.read(&mut stderr_buffer) => { let buffer = &stderr_buffer[..bytes_read?]; write_stderr.write_all(buffer).await?; - crate::reporter::ProcessPacket::Stdout(buffer.to_vec()) + ProcessPacket::Stderr(buffer.to_vec()) } }; @@ -610,11 +661,10 @@ async fn run_sandboxed_self_exec( brioche.reporter.update_job( job_id, - crate::reporter::UpdateJob::Process { - packet: Some(packet).into(), - status: job_status.clone(), + UpdateJob::ProcessPushPacket { + packet: packet.into(), }, - ) + ); } anyhow::Ok(()) @@ -622,26 +672,24 @@ async fn run_sandboxed_self_exec( }); let output = child.wait_with_output().await; - let status = output.as_ref().ok().map(|output| output.status); - job_status = crate::reporter::ProcessStatus::Exited { - child_id, - status, - elapsed: start.elapsed(), - }; - brioche.reporter.update_job( - job_id, - crate::reporter::UpdateJob::Process { - packet: None.into(), - status: job_status, - }, - ); + brioche + .reporter + .update_job(job_id, UpdateJob::ProcessFlushPackets); let result = output?; if !result.status.success() { anyhow::bail!("process exited with status code {}", result.status); } + job_status.to_ran(std::time::Instant::now())?; + brioche.reporter.update_job( + job_id, + UpdateJob::ProcessUpdateStatus { + status: job_status.clone(), + }, + ); + Ok(()) } diff --git a/crates/brioche-core/src/bake/unarchive.rs b/crates/brioche-core/src/bake/unarchive.rs index b173729..198eab1 100644 --- a/crates/brioche-core/src/bake/unarchive.rs +++ b/crates/brioche-core/src/bake/unarchive.rs @@ -6,6 +6,7 @@ use bstr::BString; use crate::{ blob::BlobHash, recipe::{Artifact, Directory, File, Meta, Unarchive, WithMeta}, + reporter::job::{NewJob, UpdateJob}, Brioche, }; @@ -27,7 +28,9 @@ pub async fn bake_unarchive( tracing::debug!(%blob_hash, archive = ?unarchive.archive, compression = ?unarchive.compression, "starting unarchive"); - let job_id = brioche.reporter.add_job(crate::reporter::NewJob::Unarchive); + let job_id = brioche.reporter.add_job(NewJob::Unarchive { + started_at: std::time::Instant::now(), + }); let archive_path = { let mut permit = crate::blob::get_save_blob_permit().await?; @@ -62,7 +65,10 @@ pub async fn bake_unarchive( let progress_percent = (estimated_progress * 100.0).min(99.0) as u8; brioche.reporter.update_job( job_id, - crate::reporter::UpdateJob::Unarchive { progress_percent }, + UpdateJob::Unarchive { + progress_percent, + finished_at: None, + }, ); let entry = match archive_entry.header().entry_type() { @@ -175,8 +181,9 @@ pub async fn bake_unarchive( brioche.reporter.update_job( job_id, - crate::reporter::UpdateJob::Unarchive { + UpdateJob::Unarchive { progress_percent: 100, + finished_at: Some(std::time::Instant::now()), }, ); diff --git a/crates/brioche-core/src/download.rs b/crates/brioche-core/src/download.rs index 9821294..452197f 100644 --- a/crates/brioche-core/src/download.rs +++ b/crates/brioche-core/src/download.rs @@ -2,7 +2,10 @@ use anyhow::Context as _; use futures::TryStreamExt as _; use tokio_util::compat::FuturesAsyncReadCompatExt as _; -use crate::Brioche; +use crate::{ + reporter::job::{NewJob, UpdateJob}, + Brioche, +}; #[tracing::instrument(skip(brioche, expected_hash))] pub async fn download( @@ -20,9 +23,10 @@ pub async fn download( tracing::debug!(%url, "starting download"); - let job_id = brioche - .reporter - .add_job(crate::reporter::NewJob::Download { url: url.clone() }); + let job_id = brioche.reporter.add_job(NewJob::Download { + url: url.clone(), + started_at: std::time::Instant::now(), + }); let response = brioche.download_client.get(url.clone()).send().await?; let response = response.error_for_status()?; @@ -52,8 +56,9 @@ pub async fn download( let progress_percent = progress_percent.round().min(99.0) as u8; brioche.reporter.update_job( job_id, - crate::reporter::UpdateJob::Download { + UpdateJob::Download { progress_percent: Some(progress_percent), + finished_at: None, }, ); } @@ -73,8 +78,9 @@ pub async fn download( brioche.reporter.update_job( job_id, - crate::reporter::UpdateJob::Download { + UpdateJob::Download { progress_percent: Some(100), + finished_at: Some(std::time::Instant::now()), }, ); diff --git a/crates/brioche-core/src/registry.rs b/crates/brioche-core/src/registry.rs index 3467b85..bbb6ebf 100644 --- a/crates/brioche-core/src/registry.rs +++ b/crates/brioche-core/src/registry.rs @@ -12,6 +12,7 @@ use crate::{ blob::BlobHash, project::{Project, ProjectHash}, recipe::{Artifact, Recipe, RecipeHash}, + reporter::job::{NewJob, UpdateJob}, Brioche, }; @@ -358,12 +359,11 @@ pub async fn fetch_bake_references( return Ok(()); } - let job_id = brioche - .reporter - .add_job(crate::reporter::NewJob::RegistryFetch { - total_blobs: unknown_blobs.len(), - total_recipes: unknown_recipes.len(), - }); + let job_id = brioche.reporter.add_job(NewJob::RegistryFetch { + total_blobs: unknown_blobs.len(), + total_recipes: unknown_recipes.len(), + started_at: std::time::Instant::now(), + }); let fetch_blobs_fut = futures::stream::iter(unknown_blobs) .map(Ok) @@ -376,7 +376,7 @@ pub async fn fetch_bake_references( brioche.reporter.update_job( job_id, - crate::reporter::UpdateJob::RegistryFetchAdd { + UpdateJob::RegistryFetchAdd { blobs_fetched: 1, recipes_fetched: 0, }, @@ -401,7 +401,7 @@ pub async fn fetch_bake_references( brioche.reporter.update_job( job_id, - crate::reporter::UpdateJob::RegistryFetchAdd { + UpdateJob::RegistryFetchAdd { blobs_fetched: 0, recipes_fetched: 1, }, @@ -418,9 +418,12 @@ pub async fn fetch_bake_references( crate::recipe::save_recipes(&brioche, new_recipes).await?; - brioche - .reporter - .update_job(job_id, crate::reporter::UpdateJob::RegistryFetchFinish); + brioche.reporter.update_job( + job_id, + UpdateJob::RegistryFetchFinish { + finished_at: std::time::Instant::now(), + }, + ); Ok(()) } @@ -433,12 +436,11 @@ pub async fn fetch_recipes_deep( let mut pending_recipes = recipes; let mut checked_recipes = HashSet::new(); - let job_id = brioche - .reporter - .add_job(crate::reporter::NewJob::RegistryFetch { - total_blobs: 0, - total_recipes: 0, - }); + let job_id = brioche.reporter.add_job(NewJob::RegistryFetch { + total_blobs: 0, + total_recipes: 0, + started_at: std::time::Instant::now(), + }); let mut total_to_fetch = 0; @@ -457,7 +459,7 @@ pub async fn fetch_recipes_deep( total_to_fetch += unknown_recipes.len(); brioche.reporter.update_job( job_id, - crate::reporter::UpdateJob::RegistryFetchUpdate { + UpdateJob::RegistryFetchUpdate { complete_blobs: None, complete_recipes: None, total_blobs: None, @@ -485,7 +487,7 @@ pub async fn fetch_recipes_deep( brioche.reporter.update_job( job_id, - crate::reporter::UpdateJob::RegistryFetchAdd { + UpdateJob::RegistryFetchAdd { blobs_fetched: 0, recipes_fetched: 1, }, @@ -510,9 +512,12 @@ pub async fn fetch_recipes_deep( crate::recipe::save_recipes(brioche, new_recipes).await?; } - brioche - .reporter - .update_job(job_id, crate::reporter::UpdateJob::RegistryFetchFinish); + brioche.reporter.update_job( + job_id, + UpdateJob::RegistryFetchFinish { + finished_at: std::time::Instant::now(), + }, + ); Ok(()) } @@ -553,12 +558,11 @@ pub async fn fetch_blobs(brioche: Brioche, blobs: &HashSet) -> anyhow: return Ok(()); } - let job_id = brioche - .reporter - .add_job(crate::reporter::NewJob::RegistryFetch { - total_blobs: unknown_blobs.len(), - total_recipes: 0, - }); + let job_id = brioche.reporter.add_job(NewJob::RegistryFetch { + total_blobs: unknown_blobs.len(), + total_recipes: 0, + started_at: std::time::Instant::now(), + }); futures::stream::iter(unknown_blobs) .map(Ok) @@ -571,7 +575,7 @@ pub async fn fetch_blobs(brioche: Brioche, blobs: &HashSet) -> anyhow: brioche.reporter.update_job( job_id, - crate::reporter::UpdateJob::RegistryFetchAdd { + UpdateJob::RegistryFetchAdd { blobs_fetched: 1, recipes_fetched: 0, }, @@ -582,9 +586,12 @@ pub async fn fetch_blobs(brioche: Brioche, blobs: &HashSet) -> anyhow: }) .await?; - brioche - .reporter - .update_job(job_id, crate::reporter::UpdateJob::RegistryFetchFinish); + brioche.reporter.update_job( + job_id, + UpdateJob::RegistryFetchFinish { + finished_at: std::time::Instant::now(), + }, + ); Ok(()) } diff --git a/crates/brioche-core/src/reporter.rs b/crates/brioche-core/src/reporter.rs index 43ebd2e..dbbd61f 100644 --- a/crates/brioche-core/src/reporter.rs +++ b/crates/brioche-core/src/reporter.rs @@ -1,205 +1,13 @@ -use std::{ - collections::HashMap, - sync::{ - atomic::{AtomicBool, AtomicUsize}, - Arc, RwLock, - }, -}; +use std::sync::{atomic::AtomicUsize, Arc}; -use bstr::ByteSlice; -use debug_ignore::DebugIgnore; -use human_repr::HumanDuration as _; -use joinery::JoinableIterator as _; -use opentelemetry::trace::TracerProvider; use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt as _, Layer as _}; +pub mod console; +pub mod job; + const DEFAULT_TRACING_LEVEL: &str = "brioche=info"; const DEFAULT_DEBUG_TRACING_LEVEL: &str = "brioche=debug"; -#[derive(Debug, Clone, Copy)] -pub enum ConsoleReporterKind { - Auto, - SuperConsole, - Plain, -} - -pub fn start_console_reporter( - kind: ConsoleReporterKind, -) -> anyhow::Result<(Reporter, ReporterGuard)> { - let jobs = Arc::new(tokio::sync::RwLock::new(HashMap::new())); - let queued_lines = Arc::new(tokio::sync::RwLock::new(Vec::new())); - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); - - let brioche_otel_enabled = matches!( - std::env::var("BRIOCHE_ENABLE_OTEL").as_deref(), - Ok("1") | Ok("true") - ); - - let start = std::time::Instant::now(); - let is_evaluating = Arc::new(AtomicBool::new(false)); - - let reporter = Reporter { - start, - num_jobs: Arc::new(AtomicUsize::new(0)), - is_evaluating: is_evaluating.clone(), - tx: tx.clone(), - }; - let guard = ReporterGuard { - tx, - shutdown_rx: Some(shutdown_rx), - shutdown_opentelemetry: brioche_otel_enabled, - }; - - std::thread::spawn({ - let queued_lines = queued_lines.clone(); - let jobs = jobs.clone(); - move || { - let superconsole = match kind { - ConsoleReporterKind::Auto => superconsole::SuperConsole::new(), - ConsoleReporterKind::SuperConsole => Some(superconsole::SuperConsole::forced_new( - superconsole::Dimensions { - width: 80, - height: 24, - }, - )), - ConsoleReporterKind::Plain => None, - }; - let mut console = match superconsole { - Some(console) => { - let root = JobsComponent { - start, - is_evaluating, - jobs, - terminal: tokio::sync::RwLock::new(termwiz::surface::Surface::new(80, 24)), - }; - ConsoleReporter::SuperConsole { - console, - root, - partial_lines: HashMap::new(), - } - } - None => ConsoleReporter::Plain { - partial_lines: HashMap::new(), - }, - }; - - let mut running = true; - while running { - while let Ok(event) = rx.try_recv() { - match event { - ReportEvent::Emit { lines } => { - console.emit(lines); - } - ReportEvent::AddJob { id, job } => { - console.add_job(id, job); - } - ReportEvent::UpdateJobState { id, update } => { - console.update_job(id, update); - } - ReportEvent::Shutdown => { - running = false; - } - } - } - let mut queued_lines = queued_lines.blocking_write(); - for lines in queued_lines.drain(..) { - console.emit(lines); - } - - let _ = console.render(); - - std::thread::sleep(std::time::Duration::from_millis(100)); - } - - let _ = console.finalize(); - let _ = shutdown_tx.send(()); - } - }); - - let opentelemetry_layer = if brioche_otel_enabled { - opentelemetry::global::set_text_map_propagator( - opentelemetry_sdk::propagation::TraceContextPropagator::new(), - ); - let provider = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter() - .http() - .with_http_client(reqwest::Client::new()), - ) - .with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource( - opentelemetry_sdk::Resource::default().merge(&opentelemetry_sdk::Resource::new( - vec![ - opentelemetry::KeyValue::new( - opentelemetry_semantic_conventions::resource::SERVICE_NAME, - "brioche", - ), - opentelemetry::KeyValue::new( - opentelemetry_semantic_conventions::resource::SERVICE_VERSION, - env!("CARGO_PKG_VERSION"), - ), - ], - )), - )) - .install_simple()?; - - Some( - tracing_opentelemetry::layer() - .with_tracer(provider.tracer("tracing-opentelemetry")) - .with_filter(tracing_debug_filter()), - ) - } else { - None - }; - - let log_file_layer = match std::env::var_os("BRIOCHE_LOG_OUTPUT") { - Some(debug_output_path) => { - let debug_output = - std::fs::File::create(debug_output_path).expect("failed to open debug output path"); - Some( - tracing_subscriber::fmt::layer() - .json() - .with_writer(debug_output) - .with_timer(tracing_subscriber::fmt::time::uptime()) - .with_filter(tracing_debug_filter()), - ) - } - _ => None, - }; - - let tracing_console_layer = - std::env::var_os("BRIOCHE_CONSOLE").map(|_| console_subscriber::spawn()); - - // HACK: Add a filter to the subscriber to remove debug logs that we - // shouldn't see if no other layer needs them. This is a workaround for - // this issue: https://github.com/tokio-rs/tracing/issues/2448 - let root_filter = match ( - &log_file_layer, - &opentelemetry_layer, - &tracing_console_layer, - ) { - (None, None, None) => Some(tracing_output_filter()), - (_, _, Some(_)) => Some(tracing_root_filter()), - _ => None, - }; - - let reporter_layer = tracing_subscriber::fmt::layer() - .compact() - .with_writer(reporter.clone()) - .without_time() - .with_filter(tracing_output_filter()); - tracing_subscriber::registry() - .with(root_filter) - .with(tracing_console_layer) - .with(reporter_layer) - .with(log_file_layer) - .with(opentelemetry_layer) - .init(); - - Ok((reporter, guard)) -} - fn tracing_output_filter() -> tracing_subscriber::EnvFilter { tracing_subscriber::EnvFilter::builder() .with_default_directive(DEFAULT_TRACING_LEVEL.parse().expect("invalid filter")) @@ -219,193 +27,12 @@ fn tracing_root_filter() -> tracing_subscriber::EnvFilter { .add_directive("runtime=trace".parse().expect("invalid filter")) } -enum ConsoleReporter { - SuperConsole { - console: superconsole::SuperConsole, - root: JobsComponent, - partial_lines: HashMap>, - }, - Plain { - partial_lines: HashMap>, - }, -} - -impl ConsoleReporter { - fn emit(&mut self, lines: superconsole::Lines) { - match self { - ConsoleReporter::SuperConsole { console, .. } => { - console.emit(lines); - } - ConsoleReporter::Plain { partial_lines: _ } => { - for line in lines { - eprintln!("{}", line.to_unstyled()); - } - } - } - } - - fn add_job(&mut self, id: JobId, job: NewJob) { - match self { - ConsoleReporter::SuperConsole { root, .. } => { - let mut jobs = root.jobs.blocking_write(); - let new_job = Job::new(job); - jobs.insert(id, new_job); - } - ConsoleReporter::Plain { partial_lines: _ } => match job { - NewJob::Download { url } => { - eprintln!("Downloading {}", url); - } - NewJob::Unarchive => {} - NewJob::Process { status } => { - if let Some(child_id) = status.child_id() { - eprintln!("Started process {child_id}"); - } else { - eprintln!("Started process"); - } - } - NewJob::RegistryFetch { - total_blobs, - total_recipes, - } => { - eprintln!( - "Fetching {total_blobs} blob{} / {total_recipes} recipe{} from registry", - if total_blobs == 1 { "" } else { "s" }, - if total_recipes == 1 { "" } else { "s" }, - ); - } - }, - } - } - - fn update_job(&mut self, id: JobId, update: UpdateJob) { - match self { - ConsoleReporter::SuperConsole { - root, - partial_lines, - .. - } => { - if let UpdateJob::Process { - ref packet, - ref status, - } = update - { - let mut terminal = root.terminal.blocking_write(); - if let Some(packet) = &packet.0 { - let child_id = status - .child_id() - .map(|id| id.to_string()) - .unwrap_or_else(|| "?".to_string()); - let buffer = partial_lines.entry(id).or_default(); - buffer.extend_from_slice(packet.bytes()); - - if let Some((lines, remainder)) = buffer.rsplit_once_str(b"\n") { - // Write each output line to the terminal, preceded - // by the process ID. We also use "\r\n" since we're - // writing to a terminal-like output. - for line in lines.split(|&b| b == b'\n') { - terminal.add_change("\r\n"); - terminal.add_change(format!("[{child_id}] ")); - terminal.add_change(String::from_utf8_lossy(line)); - } - - *buffer = remainder.to_vec(); - } - } - }; - - let mut jobs = root.jobs.blocking_write(); - let Some(job) = jobs.get_mut(&id) else { - return; - }; - let _ = job.update(update); - } - ConsoleReporter::Plain { partial_lines } => match update { - UpdateJob::Download { progress_percent } => { - if progress_percent == Some(100) { - eprintln!("Finished download"); - } - } - UpdateJob::Unarchive { progress_percent } => { - if progress_percent == 100 { - eprintln!("Unarchive"); - } - } - UpdateJob::Process { mut packet, status } => { - let child_id = status - .child_id() - .map(|id| id.to_string()) - .unwrap_or_else(|| "?".to_string()); - - if let Some(packet) = packet.take() { - let buffer = partial_lines.entry(id).or_default(); - buffer.extend_from_slice(packet.bytes()); - if let Some((lines, remainder)) = buffer.rsplit_once_str(b"\n") { - let lines = bstr::BStr::new(lines); - for line in lines.lines() { - eprintln!("[{child_id}] {}", bstr::BStr::new(line)); - } - *buffer = remainder.to_vec(); - } - } - - match status { - ProcessStatus::Running { .. } => {} - ProcessStatus::Exited { status, .. } => { - if let Some(code) = status.as_ref().and_then(|status| status.code()) { - eprintln!("Process {child_id} exited with code {}", code); - } else { - eprintln!("Process {child_id} exited"); - } - } - } - } - UpdateJob::RegistryFetchAdd { .. } => {} - UpdateJob::RegistryFetchUpdate { .. } => {} - UpdateJob::RegistryFetchFinish => { - eprintln!("Finished fetching from registry"); - } - }, - } - } - - fn render(&mut self) -> anyhow::Result<()> { - match self { - ConsoleReporter::SuperConsole { - console, - root, - partial_lines: _, - } => { - console.render(root)?; - } - ConsoleReporter::Plain { .. } => {} - } - - Ok(()) - } - - fn finalize(self) -> anyhow::Result<()> { - match self { - ConsoleReporter::SuperConsole { - console, - root, - partial_lines: _, - } => { - console.finalize(&root)?; - } - ConsoleReporter::Plain { .. } => {} - } - - anyhow::Ok(()) - } -} - pub fn start_lsp_reporter(client: tower_lsp::Client) -> (Reporter, ReporterGuard) { let (tx, _) = tokio::sync::mpsc::unbounded_channel(); let reporter = Reporter { start: std::time::Instant::now(), num_jobs: Arc::new(AtomicUsize::new(0)), - is_evaluating: Arc::new(AtomicBool::new(false)), tx: tx.clone(), }; let guard = ReporterGuard { @@ -453,7 +80,6 @@ pub fn start_null_reporter() -> (Reporter, ReporterGuard) { let reporter = Reporter { start: std::time::Instant::now(), num_jobs: Arc::new(AtomicUsize::new(0)), - is_evaluating: Arc::new(AtomicBool::new(false)), tx: tx.clone(), }; let guard = ReporterGuard { @@ -489,7 +115,6 @@ pub fn start_test_reporter() -> (Reporter, ReporterGuard) { let reporter = Reporter { start: std::time::Instant::now(), num_jobs: Arc::new(AtomicUsize::new(0)), - is_evaluating: Arc::new(AtomicBool::new(false)), tx: tx.clone(), }; let guard = ReporterGuard { @@ -535,401 +160,6 @@ impl Drop for ReporterGuard { } } -#[derive(Debug)] -pub enum NewJob { - Download { - url: url::Url, - }, - Unarchive, - Process { - status: ProcessStatus, - }, - RegistryFetch { - total_blobs: usize, - total_recipes: usize, - }, -} - -#[derive(Debug)] -pub enum UpdateJob { - Download { - progress_percent: Option, - }, - Unarchive { - progress_percent: u8, - }, - Process { - packet: DebugIgnore>, - status: ProcessStatus, - }, - RegistryFetchAdd { - blobs_fetched: usize, - recipes_fetched: usize, - }, - RegistryFetchUpdate { - total_blobs: Option, - total_recipes: Option, - complete_blobs: Option, - complete_recipes: Option, - }, - RegistryFetchFinish, -} - -#[derive(Debug)] -pub enum Job { - Download { - url: url::Url, - progress_percent: Option, - }, - Unarchive { - progress_percent: u8, - }, - Process { - packet_queue: DebugIgnore>>>, - status: ProcessStatus, - }, - RegistryFetch { - complete_blobs: usize, - total_blobs: usize, - complete_recipes: usize, - total_recipes: usize, - }, -} - -impl Job { - fn new(new: NewJob) -> Self { - match new { - NewJob::Download { url } => Self::Download { - url, - progress_percent: Some(0), - }, - NewJob::Unarchive => Self::Unarchive { - progress_percent: 0, - }, - NewJob::Process { status } => Self::Process { - packet_queue: Default::default(), - status, - }, - NewJob::RegistryFetch { - total_blobs, - total_recipes, - } => Self::RegistryFetch { - complete_blobs: 0, - total_blobs, - complete_recipes: 0, - total_recipes, - }, - } - } - - fn update(&mut self, update: UpdateJob) -> anyhow::Result<()> { - match update { - UpdateJob::Download { - progress_percent: new_progress_percent, - } => { - let Self::Download { - progress_percent, .. - } = self - else { - anyhow::bail!("tried to update a non-download job with a download update"); - }; - *progress_percent = new_progress_percent; - } - UpdateJob::Unarchive { - progress_percent: new_progress_percent, - } => { - let Self::Unarchive { - progress_percent, .. - } = self - else { - anyhow::bail!("tried to update a non-unarchive job with an unarchive update"); - }; - *progress_percent = new_progress_percent; - } - UpdateJob::Process { - mut packet, - status: new_status, - } => { - let Self::Process { - packet_queue, - status, - } = self - else { - anyhow::bail!("tried to update a non-process job with a process update"); - }; - - if let Some(packet) = packet.take() { - let mut packet_queue = packet_queue.write().map_err(|_| { - anyhow::anyhow!("failed to lock process packet queue for writing") - })?; - packet_queue.push(packet); - } - *status = new_status; - } - UpdateJob::RegistryFetchAdd { - blobs_fetched, - recipes_fetched, - } => { - let Self::RegistryFetch { - complete_blobs, - complete_recipes, - .. - } = self - else { - anyhow::bail!( - "tried to update a non-registry-fetch job with a registry-fetch update" - ); - }; - - *complete_blobs += blobs_fetched; - *complete_recipes += recipes_fetched; - } - UpdateJob::RegistryFetchUpdate { - total_blobs: new_total_blobs, - total_recipes: new_total_recipes, - complete_blobs: new_complete_blobs, - complete_recipes: new_complete_recipes, - } => { - let Self::RegistryFetch { - total_blobs, - total_recipes, - complete_blobs, - complete_recipes, - } = self - else { - anyhow::bail!( - "tried to update a non-registry-fetch job with a registry-fetch update" - ); - }; - - if let Some(new_total_blobs) = new_total_blobs { - *total_blobs = new_total_blobs; - } - if let Some(new_total_recipes) = new_total_recipes { - *total_recipes = new_total_recipes; - } - if let Some(new_complete_blobs) = new_complete_blobs { - *complete_blobs = new_complete_blobs; - } - if let Some(new_complete_recipes) = new_complete_recipes { - *complete_recipes = new_complete_recipes; - } - } - UpdateJob::RegistryFetchFinish => { - let Self::RegistryFetch { - complete_blobs, - total_blobs, - complete_recipes, - total_recipes, - } = self - else { - anyhow::bail!( - "tried to update a non-registry-fetch job with a registry-fetch-finish update" - ); - }; - - *complete_blobs = *total_blobs; - *complete_recipes = *total_recipes; - } - } - - Ok(()) - } - - fn is_complete(&self) -> bool { - match self { - Job::Download { - progress_percent, .. - } => progress_percent.map(|p| p >= 100).unwrap_or(false), - Job::Unarchive { progress_percent } => *progress_percent >= 100, - Job::Process { - status, - packet_queue: _, - } => matches!(status, ProcessStatus::Exited { .. }), - Job::RegistryFetch { - complete_blobs, - total_blobs, - complete_recipes, - total_recipes, - } => total_blobs == complete_blobs && total_recipes == complete_recipes, - } - } - - // Returns a priority for the job type. 0 is the lowest priority. Higher - // priority jobs are displayed first. - fn job_type_priority(&self) -> u8 { - match self { - Job::Unarchive { .. } => 0, - Job::Download { .. } | Job::RegistryFetch { .. } => 1, - Job::Process { .. } => 2, - } - } -} - -impl superconsole::Component for Job { - fn draw_unchecked( - &self, - _dimensions: superconsole::Dimensions, - _mode: superconsole::DrawMode, - ) -> anyhow::Result { - let lines = match self { - Job::Download { - url, - progress_percent, - } => { - let message = match progress_percent { - Some(100) => { - format!("[100%] Downloaded {url}") - } - Some(progress_percent) => { - format!("[{progress_percent:>3}%] Downloading {url}") - } - None => { - format!("[???%] Downloading {url}") - } - }; - superconsole::Lines::from_iter([superconsole::Line::sanitized(&message)]) - } - Job::Unarchive { progress_percent } => { - let message = if *progress_percent == 100 { - "[100%] Unarchived".to_string() - } else { - format!("[{progress_percent:>3}%] Unarchiving") - }; - superconsole::Lines::from_iter([superconsole::Line::sanitized(&message)]) - } - Job::Process { - packet_queue: _, - status, - } => { - let child_id = status - .child_id() - .map(|id| id.to_string()) - .unwrap_or_else(|| "?".to_string()); - let elapsed = status.elapsed().human_duration(); - let message = match status { - ProcessStatus::Running { .. } => { - format!("Process {child_id} [{elapsed}]") - } - ProcessStatus::Exited { status, .. } => { - let status = status - .as_ref() - .and_then(|status| status.code()) - .map(|c| c.to_string()) - .unwrap_or_else(|| "?".to_string()); - format!("Process {child_id} [{elapsed} Exited {status}]") - } - }; - - superconsole::Lines::from_iter(std::iter::once(superconsole::Line::sanitized( - &message, - ))) - } - Job::RegistryFetch { - complete_blobs, - total_blobs, - complete_recipes, - total_recipes, - } => { - let blob_percent = if *total_blobs > 0 { - (*complete_blobs as f64 / *total_blobs as f64) * 100.0 - } else { - 100.0 - }; - let recipe_percent = if *total_recipes > 0 { - (*complete_recipes as f64 / *total_recipes as f64) * 100.0 - } else { - 100.0 - }; - let total_percent = (recipe_percent * 0.2 + blob_percent * 0.8) as u8; - let verb = if self.is_complete() { - "Fetched" - } else { - "Fetching" - }; - let fetched_blobs = if *total_blobs == 0 { - None - } else if self.is_complete() { - Some(format!( - "{complete_blobs} blob{s}", - s = if *complete_blobs == 1 { "" } else { "s" } - )) - } else { - Some(format!( - "{complete_blobs} / {total_blobs} blob{s}", - s = if *total_blobs == 1 { "" } else { "s" } - )) - }; - let fetched_recipes = if *total_recipes == 0 { - None - } else if self.is_complete() { - Some(format!( - "{complete_recipes} recipe{s}", - s = if *complete_recipes == 1 { "" } else { "s" } - )) - } else { - Some(format!( - "{complete_recipes} / {total_recipes} recipe{s}", - s = if *total_recipes == 1 { "" } else { "s" } - )) - }; - let fetching_message = [fetched_blobs, fetched_recipes] - .into_iter() - .flatten() - .join_with(" + "); - let message = - format!("[{total_percent:>3}%] {verb} {fetching_message} from registry",); - superconsole::Lines::from_iter([superconsole::Line::sanitized(&message)]) - } - }; - - Ok(lines) - } -} - -pub enum ProcessPacket { - Stdout(Vec), - Stderr(Vec), -} - -impl ProcessPacket { - pub fn bytes(&self) -> &[u8] { - match self { - Self::Stdout(bytes) => bytes, - Self::Stderr(bytes) => bytes, - } - } -} - -#[derive(Debug, Clone)] -pub enum ProcessStatus { - Running { - child_id: Option, - start: std::time::Instant, - }, - Exited { - child_id: Option, - status: Option, - elapsed: std::time::Duration, - }, -} - -impl ProcessStatus { - fn child_id(&self) -> Option { - match self { - ProcessStatus::Running { child_id, .. } => *child_id, - ProcessStatus::Exited { child_id, .. } => *child_id, - } - } - - fn elapsed(&self) -> std::time::Duration { - match self { - ProcessStatus::Running { start, .. } => start.elapsed(), - ProcessStatus::Exited { elapsed, .. } => *elapsed, - } - } -} - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct JobId(usize); @@ -937,7 +167,6 @@ pub struct JobId(usize); pub struct Reporter { start: std::time::Instant, num_jobs: Arc, - is_evaluating: Arc, tx: tokio::sync::mpsc::UnboundedSender, } @@ -946,12 +175,7 @@ impl Reporter { let _ = self.tx.send(ReportEvent::Emit { lines }); } - pub fn set_is_evaluating(&self, is_evaluating: bool) { - self.is_evaluating - .store(is_evaluating, std::sync::atomic::Ordering::SeqCst); - } - - pub fn add_job(&self, job: NewJob) -> JobId { + pub fn add_job(&self, job: job::NewJob) -> JobId { let id = self .num_jobs .fetch_add(1, std::sync::atomic::Ordering::SeqCst); @@ -962,7 +186,7 @@ impl Reporter { id } - pub fn update_job(&self, id: JobId, update: UpdateJob) { + pub fn update_job(&self, id: JobId, update: job::UpdateJob) { let _ = self.tx.send(ReportEvent::UpdateJobState { id, update }); } @@ -1003,119 +227,10 @@ impl std::io::Write for ReporterWriter { } } -struct JobsComponent { - start: std::time::Instant, - is_evaluating: Arc, - jobs: Arc>>, - terminal: tokio::sync::RwLock, -} - -impl superconsole::Component for JobsComponent { - fn draw_unchecked( - &self, - dimensions: superconsole::Dimensions, - mode: superconsole::DrawMode, - ) -> anyhow::Result { - let jobs = self.jobs.blocking_read(); - let mut jobs: Vec<_> = jobs.iter().collect(); - let max_visible_jobs = std::cmp::max(dimensions.height.saturating_sub(15), 3); - - jobs.sort_by(cmp_job_entries); - let job_partition_point = jobs.partition_point(|&(_, job)| !job.is_complete()); - let (incomplete_jobs, complete_jobs) = jobs.split_at(job_partition_point); - - let num_jobs = jobs.len(); - let num_complete_jobs = complete_jobs.len(); - let is_evaluating = self.is_evaluating.load(std::sync::atomic::Ordering::SeqCst); - - let jobs = incomplete_jobs - .iter() - .chain(complete_jobs.iter().take(3)) - .take(max_visible_jobs); - - let jobs_lines = jobs - .map(|(_, job)| { - job.draw( - superconsole::Dimensions { - width: dimensions.width, - height: 1, - }, - mode, - ) - }) - .collect::, _>>()?; - - let num_terminal_lines = dimensions - .height - .saturating_sub(jobs_lines.len()) - .saturating_sub(3); - let mut terminal = self.terminal.blocking_write(); - - terminal.resize(dimensions.width, std::cmp::max(num_terminal_lines, 1)); - - let terminal_lines = terminal.screen_lines(); - let terminal_lines = terminal_lines - .iter() - .skip_while(|line| line.is_whitespace()) - .map(|line| superconsole::Line::sanitized(&line.as_str())) - .take(num_terminal_lines); - - let elapsed = self.start.elapsed().human_duration(); - let summary_line = match mode { - superconsole::DrawMode::Normal => { - let summary_line = format!( - "[{elapsed}] {num_complete_jobs} / {num_jobs}{or_more} job{s} complete", - s = if num_jobs == 1 { "" } else { "s" }, - or_more = if is_evaluating { "+" } else { "" }, - ); - Some(superconsole::Line::from_iter([summary_line - .try_into() - .unwrap()])) - } - superconsole::DrawMode::Final => { - // Don't show the summary line on the final draw. The final - // summary will be written outside the reporter, since we also - // want to show the summary when not using SuperConsole - None - } - }; - - let lines = terminal_lines - .chain(jobs_lines.into_iter().flatten()) - .chain(summary_line) - .collect(); - Ok(lines) - } -} - -fn cmp_job_entries( - (a_id, a_job): &(&JobId, &Job), - (b_id, b_job): &(&JobId, &Job), -) -> std::cmp::Ordering { - let a_is_complete = a_job.is_complete(); - let b_is_complete = b_job.is_complete(); - - // Show incomplete jobs first - a_is_complete.cmp(&b_is_complete).then_with(|| { - if a_is_complete { - // If both jobs are complete, then show the highest priority jobs - // first, then show the newest job first - a_job - .job_type_priority() - .cmp(&b_job.job_type_priority()) - .reverse() - .then_with(|| a_id.cmp(b_id).reverse()) - } else { - // If neither jobs is complete, then show the oldest job first - a_id.cmp(b_id) - } - }) -} - enum ReportEvent { Emit { lines: superconsole::Lines }, - AddJob { id: JobId, job: NewJob }, - UpdateJobState { id: JobId, update: UpdateJob }, + AddJob { id: JobId, job: job::NewJob }, + UpdateJobState { id: JobId, update: job::UpdateJob }, Shutdown, } diff --git a/crates/brioche-core/src/reporter/console.rs b/crates/brioche-core/src/reporter/console.rs new file mode 100644 index 0000000..ed00bca --- /dev/null +++ b/crates/brioche-core/src/reporter/console.rs @@ -0,0 +1,1323 @@ +use std::{ + borrow::Cow, + collections::{BTreeMap, HashMap}, + sync::{atomic::AtomicUsize, Arc}, +}; + +use bstr::{BString, ByteSlice}; +use joinery::JoinableIterator as _; +use opentelemetry::trace::TracerProvider as _; +use superconsole::style::Stylize; +use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt as _, Layer as _}; + +use crate::utils::DisplayDuration; + +use super::{ + job::{Job, NewJob, ProcessStatus, ProcessStream, UpdateJob}, + tracing_debug_filter, tracing_output_filter, tracing_root_filter, JobId, ReportEvent, Reporter, + ReporterGuard, +}; + +#[derive(Debug, Clone, Copy)] +pub enum ConsoleReporterKind { + Auto, + SuperConsole, + Plain, +} + +// Render at 30 fps +const RENDER_RATE: std::time::Duration = std::time::Duration::from_nanos(1_000_000_000 / 30); + +// Minimum amount of time to sleep before rendering next frame, even if +// the next frame will be late +const MIN_RENDER_WAIT: std::time::Duration = std::time::Duration::from_millis(1); + +pub fn start_console_reporter( + kind: ConsoleReporterKind, +) -> anyhow::Result<(Reporter, ReporterGuard)> { + let queued_lines = Arc::new(tokio::sync::RwLock::new(Vec::new())); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + + let brioche_otel_enabled = matches!( + std::env::var("BRIOCHE_ENABLE_OTEL").as_deref(), + Ok("1") | Ok("true") + ); + + let start = std::time::Instant::now(); + + let reporter = Reporter { + start, + num_jobs: Arc::new(AtomicUsize::new(0)), + tx: tx.clone(), + }; + let guard = ReporterGuard { + tx, + shutdown_rx: Some(shutdown_rx), + shutdown_opentelemetry: brioche_otel_enabled, + }; + + std::thread::spawn({ + let queued_lines = queued_lines.clone(); + move || { + let superconsole = match kind { + ConsoleReporterKind::Auto => superconsole::SuperConsole::new(), + ConsoleReporterKind::SuperConsole => Some(superconsole::SuperConsole::forced_new( + superconsole::Dimensions { + width: 80, + height: 24, + }, + )), + ConsoleReporterKind::Plain => None, + }; + + let jobs = HashMap::new(); + let job_outputs = JobOutputContents::new(1024 * 1024); + let mut console = match superconsole { + Some(console) => { + let root = JobsComponent { + start, + jobs, + job_outputs, + }; + ConsoleReporter::SuperConsole { console, root } + } + None => ConsoleReporter::Plain { jobs, job_outputs }, + }; + + let mut last_render: Option = None; + let mut running = true; + while running { + // Sleep long enough to try and hit our target render rate + if let Some(last_render) = last_render { + let elapsed_since_last_render = last_render.elapsed(); + let wait_until_next_render = + RENDER_RATE.saturating_sub(elapsed_since_last_render); + let wait_until_next_render = + wait_until_next_render.clamp(MIN_RENDER_WAIT, RENDER_RATE); + + std::thread::sleep(wait_until_next_render); + }; + + let _ = console.render(); + + last_render = Some(std::time::Instant::now()); + + while let Ok(event) = rx.try_recv() { + match event { + ReportEvent::Emit { lines } => { + console.emit(lines); + } + ReportEvent::AddJob { id, job } => { + console.add_job(id, job); + } + ReportEvent::UpdateJobState { id, update } => { + console.update_job(id, update); + } + ReportEvent::Shutdown => { + running = false; + } + } + } + let mut queued_lines = queued_lines.blocking_write(); + for lines in queued_lines.drain(..) { + console.emit(lines); + } + } + + let _ = console.finalize(); + let _ = shutdown_tx.send(()); + } + }); + + let opentelemetry_layer = if brioche_otel_enabled { + opentelemetry::global::set_text_map_propagator( + opentelemetry_sdk::propagation::TraceContextPropagator::new(), + ); + let provider = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .http() + .with_http_client(reqwest::Client::new()), + ) + .with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource( + opentelemetry_sdk::Resource::default().merge(&opentelemetry_sdk::Resource::new( + vec![ + opentelemetry::KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_NAME, + "brioche", + ), + opentelemetry::KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_VERSION, + env!("CARGO_PKG_VERSION"), + ), + ], + )), + )) + .install_simple()?; + + Some( + tracing_opentelemetry::layer() + .with_tracer(provider.tracer("tracing-opentelemetry")) + .with_filter(tracing_debug_filter()), + ) + } else { + None + }; + + let log_file_layer = match std::env::var_os("BRIOCHE_LOG_OUTPUT") { + Some(debug_output_path) => { + let debug_output = + std::fs::File::create(debug_output_path).expect("failed to open debug output path"); + Some( + tracing_subscriber::fmt::layer() + .json() + .with_writer(debug_output) + .with_timer(tracing_subscriber::fmt::time::uptime()) + .with_filter(tracing_debug_filter()), + ) + } + _ => None, + }; + + let tracing_console_layer = + std::env::var_os("BRIOCHE_CONSOLE").map(|_| console_subscriber::spawn()); + + // HACK: Add a filter to the subscriber to remove debug logs that we + // shouldn't see if no other layer needs them. This is a workaround for + // this issue: https://github.com/tokio-rs/tracing/issues/2448 + let root_filter = match ( + &log_file_layer, + &opentelemetry_layer, + &tracing_console_layer, + ) { + (None, None, None) => Some(tracing_output_filter()), + (_, _, Some(_)) => Some(tracing_root_filter()), + _ => None, + }; + + let reporter_layer = tracing_subscriber::fmt::layer() + .compact() + .with_writer(reporter.clone()) + .without_time() + .with_filter(tracing_output_filter()); + tracing_subscriber::registry() + .with(root_filter) + .with(tracing_console_layer) + .with(reporter_layer) + .with(log_file_layer) + .with(opentelemetry_layer) + .init(); + + Ok((reporter, guard)) +} + +enum ConsoleReporter { + SuperConsole { + console: superconsole::SuperConsole, + root: JobsComponent, + }, + Plain { + jobs: HashMap, + job_outputs: JobOutputContents, + }, +} + +impl ConsoleReporter { + fn emit(&mut self, lines: superconsole::Lines) { + match self { + ConsoleReporter::SuperConsole { console, .. } => { + console.emit(lines); + } + ConsoleReporter::Plain { .. } => { + for line in lines { + eprintln!("{}", line.to_unstyled()); + } + } + } + } + + fn add_job(&mut self, id: JobId, job: NewJob) { + match self { + ConsoleReporter::SuperConsole { root, .. } => { + let new_job = Job::new(job); + root.jobs.insert(id, new_job); + } + ConsoleReporter::Plain { jobs, .. } => { + match &job { + NewJob::Download { url, started_at: _ } => { + eprintln!("Downloading {}", url); + } + NewJob::Unarchive { started_at: _ } => {} + NewJob::Process { status: _ } => {} + NewJob::RegistryFetch { + total_blobs, + total_recipes, + started_at: _, + } => { + eprintln!( + "Fetching {total_blobs} blob{} / {total_recipes} recipe{} from registry", + if *total_blobs == 1 { "" } else { "s" }, + if *total_recipes == 1 { "" } else { "s" }, + ); + } + } + + let new_job = Job::new(job); + jobs.insert(id, new_job); + } + } + } + + fn update_job(&mut self, id: JobId, update: UpdateJob) { + match self { + ConsoleReporter::SuperConsole { root, .. } => { + if let UpdateJob::ProcessPushPacket { ref packet, .. } = update { + let (stream, bytes) = match &packet.0 { + super::job::ProcessPacket::Stdout(bytes) => (ProcessStream::Stdout, bytes), + super::job::ProcessPacket::Stderr(bytes) => (ProcessStream::Stderr, bytes), + }; + root.job_outputs + .append(JobOutputStream { job_id: id, stream }, bytes); + } else if let UpdateJob::ProcessFlushPackets = update { + root.job_outputs.flush_stream(JobOutputStream { + job_id: id, + stream: ProcessStream::Stdout, + }); + root.job_outputs.flush_stream(JobOutputStream { + job_id: id, + stream: ProcessStream::Stderr, + }); + }; + + let Some(job) = root.jobs.get_mut(&id) else { + return; + }; + let _ = job.update(update); + } + ConsoleReporter::Plain { jobs, job_outputs } => { + let Some(job) = jobs.get(&id) else { + return; + }; + + match &update { + UpdateJob::Download { finished_at, .. } => { + if let Some(finished_at) = finished_at { + let elapsed = finished_at.saturating_duration_since(job.created_at()); + eprintln!("Finished download in {}", DisplayDuration(elapsed)); + } + } + UpdateJob::Unarchive { finished_at, .. } => { + if let Some(finished_at) = finished_at { + let elapsed = finished_at.saturating_duration_since(job.created_at()); + eprintln!("Finished unarchiving in {}", DisplayDuration(elapsed)); + } + } + UpdateJob::ProcessUpdateStatus { status } => { + let child_id = status.child_id(); + + let child_id_label = lazy_format::lazy_format! { + match (child_id) { + Some(child_id) => "{child_id}", + None => "?" + } + }; + + match status { + ProcessStatus::Preparing { .. } => {} + ProcessStatus::Running { + created_at, + started_at, + .. + } => { + let launch_duration = + started_at.saturating_duration_since(*created_at); + if launch_duration > std::time::Duration::from_secs(1) { + eprintln!( + "Prepared process {child_id_label} in {}", + DisplayDuration(launch_duration) + ); + } + + eprintln!("Launched process {child_id_label}"); + } + ProcessStatus::Ran { + started_at, + finished_at, + .. + } => { + let run_duration = + finished_at.saturating_duration_since(*started_at); + eprintln!( + "Process {child_id_label} ran in {}", + DisplayDuration(run_duration) + ); + } + ProcessStatus::Finalized { + finished_at, + finalized_at, + .. + } => { + let finalize_duration = + finalized_at.saturating_duration_since(*finished_at); + if finalize_duration > std::time::Duration::from_secs(1) { + eprintln!( + "Process {child_id_label} finalized in {}", + DisplayDuration(finalize_duration) + ); + } + } + } + } + UpdateJob::ProcessPushPacket { packet } => { + let (stream, bytes) = match &packet.0 { + super::job::ProcessPacket::Stdout(bytes) => { + (ProcessStream::Stdout, bytes) + } + super::job::ProcessPacket::Stderr(bytes) => { + (ProcessStream::Stderr, bytes) + } + }; + job_outputs.append(JobOutputStream { job_id: id, stream }, bytes); + + while let Some((stream, content)) = job_outputs.pop_contents() { + print_job_content(jobs, &stream, &content); + } + } + UpdateJob::ProcessFlushPackets => { + job_outputs.flush_stream(JobOutputStream { + job_id: id, + stream: ProcessStream::Stdout, + }); + job_outputs.flush_stream(JobOutputStream { + job_id: id, + stream: ProcessStream::Stderr, + }); + + while let Some((stream, content)) = job_outputs.pop_contents() { + print_job_content(jobs, &stream, &content); + } + } + UpdateJob::RegistryFetchAdd { .. } => {} + UpdateJob::RegistryFetchUpdate { .. } => {} + UpdateJob::RegistryFetchFinish { finished_at } => { + let elapsed = finished_at.saturating_duration_since(job.created_at()); + eprintln!( + "Finished fetching from registry in {}", + DisplayDuration(elapsed) + ); + } + } + + // This should never fail, since we would've already + // returned early if the job ID wasn't found + let job = jobs.get_mut(&id).expect("job not found"); + let _ = job.update(update); + } + } + } + + fn render(&mut self) -> anyhow::Result<()> { + match self { + ConsoleReporter::SuperConsole { console, root } => { + console.render(root)?; + } + ConsoleReporter::Plain { .. } => {} + } + + Ok(()) + } + + fn finalize(self) -> anyhow::Result<()> { + match self { + ConsoleReporter::SuperConsole { console, root } => { + console.finalize(&root)?; + } + ConsoleReporter::Plain { .. } => {} + } + + anyhow::Ok(()) + } +} + +fn print_job_content(jobs: &HashMap, stream: &JobOutputStream, content: &[u8]) { + let content = bstr::BStr::new(content); + + let job = jobs.get(&stream.job_id); + let child_id = match job { + Some(Job::Process { status, .. }) => status.child_id(), + _ => None, + }; + let child_id_label = lazy_format::lazy_format! { + match (child_id) { + Some(child_id) => "{child_id}", + None => "?" + } + }; + + let content = match content.strip_suffix(b"\n") { + Some(content) => content, + None => content, + }; + + for line in content.lines() { + let line = bstr::BStr::new(line); + eprintln!("[{child_id_label}] {line}"); + } +} + +const JOB_LABEL_WIDTH: usize = 7; + +struct JobsComponent { + start: std::time::Instant, + jobs: HashMap, + job_outputs: JobOutputContents, +} + +impl superconsole::Component for JobsComponent { + fn draw_unchecked( + &self, + dimensions: superconsole::Dimensions, + mode: superconsole::DrawMode, + ) -> anyhow::Result { + let max_visible_jobs: usize = 4; + + let mut job_list: Vec<_> = self.jobs.iter().collect(); + job_list.sort_by(cmp_job_entries); + + let job_partition_point = job_list.partition_point(|&(_, job)| !job.is_complete()); + let (incomplete_jobs, complete_jobs) = job_list.split_at(job_partition_point); + + let num_incomplete_jobs = incomplete_jobs.len(); + let num_complete_jobs = complete_jobs.len(); + + // Ensure we show at least one complete job (if there are any) + let min_complete_jobs = std::cmp::min(num_complete_jobs, 1); + let max_incomplete_jobs = max_visible_jobs.saturating_sub(min_complete_jobs); + + let job_list = incomplete_jobs + .iter() + .take(max_incomplete_jobs) + .chain(complete_jobs) + .take(max_visible_jobs); + + let jobs_lines = job_list + .map(|(job_id, job)| { + JobComponent(**job_id, job).draw( + superconsole::Dimensions { + width: dimensions.width, + height: 1, + }, + mode, + ) + }) + .collect::, _>>()?; + + let num_job_output_lines = dimensions + .height + .saturating_sub(jobs_lines.len()) + .saturating_sub(3); + + let job_output_content_width = dimensions + .width + .saturating_sub(JOB_LABEL_WIDTH) + .saturating_sub(4); + + let contents_rev = Some(self.job_outputs.contents.iter().rev()) + .filter(|_| job_output_content_width > 0) + .into_iter() + .flatten(); + let lines_with_streams_rev = contents_rev + .flat_map(|(stream, content)| { + let content = match content.strip_suffix(b"\n") { + Some(content) => content, + None => content, + }; + let lines_rev = content + .lines() + .rev() + .flat_map(|line| line.chunks(job_output_content_width).rev()); + lines_rev.map(|line| (*stream, bstr::BStr::new(line))) + }) + .take(num_job_output_lines) + .collect::>(); + + let mut job_output_lines = vec![]; + let mut last_job_id = None; + for (stream, line) in lines_with_streams_rev.iter().rev() { + // Merge job labels for consecutive lines for the same job + let gutter = if last_job_id == Some(stream.job_id) { + format!("{:1$}│ ", "", JOB_LABEL_WIDTH) + } else { + let job = self.jobs.get(&stream.job_id); + let child_id = match job { + Some(Job::Process { status, .. }) => status.child_id(), + _ => None, + }; + let job_label = match child_id { + Some(child_id) => format!("{child_id}"), + None => "?".to_string(), + }; + let job_label = string_with_width(&job_label, JOB_LABEL_WIDTH, "…"); + + format!("{job_label:0$}│ ", JOB_LABEL_WIDTH) + }; + + // Pick a color based on the job ID + let gutter_color = job_color(stream.job_id); + + let styled_line = superconsole::Line::from_iter([ + superconsole::Span::new_colored_lossy(&gutter, gutter_color), + superconsole::Span::new_unstyled_lossy(line), + ]); + job_output_lines.push(styled_line); + + last_job_id = Some(stream.job_id) + } + + let summary_line = match mode { + superconsole::DrawMode::Normal => { + let elapsed_span = superconsole::Span::new_unstyled_lossy( + lazy_format::lazy_format!("{:>6}", DisplayDuration(self.start.elapsed())), + ); + let running_jobs_span = superconsole::Span::new_colored_lossy( + &format!("{num_incomplete_jobs:3} running"), + if num_incomplete_jobs > 0 { + superconsole::style::Color::Blue + } else { + superconsole::style::Color::Grey + }, + ); + let complete_jobs_span = superconsole::Span::new_colored_lossy( + &format!("{num_complete_jobs:3} complete"), + if num_complete_jobs > 0 { + superconsole::style::Color::Green + } else { + superconsole::style::Color::Grey + }, + ); + let line = superconsole::Line::from_iter([ + elapsed_span, + superconsole::Span::new_unstyled_lossy(" "), + complete_jobs_span, + superconsole::Span::new_unstyled_lossy(" "), + running_jobs_span, + ]); + Some(line) + } + superconsole::DrawMode::Final => { + // Don't show the summary line on the final draw. The final + // summary will be written outside the reporter, since we also + // want to show the summary when not using SuperConsole + None + } + }; + + let lines = job_output_lines + .into_iter() + .chain(jobs_lines.into_iter().flatten()) + .chain(summary_line) + .collect(); + Ok(lines) + } +} + +struct JobComponent<'a>(JobId, &'a Job); + +impl<'a> superconsole::Component for JobComponent<'a> { + fn draw_unchecked( + &self, + dimensions: superconsole::Dimensions, + _mode: superconsole::DrawMode, + ) -> anyhow::Result { + let &JobComponent(job_id, job) = self; + + let elapsed_span = match (job.started_at(), job.finished_at()) { + (Some(started_at), Some(finished_at)) => { + let elapsed = finished_at.saturating_duration_since(started_at); + let elapsed = DisplayDuration(elapsed); + superconsole::Span::new_colored_lossy( + &format!("{elapsed:>6.8}"), + superconsole::style::Color::DarkGrey, + ) + } + (Some(started_at), None) => { + let elapsed = started_at.elapsed(); + let elapsed = DisplayDuration(elapsed); + superconsole::Span::new_colored_lossy( + &format!("{elapsed:>6.8}"), + superconsole::style::Color::Grey, + ) + } + (None, _) => { + superconsole::Span::new_unstyled_lossy(lazy_format::lazy_format!("{:>6}", "")) + } + }; + + let lines = match job { + Job::Download { + url, + progress_percent, + started_at: _, + finished_at: _, + } => { + let percentage_span = match progress_percent { + Some(percent) => superconsole::Span::new_unstyled_lossy( + lazy_format::lazy_format!("{percent:>3}%"), + ), + None => superconsole::Span::new_unstyled_lossy("???%"), + }; + + let indicator = if job.is_complete() { + IndicatorKind::Complete + } else { + IndicatorKind::Spinner(job.elapsed().unwrap_or_default()) + }; + + let mut line = superconsole::Line::from_iter([ + elapsed_span, + superconsole::Span::new_unstyled_lossy(" "), + indicator_span(indicator), + superconsole::Span::new_unstyled_lossy(" Download "), + percentage_span, + superconsole::Span::new_unstyled_lossy(" "), + ]); + + let remaining_width = dimensions + .width + .saturating_sub(1) + .saturating_sub(line.len()); + + let truncated_url = string_with_width(url.as_str(), remaining_width, "…"); + + let progress_bar_progress = progress_percent.unwrap_or(0) as f64 / 100.0; + + line.extend(progress_bar_spans( + &truncated_url, + remaining_width, + progress_bar_progress, + )); + + superconsole::Lines::from_iter([line]) + } + Job::Unarchive { + progress_percent, + started_at: _, + finished_at: _, + } => { + let percentage_span = superconsole::Span::new_unstyled_lossy( + lazy_format::lazy_format!("{progress_percent:>3}%"), + ); + + let indicator = if job.is_complete() { + IndicatorKind::Complete + } else { + IndicatorKind::Spinner(job.elapsed().unwrap_or_default()) + }; + + let mut line = superconsole::Line::from_iter([ + elapsed_span, + superconsole::Span::new_unstyled_lossy(" "), + indicator_span(indicator), + superconsole::Span::new_unstyled_lossy(" Unarchive "), + percentage_span, + ]); + + if !job.is_complete() { + let remaining_width = dimensions + .width + .saturating_sub(1) + .saturating_sub(line.len()); + + let progress_bar_progress = *progress_percent as f64 / 100.0; + + line.push(superconsole::Span::new_unstyled_lossy(" ")); + line.extend(progress_bar_spans( + "", + remaining_width, + progress_bar_progress, + )); + } + + superconsole::Lines::from_iter([line]) + } + Job::Process { + packet_queue: _, + status, + } => { + let child_id = status + .child_id() + .map(|id| id.to_string()) + .unwrap_or_else(|| "?".to_string()); + + let indicator = match status { + ProcessStatus::Preparing { created_at } => { + IndicatorKind::PreparingSpinner(created_at.elapsed()) + } + ProcessStatus::Running { started_at, .. } => { + IndicatorKind::Spinner(started_at.elapsed()) + } + ProcessStatus::Ran { .. } | ProcessStatus::Finalized { .. } => { + IndicatorKind::Complete + } + }; + + let note_span = match status { + ProcessStatus::Preparing { created_at } => { + let preparing_duration = created_at.elapsed(); + if preparing_duration > std::time::Duration::from_secs(1) { + Some(superconsole::Span::new_colored_lossy( + &format!( + " (preparing for {})", + DisplayDuration(preparing_duration) + ), + superconsole::style::Color::DarkGrey, + )) + } else { + None + } + } + ProcessStatus::Ran { finished_at, .. } => { + let finalizing_duration = finished_at.elapsed(); + if finalizing_duration > std::time::Duration::from_secs(1) { + Some(superconsole::Span::new_colored_lossy( + &format!( + " (finalizing for {})", + DisplayDuration(finalizing_duration) + ), + superconsole::style::Color::DarkGrey, + )) + } else { + None + } + } + ProcessStatus::Running { .. } | ProcessStatus::Finalized { .. } => None, + }; + + let child_id_span = + superconsole::Span::new_colored_lossy(&child_id, job_color(job_id)); + + superconsole::Lines::from_iter([superconsole::Line::from_iter( + [ + elapsed_span, + superconsole::Span::new_unstyled_lossy(" "), + indicator_span(indicator), + superconsole::Span::new_unstyled_lossy(" Process "), + child_id_span, + ] + .into_iter() + .chain(note_span), + )]) + } + Job::RegistryFetch { + complete_blobs, + total_blobs, + complete_recipes, + total_recipes, + started_at: _, + finished_at: _, + } => { + let blob_progress = if *total_blobs > 0 { + *complete_blobs as f64 / *total_blobs as f64 + } else { + 1.0 + }; + let recipe_progress = if *total_recipes > 0 { + *complete_recipes as f64 / *total_recipes as f64 + } else { + 1.0 + }; + let total_progress = recipe_progress * 0.2 + blob_progress * 0.8; + let total_percent = (total_progress * 100.0) as u64; + + let percentage_span = superconsole::Span::new_unstyled_lossy( + lazy_format::lazy_format!("{total_percent:>3}%"), + ); + + let indicator = if job.is_complete() { + IndicatorKind::Complete + } else { + IndicatorKind::Spinner(job.elapsed().unwrap_or_default()) + }; + + let mut line = superconsole::Line::from_iter([ + elapsed_span, + superconsole::Span::new_unstyled_lossy(" "), + indicator_span(indicator), + superconsole::Span::new_unstyled_lossy(" Registry "), + percentage_span, + superconsole::Span::new_unstyled_lossy(" "), + ]); + + let is_fetching_anything = *total_blobs > 0 || *total_recipes > 0; + let fetching_message = if is_fetching_anything { + let fetched_blobs = if *total_blobs == 0 { + None + } else if job.is_complete() { + Some(format!( + "{complete_blobs} blob{s}", + s = if *complete_blobs == 1 { "" } else { "s" } + )) + } else { + Some(format!( + "{complete_blobs} / {total_blobs} blob{s}", + s = if *total_blobs == 1 { "" } else { "s" } + )) + }; + let fetched_recipes = if *total_recipes == 0 { + None + } else if job.is_complete() { + Some(format!( + "{complete_recipes} recipe{s}", + s = if *complete_recipes == 1 { "" } else { "s" } + )) + } else { + Some(format!( + "{complete_recipes} / {total_recipes} recipe{s}", + s = if *total_recipes == 1 { "" } else { "s" } + )) + }; + let fetching_message = [fetched_blobs, fetched_recipes] + .into_iter() + .flatten() + .join_with(" + "); + format!("Fetch {fetching_message} from registry") + } else { + "Fetch from registry".to_string() + }; + + let remaining_width = dimensions + .width + .saturating_sub(1) + .saturating_sub(line.len()); + line.extend(progress_bar_spans( + &fetching_message, + remaining_width, + total_progress, + )); + + superconsole::Lines::from_iter([line]) + } + }; + + Ok(lines) + } +} + +struct JobOutputContents { + total_bytes: usize, + max_bytes: usize, + contents: Vec<(JobOutputStream, BString)>, + partial_contents: BTreeMap, +} + +impl JobOutputContents { + fn new(max_bytes: usize) -> Self { + Self { + total_bytes: 0, + max_bytes, + contents: Vec::new(), + partial_contents: BTreeMap::new(), + } + } + + fn append(&mut self, stream: JobOutputStream, content: impl AsRef<[u8]>) { + let content = content.as_ref(); + + // Truncate content so that it fits within `max_bytes` + let content_start = content.len().saturating_sub(self.max_bytes); + let content = &content[content_start..]; + + // Break the content into the part containing complete lines, and the + // part that's made up of only partial lines + let (complete_content, partial_content) = match content.rsplit_once_str("\n") { + Some((complete, b"")) => (Some(complete), b"".as_ref()), + Some((complete, pending)) => (Some(complete), pending), + None => (None, content), + }; + + // Drop old content until we have enough free space to add the new content + let new_total_bytes = self.total_bytes.saturating_add(content.len()); + let mut drop_bytes = new_total_bytes.saturating_sub(self.max_bytes); + while drop_bytes > 0 { + // Get the oldest content + let oldest_content = self + .contents + .get_mut(0) + .map(|(_, content)| content) + .or_else(|| self.partial_contents.values_mut().next()); + let Some(oldest_content) = oldest_content else { + break; + }; + + if oldest_content.len() > drop_bytes { + // If the oldest content is longer than the total number of + // bytes need to drop, then remove the bytes at the start, then + // we're done + oldest_content.drain(0..drop_bytes); + break; + } else { + // Otherwise, remove the content and continue + let (_, removed_content) = self.contents.remove(0); + drop_bytes -= removed_content.len(); + } + } + + if let Some(complete_content) = complete_content { + let prior_pending = self.partial_contents.remove(&stream); + let prior_content = self + .contents + .last_mut() + .and_then(|(content_stream, content)| { + if *content_stream == stream { + Some(content) + } else { + None + } + }); + + if let Some(prior_content) = prior_content { + // If the most recent content is from the same job, then just + // append the pending content and new content to the end + + if let Some(prior_pending) = prior_pending { + prior_content.extend_from_slice(&prior_pending); + } + prior_content.extend_from_slice(complete_content); + prior_content.push(b'\n'); + } else { + // Otherwise, add a new content entry + + let mut bytes = bstr::BString::default(); + if let Some(prior_pending) = prior_pending { + bytes.extend_from_slice(&prior_pending); + } + bytes.extend_from_slice(complete_content); + bytes.push(b'\n'); + + self.contents.push((stream, bytes)); + } + } + + if !partial_content.is_empty() { + match self.partial_contents.entry(stream) { + std::collections::btree_map::Entry::Vacant(entry) => { + entry.insert(partial_content.into()); + } + std::collections::btree_map::Entry::Occupied(entry) => { + entry.into_mut().extend_from_slice(partial_content); + } + } + } + + self.total_bytes = std::cmp::min(new_total_bytes, self.max_bytes); + } + + fn flush_stream(&mut self, stream: JobOutputStream) { + // Get any partial content that should be flushed + let Some(partial_content) = self.partial_contents.remove(&stream) else { + return; + }; + + // We aren't adding or removing any bytes, so no need to truncate + // or drop old data first + + let prior_content = self + .contents + .last_mut() + .and_then(|(content_stream, content)| { + if *content_stream == stream { + Some(content) + } else { + None + } + }); + + if let Some(prior_content) = prior_content { + // If the most recent content is from the same job, then just + // append the flushed content + + prior_content.extend_from_slice(&partial_content); + } else { + // Otherwise, add a new content entry + + self.contents.push((stream, partial_content)); + } + } + + fn pop_contents(&mut self) -> Option<(JobOutputStream, bstr::BString)> { + self.contents.pop() + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +struct JobOutputStream { + pub job_id: JobId, + pub stream: ProcessStream, +} + +fn cmp_job_entries( + (a_id, a_job): &(&JobId, &Job), + (b_id, b_job): &(&JobId, &Job), +) -> std::cmp::Ordering { + let a_finalized_at = a_job.finalized_at(); + let b_finalized_at = b_job.finalized_at(); + + // Show unfinalized jobs first + match (a_finalized_at, b_finalized_at) { + (Some(_), None) => std::cmp::Ordering::Greater, + (None, Some(_)) => std::cmp::Ordering::Less, + (a_finalized_at, b_finalized_at) => { + // Show higher priority jobs first + a_job + .job_type_priority() + .cmp(&b_job.job_type_priority()) + .reverse() + .then_with(|| { + // Show more recently finalized jobs first + a_finalized_at.cmp(&b_finalized_at).reverse().then_with(|| { + // Show newer jobs first + a_id.cmp(b_id) + }) + }) + } + } +} + +fn string_with_width<'a>(s: &'a str, num_chars: usize, replacement: &str) -> Cow<'a, str> { + if num_chars == 0 { + return Cow::Borrowed(""); + } + + let s_chars = s.chars().count(); + + match s_chars.cmp(&num_chars) { + std::cmp::Ordering::Equal => Cow::Borrowed(s), + std::cmp::Ordering::Less => Cow::Owned(format!("{s:0$}", num_chars)), + std::cmp::Ordering::Greater => { + let replacement_chars = replacement.chars().count(); + let keep_chars = num_chars.saturating_sub(replacement_chars); + + let left_chars = keep_chars / 2; + let right_chars = keep_chars.saturating_sub(left_chars); + + let new_chars = s + .chars() + .take(left_chars) + .chain(replacement.chars().take(num_chars)) + .chain(s.chars().skip(s_chars.saturating_sub(right_chars))); + + Cow::Owned(new_chars.collect()) + } + } +} + +fn job_color(job_id: JobId) -> superconsole::style::Color { + const JOB_COLORS: &[superconsole::style::Color] = &[ + superconsole::style::Color::Cyan, + superconsole::style::Color::Magenta, + superconsole::style::Color::Yellow, + superconsole::style::Color::Blue, + ]; + + JOB_COLORS[job_id.0 % JOB_COLORS.len()] +} + +#[derive(Debug, Clone, Copy)] +enum IndicatorKind { + PreparingSpinner(std::time::Duration), + Spinner(std::time::Duration), + Complete, +} + +fn indicator_span(kind: IndicatorKind) -> superconsole::Span { + match kind { + IndicatorKind::Spinner(elapsed) => { + let spinner = spinner(elapsed, 100); + superconsole::Span::new_colored_lossy(spinner, superconsole::style::Color::Blue) + } + IndicatorKind::PreparingSpinner(elapsed) => { + let spinner = spinner(elapsed, 200); + superconsole::Span::new_colored_lossy(spinner, superconsole::style::Color::Grey) + } + IndicatorKind::Complete => { + superconsole::Span::new_colored_lossy("✓", superconsole::style::Color::Green) + } + } +} + +fn progress_bar_spans(interior: &str, width: usize, progress: f64) -> [superconsole::Span; 2] { + let filled = ((width as f64) * progress) as usize; + + let padded = format!("{interior:0$.0$}", width); + let (filled_part, unfilled_part) = if progress <= 0.0 { + ("", &*padded) + } else if progress >= 1.0 { + (&*padded, "") + } else { + let split_offset = padded + .char_indices() + .find_map( + |(index, _)| { + if index >= filled { + Some(index) + } else { + None + } + }, + ) + .unwrap_or(0); + padded.split_at(split_offset) + }; + + [ + superconsole::Span::new_styled_lossy(filled_part.to_string().dark_grey().negative()), + superconsole::Span::new_colored_lossy(unfilled_part, superconsole::style::Color::DarkGrey), + ] +} + +fn spinner(duration: std::time::Duration, speed: u128) -> &'static str { + const SPINNERS: &[&str] = &["◜", "◝", "◞", "◟"]; + SPINNERS[(duration.as_millis() / speed) as usize % SPINNERS.len()] +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use crate::reporter::job::ProcessStream::{self, Stderr, Stdout}; + + use super::{string_with_width, JobId, JobOutputContents, JobOutputStream}; + + fn job_stream(id: usize, stream: ProcessStream) -> JobOutputStream { + JobOutputStream { + job_id: JobId(id), + stream, + } + } + + #[test] + fn test_job_output_contents_basic() { + let mut contents = JobOutputContents::new(100); + contents.append(job_stream(1, Stdout), "a\nb\nc"); + + assert_eq!(contents.total_bytes, 5); + assert_eq!( + contents.contents, + [(job_stream(1, Stdout), "a\nb\n".into())], + ); + assert_eq!( + contents.partial_contents, + BTreeMap::from_iter([(job_stream(1, Stdout), "c".into())]) + ); + } + + #[test] + fn test_job_output_interleaved() { + let mut contents = JobOutputContents::new(100); + + contents.append(job_stream(1, Stdout), "a\nb\nc"); + contents.append(job_stream(1, Stderr), "d\ne\nf"); + contents.append(job_stream(2, Stdout), "g\nh\ni"); + contents.append(job_stream(2, Stdout), "j\nk\nl"); + contents.append(job_stream(2, Stderr), "m\nn\no"); + contents.append(job_stream(2, Stderr), "p\nq\nr"); + contents.append(job_stream(1, Stdout), "s\nt\nu"); + contents.append(job_stream(1, Stderr), "v\nw\nx"); + + assert_eq!(contents.total_bytes, 40); + assert_eq!( + contents.contents, + [ + (job_stream(1, Stdout), "a\nb\n".into()), + (job_stream(1, Stderr), "d\ne\n".into()), + (job_stream(2, Stdout), "g\nh\nij\nk\n".into()), + (job_stream(2, Stderr), "m\nn\nop\nq\n".into()), + (job_stream(1, Stdout), "cs\nt\n".into()), + (job_stream(1, Stderr), "fv\nw\n".into()), + ] + ); + assert_eq!( + contents.partial_contents, + BTreeMap::from_iter([ + (job_stream(1, Stdout), "u".into()), + (job_stream(1, Stderr), "x".into()), + (job_stream(2, Stdout), "l".into()), + (job_stream(2, Stderr), "r".into()), + ]) + ); + } + + #[test] + fn test_job_output_drop_oldest() { + let mut contents = JobOutputContents::new(10); + + contents.append(job_stream(1, Stdout), "a\n"); + contents.append(job_stream(2, Stdout), "bcdefghij\n"); + + assert_eq!(contents.total_bytes, 10); + assert_eq!( + contents.contents, + [(job_stream(2, Stdout), "bcdefghij\n".into())] + ); + } + + #[test] + fn test_job_output_truncate_oldest() { + let mut contents = JobOutputContents::new(10); + + contents.append(job_stream(1, Stdout), "abcdefghi\n"); + contents.append(job_stream(2, Stdout), "jk\n"); + + assert_eq!(contents.total_bytes, 10); + assert_eq!( + contents.contents, + [ + (job_stream(1, Stdout), "defghi\n".into()), + (job_stream(2, Stdout), "jk\n".into()), + ] + ); + } + + #[test] + fn test_string_with_width() { + assert_eq!(string_with_width("abcd", 10, "-"), "abcd "); + assert_eq!(string_with_width("abcd", 4, "-"), "abcd"); + assert_eq!(string_with_width("abcd", 3, "-"), "a-d"); + assert_eq!(string_with_width("abcd", 1, "-"), "-"); + assert_eq!(string_with_width("abcd", 0, "-"), ""); + + assert_eq!(string_with_width("abcde", 10, "-"), "abcde "); + assert_eq!(string_with_width("abcde", 5, "-"), "abcde"); + assert_eq!(string_with_width("abcde", 3, "-"), "a-e"); + assert_eq!(string_with_width("abcde", 1, "-"), "-"); + assert_eq!(string_with_width("abcde", 0, "-"), ""); + + assert_eq!(string_with_width("abcd", 10, "…"), "abcd "); + assert_eq!(string_with_width("abcd", 4, "…"), "abcd"); + assert_eq!(string_with_width("abcd", 3, "…"), "a…d"); + assert_eq!(string_with_width("abcd", 1, "…"), "…"); + assert_eq!(string_with_width("abcd", 0, "…"), ""); + + assert_eq!(string_with_width("abcde", 10, "…"), "abcde "); + assert_eq!(string_with_width("abcde", 5, "…"), "abcde"); + assert_eq!(string_with_width("abcde", 3, "…"), "a…e"); + assert_eq!(string_with_width("abcde", 1, "…"), "…"); + assert_eq!(string_with_width("abcde", 0, "…"), ""); + + assert_eq!(string_with_width("abcdef", 10, "..."), "abcdef "); + assert_eq!(string_with_width("abcdef", 6, "..."), "abcdef"); + assert_eq!(string_with_width("abcdef", 5, "..."), "a...f"); + assert_eq!(string_with_width("abcdef", 3, "..."), "..."); + assert_eq!(string_with_width("abcdef", 1, "..."), "."); + assert_eq!(string_with_width("abcdef", 0, "..."), ""); + + assert_eq!(string_with_width("abcdefg", 10, "..."), "abcdefg "); + assert_eq!(string_with_width("abcdefg", 7, "..."), "abcdefg"); + assert_eq!(string_with_width("abcdefg", 5, "..."), "a...g"); + assert_eq!(string_with_width("abcdefg", 3, "..."), "..."); + assert_eq!(string_with_width("abcdefg", 1, "..."), "."); + assert_eq!(string_with_width("abcdefg", 0, "..."), ""); + } +} diff --git a/crates/brioche-core/src/reporter/job.rs b/crates/brioche-core/src/reporter/job.rs new file mode 100644 index 0000000..ad6187f --- /dev/null +++ b/crates/brioche-core/src/reporter/job.rs @@ -0,0 +1,459 @@ +use std::sync::{Arc, RwLock}; + +use debug_ignore::DebugIgnore; + +#[derive(Debug)] +pub enum NewJob { + Download { + url: url::Url, + started_at: std::time::Instant, + }, + Unarchive { + started_at: std::time::Instant, + }, + Process { + status: ProcessStatus, + }, + RegistryFetch { + total_blobs: usize, + total_recipes: usize, + started_at: std::time::Instant, + }, +} + +#[derive(Debug)] +pub enum UpdateJob { + Download { + progress_percent: Option, + finished_at: Option, + }, + Unarchive { + progress_percent: u8, + finished_at: Option, + }, + ProcessPushPacket { + packet: DebugIgnore, + }, + ProcessFlushPackets, + ProcessUpdateStatus { + status: ProcessStatus, + }, + RegistryFetchAdd { + blobs_fetched: usize, + recipes_fetched: usize, + }, + RegistryFetchUpdate { + total_blobs: Option, + total_recipes: Option, + complete_blobs: Option, + complete_recipes: Option, + }, + RegistryFetchFinish { + finished_at: std::time::Instant, + }, +} + +#[derive(Debug)] +pub enum Job { + Download { + url: url::Url, + progress_percent: Option, + started_at: std::time::Instant, + finished_at: Option, + }, + Unarchive { + progress_percent: u8, + started_at: std::time::Instant, + finished_at: Option, + }, + Process { + packet_queue: DebugIgnore>>>, + status: ProcessStatus, + }, + RegistryFetch { + complete_blobs: usize, + total_blobs: usize, + complete_recipes: usize, + total_recipes: usize, + started_at: std::time::Instant, + finished_at: Option, + }, +} + +impl Job { + pub fn new(new: NewJob) -> Self { + match new { + NewJob::Download { url, started_at } => Self::Download { + url, + progress_percent: Some(0), + started_at, + finished_at: None, + }, + NewJob::Unarchive { started_at } => Self::Unarchive { + progress_percent: 0, + started_at, + finished_at: None, + }, + NewJob::Process { status } => Self::Process { + packet_queue: Default::default(), + status, + }, + NewJob::RegistryFetch { + total_blobs, + total_recipes, + started_at, + } => Self::RegistryFetch { + complete_blobs: 0, + total_blobs, + complete_recipes: 0, + total_recipes, + started_at, + finished_at: None, + }, + } + } + + pub fn update(&mut self, update: UpdateJob) -> anyhow::Result<()> { + match update { + UpdateJob::Download { + progress_percent: new_progress_percent, + finished_at: new_finished_at, + } => { + let Self::Download { + progress_percent, + finished_at, + .. + } = self + else { + anyhow::bail!("tried to update a non-download job with a download update"); + }; + *progress_percent = new_progress_percent; + *finished_at = new_finished_at; + } + UpdateJob::Unarchive { + progress_percent: new_progress_percent, + finished_at: new_finished_at, + } => { + let Self::Unarchive { + progress_percent, + finished_at, + .. + } = self + else { + anyhow::bail!("tried to update a non-unarchive job with an unarchive update"); + }; + *progress_percent = new_progress_percent; + *finished_at = new_finished_at; + } + UpdateJob::ProcessPushPacket { packet } => { + let Self::Process { + packet_queue, + status: _, + } = self + else { + anyhow::bail!("tried to update a non-process job with a process update"); + }; + + let mut packet_queue = packet_queue.write().map_err(|_| { + anyhow::anyhow!("failed to lock process packet queue for writing") + })?; + packet_queue.push(packet.0); + } + UpdateJob::ProcessFlushPackets => {} + UpdateJob::ProcessUpdateStatus { status: new_status } => { + let Self::Process { + packet_queue: _, + status, + } = self + else { + anyhow::bail!("tried to update a non-process job with a process update"); + }; + + *status = new_status; + } + UpdateJob::RegistryFetchAdd { + blobs_fetched, + recipes_fetched, + } => { + let Self::RegistryFetch { + complete_blobs, + complete_recipes, + .. + } = self + else { + anyhow::bail!( + "tried to update a non-registry-fetch job with a registry-fetch update" + ); + }; + + *complete_blobs += blobs_fetched; + *complete_recipes += recipes_fetched; + } + UpdateJob::RegistryFetchUpdate { + total_blobs: new_total_blobs, + total_recipes: new_total_recipes, + complete_blobs: new_complete_blobs, + complete_recipes: new_complete_recipes, + } => { + let Self::RegistryFetch { + total_blobs, + total_recipes, + complete_blobs, + complete_recipes, + started_at: _, + finished_at: _, + } = self + else { + anyhow::bail!( + "tried to update a non-registry-fetch job with a registry-fetch update" + ); + }; + + if let Some(new_total_blobs) = new_total_blobs { + *total_blobs = new_total_blobs; + } + if let Some(new_total_recipes) = new_total_recipes { + *total_recipes = new_total_recipes; + } + if let Some(new_complete_blobs) = new_complete_blobs { + *complete_blobs = new_complete_blobs; + } + if let Some(new_complete_recipes) = new_complete_recipes { + *complete_recipes = new_complete_recipes; + } + } + UpdateJob::RegistryFetchFinish { + finished_at: new_finished_at, + } => { + let Self::RegistryFetch { + complete_blobs, + total_blobs, + complete_recipes, + total_recipes, + started_at: _, + finished_at, + } = self + else { + anyhow::bail!( + "tried to update a non-registry-fetch job with a registry-fetch-finish update" + ); + }; + + *complete_blobs = *total_blobs; + *complete_recipes = *total_recipes; + *finished_at = Some(new_finished_at); + } + } + + Ok(()) + } + + pub fn created_at(&self) -> std::time::Instant { + match self { + Job::Download { started_at, .. } + | Job::Unarchive { started_at, .. } + | Job::RegistryFetch { started_at, .. } => *started_at, + Job::Process { status, .. } => status.created_at(), + } + } + + pub fn started_at(&self) -> Option { + match self { + Job::Download { started_at, .. } + | Job::Unarchive { started_at, .. } + | Job::RegistryFetch { started_at, .. } => Some(*started_at), + Job::Process { status, .. } => status.started_at(), + } + } + + pub fn finished_at(&self) -> Option { + match self { + Job::Download { finished_at, .. } + | Job::Unarchive { finished_at, .. } + | Job::RegistryFetch { finished_at, .. } => *finished_at, + Job::Process { status, .. } => status.finished_at(), + } + } + + pub fn finalized_at(&self) -> Option { + match self { + Job::Download { finished_at, .. } + | Job::Unarchive { finished_at, .. } + | Job::RegistryFetch { finished_at, .. } => *finished_at, + Job::Process { status, .. } => status.finalized_at(), + } + } + + pub fn elapsed(&self) -> Option { + let started_at = self.started_at()?; + let elapsed = if let Some(finished_at) = self.finished_at() { + finished_at.saturating_duration_since(started_at) + } else { + started_at.elapsed() + }; + Some(elapsed) + } + + pub fn is_complete(&self) -> bool { + self.finished_at().is_some() + } + + // Returns a priority for the job type. 0 is the lowest priority. Higher + // priority jobs are displayed first. + pub fn job_type_priority(&self) -> u8 { + match self { + Job::Unarchive { .. } => 0, + Job::Download { .. } | Job::RegistryFetch { .. } | Job::Process { .. } => 2, + } + } +} + +pub enum ProcessPacket { + Stdout(Vec), + Stderr(Vec), +} + +impl ProcessPacket { + pub fn bytes(&self) -> &[u8] { + match self { + Self::Stdout(bytes) => bytes, + Self::Stderr(bytes) => bytes, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ProcessStream { + Stdout, + Stderr, +} + +#[derive(Debug, Clone)] +pub enum ProcessStatus { + Preparing { + created_at: std::time::Instant, + }, + Running { + child_id: Option, + created_at: std::time::Instant, + started_at: std::time::Instant, + }, + Ran { + child_id: Option, + created_at: std::time::Instant, + started_at: std::time::Instant, + finished_at: std::time::Instant, + }, + Finalized { + child_id: Option, + created_at: std::time::Instant, + started_at: std::time::Instant, + finished_at: std::time::Instant, + finalized_at: std::time::Instant, + }, +} + +impl ProcessStatus { + fn created_at(&self) -> std::time::Instant { + match self { + Self::Preparing { created_at } + | Self::Running { created_at, .. } + | Self::Ran { created_at, .. } + | Self::Finalized { created_at, .. } => *created_at, + } + } + + fn started_at(&self) -> Option { + match self { + Self::Preparing { .. } => None, + Self::Running { started_at, .. } + | Self::Ran { started_at, .. } + | Self::Finalized { started_at, .. } => Some(*started_at), + } + } + + fn finished_at(&self) -> Option { + match self { + Self::Preparing { .. } | Self::Running { .. } => None, + Self::Ran { finished_at, .. } | Self::Finalized { finished_at, .. } => { + Some(*finished_at) + } + } + } + + fn finalized_at(&self) -> Option { + match self { + Self::Preparing { .. } | Self::Running { .. } | Self::Ran { .. } => None, + Self::Finalized { finalized_at, .. } => Some(*finalized_at), + } + } + + pub fn child_id(&self) -> Option { + match self { + ProcessStatus::Preparing { .. } => None, + ProcessStatus::Running { child_id, .. } + | ProcessStatus::Ran { child_id, .. } + | ProcessStatus::Finalized { child_id, .. } => *child_id, + } + } + + pub fn to_running( + &mut self, + started_at: std::time::Instant, + child_id: Option, + ) -> anyhow::Result<()> { + let Self::Preparing { created_at } = *self else { + anyhow::bail!("expected ProcessStatus to be Preparing"); + }; + + *self = Self::Running { + child_id, + created_at, + started_at, + }; + + Ok(()) + } + + pub fn to_ran(&mut self, finished_at: std::time::Instant) -> anyhow::Result<()> { + let Self::Running { + child_id, + created_at, + started_at, + } = *self + else { + anyhow::bail!("expected ProcessStatus to be Running"); + }; + + *self = Self::Ran { + child_id, + created_at, + started_at, + finished_at, + }; + + Ok(()) + } + + pub fn to_finalized(&mut self, finalized_at: std::time::Instant) -> anyhow::Result<()> { + let Self::Ran { + child_id, + created_at, + started_at, + finished_at, + } = *self + else { + anyhow::bail!("expected ProcessStatus to be Ran"); + }; + + *self = Self::Finalized { + child_id, + created_at, + started_at, + finished_at, + finalized_at, + }; + + Ok(()) + } +} diff --git a/crates/brioche-core/src/sync.rs b/crates/brioche-core/src/sync.rs index 75bb7e9..42bcda7 100644 --- a/crates/brioche-core/src/sync.rs +++ b/crates/brioche-core/src/sync.rs @@ -1,6 +1,6 @@ +use crate::utils::DisplayDuration; use anyhow::Context as _; use futures::{StreamExt as _, TryStreamExt as _}; -use human_repr::HumanDuration; use crate::{ project::ProjectHash, @@ -65,7 +65,7 @@ pub async fn sync_bakes( if verbose { println!( "Collected refs in {} ({num_recipe_refs} recipes, {num_blob_refs} blobs)", - start_refs.elapsed().human_duration() + DisplayDuration(start_refs.elapsed()) ); } @@ -107,7 +107,7 @@ pub async fn sync_bakes( if verbose { println!( "Finished syncing {num_new_bakes} / {num_bakes} bakes in {}", - start_bakes.elapsed().human_duration() + DisplayDuration(start_bakes.elapsed()) ); } @@ -183,7 +183,7 @@ pub async fn sync_recipe_references( if verbose { println!( "Finished syncing {num_new_blobs} / {num_total_blobs} blobs in {}", - start_blobs.elapsed().human_duration() + DisplayDuration(start_blobs.elapsed()) ); } @@ -216,7 +216,7 @@ pub async fn sync_recipe_references( if verbose { println!( "Finished syncing {num_new_recipes} / {num_total_recipes} recipes in {}", - start_recipes.elapsed().human_duration() + DisplayDuration(start_recipes.elapsed()) ); } @@ -282,7 +282,7 @@ pub async fn sync_project_references( if verbose { println!( "Finished syncing {num_new_blobs} / {num_total_blobs} loaded blobs in {}", - start_blobs.elapsed().human_duration() + DisplayDuration(start_blobs.elapsed()) ); } @@ -310,7 +310,7 @@ pub async fn sync_project_references( if verbose { println!( "Finished syncing {num_new_projects} / {num_total_projects} projects in {}", - start_projects.elapsed().human_duration() + DisplayDuration(start_projects.elapsed()) ); } diff --git a/crates/brioche-core/src/utils.rs b/crates/brioche-core/src/utils.rs index 0ae7e41..59c9373 100644 --- a/crates/brioche-core/src/utils.rs +++ b/crates/brioche-core/src/utils.rs @@ -1,6 +1,240 @@ +use std::fmt::Write as _; + pub fn is_default(value: &T) -> bool where T: Default + PartialEq, { *value == Default::default() } + +const SECS_PRECISION: usize = 2; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct DisplayDuration(pub std::time::Duration); + +impl std::fmt::Display for DisplayDuration { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let max_width = f.precision().unwrap_or(usize::MAX); + + let secs = self.0.as_secs_f64(); + + let mut buffer = String::new(); + let mut unaligned = String::new(); + + if secs < 60.0 { + // Write the integer part of the second + write!(&mut buffer, "{secs:.0}")?; + + // Compute how much room we have for the fractional part + let secs_integer_width = buffer.len(); + let secs_fractional_width = max_width + .saturating_sub(secs_integer_width) + .saturating_sub(2); + + let secs_precision = std::cmp::min(secs_fractional_width, SECS_PRECISION); + write!(&mut unaligned, "{secs:.0$}s", secs_precision)?; + } else { + // Use the rounded seconds, and break them up into hours, + // minutes, and days + let secs = secs.floor() as u64; + let (secs, mins) = split_unit(secs, 60); + let (mins, hours) = split_unit(mins, 60); + + let units = [(hours, "h"), (mins, "m"), (secs, "s")] + .into_iter() + .skip_while(|&(count, _)| count == 0); + for (count, unit) in units { + let is_first = unaligned.is_empty(); + + buffer.clear(); + if is_first { + write!(&mut buffer, "{count}{unit}")?; + } else { + // For successive units, use a zero-padded count + // for alignment + write!(&mut buffer, "{count:2}{unit}")?; + } + + let within_width = unaligned.len() + buffer.len() <= max_width; + if unaligned.is_empty() || within_width { + // This is either the first unit, or we're still within + // the width, so add this unit + unaligned += &buffer; + } else { + // We've exceeded the width, so bail early + break; + } + } + } + + let needed_fill = f + .width() + .unwrap_or_default() + .saturating_sub(unaligned.len()); + let (left_fill, right_fill) = match f.align() { + Some(std::fmt::Alignment::Left) | None => (0, needed_fill), + Some(std::fmt::Alignment::Right) => (needed_fill, 0), + Some(std::fmt::Alignment::Center) => { + let half_fill = needed_fill / 2; + (half_fill, needed_fill.saturating_sub(half_fill)) + } + }; + + for _ in 0..left_fill { + f.write_char(f.fill())?; + } + f.write_str(&unaligned)?; + for _ in 0..right_fill { + f.write_char(f.fill())?; + } + + Ok(()) + } +} + +fn split_unit(little_unit: u64, big_unit_size: u64) -> (u64, u64) { + let big_count = little_unit / big_unit_size; + let little_remainder = little_unit - (big_count * big_unit_size); + + (little_remainder, big_count) +} + +#[cfg(test)] +mod tests { + use super::DisplayDuration; + + fn display_duration_ms(ms: u64) -> DisplayDuration { + DisplayDuration(std::time::Duration::from_millis(ms)) + } + + #[test] + fn test_display_duration_basic() { + assert_eq!(display_duration_ms(0).to_string(), "0.00s",); + assert_eq!(display_duration_ms(10).to_string(), "0.01s",); + assert_eq!(display_duration_ms(99).to_string(), "0.10s",); + assert_eq!(display_duration_ms(1_010).to_string(), "1.01s"); + assert_eq!(display_duration_ms(59_990).to_string(), "59.99s"); + assert_eq!(display_duration_ms(59_999).to_string(), "60.00s"); + + assert_eq!(display_duration_ms(60_000).to_string(), "1m 0s"); + assert_eq!(display_duration_ms(60_999).to_string(), "1m 0s"); + assert_eq!(display_duration_ms(61_000).to_string(), "1m 1s"); + assert_eq!(display_duration_ms(120_000).to_string(), "2m 0s"); + assert_eq!(display_duration_ms(120_000).to_string(), "2m 0s"); + + assert_eq!(display_duration_ms(3_599_999).to_string(), "59m59s"); + assert_eq!(display_duration_ms(3_600_000).to_string(), "1h 0m 0s"); + assert_eq!(display_duration_ms(36_000_000).to_string(), "10h 0m 0s"); + assert_eq!(display_duration_ms(360_000_000).to_string(), "100h 0m 0s"); + } + + #[test] + fn test_display_duration_with_precision() { + assert_eq!(format!("{:.0}", display_duration_ms(2500)), "2s"); + assert_eq!(format!("{:.1}", display_duration_ms(2500)), "2s"); + assert_eq!(format!("{:.2}", display_duration_ms(2500)), "2s"); + assert_eq!(format!("{:.3}", display_duration_ms(2500)), "2s"); + assert_eq!(format!("{:.4}", display_duration_ms(2500)), "2.5s"); + assert_eq!(format!("{:.5}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:.6}", display_duration_ms(2500)), "2.50s"); + + assert_eq!(format!("{:.0}", display_duration_ms(65_000)), "1m"); + assert_eq!(format!("{:.1}", display_duration_ms(65_000)), "1m"); + assert_eq!(format!("{:.2}", display_duration_ms(65_000)), "1m"); + assert_eq!(format!("{:.3}", display_duration_ms(65_000)), "1m"); + assert_eq!(format!("{:.4}", display_duration_ms(65_000)), "1m"); + assert_eq!(format!("{:.5}", display_duration_ms(65_000)), "1m 5s"); + assert_eq!(format!("{:.6}", display_duration_ms(65_000)), "1m 5s"); + + assert_eq!(format!("{:.0}", display_duration_ms(605_000)), "10m"); + assert_eq!(format!("{:.1}", display_duration_ms(605_000)), "10m"); + assert_eq!(format!("{:.2}", display_duration_ms(605_000)), "10m"); + assert_eq!(format!("{:.3}", display_duration_ms(605_000)), "10m"); + assert_eq!(format!("{:.4}", display_duration_ms(605_000)), "10m"); + assert_eq!(format!("{:.5}", display_duration_ms(605_000)), "10m"); + assert_eq!(format!("{:.6}", display_duration_ms(605_000)), "10m 5s"); + assert_eq!(format!("{:.7}", display_duration_ms(605_000)), "10m 5s"); + + assert_eq!(format!("{:.0}", display_duration_ms(3_661_000)), "1h"); + assert_eq!(format!("{:.1}", display_duration_ms(3_661_000)), "1h"); + assert_eq!(format!("{:.2}", display_duration_ms(3_661_000)), "1h"); + assert_eq!(format!("{:.3}", display_duration_ms(3_661_000)), "1h"); + assert_eq!(format!("{:.4}", display_duration_ms(3_661_000)), "1h"); + assert_eq!(format!("{:.5}", display_duration_ms(3_661_000)), "1h 1m"); + assert_eq!(format!("{:.6}", display_duration_ms(3_661_000)), "1h 1m"); + assert_eq!(format!("{:.7}", display_duration_ms(3_661_000)), "1h 1m"); + assert_eq!(format!("{:.8}", display_duration_ms(3_661_000)), "1h 1m 1s"); + assert_eq!(format!("{:.9}", display_duration_ms(3_661_000)), "1h 1m 1s"); + + assert_eq!(format!("{:.0}", display_duration_ms(36_061_000)), "10h"); + assert_eq!(format!("{:.1}", display_duration_ms(36_061_000)), "10h"); + assert_eq!(format!("{:.2}", display_duration_ms(36_061_000)), "10h"); + assert_eq!(format!("{:.3}", display_duration_ms(36_061_000)), "10h"); + assert_eq!(format!("{:.4}", display_duration_ms(36_061_000)), "10h"); + assert_eq!(format!("{:.5}", display_duration_ms(36_061_000)), "10h"); + assert_eq!(format!("{:.6}", display_duration_ms(36_061_000)), "10h 1m"); + assert_eq!(format!("{:.7}", display_duration_ms(36_061_000)), "10h 1m"); + assert_eq!(format!("{:.8}", display_duration_ms(36_061_000)), "10h 1m"); + assert_eq!( + format!("{:.9}", display_duration_ms(36_061_000)), + "10h 1m 1s" + ); + assert_eq!( + format!("{:.10}", display_duration_ms(36_061_000)), + "10h 1m 1s" + ); + } + + #[test] + fn test_display_duration_with_padding() { + assert_eq!(format!("{:0}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:1}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:2}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:3}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:4}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:5}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:6}", display_duration_ms(2500)), "2.50s "); + assert_eq!(format!("{:7}", display_duration_ms(2500)), "2.50s "); + assert_eq!(format!("{:8}", display_duration_ms(2500)), "2.50s "); + + assert_eq!(format!("{:<0}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:<1}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:<2}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:<3}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:<4}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:<5}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:<6}", display_duration_ms(2500)), "2.50s "); + assert_eq!(format!("{:<7}", display_duration_ms(2500)), "2.50s "); + assert_eq!(format!("{:<8}", display_duration_ms(2500)), "2.50s "); + + assert_eq!(format!("{:>0}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:>1}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:>2}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:>3}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:>4}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:>5}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:>6}", display_duration_ms(2500)), " 2.50s"); + assert_eq!(format!("{:>7}", display_duration_ms(2500)), " 2.50s"); + assert_eq!(format!("{:>8}", display_duration_ms(2500)), " 2.50s"); + + assert_eq!(format!("{:^0}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:^1}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:^2}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:^3}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:^4}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:^5}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:^6}", display_duration_ms(2500)), "2.50s "); + assert_eq!(format!("{:^7}", display_duration_ms(2500)), " 2.50s "); + assert_eq!(format!("{:^8}", display_duration_ms(2500)), " 2.50s "); + + assert_eq!(format!("{:-^0}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:-^1}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:-^2}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:-^3}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:-^4}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:-^5}", display_duration_ms(2500)), "2.50s"); + assert_eq!(format!("{:-^6}", display_duration_ms(2500)), "2.50s-"); + assert_eq!(format!("{:-^7}", display_duration_ms(2500)), "-2.50s-"); + assert_eq!(format!("{:-^8}", display_duration_ms(2500)), "-2.50s--"); + } +} diff --git a/crates/brioche/Cargo.toml b/crates/brioche/Cargo.toml index b0e0981..31a6a4b 100644 --- a/crates/brioche/Cargo.toml +++ b/crates/brioche/Cargo.toml @@ -11,7 +11,6 @@ cfg-if = "1.0.0" clap = { version = "4.4.11", features = ["derive"] } futures = "0.3.29" hex = "0.4.3" -human-repr = "1.1.0" reqwest = { version = "0.12.4", default-features = false, features = ["rustls-tls", "zstd", "json"] } serde = { version = "1.0.193", features = ["derive"] } serde_json = "1.0.108" diff --git a/crates/brioche/src/build.rs b/crates/brioche/src/build.rs index bfb14e3..70f4ca2 100644 --- a/crates/brioche/src/build.rs +++ b/crates/brioche/src/build.rs @@ -1,9 +1,11 @@ use std::{path::PathBuf, process::ExitCode}; use anyhow::Context as _; -use brioche_core::{fs_utils, project::ProjectLocking, reporter::ConsoleReporterKind}; +use brioche_core::{ + fs_utils, project::ProjectLocking, reporter::console::ConsoleReporterKind, + utils::DisplayDuration, +}; use clap::Parser; -use human_repr::HumanDuration; use tracing::Instrument; #[derive(Debug, Parser)] @@ -47,8 +49,7 @@ pub struct BuildArgs { pub async fn build(args: BuildArgs) -> anyhow::Result { let (reporter, mut guard) = - brioche_core::reporter::start_console_reporter(ConsoleReporterKind::Auto)?; - reporter.set_is_evaluating(true); + brioche_core::reporter::console::start_console_reporter(ConsoleReporterKind::Auto)?; let brioche = brioche_core::BriocheBuilder::new(reporter.clone()) .keep_temps(args.keep_temps) @@ -108,7 +109,6 @@ pub async fn build(args: BuildArgs) -> anyhow::Result { ) .await?; - reporter.set_is_evaluating(false); let artifact = brioche_core::bake::bake( &brioche, recipe, @@ -121,7 +121,7 @@ pub async fn build(args: BuildArgs) -> anyhow::Result { guard.shutdown_console().await; - let elapsed = reporter.elapsed().human_duration(); + let elapsed = DisplayDuration(reporter.elapsed()); let num_jobs = reporter.num_jobs(); let jobs_message = match num_jobs { 0 => "(no new jobs)".to_string(), @@ -166,7 +166,7 @@ pub async fn build(args: BuildArgs) -> anyhow::Result { num_new_bakes, } = brioche_core::sync::wait_for_in_progress_syncs(&brioche).await?; - let wait_duration = wait_start.elapsed().human_duration(); + let wait_duration = DisplayDuration(wait_start.elapsed()); println!("In-progress sync waited for {wait_duration} and synced:"); println!(" {num_new_blobs} blobs"); println!(" {num_new_recipes} recipes"); @@ -176,7 +176,7 @@ pub async fn build(args: BuildArgs) -> anyhow::Result { let sync_start = std::time::Instant::now(); brioche_core::sync::sync_project(&brioche, project_hash, &args.export).await?; - let sync_duration = sync_start.elapsed().human_duration(); + let sync_duration = DisplayDuration(sync_start.elapsed()); println!("Finished sync in {sync_duration}"); } diff --git a/crates/brioche/src/check.rs b/crates/brioche/src/check.rs index 6d62fa2..915ade6 100644 --- a/crates/brioche/src/check.rs +++ b/crates/brioche/src/check.rs @@ -5,7 +5,7 @@ use brioche_core::project::ProjectHash; use brioche_core::project::ProjectLocking; use brioche_core::project::ProjectValidation; use brioche_core::project::Projects; -use brioche_core::reporter::ConsoleReporterKind; +use brioche_core::reporter::console::ConsoleReporterKind; use brioche_core::reporter::Reporter; use brioche_core::Brioche; use clap::Parser; @@ -25,7 +25,7 @@ pub struct CheckArgs { pub async fn check(args: CheckArgs) -> anyhow::Result { let (reporter, mut guard) = - brioche_core::reporter::start_console_reporter(ConsoleReporterKind::Auto)?; + brioche_core::reporter::console::start_console_reporter(ConsoleReporterKind::Auto)?; let brioche = brioche_core::BriocheBuilder::new(reporter.clone()) .build() diff --git a/crates/brioche/src/format.rs b/crates/brioche/src/format.rs index 0444828..2c2030b 100644 --- a/crates/brioche/src/format.rs +++ b/crates/brioche/src/format.rs @@ -2,7 +2,7 @@ use std::{path::PathBuf, process::ExitCode}; use brioche_core::{ project::{ProjectHash, ProjectLocking, ProjectValidation, Projects}, - reporter::{ConsoleReporterKind, Reporter}, + reporter::{console::ConsoleReporterKind, Reporter}, }; use clap::Parser; use tracing::Instrument; @@ -22,7 +22,7 @@ pub struct FormatArgs { pub async fn format(args: FormatArgs) -> anyhow::Result { let (reporter, mut guard) = - brioche_core::reporter::start_console_reporter(ConsoleReporterKind::Auto)?; + brioche_core::reporter::console::start_console_reporter(ConsoleReporterKind::Auto)?; let brioche = brioche_core::BriocheBuilder::new(reporter.clone()) .build() diff --git a/crates/brioche/src/install.rs b/crates/brioche/src/install.rs index 53e28f1..107fd72 100644 --- a/crates/brioche/src/install.rs +++ b/crates/brioche/src/install.rs @@ -6,11 +6,11 @@ use brioche_core::project::ProjectHash; use brioche_core::project::ProjectLocking; use brioche_core::project::ProjectValidation; use brioche_core::project::Projects; -use brioche_core::reporter::ConsoleReporterKind; +use brioche_core::reporter::console::ConsoleReporterKind; use brioche_core::reporter::Reporter; +use brioche_core::utils::DisplayDuration; use brioche_core::Brioche; use clap::Parser; -use human_repr::HumanDuration; use tracing::Instrument; use crate::consolidate_result; @@ -35,7 +35,7 @@ pub struct InstallArgs { pub async fn install(args: InstallArgs) -> anyhow::Result { let (reporter, mut guard) = - brioche_core::reporter::start_console_reporter(ConsoleReporterKind::Auto)?; + brioche_core::reporter::console::start_console_reporter(ConsoleReporterKind::Auto)?; let brioche = brioche_core::BriocheBuilder::new(reporter.clone()) .build() @@ -86,9 +86,6 @@ pub async fn install(args: InstallArgs) -> anyhow::Result { ) .await; - // Ensure the reporter is no longer evaluating, in case of an error - reporter.set_is_evaluating(false); - consolidate_result(&reporter, &project_name, result, &mut error_result); } Err(e) => { @@ -121,9 +118,6 @@ pub async fn install(args: InstallArgs) -> anyhow::Result { ) .await; - // Ensure the reporter is no longer evaluating, in case of an error - reporter.set_is_evaluating(false); - consolidate_result(&reporter, &project_name, result, &mut error_result); } Err(e) => { @@ -158,8 +152,6 @@ async fn run_install( options: &InstallOptions, ) -> Result { async { - reporter.set_is_evaluating(true); - // If the `--locked` flag is used, validate that all lockfiles are // up-to-date. Otherwise, write any out-of-date lockfiles if options.locked { @@ -200,8 +192,6 @@ async fn run_install( brioche_core::script::evaluate::evaluate(brioche, projects, project_hash, export) .await?; - reporter.set_is_evaluating(false); - let artifact = brioche_core::bake::bake( brioche, recipe, @@ -212,7 +202,7 @@ async fn run_install( ) .await?; - let elapsed = reporter.elapsed().human_duration(); + let elapsed = DisplayDuration(reporter.elapsed()); let num_jobs = reporter.num_jobs(); let jobs_message = match num_jobs { 0 => "(no new jobs)".to_string(), diff --git a/crates/brioche/src/main.rs b/crates/brioche/src/main.rs index 8f64e16..c06a2df 100644 --- a/crates/brioche/src/main.rs +++ b/crates/brioche/src/main.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, path::PathBuf, process::ExitCode, sync::Arc}; use brioche_core::{ project::{ProjectLocking, ProjectValidation}, - reporter::ConsoleReporterKind, + reporter::console::ConsoleReporterKind, }; use clap::Parser; @@ -193,7 +193,7 @@ async fn export_project(args: ExportProjectArgs) -> anyhow::Result<()> { } let (reporter, mut guard) = - brioche_core::reporter::start_console_reporter(ConsoleReporterKind::Plain)?; + brioche_core::reporter::console::start_console_reporter(ConsoleReporterKind::Plain)?; let brioche = brioche_core::BriocheBuilder::new(reporter).build().await?; let projects = brioche_core::project::Projects::default(); diff --git a/crates/brioche/src/publish.rs b/crates/brioche/src/publish.rs index b103326..b6b4d02 100644 --- a/crates/brioche/src/publish.rs +++ b/crates/brioche/src/publish.rs @@ -2,7 +2,7 @@ use std::{path::PathBuf, process::ExitCode}; use brioche_core::{ project::{ProjectHash, ProjectLocking, ProjectValidation, Projects}, - reporter::{ConsoleReporterKind, Reporter}, + reporter::{console::ConsoleReporterKind, Reporter}, Brioche, }; use clap::Parser; @@ -18,7 +18,7 @@ pub struct PublishArgs { pub async fn publish(args: PublishArgs) -> anyhow::Result { let (reporter, mut guard) = - brioche_core::reporter::start_console_reporter(ConsoleReporterKind::Auto)?; + brioche_core::reporter::console::start_console_reporter(ConsoleReporterKind::Auto)?; let brioche = brioche_core::BriocheBuilder::new(reporter.clone()) .build() diff --git a/crates/brioche/src/run.rs b/crates/brioche/src/run.rs index 6eb24fd..d8026f1 100644 --- a/crates/brioche/src/run.rs +++ b/crates/brioche/src/run.rs @@ -1,9 +1,10 @@ use std::process::ExitCode; use anyhow::Context as _; -use brioche_core::{project::ProjectLocking, reporter::ConsoleReporterKind}; +use brioche_core::{ + project::ProjectLocking, reporter::console::ConsoleReporterKind, utils::DisplayDuration, +}; use clap::Parser; -use human_repr::HumanDuration; use tracing::Instrument; #[derive(Debug, Parser)] @@ -44,9 +45,8 @@ pub async fn run(args: RunArgs) -> anyhow::Result { let (reporter, mut guard) = if args.quiet { brioche_core::reporter::start_null_reporter() } else { - brioche_core::reporter::start_console_reporter(ConsoleReporterKind::Auto)? + brioche_core::reporter::console::start_console_reporter(ConsoleReporterKind::Auto)? }; - reporter.set_is_evaluating(true); let brioche = brioche_core::BriocheBuilder::new(reporter.clone()) .keep_temps(args.keep_temps) @@ -105,7 +105,6 @@ pub async fn run(args: RunArgs) -> anyhow::Result { ) .await?; - reporter.set_is_evaluating(false); let artifact = brioche_core::bake::bake( &brioche, recipe, @@ -118,7 +117,7 @@ pub async fn run(args: RunArgs) -> anyhow::Result { guard.shutdown_console().await; - let elapsed = reporter.elapsed().human_duration(); + let elapsed = DisplayDuration(reporter.elapsed()); let num_jobs = reporter.num_jobs(); let jobs_message = match num_jobs { 0 => "(no new jobs)".to_string(),