Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shim Apple's futex primitives #4142

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 34 additions & 30 deletions src/concurrency/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ struct Condvar {
/// The futex state.
#[derive(Default, Debug)]
struct Futex {
waiters: VecDeque<FutexWaiter>,
waiters: Vec<FutexWaiter>,
/// Tracks the happens-before relationship
/// between a futex-wake and a futex-wait
/// during a non-spurious wake event.
Expand All @@ -140,6 +140,12 @@ struct Futex {
#[derive(Default, Clone)]
pub struct FutexRef(Rc<RefCell<Futex>>);

impl FutexRef {
pub fn waiters(&self) -> usize {
self.0.borrow().waiters.len()
}
}

impl VisitProvenance for FutexRef {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
// No provenance in `Futex`.
Expand Down Expand Up @@ -728,25 +734,21 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
interp_ok(true)
}

/// Wait for the futex to be signaled, or a timeout.
/// On a signal, `retval_succ` is written to `dest`.
/// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error.
/// Wait for the futex to be signaled, or a timeout. Once the thread is
/// unblocked, `callback` is called with the unblock reason.
fn futex_wait(
&mut self,
futex_ref: FutexRef,
bitset: u32,
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
retval_succ: Scalar,
retval_timeout: Scalar,
dest: MPlaceTy<'tcx>,
errno_timeout: IoError,
callback: DynUnblockCallback<'tcx>,
) {
let this = self.eval_context_mut();
let thread = this.active_thread();
let mut futex = futex_ref.0.borrow_mut();
let waiters = &mut futex.waiters;
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
waiters.push_back(FutexWaiter { thread, bitset });
waiters.push(FutexWaiter { thread, bitset });
drop(futex);

this.block_thread(
Expand All @@ -755,10 +757,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
callback!(
@capture<'tcx> {
futex_ref: FutexRef,
retval_succ: Scalar,
retval_timeout: Scalar,
dest: MPlaceTy<'tcx>,
errno_timeout: IoError,
callback: DynUnblockCallback<'tcx>,
}
|this, unblock: UnblockKind| {
match unblock {
Expand All @@ -768,29 +767,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&futex.clock, &this.machine.threads);
}
// Write the return value.
this.write_scalar(retval_succ, &dest)?;
interp_ok(())
},
UnblockKind::TimedOut => {
// Remove the waiter from the futex.
let thread = this.active_thread();
let mut futex = futex_ref.0.borrow_mut();
futex.waiters.retain(|waiter| waiter.thread != thread);
// Set errno and write return value.
this.set_last_error(errno_timeout)?;
this.write_scalar(retval_timeout, &dest)?;
interp_ok(())
},
}

callback.call(this, unblock)
}
),
);
}

/// Wake up the first thread in the queue that matches any of the bits in the bitset.
/// Returns whether anything was woken.
fn futex_wake(&mut self, futex_ref: &FutexRef, bitset: u32) -> InterpResult<'tcx, bool> {
/// Wake up `count` of the threads in the queue that match any of the bits
/// in the bitset. Returns how many threads were woken.
fn futex_wake(
&mut self,
futex_ref: &FutexRef,
bitset: u32,
count: usize,
) -> InterpResult<'tcx, usize> {
let this = self.eval_context_mut();
let mut futex = futex_ref.0.borrow_mut();
let data_race = &this.machine.data_race;
Expand All @@ -800,13 +799,18 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
data_race.release_clock(&this.machine.threads, |clock| futex.clock.clone_from(clock));
}

// Wake up the first thread in the queue that matches any of the bits in the bitset.
let Some(i) = futex.waiters.iter().position(|w| w.bitset & bitset != 0) else {
return interp_ok(false);
};
let waiter = futex.waiters.remove(i).unwrap();
// Remove `count` of the threads in the queue that match any of the bits in the bitset.
// We collect all of them before unblocking because the unblock callback may access the
// futex state to retrieve the remaining number of waiters on macOS.
let waiters: Vec<_> =
futex.waiters.extract_if(.., |w| w.bitset & bitset != 0).take(count).collect();
drop(futex);
this.unblock_thread(waiter.thread, BlockReason::Futex)?;
interp_ok(true)

let woken = waiters.len();
for waiter in waiters {
this.unblock_thread(waiter.thread, BlockReason::Futex)?;
}

