Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(threads): add mutex with priority inheritance #398

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ members = [
"tests/i2c-controller",
"tests/threading-dynamic-prios",
"tests/threading-lock",
"tests/threading-mutex",
]

exclude = ["src/lib"]
Expand Down
1 change: 1 addition & 0 deletions src/riot-rs-threads/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#![cfg_attr(not(test), no_std)]
#![feature(naked_functions)]
#![feature(used_with_arg)]
#![feature(negative_impls)]
#![cfg_attr(target_arch = "xtensa", feature(asm_experimental_arch))]
// Disable indexing lints for now, possible panics are documented or rely on internally-enforced
// invariants
Expand Down
2 changes: 2 additions & 0 deletions src/riot-rs-threads/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Synchronization primitives.
mod channel;
mod lock;
mod mutex;

pub use channel::Channel;
pub use lock::Lock;
pub use mutex::{Mutex, MutexGuard};
205 changes: 205 additions & 0 deletions src/riot-rs-threads/src/sync/mutex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use core::{
cell::UnsafeCell,
ops::{Deref, DerefMut},
};

use critical_section::CriticalSection;
use riot_rs_runqueue::{RunqueueId, ThreadId};

use crate::{thread::ThreadState, threadlist::ThreadList, THREADS};

/// A basic mutex with priority inheritance.
pub struct Mutex<T> {
ROMemories marked this conversation as resolved.
Show resolved Hide resolved
state: UnsafeCell<LockState>,
inner: UnsafeCell<T>,
}

/// State of a [`Mutex`].
enum LockState {
Unlocked,
Locked {
/// The current owner of the lock.
owner_id: ThreadId,
/// The original priority of the current owner (without priority inheritance).
owner_prio: RunqueueId,
//. Waiters for the mutex.
waiters: ThreadList,
},
}

impl LockState {
/// Returns a [`LockState::Locked`] with the current thread as the owner
/// and an empty waitlist.
///
/// # Panics
///
/// Panics if called outside of a thread context.
fn locked_with_current(cs: CriticalSection) -> Self {
let (owner_id, owner_prio) = THREADS.with_mut_cs(cs, |mut threads| {
let current = threads
.current()
.expect("Function should be called inside a thread context.");
(current.pid, current.prio)
});
LockState::Locked {
waiters: ThreadList::new(),
owner_id,
owner_prio,
}
}
}

impl<T> Mutex<T> {
/// Creates a new **unlocked** [`Mutex`].
pub const fn new(value: T) -> Self {
Self {
state: UnsafeCell::new(LockState::Unlocked),
inner: UnsafeCell::new(value),
}
}
}

impl<T> Mutex<T> {
/// Returns whether the mutex is locked.
pub fn is_locked(&self) -> bool {
critical_section::with(|_| {
let state = unsafe { &*self.state.get() };
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
!matches!(state, LockState::Unlocked)
})
}

/// Acquires a mutex, blocking the current thread until it is able to do so.
///
/// If the mutex was unlocked, it will be locked and a [`MutexGuard`] is returned.
/// If the mutex is locked, this function will block the current thread until the mutex gets
/// unlocked elsewhere.
///
/// If the current owner of the mutex has a lower priority than the current thread, it will inherit
/// the waiting thread's priority.
/// The priority is reset once the mutex is released. This means that a **user can not change a thread's
/// priority while it holds the lock**, because it will be changed back after release!
///
/// # Panics
///
/// Panics if called outside of a thread context.
pub fn lock(&self) -> MutexGuard<T> {
critical_section::with(|cs| {
// SAFETY: access to the state only happens in critical sections, so it's always unique.
let state = unsafe { &mut *self.state.get() };
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
match state {
LockState::Unlocked => {
*state = LockState::locked_with_current(cs);
}
LockState::Locked {
waiters,
owner_id,
owner_prio,
} => {
// Insert thread in waitlist, which also triggers the scheduler.
match waiters.put_current(cs, ThreadState::LockBlocked) {
// `Some` when the inserted thread is the highest priority
// thread in the waitlist.
Some(waiter_prio) if waiter_prio > *owner_prio => {
// Current mutex owner inherits the priority.
THREADS.with_mut_cs(cs, |mut threads| {
threads.set_priority(*owner_id, waiter_prio)
});
}
_ => {}
}
// Context switch happens here as soon as we leave the critical section.
ROMemories marked this conversation as resolved.
Show resolved Hide resolved
}
}
});
// Mutex was either directly acquired because it was unlocked, or the current thread was entered
// to the waitlist. In the latter case, it only continues running here after it was popped again
// from the waitlist and the thread acquired the mutex.

MutexGuard { mutex: self }
}

/// Attempts to acquire this lock, in a non-blocking fashion.
///
/// If the mutex was unlocked, it will be locked and a [`MutexGuard`] is returned.
/// If the mutex was locked `None` is returned.
pub fn try_lock(&self) -> Option<MutexGuard<T>> {
critical_section::with(|cs| {
// SAFETY: access to the state only happens in critical sections, so it's always unique.
let state = unsafe { &mut *self.state.get() };
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
if let LockState::Unlocked = *state {
*state = LockState::locked_with_current(cs);
Some(MutexGuard { mutex: self })
} else {
None
}
})
}

/// Releases the mutex.
///
/// If there are waiters, the first waiter will be woken up.
fn release(&self) {
critical_section::with(|cs| {
// SAFETY: access to the state only happens in critical sections, so it's always unique.
let state = unsafe { &mut *self.state.get() };
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
if let LockState::Locked {
waiters,
owner_id,
owner_prio,
} = state
{
// Reset original priority of owner.
THREADS.with_mut_cs(cs, |mut threads| {
threads.set_priority(*owner_id, *owner_prio)
});
// Pop next thread from waitlist so that it can acquire the mutex.
if let Some((pid, _)) = waiters.pop(cs) {
THREADS.with_mut_cs(cs, |threads| {
*owner_id = pid;
*owner_prio = threads.get_unchecked(pid).prio;
})
} else {
// Unlock if waitlist was empty.
*state = LockState::Unlocked
}
}
})
}
}

