diff --git a/library/std/src/sys/pal/windows/c.rs b/library/std/src/sys/pal/windows/c.rs index 27aa35f69f1bf..2e0415daefbb5 100644 --- a/library/std/src/sys/pal/windows/c.rs +++ b/library/std/src/sys/pal/windows/c.rs @@ -52,8 +52,6 @@ pub const INVALID_HANDLE_VALUE: HANDLE = ::core::ptr::without_provenance_mut(-1i pub const EXIT_SUCCESS: u32 = 0; pub const EXIT_FAILURE: u32 = 1; -#[cfg(target_vendor = "win7")] -pub const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = CONDITION_VARIABLE { Ptr: ptr::null_mut() }; #[cfg(target_vendor = "win7")] pub const SRWLOCK_INIT: SRWLOCK = SRWLOCK { Ptr: ptr::null_mut() }; #[cfg(not(target_thread_local))] diff --git a/library/std/src/sys/sync/condvar/mod.rs b/library/std/src/sys/sync/condvar/mod.rs index 6849cacf88e76..ae7b677225ef2 100644 --- a/library/std/src/sys/sync/condvar/mod.rs +++ b/library/std/src/sys/sync/condvar/mod.rs @@ -12,24 +12,21 @@ cfg_if::cfg_if! { ))] { mod futex; pub use futex::Condvar; + } else if #[cfg(any( + all(target_os = "windows", target_vendor = "win7"), + target_os = "netbsd", + all(target_vendor = "fortanix", target_env = "sgx"), + target_os = "teeos", + target_os = "xous", + ))] { + mod queue; + pub use queue::Condvar; } else if #[cfg(target_family = "unix")] { mod pthread; pub use pthread::Condvar; - } else if #[cfg(all(target_os = "windows", target_vendor = "win7"))] { - mod windows7; - pub use windows7::Condvar; - } else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] { - mod sgx; - pub use sgx::Condvar; } else if #[cfg(target_os = "solid_asp3")] { mod itron; pub use itron::Condvar; - } else if #[cfg(target_os = "teeos")] { - mod teeos; - pub use teeos::Condvar; - } else if #[cfg(target_os = "xous")] { - mod xous; - pub use xous::Condvar; } else { mod no_threads; pub use no_threads::Condvar; diff --git a/library/std/src/sys/sync/condvar/queue.rs b/library/std/src/sys/sync/condvar/queue.rs new file mode 100644 index 0000000000000..cc39a4a9f29f3 --- /dev/null +++ b/library/std/src/sys/sync/condvar/queue.rs @@ -0,0 +1,380 @@ +//! A generic `Condvar` implementation based on thread parking and a lockless +//! queue of threads. +//! +//! Not all platforms provide an efficient `Condvar` implementation: the UNIX +//! `pthread_condvar_t` needs memory allocation, while SGX doesn't have +//! synchronization primitives at all. Therefore, we implement our own. +//! +//! To do so, we keep a list of the [`Thread`]s waiting on the `Condvar` and +//! wake them up as needed. Access to the list is controlled by an atomic +//! counter. To notify a waiter, the counter is incremented. If the counter +//! was previously zero, the notifying thread has control over the list and +//! will wake up threads until the number of threads it has woken up equals +//! the counter value. Therefore, other threads do not need to wait for control +//! over the list because the controlling thread will take over their notification. +//! +//! This counter is embedded into the lower bits of a pointer to the list head. +//! As that limits the number of in-flight notifications, the counter increments +//! are saturated to a maximum value ([`ALL`]) that causes all threads to be woken +//! up, leading to a spurious wakeup for some threads. The API of `Condvar` permits +//! this however. Timeouts employ the same method to make sure that the current +//! thread handle is removed from the list. +//! +//! The list itself has the same structure as the one used by the queue-based +//! `RwLock` implementation, see its documentation for more information. This +//! enables the lockless enqueuing of threads and results in `Condvar` being +//! only a pointer in size. +//! +//! This implementation is loosely based upon the lockless `Condvar` in +//! [`usync`](https://github.com/kprotty/usync/blob/8937bb77963f6bf9068e56ad46133e933eb79974/src/condvar.rs). + +#![forbid(unsafe_op_in_unsafe_fn)] + +use crate::cell::UnsafeCell; +use crate::mem::forget; +use crate::ptr::{self, NonNull}; +use crate::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; +use crate::sync::atomic::{AtomicBool, AtomicPtr}; +use crate::sys::sync::Mutex; +use crate::thread::{self, Thread}; +use crate::time::{Duration, Instant}; + +type State = *mut (); + +const EMPTY: State = ptr::null_mut(); +const ALL: usize = 0b1111; +const MASK: usize = !ALL; + +fn count(state: State) -> usize { + state.addr() & ALL +} + +unsafe fn to_node(state: State) -> NonNull { + unsafe { NonNull::new_unchecked(state.mask(MASK)).cast() } +} + +struct PanicGuard; +impl Drop for PanicGuard { + fn drop(&mut self) { + rtabort!("tried to drop node in intrusive list."); + } +} + +#[repr(align(16))] +struct Node { + // Accesses to these `UnsafeCell`s may only be made from the thread that + // first increment the wakeup count. + next: UnsafeCell>>, + prev: UnsafeCell>>, + tail: UnsafeCell>>, + notified: AtomicBool, + thread: Thread, +} + +impl Node { + unsafe fn notify(node: NonNull) { + let thread = unsafe { node.as_ref().thread.clone() }; + unsafe { + node.as_ref().notified.store(true, Release); + } + thread.unpark(); + } +} + +/// Scan through the list until the `next` pointer of the current node equals +/// `known`, then return that node. Add backlinks to all encountered nodes. +unsafe fn scan_until_known(mut scan: NonNull, known: NonNull) -> NonNull { + loop { + let next = unsafe { scan.as_ref().next.get().read().unwrap_unchecked() }; + if next != known { + unsafe { + next.as_ref().prev.get().write(Some(scan)); + scan = next; + } + } else { + return scan; + } + } +} + +/// Scan until encountering a node with a non-empty `tail` field, then return +/// the value of that field. Add backlinks to all encountered nodes. +unsafe fn scan_until_tail(mut scan: NonNull) -> NonNull { + loop { + let s = unsafe { scan.as_ref() }; + match unsafe { s.tail.get().read() } { + Some(tail) => return tail, + None => unsafe { + let next = s.next.get().read().unwrap_unchecked(); + next.as_ref().prev.get().write(Some(scan)); + scan = next; + }, + } + } +} + +/// Notify all nodes, going backwards starting with `tail`. +unsafe fn notify_all(mut tail: NonNull) { + loop { + let prev = unsafe { tail.as_ref().prev.get().read() }; + unsafe { + Node::notify(tail); + } + match prev { + Some(prev) => tail = prev, + None => return, + } + } +} + +pub struct Condvar { + state: AtomicPtr<()>, +} + +impl Condvar { + #[inline] + pub const fn new() -> Condvar { + Condvar { state: AtomicPtr::new(ptr::null_mut()) } + } + + pub unsafe fn wait(&self, mutex: &Mutex) { + unsafe { + self.wait_optional_timeout(mutex, None); + } + } + + pub unsafe fn wait_timeout(&self, mutex: &Mutex, timeout: Duration) -> bool { + let timeout = Instant::now().checked_add(timeout); + unsafe { self.wait_optional_timeout(mutex, timeout) } + } + + unsafe fn wait_optional_timeout(&self, mutex: &Mutex, timeout: Option) -> bool { + let node = &Node { + next: UnsafeCell::new(None), + prev: UnsafeCell::new(None), + tail: UnsafeCell::new(None), + notified: AtomicBool::new(false), + thread: thread::try_current().unwrap_or_else(|| Thread::new_unnamed()), + }; + + // Enqueue the node. + let mut state = self.state.load(Relaxed); + loop { + unsafe { + node.next.get().write(NonNull::new(state.mask(MASK).cast())); + node.tail.get().write(if state == EMPTY { + Some(NonNull::from(node).cast()) + } else { + None + }); + } + + let next = ptr::from_ref(node).wrapping_byte_add(count(state)) as State; + match self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) { + Ok(_) => break, + Err(new) => state = new, + } + } + + // The node is registered, so the structure must not be + // mutably accessed or destroyed while other threads may + // be accessing it. Guard against unwinds using a panic + // guard that aborts when dropped. + let guard = PanicGuard; + + unsafe { + mutex.unlock(); + } + + let mut timed_out = false; + if let Some(timeout) = timeout { + // While we haven't timed out or been notified, keep parking this thread. + while !node.notified.load(Acquire) { + if let Some(remaining) = timeout.checked_duration_since(Instant::now()) { + unsafe { + node.thread.park_timeout(remaining); + } + } else { + timed_out = true; + break; + } + } + + if timed_out { + // The node is still in the queue. Wakeup all threads so that + // it is removed. + self.notify_all(); + } else { + // The node was marked as notified, so it is no longer part of + // the queue. Relock the mutex and return. + forget(guard); + mutex.lock(); + return true; + } + } + + // Park the thread until we are notified. + while !node.notified.load(Acquire) { + unsafe { + node.thread.park(); + } + } + + // The node was marked as notified, so it is no longer part of + // the queue. Relock the mutex and return. + forget(guard); + mutex.lock(); + !timed_out + } + + pub fn notify_one(&self) { + // Try to increase the notification counter. + let mut state = self.state.load(Relaxed); + loop { + if state == EMPTY { + return; + } + + if count(state) == ALL { + // All threads are being notified, so we don't need to do another + // notification. + return; + } else if count(state) != 0 { + // Another thread is handling notifications, tell it to notify + // one more thread. + let next = state.wrapping_byte_add(1); + match self.state.compare_exchange_weak(state, next, Relaxed, Relaxed) { + Ok(_) => return, + Err(new) => state = new, + } + } else { + // No notifications are in progress, we should take responsibility + // for waking up threads. Increase the notification counter to do so. + let next = state.wrapping_byte_add(1); + match self.state.compare_exchange_weak(state, next, Acquire, Relaxed) { + Ok(_) => { + state = next; + break; + } + Err(new) => state = new, + } + } + } + + // At this point, we took responsibility for notifying threads, meaning + // we have exclusive access to the queue. Wake up threads as long as there + // are threads to notify and notifications requested. + + // Keep track of how many threads we notified already. + let mut notified = 0; + // This is the node that will be woken up next. + let mut tail = unsafe { scan_until_tail(to_node(state)) }; + + while count(state) != ALL { + if notified != count(state) { + // We haven't notified enough threads, so wake up `tail`. + + let prev = unsafe { tail.as_ref().prev.get().read() }; + + unsafe { + Node::notify(tail); + } + + notified += 1; + + if let Some(prev) = prev { + tail = prev; + } else { + // We notified all threads in the queue. As long as no new + // nodes have been added, clear the state. + loop { + match self.state.compare_exchange_weak(state, EMPTY, Release, Acquire) { + Ok(_) => return, + Err(new) => state = new, + } + + let head = unsafe { to_node(state) }; + if head != tail { + // `head` has already been woken up, so we may not + // access it. Simply continue the main loop with + // the last new node. + tail = unsafe { scan_until_known(head, tail) }; + break; + } + } + } + } else { + // We notified enough threads. Try clearing the counter. + + let head = unsafe { to_node(state) }; + unsafe { + head.as_ref().tail.get().write(Some(tail)); + } + + match self.state.compare_exchange_weak(state, state.mask(MASK), Release, Acquire) { + Ok(_) => return, + Err(new) => state = new, + } + + let scan = unsafe { to_node(state) }; + if scan != head { + // New nodes have been added to the queue. Link the new part + // of the queue to the old one. + let scan = unsafe { scan_until_known(scan, head) }; + unsafe { + head.as_ref().prev.get().write(Some(scan)); + } + } + } + } + + // We need to wake up all threads in the queue. + // Use a swap to reset the state so that we do not endlessly retry if + // new nodes are constantly being added. + + let new = self.state.swap(EMPTY, Acquire); + let head = unsafe { to_node(state) }; + let scan = unsafe { to_node(new) }; + if head != scan { + // New nodes have been added to the queue. Link the new part + // of the queue to the old one. + let scan = unsafe { scan_until_known(scan, head) }; + unsafe { + head.as_ref().prev.get().write(Some(scan)); + } + } + + unsafe { notify_all(tail) } + } + + pub fn notify_all(&self) { + let mut state = self.state.load(Relaxed); + loop { + if state == EMPTY { + return; + } + + if count(state) == ALL { + // All threads are already being notified. + return; + } else if count(state) != 0 { + // Another thread is handling notifications, tell it to notify + // all threads. + let next = state.map_addr(|state| state | ALL); + match self.state.compare_exchange_weak(state, next, Relaxed, Relaxed) { + Ok(_) => return, + Err(new) => state = new, + } + } else { + // Take the whole queue and wake it up. + match self.state.compare_exchange_weak(state, EMPTY, Acquire, Relaxed) { + Ok(_) => break, + Err(new) => state = new, + } + } + } + + let tail = unsafe { scan_until_tail(to_node(state)) }; + unsafe { notify_all(tail) } + } +} diff --git a/library/std/src/sys/sync/condvar/sgx.rs b/library/std/src/sys/sync/condvar/sgx.rs deleted file mode 100644 index ecb5872f60d90..0000000000000 --- a/library/std/src/sys/sync/condvar/sgx.rs +++ /dev/null @@ -1,45 +0,0 @@ -use crate::sys::pal::waitqueue::{SpinMutex, WaitQueue, WaitVariable}; -use crate::sys::sync::Mutex; -use crate::sys_common::lazy_box::{LazyBox, LazyInit}; -use crate::time::Duration; - -/// FIXME: `UnsafeList` is not movable. -struct AllocatedCondvar(SpinMutex>); - -pub struct Condvar { - inner: LazyBox, -} - -impl LazyInit for AllocatedCondvar { - fn init() -> Box { - Box::new(AllocatedCondvar(SpinMutex::new(WaitVariable::new(())))) - } -} - -impl Condvar { - pub const fn new() -> Condvar { - Condvar { inner: LazyBox::new() } - } - - #[inline] - pub fn notify_one(&self) { - let _ = WaitQueue::notify_one(self.inner.0.lock()); - } - - #[inline] - pub fn notify_all(&self) { - let _ = WaitQueue::notify_all(self.inner.0.lock()); - } - - pub unsafe fn wait(&self, mutex: &Mutex) { - let guard = self.inner.0.lock(); - WaitQueue::wait(guard, || unsafe { mutex.unlock() }); - mutex.lock() - } - - pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool { - let success = WaitQueue::wait_timeout(&self.inner.0, dur, || unsafe { mutex.unlock() }); - mutex.lock(); - success - } -} diff --git a/library/std/src/sys/sync/condvar/teeos.rs b/library/std/src/sys/sync/condvar/teeos.rs deleted file mode 100644 index 0a931f407d2fa..0000000000000 --- a/library/std/src/sys/sync/condvar/teeos.rs +++ /dev/null @@ -1,100 +0,0 @@ -use crate::cell::UnsafeCell; -use crate::ptr; -use crate::sync::atomic::{AtomicPtr, Ordering::Relaxed}; -use crate::sys::sync::mutex::{self, Mutex}; -use crate::sys::time::TIMESPEC_MAX; -use crate::sys_common::lazy_box::{LazyBox, LazyInit}; -use crate::time::Duration; - -extern "C" { - pub fn pthread_cond_timedwait( - cond: *mut libc::pthread_cond_t, - lock: *mut libc::pthread_mutex_t, - adstime: *const libc::timespec, - ) -> libc::c_int; -} - -struct AllocatedCondvar(UnsafeCell); - -pub struct Condvar { - inner: LazyBox, - mutex: AtomicPtr, -} - -#[inline] -fn raw(c: &Condvar) -> *mut libc::pthread_cond_t { - c.inner.0.get() -} - -unsafe impl Send for AllocatedCondvar {} -unsafe impl Sync for AllocatedCondvar {} - -impl LazyInit for AllocatedCondvar { - fn init() -> Box { - let condvar = Box::new(AllocatedCondvar(UnsafeCell::new(libc::PTHREAD_COND_INITIALIZER))); - - let r = unsafe { libc::pthread_cond_init(condvar.0.get(), crate::ptr::null()) }; - assert_eq!(r, 0); - - condvar - } -} - -impl Drop for AllocatedCondvar { - #[inline] - fn drop(&mut self) { - let r = unsafe { libc::pthread_cond_destroy(self.0.get()) }; - debug_assert_eq!(r, 0); - } -} - -impl Condvar { - pub const fn new() -> Condvar { - Condvar { inner: LazyBox::new(), mutex: AtomicPtr::new(ptr::null_mut()) } - } - - #[inline] - fn verify(&self, mutex: *mut libc::pthread_mutex_t) { - match self.mutex.compare_exchange(ptr::null_mut(), mutex, Relaxed, Relaxed) { - Ok(_) => {} // Stored the address - Err(n) if n == mutex => {} // Lost a race to store the same address - _ => panic!("attempted to use a condition variable with two mutexes"), - } - } - - #[inline] - pub fn notify_one(&self) { - let r = unsafe { libc::pthread_cond_signal(raw(self)) }; - debug_assert_eq!(r, 0); - } - - #[inline] - pub fn notify_all(&self) { - let r = unsafe { libc::pthread_cond_broadcast(raw(self)) }; - debug_assert_eq!(r, 0); - } - - #[inline] - pub unsafe fn wait(&self, mutex: &Mutex) { - let mutex = mutex::raw(mutex); - self.verify(mutex); - let r = libc::pthread_cond_wait(raw(self), mutex); - debug_assert_eq!(r, 0); - } - - pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool { - use crate::sys::time::Timespec; - - let mutex = mutex::raw(mutex); - self.verify(mutex); - - let timeout = Timespec::now(libc::CLOCK_MONOTONIC) - .checked_add_duration(&dur) - .and_then(|t| t.to_timespec()) - .unwrap_or(TIMESPEC_MAX); - - let r = pthread_cond_timedwait(raw(self), mutex, &timeout); - assert!(r == libc::ETIMEDOUT || r == 0); - r == 0 - } -} diff --git a/library/std/src/sys/sync/condvar/windows7.rs b/library/std/src/sys/sync/condvar/windows7.rs deleted file mode 100644 index 07fa5fdd698ee..0000000000000 --- a/library/std/src/sys/sync/condvar/windows7.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::cell::UnsafeCell; -use crate::sys::c; -use crate::sys::os; -use crate::sys::sync::{mutex, Mutex}; -use crate::time::Duration; - -pub struct Condvar { - inner: UnsafeCell, -} - -unsafe impl Send for Condvar {} -unsafe impl Sync for Condvar {} - -impl Condvar { - #[inline] - pub const fn new() -> Condvar { - Condvar { inner: UnsafeCell::new(c::CONDITION_VARIABLE_INIT) } - } - - #[inline] - pub unsafe fn wait(&self, mutex: &Mutex) { - let r = c::SleepConditionVariableSRW(self.inner.get(), mutex::raw(mutex), c::INFINITE, 0); - debug_assert!(r != 0); - } - - pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool { - let r = c::SleepConditionVariableSRW( - self.inner.get(), - mutex::raw(mutex), - crate::sys::pal::dur2timeout(dur), - 0, - ); - if r == 0 { - debug_assert_eq!(os::errno() as usize, c::ERROR_TIMEOUT as usize); - false - } else { - true - } - } - - #[inline] - pub fn notify_one(&self) { - unsafe { c::WakeConditionVariable(self.inner.get()) } - } - - #[inline] - pub fn notify_all(&self) { - unsafe { c::WakeAllConditionVariable(self.inner.get()) } - } -} diff --git a/library/std/src/sys/sync/condvar/xous.rs b/library/std/src/sys/sync/condvar/xous.rs deleted file mode 100644 index 7b218818ef8ef..0000000000000 --- a/library/std/src/sys/sync/condvar/xous.rs +++ /dev/null @@ -1,148 +0,0 @@ -use crate::os::xous::ffi::{blocking_scalar, scalar}; -use crate::os::xous::services::{ticktimer_server, TicktimerScalar}; -use crate::sys::sync::Mutex; -use crate::time::Duration; -use core::sync::atomic::{AtomicUsize, Ordering}; - -// The implementation is inspired by Andrew D. Birrell's paper -// "Implementing Condition Variables with Semaphores" - -const NOTIFY_TRIES: usize = 3; - -pub struct Condvar { - counter: AtomicUsize, - timed_out: AtomicUsize, -} - -unsafe impl Send for Condvar {} -unsafe impl Sync for Condvar {} - -impl Condvar { - #[inline] - #[rustc_const_stable(feature = "const_locks", since = "1.63.0")] - pub const fn new() -> Condvar { - Condvar { counter: AtomicUsize::new(0), timed_out: AtomicUsize::new(0) } - } - - fn notify_some(&self, to_notify: usize) { - // Assumption: The Mutex protecting this condvar is locked throughout the - // entirety of this call, preventing calls to `wait` and `wait_timeout`. - - // Logic check: Ensure that there aren't any missing waiters. Remove any that - // timed-out, ensuring the counter doesn't underflow. - assert!(self.timed_out.load(Ordering::Relaxed) <= self.counter.load(Ordering::Relaxed)); - self.counter.fetch_sub(self.timed_out.swap(0, Ordering::Relaxed), Ordering::Relaxed); - - // Figure out how many threads to notify. Note that it is impossible for `counter` - // to increase during this operation because Mutex is locked. However, it is - // possible for `counter` to decrease due to a condvar timing out, in which - // case the corresponding `timed_out` will increase accordingly. - let Ok(waiter_count) = - self.counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |counter| { - if counter == 0 { - return None; - } else { - Some(counter - counter.min(to_notify)) - } - }) - else { - // No threads are waiting on this condvar - return; - }; - - let mut remaining_to_wake = waiter_count.min(to_notify); - if remaining_to_wake == 0 { - return; - } - for _wake_tries in 0..NOTIFY_TRIES { - let result = blocking_scalar( - ticktimer_server(), - TicktimerScalar::NotifyCondition(self.index(), remaining_to_wake).into(), - ) - .expect("failure to send NotifyCondition command"); - - // Remove the list of waiters that were notified - remaining_to_wake -= result[0]; - - // Also remove the number of waiters that timed out. Clamp it to 0 in order to - // ensure we don't wait forever in case the waiter woke up between the time - // we counted the remaining waiters and now. - remaining_to_wake = - remaining_to_wake.saturating_sub(self.timed_out.swap(0, Ordering::Relaxed)); - if remaining_to_wake == 0 { - return; - } - crate::thread::yield_now(); - } - } - - pub fn notify_one(&self) { - self.notify_some(1) - } - - pub fn notify_all(&self) { - self.notify_some(self.counter.load(Ordering::Relaxed)) - } - - fn index(&self) -> usize { - core::ptr::from_ref(self).addr() - } - - /// Unlock the given Mutex and wait for the notification. Wait at most - /// `ms` milliseconds, or pass `0` to wait forever. - /// - /// Returns `true` if the condition was received, `false` if it timed out - fn wait_ms(&self, mutex: &Mutex, ms: usize) -> bool { - self.counter.fetch_add(1, Ordering::Relaxed); - unsafe { mutex.unlock() }; - - // Threading concern: There is a chance that the `notify` thread wakes up here before - // we have a chance to wait for the condition. This is fine because we've recorded - // the fact that we're waiting by incrementing the counter. - let result = blocking_scalar( - ticktimer_server(), - TicktimerScalar::WaitForCondition(self.index(), ms).into(), - ); - let awoken = result.expect("Ticktimer: failure to send WaitForCondition command")[0] == 0; - - // If we awoke due to a timeout, increment the `timed_out` counter so that the - // main loop of `notify` knows there's a timeout. - // - // This is done with the Mutex still unlocked, because the Mutex might still - // be locked by the `notify` process above. - if !awoken { - self.timed_out.fetch_add(1, Ordering::Relaxed); - } - - unsafe { mutex.lock() }; - awoken - } - - pub unsafe fn wait(&self, mutex: &Mutex) { - // Wait for 0 ms, which is a special case to "wait forever" - self.wait_ms(mutex, 0); - } - - pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool { - let mut millis = dur.as_millis() as usize; - // Ensure we don't wait for 0 ms, which would cause us to wait forever - if millis == 0 { - millis = 1; - } - self.wait_ms(mutex, millis) - } -} - -impl Drop for Condvar { - fn drop(&mut self) { - let remaining_count = self.counter.load(Ordering::Relaxed); - let timed_out = self.timed_out.load(Ordering::Relaxed); - assert!( - remaining_count - timed_out == 0, - "counter was {} and timed_out was {} not 0", - remaining_count, - timed_out - ); - scalar(ticktimer_server(), TicktimerScalar::FreeCondition(self.index()).into()).ok(); - } -} diff --git a/library/std/src/sys/sync/mutex/mod.rs b/library/std/src/sys/sync/mutex/mod.rs index 73d9bd273de17..8d501df5b602f 100644 --- a/library/std/src/sys/sync/mutex/mod.rs +++ b/library/std/src/sys/sync/mutex/mod.rs @@ -19,10 +19,11 @@ cfg_if::cfg_if! { target_os = "teeos", ))] { mod pthread; + #[allow(unused_imports)] pub use pthread::{Mutex, raw}; } else if #[cfg(all(target_os = "windows", target_vendor = "win7"))] { mod windows7; - pub use windows7::{Mutex, raw}; + pub use windows7::Mutex; } else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] { mod sgx; pub use sgx::Mutex; diff --git a/library/std/src/sys/sync/mutex/pthread.rs b/library/std/src/sys/sync/mutex/pthread.rs index ee0794334fbe3..99964d5f6372b 100644 --- a/library/std/src/sys/sync/mutex/pthread.rs +++ b/library/std/src/sys/sync/mutex/pthread.rs @@ -103,7 +103,7 @@ impl Mutex { } #[inline] - pub unsafe fn lock(&self) { + pub fn lock(&self) { #[cold] #[inline(never)] fn fail(r: i32) -> ! { @@ -111,7 +111,7 @@ impl Mutex { panic!("failed to lock mutex: {error}"); } - let r = libc::pthread_mutex_lock(raw(self)); + let r = unsafe { libc::pthread_mutex_lock(raw(self)) }; // As we set the mutex type to `PTHREAD_MUTEX_NORMAL` above, we expect // the lock call to never fail. Unfortunately however, some platforms // (Solaris) do not conform to the standard, and instead always provide @@ -131,8 +131,8 @@ impl Mutex { } #[inline] - pub unsafe fn try_lock(&self) -> bool { - libc::pthread_mutex_trylock(raw(self)) == 0 + pub fn try_lock(&self) -> bool { + unsafe { libc::pthread_mutex_trylock(raw(self)) == 0 } } } diff --git a/library/std/src/sys/sync/mutex/xous.rs b/library/std/src/sys/sync/mutex/xous.rs index 1426e48f8b7af..a6ab298409264 100644 --- a/library/std/src/sys/sync/mutex/xous.rs +++ b/library/std/src/sys/sync/mutex/xous.rs @@ -36,7 +36,7 @@ impl Mutex { } #[inline] - pub unsafe fn lock(&self) { + pub fn lock(&self) { // Try multiple times to acquire the lock without resorting to the ticktimer // server. For locks that are held for a short amount of time, this will // result in the ticktimer server never getting invoked. The `locked` value diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index c8ee365392f85..3b9a054c390aa 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -1099,7 +1099,7 @@ impl Drop for PanicGuard { #[stable(feature = "rust1", since = "1.0.0")] pub fn park() { let guard = PanicGuard; - // SAFETY: park_timeout is called on the parker owned by this thread. + // SAFETY: park is called on the parker owned by this thread. unsafe { current().park(); } @@ -1168,7 +1168,7 @@ pub fn park_timeout(dur: Duration) { let guard = PanicGuard; // SAFETY: park_timeout is called on the parker owned by this thread. unsafe { - current().inner.as_ref().parker().park_timeout(dur); + current().park_timeout(dur); } // No panic occurred, do not abort. forget(guard); @@ -1361,6 +1361,15 @@ impl Thread { unsafe { self.inner.as_ref().parker().park() } } + /// Like the public [`park_timeout`], but callable on any handle. This is used to + /// allow parking in TLS destructors. + /// + /// # Safety + /// May only be called from the thread to which this handle belongs. + pub(crate) unsafe fn park_timeout(&self, dur: Duration) { + unsafe { self.inner.as_ref().parker().park_timeout(dur) } + } + /// Atomically makes the handle's token available if it is not already. /// /// Every thread is equipped with some basic low-level blocking support, via