interp_ok(woken)
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#![feature(unqualified_local_imports)]
#![feature(derive_coerce_pointee)]
#![feature(arbitrary_self_types)]
#![feature(unsigned_is_multiple_of)]
#![feature(extract_if)]
// Configure clippy and other lints
#![allow(
clippy::collapsible_else_if,
Expand Down
30 changes: 16 additions & 14 deletions src/shims/unix/linux_like/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,24 @@ pub fn futex<'tcx>(
.futex
.clone();

let dest = dest.clone();
ecx.futex_wait(
futex_ref,
bitset,
timeout,
Scalar::from_target_isize(0, ecx), // retval_succ
Scalar::from_target_isize(-1, ecx), // retval_timeout
dest.clone(),
LibcError("ETIMEDOUT"), // errno_timeout
callback!(
@capture<'tcx> {
dest: MPlaceTy<'tcx>,
}
|ecx, unblock: UnblockKind| match unblock {
UnblockKind::Ready => {
ecx.write_int(0, &dest)
}
UnblockKind::TimedOut => {
ecx.set_last_error_and_return(LibcError("ETIMEDOUT"), &dest)
}
}
),
);
} else {
// The futex value doesn't match the expected value, so we return failure
Expand Down Expand Up @@ -209,16 +219,8 @@ pub fn futex<'tcx>(
// will see the latest value on addr which could be changed by our caller
// before doing the syscall.
ecx.atomic_fence(AtomicFenceOrd::SeqCst)?;
let mut n = 0;
#[expect(clippy::arithmetic_side_effects)]
for _ in 0..val {
if ecx.futex_wake(&futex_ref, bitset)? {
n += 1;
} else {
break;
}
}
ecx.write_scalar(Scalar::from_target_isize(n, ecx), dest)?;
let woken = ecx.futex_wake(&futex_ref, bitset, val.try_into().unwrap())?;
ecx.write_scalar(Scalar::from_target_isize(woken.try_into().unwrap(), ecx), dest)?;
}
op => throw_unsup_format!("Miri does not support `futex` syscall with op={}", op),
}
Expand Down
66 changes: 63 additions & 3 deletions src/shims/unix/macos/foreign_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@ use rustc_middle::ty::Ty;
use rustc_span::Symbol;
use rustc_target::callconv::{Conv, FnAbi};

use super::sync::EvalContextExt as _;
use super::sync::{EvalContextExt as _, MacOsFutexTimeout};
use crate::shims::unix::*;
use crate::*;

pub fn is_dyn_sym(_name: &str) -> bool {
false
pub fn is_dyn_sym(name: &str) -> bool {
match name {
// These only became available with macOS 11.0, so std looks them up dynamically.
"os_sync_wait_on_address"
| "os_sync_wait_on_address_with_deadline"
| "os_sync_wait_on_address_with_timeout"
| "os_sync_wake_by_address_any"
| "os_sync_wake_by_address_all" => true,
_ => false,
}
}

impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
Expand Down Expand Up @@ -214,6 +222,58 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
this.write_scalar(res, dest)?;
}

// Futex primitives
"os_sync_wait_on_address" => {
joboet marked this conversation as resolved.
Show resolved Hide resolved
let [addr_op, value_op, size_op, flags_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wait_on_address(
addr_op,
value_op,
size_op,
flags_op,
MacOsFutexTimeout::None,
dest,
)?;
}
"os_sync_wait_on_address_with_deadline" => {
let [addr_op, value_op, size_op, flags_op, clock_op, timeout_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wait_on_address(
addr_op,
value_op,
size_op,
flags_op,
MacOsFutexTimeout::Absolute { clock_op, timeout_op },
dest,
)?;
}
"os_sync_wait_on_address_with_timeout" => {
let [addr_op, value_op, size_op, flags_op, clock_op, timeout_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wait_on_address(
addr_op,
value_op,
size_op,
flags_op,
MacOsFutexTimeout::Relative { clock_op, timeout_op },
dest,
)?;
}
"os_sync_wake_by_address_any" => {
let [addr_op, size_op, flags_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wake_by_address(
addr_op, size_op, flags_op, /* all */ false, dest,
)?;
}
"os_sync_wake_by_address_all" => {
let [addr_op, size_op, flags_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wake_by_address(
addr_op, size_op, flags_op, /* all */ true, dest,
)?;
}

"os_unfair_lock_lock" => {
let [lock_op] = this.check_shim(abi, Conv::C, link_name, args)?;
this.os_unfair_lock_lock(lock_op)?;
Expand Down
Loading