unsafe impl<T> Sync for Mutex<T> {}

/// Grants access to the [`Mutex`] inner data.
///
/// Dropping the [`MutexGuard`] will unlock the [`Mutex`];
pub struct MutexGuard<'a, T> {
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
mutex: &'a Mutex<T>,
}

impl<'a, T> Deref for MutexGuard<'a, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
// SAFETY: MutexGuard always has unique access.
unsafe { &*self.mutex.inner.get() }
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl<'a, T> DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
// SAFETY: MutexGuard always has unique access.
unsafe { &mut *self.mutex.inner.get() }
ROMemories marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl<'a, T> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
// Unlock the mutex when the guard is dropped.
self.mutex.release()
}
}

// The `MutexGuard` is tied to a thread, it must not be possible to `Send` it to another thread.
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
impl<T> !Send for MutexGuard<'_, T> {}
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved

unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
25 changes: 17 additions & 8 deletions src/riot-rs-threads/src/threadlist.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use critical_section::CriticalSection;

use crate::{thread::Thread, ThreadId, ThreadState, THREADS};
use crate::{thread::Thread, RunqueueId, ThreadId, ThreadState, THREADS};

/// Manages blocked [`super::Thread`]s for a resource, and triggering the scheduler when needed.
#[derive(Debug, Default)]
pub struct ThreadList {
/// Next thread to run once the resource is available.
pub head: Option<ThreadId>,
head: Option<ThreadId>,
}

impl ThreadList {
Expand All @@ -17,10 +17,12 @@ impl ThreadList {

/// Puts the current (blocked) thread into this [`ThreadList`] and triggers the scheduler.
///
/// Returns a `RunqueueId` if the highest priority among the waiters in the list has changed.
///
/// # Panics
///
/// Panics if this is called outside of a thread context.
pub fn put_current(&mut self, cs: CriticalSection, state: ThreadState) {
pub fn put_current(&mut self, cs: CriticalSection, state: ThreadState) -> Option<RunqueueId> {
THREADS.with_mut_cs(cs, |mut threads| {
let &mut Thread { pid, prio, .. } = threads
.current()
Expand All @@ -35,12 +37,19 @@ impl ThreadList {
next = threads.thread_blocklist[usize::from(n)];
}
threads.thread_blocklist[usize::from(pid)] = next;
match curr {
Some(curr) => threads.thread_blocklist[usize::from(curr)] = Some(pid),
_ => self.head = Some(pid),
}
let inherit_priority = match curr {
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
Some(curr) => {
threads.thread_blocklist[usize::from(curr)] = Some(pid);
None
}
None => {
self.head = Some(pid);
Some(prio)
}
};
threads.set_state(pid, state);
});
inherit_priority
})
}

/// Removes the head from this [`ThreadList`].
Expand Down
1 change: 1 addition & 0 deletions tests/laze.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ subdirs:
- i2c-controller
- threading-dynamic-prios
- threading-lock
- threading-mutex
13 changes: 13 additions & 0 deletions tests/threading-mutex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "threading-mutex"
version = "0.1.0"
authors = ["Elena Frank <[email protected]>"]
license.workspace = true
edition.workspace = true
publish = false

[dependencies]
embassy-executor = { workspace = true }
riot-rs = { path = "../../src/riot-rs", features = ["threading"] }
riot-rs-boards = { path = "../../src/riot-rs-boards" }
portable-atomic = "1.6.0"
5 changes: 5 additions & 0 deletions tests/threading-mutex/laze.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apps:
- name: threading-mutex
selects:
- ?release
- sw/threading
Loading
Loading