From 2cd4af1a56e6fdd3c50202705d5afebf1a2df9d9 Mon Sep 17 00:00:00 2001 From: joboet Date: Sun, 19 Jan 2025 20:41:35 +0100 Subject: [PATCH 1/8] shim Apple's futex primitives This is necessary to unblock rust-lang/rust#122408. The documentation for these is available [here](https://developer.apple.com/documentation/os/os_sync_wait_on_address?language=objc). Because the futex wait operations (`os_sync_wait_on_address` et al.) return the number of remaining waiters after returning, this required some changes to the common futex infrastructure, which I've changed to take a callback instead of precalculating the return values. --- src/concurrency/sync.rs | 31 ++-- src/shims/unix/linux_like/sync.rs | 18 +- src/shims/unix/macos/foreign_items.rs | 61 ++++++- src/shims/unix/macos/sync.rs | 207 +++++++++++++++++++++ src/shims/windows/sync.rs | 21 ++- tests/pass-dep/concurrency/apple-futex.rs | 211 ++++++++++++++++++++++ 6 files changed, 521 insertions(+), 28 deletions(-) create mode 100644 tests/pass-dep/concurrency/apple-futex.rs diff --git a/src/concurrency/sync.rs b/src/concurrency/sync.rs index 14c72e9398..9d573c4927 100644 --- a/src/concurrency/sync.rs +++ b/src/concurrency/sync.rs @@ -140,6 +140,12 @@ struct Futex { #[derive(Default, Clone)] pub struct FutexRef(Rc>); +impl FutexRef { + pub fn waiters(&self) -> usize { + self.0.borrow().waiters.len() + } +} + impl VisitProvenance for FutexRef { fn visit_provenance(&self, _visit: &mut VisitWith<'_>) { // No provenance in `Futex`. @@ -729,17 +735,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { } /// Wait for the futex to be signaled, or a timeout. - /// On a signal, `retval_succ` is written to `dest`. - /// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error. + /// * On a signal, `retval_succ` is called with the number of waiters on the + /// futex and its result is written to `dest`. + /// * On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` + /// is set as the last error. fn futex_wait( &mut self, futex_ref: FutexRef, bitset: u32, timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>, - retval_succ: Scalar, - retval_timeout: Scalar, - dest: MPlaceTy<'tcx>, - errno_timeout: IoError, + callback: DynUnblockCallback<'tcx>, ) { let this = self.eval_context_mut(); let thread = this.active_thread(); @@ -755,10 +760,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { callback!( @capture<'tcx> { futex_ref: FutexRef, - retval_succ: Scalar, - retval_timeout: Scalar, - dest: MPlaceTy<'tcx>, - errno_timeout: IoError, + callback: DynUnblockCallback<'tcx>, } |this, unblock: UnblockKind| { match unblock { @@ -768,21 +770,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { if let Some(data_race) = &this.machine.data_race { data_race.acquire_clock(&futex.clock, &this.machine.threads); } - // Write the return value. - this.write_scalar(retval_succ, &dest)?; - interp_ok(()) }, UnblockKind::TimedOut => { // Remove the waiter from the futex. let thread = this.active_thread(); let mut futex = futex_ref.0.borrow_mut(); futex.waiters.retain(|waiter| waiter.thread != thread); - // Set errno and write return value. - this.set_last_error(errno_timeout)?; - this.write_scalar(retval_timeout, &dest)?; - interp_ok(()) }, } + + callback.call(this, unblock) } ), ); diff --git a/src/shims/unix/linux_like/sync.rs b/src/shims/unix/linux_like/sync.rs index 8369811336..40af1ee6a7 100644 --- a/src/shims/unix/linux_like/sync.rs +++ b/src/shims/unix/linux_like/sync.rs @@ -158,14 +158,24 @@ pub fn futex<'tcx>( .futex .clone(); + let dest = dest.clone(); ecx.futex_wait( futex_ref, bitset, timeout, - Scalar::from_target_isize(0, ecx), // retval_succ - Scalar::from_target_isize(-1, ecx), // retval_timeout - dest.clone(), - LibcError("ETIMEDOUT"), // errno_timeout + callback!( + @capture<'tcx> { + dest: MPlaceTy<'tcx>, + } + |ecx, unblock: UnblockKind| match unblock { + UnblockKind::Ready => { + ecx.write_scalar(Scalar::from_target_isize(0, ecx), &dest) + } + UnblockKind::TimedOut => { + ecx.set_last_error_and_return(LibcError("ETIMEDOUT"), &dest) + } + } + ), ); } else { // The futex value doesn't match the expected value, so we return failure diff --git a/src/shims/unix/macos/foreign_items.rs b/src/shims/unix/macos/foreign_items.rs index 37fb6c1919..e204a68440 100644 --- a/src/shims/unix/macos/foreign_items.rs +++ b/src/shims/unix/macos/foreign_items.rs @@ -2,12 +2,20 @@ use rustc_middle::ty::Ty; use rustc_span::Symbol; use rustc_target::callconv::{Conv, FnAbi}; -use super::sync::EvalContextExt as _; +use super::sync::{EvalContextExt as _, MacOsFutexTimeout}; use crate::shims::unix::*; use crate::*; -pub fn is_dyn_sym(_name: &str) -> bool { - false +pub fn is_dyn_sym(name: &str) -> bool { + match name { + // These only became available with macOS 11.0, so std looks them up dynamically. + "os_sync_wait_on_address" + | "os_sync_wait_on_address_with_deadline" + | "os_sync_wait_on_address_with_timeout" + | "os_sync_wake_by_address_any" + | "os_sync_wake_by_address_all" => true, + _ => false, + } } impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {} @@ -214,6 +222,53 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { this.write_scalar(res, dest)?; } + "os_sync_wait_on_address" => { + let [addr_op, value_op, size_op, flags_op] = + this.check_shim(abi, Conv::C, link_name, args)?; + this.os_sync_wait_on_address( + addr_op, + value_op, + size_op, + flags_op, + MacOsFutexTimeout::None, + dest, + )?; + } + "os_sync_wait_on_address_with_deadline" => { + let [addr_op, value_op, size_op, flags_op, clock_op, timeout_op] = + this.check_shim(abi, Conv::C, link_name, args)?; + this.os_sync_wait_on_address( + addr_op, + value_op, + size_op, + flags_op, + MacOsFutexTimeout::Absolute { clock_op, timeout_op }, + dest, + )?; + } + "os_sync_wait_on_address_with_timeout" => { + let [addr_op, value_op, size_op, flags_op, clock_op, timeout_op] = + this.check_shim(abi, Conv::C, link_name, args)?; + this.os_sync_wait_on_address( + addr_op, + value_op, + size_op, + flags_op, + MacOsFutexTimeout::Relative { clock_op, timeout_op }, + dest, + )?; + } + "os_sync_wake_by_address_any" => { + let [addr_op, size_op, flags_op] = + this.check_shim(abi, Conv::C, link_name, args)?; + this.os_sync_wake_by_address(addr_op, size_op, flags_op, false, dest)?; + } + "os_sync_wake_by_address_all" => { + let [addr_op, size_op, flags_op] = + this.check_shim(abi, Conv::C, link_name, args)?; + this.os_sync_wake_by_address(addr_op, size_op, flags_op, true, dest)?; + } + "os_unfair_lock_lock" => { let [lock_op] = this.check_shim(abi, Conv::C, link_name, args)?; this.os_unfair_lock_lock(lock_op)?; diff --git a/src/shims/unix/macos/sync.rs b/src/shims/unix/macos/sync.rs index 5442d38d52..810eb56ebf 100644 --- a/src/shims/unix/macos/sync.rs +++ b/src/shims/unix/macos/sync.rs @@ -10,8 +10,12 @@ //! and we do not detect copying of the lock, but macOS doesn't guarantee anything //! in that case either. +use std::cell::Cell; +use std::time::Duration; + use rustc_abi::Size; +use crate::concurrency::sync::FutexRef; use crate::*; #[derive(Clone)] @@ -20,6 +24,24 @@ enum MacOsUnfairLock { Active { mutex_ref: MutexRef }, } +pub enum MacOsFutexTimeout<'a, 'tcx> { + None, + Relative { clock_op: &'a OpTy<'tcx>, timeout_op: &'a OpTy<'tcx> }, + Absolute { clock_op: &'a OpTy<'tcx>, timeout_op: &'a OpTy<'tcx> }, +} + +/// Metadata for a macOS futex. +/// +/// Since macOS 11.0, Apple has exposed the previously private futex API consisting +/// of `os_sync_wait_on_address` (and friends) and `os_sync_wake_by_address_{any, all}`. +/// These work with different value sizes and flags, which are validated to be consistent. +/// This structure keeps track of both the futex queue and these values. +struct MacOsFutex { + futex: FutexRef, + size: Cell, + shared: Cell, +} + impl<'tcx> EvalContextExtPriv<'tcx> for crate::MiriInterpCx<'tcx> {} trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> { fn os_unfair_lock_get_data<'a>( @@ -54,6 +76,191 @@ trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> { impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {} pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { + /// Implements [`os_sync_wait_on_address`], [`os_sync_wait_on_address_with_deadline`] + /// and [`os_sync_wait_on_address_with_timeout`]. + /// + /// [`os_sync_wait_on_address`]: https://developer.apple.com/documentation/os/os_sync_wait_on_address?language=objc + /// [`os_sync_wait_on_address_with_deadline`]: https://developer.apple.com/documentation/os/os_sync_wait_on_address_with_deadline?language=objc + /// [`os_sync_wait_on_address_with_timeout`]: https://developer.apple.com/documentation/os/os_sync_wait_on_address_with_timeout?language=objc + fn os_sync_wait_on_address( + &mut self, + addr_op: &OpTy<'tcx>, + value_op: &OpTy<'tcx>, + size_op: &OpTy<'tcx>, + flags_op: &OpTy<'tcx>, + timeout: MacOsFutexTimeout<'_, 'tcx>, + dest: &MPlaceTy<'tcx>, + ) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + let none = this.eval_libc_u32("OS_SYNC_WAIT_ON_ADDRESS_NONE"); + let shared = this.eval_libc_u32("OS_SYNC_WAIT_ON_ADDRESS_SHARED"); + let absolute_clock = this.eval_libc_u32("OS_CLOCK_MACH_ABSOLUTE_TIME"); + + let ptr = this.read_pointer(addr_op)?; + let value = this.read_scalar(value_op)?.to_u64()?; + let size = this.read_target_usize(size_op)?; + let flags = this.read_scalar(flags_op)?.to_u32()?; + + let clock_timeout = match timeout { + MacOsFutexTimeout::None => None, + MacOsFutexTimeout::Relative { clock_op, timeout_op } => { + let clock = this.read_scalar(clock_op)?.to_u32()?; + let timeout = this.read_scalar(timeout_op)?.to_u64()?; + Some((clock, TimeoutAnchor::Relative, timeout)) + } + MacOsFutexTimeout::Absolute { clock_op, timeout_op } => { + let clock = this.read_scalar(clock_op)?.to_u32()?; + let timeout = this.read_scalar(timeout_op)?.to_u64()?; + Some((clock, TimeoutAnchor::Absolute, timeout)) + } + }; + + // Perform validation of the arguments. + let addr = ptr.addr().bytes(); + if addr == 0 + || addr % size != 0 + || !matches!(size, 4 | 8) + || (flags != none && flags != shared) + || clock_timeout + .is_some_and(|(clock, _, timeout)| clock != absolute_clock || timeout == 0) + { + this.set_last_error_and_return(LibcError("EINVAL"), dest)?; + return interp_ok(()); + } + + let is_shared = flags == shared; + let timeout = clock_timeout.map(|(_, anchor, timeout)| { + (TimeoutClock::Monotonic, anchor, Duration::from_nanos(timeout)) + }); + + // See the Linux futex implementation for why this fence exists. + this.atomic_fence(AtomicFenceOrd::SeqCst)?; + + let layout = this.machine.layouts.uint(Size::from_bytes(size)).unwrap(); + let futex_val = this + .read_scalar_atomic(&this.ptr_to_mplace(ptr, layout), AtomicReadOrd::Relaxed)? + .to_bits(Size::from_bytes(size))?; + + let Some(futex) = this.get_sync_or_init(ptr, |_| { + MacOsFutex { + futex: Default::default(), + size: Cell::new(size), + shared: Cell::new(is_shared), + } + }) else { + this.set_last_error_and_return(LibcError("ENOMEM"), dest)?; + return interp_ok(()); + }; + + // Detect mismatches between the flags and sizes used on this address + // by storing the parameters of the first waiter in a batch of waiters. + if futex.futex.waiters() == 0 { + futex.size.set(size); + futex.shared.set(is_shared); + } else if futex.size.get() != size || futex.shared.get() != is_shared { + this.set_last_error_and_return(LibcError("EINVAL"), dest)?; + return interp_ok(()); + } + + if futex_val == value as u128 { + // If the values are the same, we have to block. + let futex_ref = futex.futex.clone(); + let dest = dest.clone(); + this.futex_wait( + futex_ref.clone(), + u32::MAX, // bitset + timeout, + callback!( + @capture<'tcx> { + dest: MPlaceTy<'tcx>, + futex_ref: FutexRef, + } + |this, unblock: UnblockKind| { + match unblock { + UnblockKind::Ready => { + let remaining = futex_ref.waiters(); + this.write_scalar(Scalar::from_i32(remaining as i32), &dest) + } + UnblockKind::TimedOut => { + this.set_last_error_and_return(LibcError("ETIMEDOUT"), &dest) + } + } + } + ), + ); + } else { + // else retrieve the current number of waiters. + let waiters = futex.futex.waiters() as i32; + this.write_scalar(Scalar::from_i32(waiters), dest)?; + } + + interp_ok(()) + } + + /// Implements [`os_sync_wake_by_address_all`] and [`os_sync_wake_by_address_any`]. + /// + /// [`os_sync_wake_by_address_all`]: https://developer.apple.com/documentation/os/os_sync_wake_by_address_all?language=objc + /// [`os_sync_wake_by_address_any`]: https://developer.apple.com/documentation/os/os_sync_wake_by_address_any?language=objc + fn os_sync_wake_by_address( + &mut self, + addr_op: &OpTy<'tcx>, + size_op: &OpTy<'tcx>, + flags_op: &OpTy<'tcx>, + all: bool, + dest: &MPlaceTy<'tcx>, + ) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + let none = this.eval_libc_u32("OS_SYNC_WAKE_BY_ADDRESS_NONE"); + let shared = this.eval_libc_u32("OS_SYNC_WAKE_BY_ADDRESS_SHARED"); + + let ptr = this.read_pointer(addr_op)?; + let size = this.read_target_usize(size_op)?; + let flags = this.read_scalar(flags_op)?.to_u32()?; + + // Perform validation of the arguments. + let addr = ptr.addr().bytes(); + if addr == 0 || !matches!(size, 4 | 8) || (flags != none && flags != shared) { + this.set_last_error_and_return(LibcError("EINVAL"), dest)?; + return interp_ok(()); + } + + let is_shared = flags == shared; + + let Some(futex) = this.get_sync_or_init(ptr, |_| { + MacOsFutex { + futex: Default::default(), + size: Cell::new(size), + shared: Cell::new(is_shared), + } + }) else { + this.set_last_error_and_return(LibcError("ENOENT"), dest)?; + return interp_ok(()); + }; + + if futex.futex.waiters() == 0 { + this.set_last_error_and_return(LibcError("ENOENT"), dest)?; + return interp_ok(()); + // Detect mismatches between the flags and sizes used on this address. + } else if futex.size.get() != size || futex.shared.get() != is_shared { + this.set_last_error_and_return(LibcError("EINVAL"), dest)?; + return interp_ok(()); + } + + let futex_ref = futex.futex.clone(); + + // See the Linux futex implementation for why this fence exists. + this.atomic_fence(AtomicFenceOrd::SeqCst)?; + + if all { + while this.futex_wake(&futex_ref, u32::MAX)? {} + } else { + this.futex_wake(&futex_ref, u32::MAX)?; + } + + this.write_scalar(Scalar::from_i32(0), dest)?; + interp_ok(()) + } + fn os_unfair_lock_lock(&mut self, lock_op: &OpTy<'tcx>) -> InterpResult<'tcx> { let this = self.eval_context_mut(); diff --git a/src/shims/windows/sync.rs b/src/shims/windows/sync.rs index 808a7786eb..079599478a 100644 --- a/src/shims/windows/sync.rs +++ b/src/shims/windows/sync.rs @@ -212,14 +212,27 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { .futex .clone(); + let dest = dest.clone(); this.futex_wait( futex_ref, u32::MAX, // bitset timeout, - Scalar::from_i32(1), // retval_succ - Scalar::from_i32(0), // retval_timeout - dest.clone(), - IoError::WindowsError("ERROR_TIMEOUT"), // errno_timeout + callback!( + @capture<'tcx> { + dest: MPlaceTy<'tcx> + } + |this, unblock: UnblockKind| { + match unblock { + UnblockKind::Ready => { + this.write_scalar(Scalar::from_i32(1), &dest) + } + UnblockKind::TimedOut => { + this.set_last_error(IoError::WindowsError("ERROR_TIMEOUT"))?; + this.write_scalar(Scalar::from_i32(0), &dest) + } + } + } + ), ); } diff --git a/tests/pass-dep/concurrency/apple-futex.rs b/tests/pass-dep/concurrency/apple-futex.rs new file mode 100644 index 0000000000..adf70c54ff --- /dev/null +++ b/tests/pass-dep/concurrency/apple-futex.rs @@ -0,0 +1,211 @@ +//@only-target: darwin + +use std::time::{Duration, Instant}; +use std::{io, ptr, thread}; + +fn wake_nobody() { + let futex = 0; + + // Wake 1 waiter. Expect zero waiters woken up, as nobody is waiting. + unsafe { + assert_eq!( + libc::os_sync_wake_by_address_any( + ptr::from_ref(&futex).cast_mut().cast(), + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE + ), + -1 + ); + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ENOENT); + } +} + +fn wake_dangling() { + let futex = Box::new(0); + let ptr = ptr::from_ref(&futex).cast_mut().cast(); + drop(futex); + + // Expect error since this is now "unmapped" memory. + unsafe { + assert_eq!( + libc::os_sync_wake_by_address_any( + ptr, + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE + ), + -1 + ); + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ENOENT); + } +} + +fn wait_wrong_val() { + let futex: i32 = 123; + + // Only wait if the futex value is 456. + unsafe { + assert_eq!( + libc::os_sync_wait_on_address( + ptr::from_ref(&futex).cast_mut().cast(), + 456, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_NONE + ), + 0, + ); + } +} + +fn wait_timeout() { + let start = Instant::now(); + + let futex: i32 = 123; + + // Wait for 200ms, with nobody waking us up early. + unsafe { + assert_eq!( + libc::os_sync_wait_on_address_with_timeout( + ptr::from_ref(&futex).cast_mut().cast(), + 123, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, + libc::OS_CLOCK_MACH_ABSOLUTE_TIME, + 200_000_000, + ), + -1, + ); + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ETIMEDOUT); + } + + assert!((200..1000).contains(&start.elapsed().as_millis())); +} + +fn wait_absolute_timeout() { + let start = Instant::now(); + + // Get the current monotonic timestamp. + #[allow(deprecated)] + let mut deadline = unsafe { libc::mach_absolute_time() }; + + // Add 200ms (note: this only works under miri because it uses nanosecond units for + // mach_absolute_time). + deadline += 200_000_000; + + let futex: i32 = 123; + + // Wait for 200ms from now, with nobody waking us up early. + unsafe { + assert_eq!( + libc::os_sync_wait_on_address_with_deadline( + ptr::from_ref(&futex).cast_mut().cast(), + 123, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, + libc::OS_CLOCK_MACH_ABSOLUTE_TIME, + deadline, + ), + -1, + ); + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ETIMEDOUT); + } + + assert!((200..1000).contains(&start.elapsed().as_millis())); +} + +fn wait_wake() { + let start = Instant::now(); + + static mut FUTEX: i32 = 0; + + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(200)); + unsafe { + assert_eq!( + libc::os_sync_wake_by_address_any( + (&raw const FUTEX).cast_mut().cast(), + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, + ), + 0, + ); + } + }); + + unsafe { + assert_eq!( + libc::os_sync_wait_on_address( + (&raw const FUTEX).cast_mut().cast(), + 0, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, + ), + 0, + ); + } + + // When running this in stress-gc mode, things can take quite long. + // So the timeout is 3000 ms. + assert!((200..3000).contains(&start.elapsed().as_millis())); + t.join().unwrap(); +} + +fn param_mismatch() { + let futex = 0; + thread::scope(|s| { + s.spawn(|| { + unsafe { + assert_eq!( + libc::os_sync_wait_on_address_with_timeout( + ptr::from_ref(&futex).cast_mut().cast(), + 0, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, + libc::OS_CLOCK_MACH_ABSOLUTE_TIME, + 400_000_000, + ), + -1, + ); + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ETIMEDOUT); + } + }); + + s.spawn(|| { + thread::sleep(Duration::from_millis(200)); + unsafe { + assert_eq!( + libc::os_sync_wait_on_address( + ptr::from_ref(&futex).cast_mut().cast(), + 0, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_SHARED, + ), + -1, + ); + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::EINVAL); + } + }); + + thread::sleep(Duration::from_millis(200)); + unsafe { + assert_eq!( + libc::os_sync_wake_by_address_any( + ptr::from_ref(&futex).cast_mut().cast(), + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_SHARED, + ), + -1, + ); + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::EINVAL); + } + }); +} + +fn main() { + wake_nobody(); + wake_dangling(); + wait_wrong_val(); + wait_timeout(); + wait_absolute_timeout(); + wait_wake(); + param_mismatch(); +} From 834de5287cae14f29636bcb0dec9df7b20058cbf Mon Sep 17 00:00:00 2001 From: joboet Date: Sat, 1 Feb 2025 18:02:33 +0100 Subject: [PATCH 2/8] add and update futex comments --- src/concurrency/sync.rs | 7 ++----- src/shims/unix/macos/foreign_items.rs | 9 +++++++-- src/shims/unix/macos/sync.rs | 9 ++++++++- tests/pass-dep/concurrency/apple-futex.rs | 4 +++- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/concurrency/sync.rs b/src/concurrency/sync.rs index 9d573c4927..9cdeececb4 100644 --- a/src/concurrency/sync.rs +++ b/src/concurrency/sync.rs @@ -734,11 +734,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { interp_ok(true) } - /// Wait for the futex to be signaled, or a timeout. - /// * On a signal, `retval_succ` is called with the number of waiters on the - /// futex and its result is written to `dest`. - /// * On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` - /// is set as the last error. + /// Wait for the futex to be signaled, or a timeout. Once the thread is + /// unblocked, `callback` is called with the unblock reason. fn futex_wait( &mut self, futex_ref: FutexRef, diff --git a/src/shims/unix/macos/foreign_items.rs b/src/shims/unix/macos/foreign_items.rs index e204a68440..918fd8dd52 100644 --- a/src/shims/unix/macos/foreign_items.rs +++ b/src/shims/unix/macos/foreign_items.rs @@ -222,6 +222,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { this.write_scalar(res, dest)?; } + // Futex primitives "os_sync_wait_on_address" => { let [addr_op, value_op, size_op, flags_op] = this.check_shim(abi, Conv::C, link_name, args)?; @@ -261,12 +262,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { "os_sync_wake_by_address_any" => { let [addr_op, size_op, flags_op] = this.check_shim(abi, Conv::C, link_name, args)?; - this.os_sync_wake_by_address(addr_op, size_op, flags_op, false, dest)?; + this.os_sync_wake_by_address( + addr_op, size_op, flags_op, /* all */ false, dest, + )?; } "os_sync_wake_by_address_all" => { let [addr_op, size_op, flags_op] = this.check_shim(abi, Conv::C, link_name, args)?; - this.os_sync_wake_by_address(addr_op, size_op, flags_op, true, dest)?; + this.os_sync_wake_by_address( + addr_op, size_op, flags_op, /* all */ true, dest, + )?; } "os_unfair_lock_lock" => { diff --git a/src/shims/unix/macos/sync.rs b/src/shims/unix/macos/sync.rs index 810eb56ebf..b215f5f71a 100644 --- a/src/shims/unix/macos/sync.rs +++ b/src/shims/unix/macos/sync.rs @@ -38,7 +38,9 @@ pub enum MacOsFutexTimeout<'a, 'tcx> { /// This structure keeps track of both the futex queue and these values. struct MacOsFutex { futex: FutexRef, + /// The size in bytes of the atomic primitive underlying this futex. size: Cell, + /// Whether the futex is shared across process boundaries. shared: Cell, } @@ -153,7 +155,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { }; // Detect mismatches between the flags and sizes used on this address - // by storing the parameters of the first waiter in a batch of waiters. + // by comparing with the parameters stored with the first waiter. if futex.futex.waiters() == 0 { futex.size.set(size); futex.shared.set(is_shared); @@ -233,6 +235,11 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { shared: Cell::new(is_shared), } }) else { + // No AllocId, or no live allocation at that AllocId. Return an + // error code. (That seems nicer than silently doing something + // non-intuitive.) This means that if an address gets reused by a + // new allocation, we'll use an independent futex queue for this... + // that seems acceptable. this.set_last_error_and_return(LibcError("ENOENT"), dest)?; return interp_ok(()); }; diff --git a/tests/pass-dep/concurrency/apple-futex.rs b/tests/pass-dep/concurrency/apple-futex.rs index adf70c54ff..0cda840a6f 100644 --- a/tests/pass-dep/concurrency/apple-futex.rs +++ b/tests/pass-dep/concurrency/apple-futex.rs @@ -6,7 +6,7 @@ use std::{io, ptr, thread}; fn wake_nobody() { let futex = 0; - // Wake 1 waiter. Expect zero waiters woken up, as nobody is waiting. + // Wake 1 waiter. Expect ENOENT woken up, as nobody is waiting. unsafe { assert_eq!( libc::os_sync_wake_by_address_any( @@ -181,6 +181,7 @@ fn param_mismatch() { ), -1, ); + // This call fails because it uses the shared flag whereas the first waiter didn't. assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::EINVAL); } }); @@ -195,6 +196,7 @@ fn param_mismatch() { ), -1, ); + // This call fails because it uses the shared flag whereas the waiter didn't. assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::EINVAL); } }); From 0a28dce3128f5faef236a30dcf71981058e62793 Mon Sep 17 00:00:00 2001 From: joboet Date: Sat, 1 Feb 2025 18:14:26 +0100 Subject: [PATCH 3/8] fix handling of scalars in futex code --- src/lib.rs | 1 + src/shims/unix/linux_like/sync.rs | 2 +- src/shims/unix/macos/sync.rs | 10 +++++----- src/shims/windows/sync.rs | 4 ++-- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5d3204a527..61632cc0f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ #![feature(unqualified_local_imports)] #![feature(derive_coerce_pointee)] #![feature(arbitrary_self_types)] +#![feature(unsigned_is_multiple_of)] // Configure clippy and other lints #![allow( clippy::collapsible_else_if, diff --git a/src/shims/unix/linux_like/sync.rs b/src/shims/unix/linux_like/sync.rs index 40af1ee6a7..dcf25e438c 100644 --- a/src/shims/unix/linux_like/sync.rs +++ b/src/shims/unix/linux_like/sync.rs @@ -169,7 +169,7 @@ pub fn futex<'tcx>( } |ecx, unblock: UnblockKind| match unblock { UnblockKind::Ready => { - ecx.write_scalar(Scalar::from_target_isize(0, ecx), &dest) + ecx.write_int(0, &dest) } UnblockKind::TimedOut => { ecx.set_last_error_and_return(LibcError("ETIMEDOUT"), &dest) diff --git a/src/shims/unix/macos/sync.rs b/src/shims/unix/macos/sync.rs index b215f5f71a..c154a15cab 100644 --- a/src/shims/unix/macos/sync.rs +++ b/src/shims/unix/macos/sync.rs @@ -120,8 +120,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { // Perform validation of the arguments. let addr = ptr.addr().bytes(); if addr == 0 - || addr % size != 0 || !matches!(size, 4 | 8) + || !addr.is_multiple_of(size) || (flags != none && flags != shared) || clock_timeout .is_some_and(|(clock, _, timeout)| clock != absolute_clock || timeout == 0) @@ -164,7 +164,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { return interp_ok(()); } - if futex_val == value as u128 { + if futex_val == value.into() { // If the values are the same, we have to block. let futex_ref = futex.futex.clone(); let dest = dest.clone(); @@ -180,8 +180,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { |this, unblock: UnblockKind| { match unblock { UnblockKind::Ready => { - let remaining = futex_ref.waiters(); - this.write_scalar(Scalar::from_i32(remaining as i32), &dest) + let remaining = futex_ref.waiters().try_into().unwrap(); + this.write_scalar(Scalar::from_i32(remaining), &dest) } UnblockKind::TimedOut => { this.set_last_error_and_return(LibcError("ETIMEDOUT"), &dest) @@ -192,7 +192,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { ); } else { // else retrieve the current number of waiters. - let waiters = futex.futex.waiters() as i32; + let waiters = futex.futex.waiters().try_into().unwrap(); this.write_scalar(Scalar::from_i32(waiters), dest)?; } diff --git a/src/shims/windows/sync.rs b/src/shims/windows/sync.rs index 079599478a..c54edf92b4 100644 --- a/src/shims/windows/sync.rs +++ b/src/shims/windows/sync.rs @@ -224,11 +224,11 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { |this, unblock: UnblockKind| { match unblock { UnblockKind::Ready => { - this.write_scalar(Scalar::from_i32(1), &dest) + this.write_int(1, &dest) } UnblockKind::TimedOut => { this.set_last_error(IoError::WindowsError("ERROR_TIMEOUT"))?; - this.write_scalar(Scalar::from_i32(0), &dest) + this.write_int(0, &dest) } } } From 95c5da3f285e3c3a952b4d9f2551e9661385a80b Mon Sep 17 00:00:00 2001 From: joboet Date: Sat, 1 Feb 2025 19:23:44 +0100 Subject: [PATCH 4/8] wake up all futex threads at the same time On macOS, the futex operations return the number of outstanding waiters. For this count to be correct, we have to remove all eligible threads from the internal futex list before any of the wakeup callbacks are run. This has the nice side-effect of making the runtime removal operation linear in the number of threads instead of quadratic. --- src/concurrency/sync.rs | 34 ++++++++----- src/lib.rs | 1 + src/shims/unix/linux_like/sync.rs | 12 +---- src/shims/unix/macos/sync.rs | 8 +-- src/shims/windows/sync.rs | 4 +- tests/pass-dep/concurrency/apple-futex.rs | 59 +++++++++++++++++++++++ 6 files changed, 87 insertions(+), 31 deletions(-) diff --git a/src/concurrency/sync.rs b/src/concurrency/sync.rs index 9cdeececb4..268268848e 100644 --- a/src/concurrency/sync.rs +++ b/src/concurrency/sync.rs @@ -128,7 +128,7 @@ struct Condvar { /// The futex state. #[derive(Default, Debug)] struct Futex { - waiters: VecDeque, + waiters: Vec, /// Tracks the happens-before relationship /// between a futex-wake and a futex-wait /// during a non-spurious wake event. @@ -748,7 +748,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { let mut futex = futex_ref.0.borrow_mut(); let waiters = &mut futex.waiters; assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting"); - waiters.push_back(FutexWaiter { thread, bitset }); + waiters.push(FutexWaiter { thread, bitset }); drop(futex); this.block_thread( @@ -782,9 +782,14 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { ); } - /// Wake up the first thread in the queue that matches any of the bits in the bitset. - /// Returns whether anything was woken. - fn futex_wake(&mut self, futex_ref: &FutexRef, bitset: u32) -> InterpResult<'tcx, bool> { + /// Wake up `count` of the threads in the queue that match any of the bits + /// in the bitset. Returns how many threads were woken. + fn futex_wake( + &mut self, + futex_ref: &FutexRef, + bitset: u32, + count: usize, + ) -> InterpResult<'tcx, usize> { let this = self.eval_context_mut(); let mut futex = futex_ref.0.borrow_mut(); let data_race = &this.machine.data_race; @@ -794,13 +799,18 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { data_race.release_clock(&this.machine.threads, |clock| futex.clock.clone_from(clock)); } - // Wake up the first thread in the queue that matches any of the bits in the bitset. - let Some(i) = futex.waiters.iter().position(|w| w.bitset & bitset != 0) else { - return interp_ok(false); - }; - let waiter = futex.waiters.remove(i).unwrap(); + // Remove `count` of the threads in the queue that match any of the bits in the bitset. + // We collect all of them before unblocking because the unblock callback may access the + // futex state to retrieve the remaining number of waiters on macOS. + let waiters: Vec<_> = + futex.waiters.extract_if(.., |w| w.bitset & bitset != 0).take(count).collect(); drop(futex); - this.unblock_thread(waiter.thread, BlockReason::Futex)?; - interp_ok(true) + + let woken = waiters.len(); + for waiter in waiters { + this.unblock_thread(waiter.thread, BlockReason::Futex)?; + } + + interp_ok(woken) } } diff --git a/src/lib.rs b/src/lib.rs index 61632cc0f6..45054c37c4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,7 @@ #![feature(derive_coerce_pointee)] #![feature(arbitrary_self_types)] #![feature(unsigned_is_multiple_of)] +#![feature(extract_if)] // Configure clippy and other lints #![allow( clippy::collapsible_else_if, diff --git a/src/shims/unix/linux_like/sync.rs b/src/shims/unix/linux_like/sync.rs index dcf25e438c..280bee4800 100644 --- a/src/shims/unix/linux_like/sync.rs +++ b/src/shims/unix/linux_like/sync.rs @@ -219,16 +219,8 @@ pub fn futex<'tcx>( // will see the latest value on addr which could be changed by our caller // before doing the syscall. ecx.atomic_fence(AtomicFenceOrd::SeqCst)?; - let mut n = 0; - #[expect(clippy::arithmetic_side_effects)] - for _ in 0..val { - if ecx.futex_wake(&futex_ref, bitset)? { - n += 1; - } else { - break; - } - } - ecx.write_scalar(Scalar::from_target_isize(n, ecx), dest)?; + let woken = ecx.futex_wake(&futex_ref, bitset, val.try_into().unwrap())?; + ecx.write_scalar(Scalar::from_target_isize(woken.try_into().unwrap(), ecx), dest)?; } op => throw_unsup_format!("Miri does not support `futex` syscall with op={}", op), } diff --git a/src/shims/unix/macos/sync.rs b/src/shims/unix/macos/sync.rs index c154a15cab..48b7f2401a 100644 --- a/src/shims/unix/macos/sync.rs +++ b/src/shims/unix/macos/sync.rs @@ -257,13 +257,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { // See the Linux futex implementation for why this fence exists. this.atomic_fence(AtomicFenceOrd::SeqCst)?; - - if all { - while this.futex_wake(&futex_ref, u32::MAX)? {} - } else { - this.futex_wake(&futex_ref, u32::MAX)?; - } - + this.futex_wake(&futex_ref, u32::MAX, if all { usize::MAX } else { 1 })?; this.write_scalar(Scalar::from_i32(0), dest)?; interp_ok(()) } diff --git a/src/shims/windows/sync.rs b/src/shims/windows/sync.rs index c54edf92b4..8d5ea7db9e 100644 --- a/src/shims/windows/sync.rs +++ b/src/shims/windows/sync.rs @@ -257,7 +257,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { }; let futex_ref = futex_ref.futex.clone(); - this.futex_wake(&futex_ref, u32::MAX)?; + this.futex_wake(&futex_ref, u32::MAX, 1)?; interp_ok(()) } @@ -277,7 +277,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { }; let futex_ref = futex_ref.futex.clone(); - while this.futex_wake(&futex_ref, u32::MAX)? {} + this.futex_wake(&futex_ref, u32::MAX, usize::MAX)?; interp_ok(()) } diff --git a/tests/pass-dep/concurrency/apple-futex.rs b/tests/pass-dep/concurrency/apple-futex.rs index 0cda840a6f..e525d55bec 100644 --- a/tests/pass-dep/concurrency/apple-futex.rs +++ b/tests/pass-dep/concurrency/apple-futex.rs @@ -149,6 +149,64 @@ fn wait_wake() { t.join().unwrap(); } +fn wait_wake_multiple() { + let val = 0i32; + let futex = &val; + + thread::scope(|s| { + // Spawn some threads and make them wait on the futex. + for i in 0..4 { + s.spawn(move || unsafe { + assert_eq!( + libc::os_sync_wait_on_address( + ptr::from_ref(futex).cast_mut().cast(), + 0, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, + ), + // The last two threads will be woken at the same time, + // but for the first two threads the remaining number + // of waiters should be strictly decreasing. + if i < 2 { 3 - i } else { 0 }, + ); + }); + + thread::sleep(Duration::from_millis(200)); + } + + // Wake the threads up again. + unsafe { + assert_eq!( + libc::os_sync_wake_by_address_any( + ptr::from_ref(futex).cast_mut().cast(), + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, + ), + 0 + ); + + assert_eq!( + libc::os_sync_wake_by_address_any( + ptr::from_ref(futex).cast_mut().cast(), + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, + ), + 0 + ); + + // Wake both remaining threads at the same time. + assert_eq!( + libc::os_sync_wake_by_address_all( + ptr::from_ref(futex).cast_mut().cast(), + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, + ), + 0 + ); + } + }) +} + fn param_mismatch() { let futex = 0; thread::scope(|s| { @@ -209,5 +267,6 @@ fn main() { wait_timeout(); wait_absolute_timeout(); wait_wake(); + wait_wake_multiple(); param_mismatch(); } From 44b8baf7730a66e58faa7bec7441dbc86a2da68e Mon Sep 17 00:00:00 2001 From: joboet Date: Sat, 1 Feb 2025 19:29:07 +0100 Subject: [PATCH 5/8] use acquire ordering for reading the futex value on macOS --- src/shims/unix/macos/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/shims/unix/macos/sync.rs b/src/shims/unix/macos/sync.rs index 48b7f2401a..6f763dad33 100644 --- a/src/shims/unix/macos/sync.rs +++ b/src/shims/unix/macos/sync.rs @@ -140,7 +140,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { let layout = this.machine.layouts.uint(Size::from_bytes(size)).unwrap(); let futex_val = this - .read_scalar_atomic(&this.ptr_to_mplace(ptr, layout), AtomicReadOrd::Relaxed)? + .read_scalar_atomic(&this.ptr_to_mplace(ptr, layout), AtomicReadOrd::Acquire)? .to_bits(Size::from_bytes(size))?; let Some(futex) = this.get_sync_or_init(ptr, |_| { From 1b1f9db164aac198cae79749c745a0bd9092610c Mon Sep 17 00:00:00 2001 From: joboet Date: Sat, 1 Feb 2025 19:37:25 +0100 Subject: [PATCH 6/8] stop special-casing futex creation on macOS --- src/shims/unix/macos/sync.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/shims/unix/macos/sync.rs b/src/shims/unix/macos/sync.rs index 6f763dad33..ca49f6b104 100644 --- a/src/shims/unix/macos/sync.rs +++ b/src/shims/unix/macos/sync.rs @@ -143,16 +143,15 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { .read_scalar_atomic(&this.ptr_to_mplace(ptr, layout), AtomicReadOrd::Acquire)? .to_bits(Size::from_bytes(size))?; - let Some(futex) = this.get_sync_or_init(ptr, |_| { - MacOsFutex { - futex: Default::default(), - size: Cell::new(size), - shared: Cell::new(is_shared), - } - }) else { - this.set_last_error_and_return(LibcError("ENOMEM"), dest)?; - return interp_ok(()); - }; + let futex = this + .get_sync_or_init(ptr, |_| { + MacOsFutex { + futex: Default::default(), + size: Cell::new(size), + shared: Cell::new(is_shared), + } + }) + .unwrap(); // Detect mismatches between the flags and sizes used on this address // by comparing with the parameters stored with the first waiter. From c293de0c7bc48c523f03c4de47a4be1b5f890c42 Mon Sep 17 00:00:00 2001 From: joboet Date: Sat, 1 Feb 2025 19:53:52 +0100 Subject: [PATCH 7/8] expand on why the futex test skips `mach_timebase_info` --- tests/pass-dep/concurrency/apple-futex.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/pass-dep/concurrency/apple-futex.rs b/tests/pass-dep/concurrency/apple-futex.rs index e525d55bec..969b3cdf72 100644 --- a/tests/pass-dep/concurrency/apple-futex.rs +++ b/tests/pass-dep/concurrency/apple-futex.rs @@ -87,8 +87,10 @@ fn wait_absolute_timeout() { #[allow(deprecated)] let mut deadline = unsafe { libc::mach_absolute_time() }; - // Add 200ms (note: this only works under miri because it uses nanosecond units for - // mach_absolute_time). + // Add 200ms. + // What we should be doing here is call `mach_timebase_info` to determine the + // unit used for `deadline`, but we know what Miri returns for that function: + // the unit is nanoseconds. deadline += 200_000_000; let futex: i32 = 123; From 1dc71735e0ff0bce66880fb97065fa9c85759960 Mon Sep 17 00:00:00 2001 From: joboet Date: Sat, 1 Feb 2025 19:59:44 +0100 Subject: [PATCH 8/8] add comments explaining the parameter mismatch check in the Apple futex code --- src/shims/unix/macos/sync.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/shims/unix/macos/sync.rs b/src/shims/unix/macos/sync.rs index ca49f6b104..3c4ade59ae 100644 --- a/src/shims/unix/macos/sync.rs +++ b/src/shims/unix/macos/sync.rs @@ -154,7 +154,9 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { .unwrap(); // Detect mismatches between the flags and sizes used on this address - // by comparing with the parameters stored with the first waiter. + // by comparing it with the parameters used by the other waiters in + // the current list. If the list is currently empty, update those + // parameters. if futex.futex.waiters() == 0 { futex.size.set(size); futex.shared.set(is_shared); @@ -246,7 +248,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { if futex.futex.waiters() == 0 { this.set_last_error_and_return(LibcError("ENOENT"), dest)?; return interp_ok(()); - // Detect mismatches between the flags and sizes used on this address. + // If there are waiters in the queue, they have all used the parameters + // stored in `futex` (we check this in `os_sync_wait_on_address` above). + // Detect mismatches between "our" parameters and the parameters used by + // the waiters and return an error in that case. } else if futex.size.get() != size || futex.shared.get() != is_shared { this.set_last_error_and_return(LibcError("EINVAL"), dest)?; return interp_ok(());