Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/cargo/core/compiler/job_queue/job_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::core::compiler::future_incompat::FutureBreakageItem;
use crate::core::compiler::locking::LockKey;
use crate::core::compiler::timings::SectionTiming;
use crate::util::Queue;
use crate::util::flock::ReportBlocking;
use crate::{CargoResult, core::compiler::locking::LockManager};

use super::{Artifact, DiagDedupe, Job, JobId, Message};
Expand Down Expand Up @@ -148,11 +149,11 @@ impl<'a, 'gctx> JobState<'a, 'gctx> {
}

pub fn lock_exclusive(&self, lock: &LockKey) -> CargoResult<()> {
self.lock_manager.lock(lock)
self.lock_manager.lock(lock, self)
}

pub fn downgrade_to_shared(&self, lock: &LockKey) -> CargoResult<()> {
self.lock_manager.downgrade_to_shared(lock)
self.lock_manager.downgrade_to_shared(lock, self)
}

pub fn on_section_timing_emitted(&self, section: SectionTiming) {
Expand Down Expand Up @@ -213,3 +214,12 @@ impl<'a, 'gctx> JobState<'a, 'gctx> {
.push(Message::FutureIncompatReport(self.id, report));
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me that whether we should have a Shell trait: #t-cargo > GSoC 2026 ideas @ 💬

Though it might not be a thing that we really want. See also #13278 and #4614 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am unsure about a Shell trait. You raise a good point about it potentially becoming a big monster trait.

My thought here was that ReportBlocking is an even high level abstraction that focuses on a narrow task. Its just something that can "report a blocking message". In theory it does not even have to be a shell!

Even if we were to have a Shell trait I'd imagine the flow would be something like the following as we have to go through indirection (the Queue<Message> in JobState) to get to the shell where we eventually print the message.

flock.rs -> impl ReportBlocking -> impl Shell

impl ReportBlocking for &JobState<'_, '_> {
fn blocking(&self, msg: &str) -> CargoResult<()> {
self.messages.push_bounded(Message::Blocking {
msg: msg.to_string(),
});
Ok(())
}
}
23 changes: 22 additions & 1 deletion src/cargo/core/compiler/job_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ use crate::util::context::WarningHandling;
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
use crate::util::errors::AlreadyPrintedError;
use crate::util::machine_message::{self, Message as _};
use crate::util::{self, internal};
use crate::util::{self, internal, style};
use crate::util::{DependencyQueue, GlobalContext, Progress, ProgressStyle, Queue};

/// This structure is backed by the `DependencyQueue` type and manages the
Expand Down Expand Up @@ -320,6 +320,20 @@ impl<'gctx> DiagDedupe<'gctx> {
shell.err().write_all(b"\n")?;
Ok(true)
}

/// Emits a flock blocking message
///
/// Returns `true` if the message was emitted, or `false` if it was
/// suppressed for being a duplicate.
fn emit_blocking(&self, msg: &str) -> CargoResult<bool> {
let h = util::hash_u64(msg);
if !self.seen.borrow_mut().insert(h) {
return Ok(false);
}
let mut shell = self.gctx.shell();
shell.status_with_color("Blocking", &msg, &style::NOTE)?;
Ok(true)
}
}

/// Possible artifacts that can be produced by compilations, used as edge values
Expand Down Expand Up @@ -371,6 +385,10 @@ enum Message {
warning: String,
},

Blocking {
msg: String,
},

FixDiagnostic(diagnostic_server::Message),
Token(io::Result<Acquired>),
Finish(JobId, Artifact, CargoResult<()>),
Expand Down Expand Up @@ -643,6 +661,9 @@ impl<'gctx> DrainState<'gctx> {
let fixable = false;
self.bump_warning_count(id, lint, emitted, fixable);
}
Message::Blocking { msg } => {
self.diag_dedupe.emit_blocking(&msg)?;
}
Message::WarningCount {
id,
lint,
Expand Down
55 changes: 40 additions & 15 deletions src/cargo/core/compiler/locking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
use crate::{
CargoResult,
core::compiler::{BuildRunner, Unit},
util::{FileLock, Filesystem},
util::{
FileLock, Filesystem,
flock::{self, ReportBlocking},
interning::InternedString,
},
};
use anyhow::bail;
use std::{
Expand Down Expand Up @@ -46,23 +50,26 @@ impl LockManager {
lock.file().lock_shared()?;
} else {
let fs = Filesystem::new(key.0.clone());
let lock_msg = format!(
"{} ({})",
unit.pkg.name(),
build_runner.files().unit_hash(unit)
);
let lock_msg = key.msg();
let lock = fs.open_ro_shared_create(&key.0, build_runner.bcx.gctx, &lock_msg)?;
locks.insert(key.clone(), lock);
}

Ok(key)
}

#[instrument(skip(self))]
pub fn lock(&self, key: &LockKey) -> CargoResult<()> {
#[instrument(skip(self, report_blocking))]
pub fn lock(&self, key: &LockKey, report_blocking: impl ReportBlocking) -> CargoResult<()> {
let mut locks = self.locks.lock().unwrap();
if let Some(lock) = locks.get_mut(&key) {
lock.file().lock()?;
let file = lock.file();

flock::acquire(
report_blocking,
&key.msg(),
&key.0,
&|| file.try_lock(),
&|| file.lock(),
)?;
} else {
bail!("lock was not found in lock manager: {key}");
}
Expand All @@ -71,13 +78,24 @@ impl LockManager {
}

/// Upgrades an existing exclusive lock into a shared lock.
#[instrument(skip(self))]
pub fn downgrade_to_shared(&self, key: &LockKey) -> CargoResult<()> {
#[instrument(skip(self, report_blocking))]
pub fn downgrade_to_shared(
&self,
key: &LockKey,
report_blocking: impl ReportBlocking,
) -> CargoResult<()> {
let mut locks = self.locks.lock().unwrap();
let Some(lock) = locks.get_mut(key) else {
bail!("lock was not found in lock manager: {key}");
};
lock.file().lock_shared()?;
let file = lock.file();
flock::acquire(
report_blocking,
&key.msg(),
&key.0,
&|| file.try_lock_shared(),
&|| file.lock_shared(),
)?;
Ok(())
}

Expand All @@ -93,11 +111,18 @@ impl LockManager {
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct LockKey(PathBuf);
pub struct LockKey(PathBuf, InternedString, String);

impl LockKey {
fn from_unit(build_runner: &BuildRunner<'_, '_>, unit: &Unit) -> Self {
Self(build_runner.files().build_unit_lock(unit))
let name = unit.pkg.name();
let hash = build_runner.files().unit_hash(unit);
let path = build_runner.files().build_unit_lock(unit);
Self(path, name, hash)
}

fn msg(&self) -> String {
format!("{} ({})", self.1, self.2)
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/cargo/util/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ use crate::sources::CRATES_IO_REGISTRY;
use crate::util::OnceExt as _;
use crate::util::cache_lock::{CacheLock, CacheLockMode, CacheLocker};
use crate::util::errors::CargoResult;
use crate::util::flock::ReportBlocking;
use crate::util::network::http::configure_http_handle;
use crate::util::network::http::http_handle;
use crate::util::restricted_names::is_glob_pattern;
use crate::util::{CanonicalUrl, closest_msg, internal};
use crate::util::{Filesystem, IntoUrl, IntoUrlWithBase, Rustc};
use crate::util::{Filesystem, IntoUrl, IntoUrlWithBase, Rustc, style};

use annotate_snippets::Level;
use anyhow::{Context as _, anyhow, bail, format_err};
Expand Down Expand Up @@ -2123,6 +2124,13 @@ impl GlobalContext {
}
}

impl ReportBlocking for &GlobalContext {
fn blocking(&self, msg: &str) -> CargoResult<()> {
self.shell()
.status_with_color("Blocking", &msg, &style::NOTE)
}
}

pub fn homedir(cwd: &Path) -> Option<PathBuf> {
::home::cargo_home_with_cwd(cwd).ok()
}
Expand Down
50 changes: 28 additions & 22 deletions src/cargo/util/flock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use std::io;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Display, Path, PathBuf};

use crate::util::GlobalContext;
use crate::util::errors::CargoResult;
use crate::util::style;
use anyhow::Context as _;
use cargo_util::paths;

Expand Down Expand Up @@ -223,14 +221,14 @@ impl Filesystem {
/// This function will create a file at `path` if it doesn't already exist
/// (including intermediate directories), and then it will acquire an
/// exclusive lock on `path`. If the process must block waiting for the
/// lock, the `msg` is printed to [`GlobalContext`].
/// lock, the `msg` is shown to the user via [`ReportBlocking`].
///
/// The returned file can be accessed to look at the path and also has
/// read/write access to the underlying file.
pub fn open_rw_exclusive_create<P>(
&self,
path: P,
gctx: &GlobalContext,
report_blocking: impl ReportBlocking,
msg: &str,
) -> CargoResult<FileLock>
where
Expand All @@ -239,7 +237,7 @@ impl Filesystem {
let mut opts = OpenOptions::new();
opts.read(true).write(true).create(true);
let (path, f) = self.open(path.as_ref(), &opts, true)?;
acquire(gctx, msg, &path, &|| f.try_lock(), &|| f.lock())?;
acquire(report_blocking, msg, &path, &|| f.try_lock(), &|| f.lock())?;
Ok(FileLock { f: Some(f), path })
}

Expand All @@ -265,24 +263,28 @@ impl Filesystem {
///
/// This function will fail if `path` doesn't already exist, but if it does
/// then it will acquire a shared lock on `path`. If the process must block
/// waiting for the lock, the `msg` is printed to [`GlobalContext`].
/// waiting for the lock, the `msg` is shown to the user via [`ReportBlocking`].
///
/// The returned file can be accessed to look at the path and also has read
/// access to the underlying file. Any writes to the file will return an
/// error.
pub fn open_ro_shared<P>(
&self,
path: P,
gctx: &GlobalContext,
report_blocking: impl ReportBlocking,
msg: &str,
) -> CargoResult<FileLock>
where
P: AsRef<Path>,
{
let (path, f) = self.open(path.as_ref(), &OpenOptions::new().read(true), false)?;
acquire(gctx, msg, &path, &|| f.try_lock_shared(), &|| {
f.lock_shared()
})?;
acquire(
report_blocking,
msg,
&path,
&|| f.try_lock_shared(),
&|| f.lock_shared(),
)?;
Ok(FileLock { f: Some(f), path })
}

Expand All @@ -294,15 +296,19 @@ impl Filesystem {
pub fn open_ro_shared_create<P: AsRef<Path>>(
&self,
path: P,
gctx: &GlobalContext,
report_blocking: impl ReportBlocking,
msg: &str,
) -> CargoResult<FileLock> {
let mut opts = OpenOptions::new();
opts.read(true).write(true).create(true);
let (path, f) = self.open(path.as_ref(), &opts, true)?;
acquire(gctx, msg, &path, &|| f.try_lock_shared(), &|| {
f.lock_shared()
})?;
acquire(
report_blocking,
msg,
&path,
&|| f.try_lock_shared(),
&|| f.lock_shared(),
)?;
Ok(FileLock { f: Some(f), path })
}

Expand Down Expand Up @@ -400,27 +406,23 @@ fn try_acquire(path: &Path, lock_try: &dyn Fn() -> Result<(), TryLockError>) ->
/// This function will acquire the lock on a `path`, printing out a nice message
/// to the console if we have to wait for it. It will first attempt to use `try`
/// to acquire a lock on the crate, and in the case of contention it will emit a
/// status message based on `msg` to [`GlobalContext`]'s shell, and then use `block` to
/// status message based on `msg` to [`ReportBlocking`], and then use `block` to
/// block waiting to acquire a lock.
///
/// Returns an error if the lock could not be acquired or if any error other
/// than a contention error happens.
fn acquire(
gctx: &GlobalContext,
pub fn acquire(
report_blocking: impl ReportBlocking,
msg: &str,
path: &Path,
lock_try: &dyn Fn() -> Result<(), TryLockError>,
lock_block: &dyn Fn() -> io::Result<()>,
) -> CargoResult<()> {
// Ensure `shell` is not already in use,
// regardless of whether we hit contention or not
gctx.debug_assert_shell_not_borrowed();
if try_acquire(path, lock_try)? {
return Ok(());
}
let msg = format!("waiting for file lock on {}", msg);
gctx.shell()
.status_with_color("Blocking", &msg, &style::NOTE)?;
report_blocking.blocking(&msg)?;

lock_block().with_context(|| format!("failed to lock file: {}", path.display()))?;
Ok(())
Expand Down Expand Up @@ -469,3 +471,7 @@ fn error_unsupported(err: &std::io::Error) -> bool {
_ => err.kind() == std::io::ErrorKind::Unsupported,
}
}

pub trait ReportBlocking {
fn blocking(&self, msg: &str) -> CargoResult<()>;
}