Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
457 changes: 341 additions & 116 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions modules/axmm/src/aspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ impl AddrSpace {
self.areas.find(vaddr)
}

pub fn find_area(&self, vaddr: VirtAddr) -> Option<&MemoryArea<Backend>> {
self.areas.find(vaddr)
}
Comment on lines +116 to +118
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new find_area method is identical to the existing find_area_mut method on line 113 (which also calls self.areas.find(vaddr)). The only difference is that find_area_mut returns a mutable reference. This creates duplicate logic. Consider whether this new method is necessary, or if callers should use find_area_mut and simply not mutate the result.

Copilot uses AI. Check for mistakes.

/// Add a new linear mapping.
///
/// See [`Backend`] for more details about the mapping backends.
Expand Down
2 changes: 2 additions & 0 deletions modules/axnet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
//! [smoltcp]: https://github.com/smoltcp-rs/smoltcp

#![no_std]
#![feature(ip_from)]
#![feature(maybe_uninit_slice)]

#[macro_use]
extern crate log;
Expand Down
1 change: 1 addition & 0 deletions modules/axtask/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub fn init_scheduler_with_cpu_num(cpu_num: usize) {
CPU_NUM.store(cpu_num, core::sync::atomic::Ordering::Relaxed);

crate::run_queue::init();
crate::executor::init();

info!(" use {} scheduler.", Scheduler::scheduler_name());
}
Expand Down
205 changes: 205 additions & 0 deletions modules/axtask/src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use alloc::{boxed::Box, collections::VecDeque, sync::Arc, task::Wake};
use core::{
future::Future,
pin::Pin,
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
task::{Context, Poll, Waker},
};

use kernel_guard::NoPreemptIrqSave;
use kspin::SpinNoIrq;
use lazyinit::LazyInit;

use crate::{current_run_queue, select_run_queue, TaskId, WeakAxTaskRef};

pub struct AxExecutor {
queue: SpinNoIrq<VecDeque<Arc<AsyncTask>>>,
}

impl AxExecutor {
pub fn new() -> Self {
Self {
queue: SpinNoIrq::new(VecDeque::new()),
}
}

pub fn add_task(&self, task: Arc<AsyncTask>) {
self.queue.lock().push_back(task);
}

pub fn pop_task(&self) -> Option<Arc<AsyncTask>> {
self.queue.lock().pop_front()
}

pub fn is_empty(&self) -> bool {
self.queue.lock().is_empty()
}
}

impl Default for AxExecutor {
fn default() -> Self {
Self::new()
}
}

#[percpu::def_percpu]
static READY_QUEUE: LazyInit<Arc<AxExecutor>> = LazyInit::new();

#[percpu::def_percpu]
static WAKE_COUNT: AtomicUsize = AtomicUsize::new(0);

#[percpu::def_percpu]
static BLOCKED_TASK: SpinNoIrq<Option<WeakAxTaskRef>> = SpinNoIrq::new(None);

pub(crate) fn init() {
READY_QUEUE.with_current(|q| {
q.init_once(Arc::new(AxExecutor::new()));
});
}

pub fn set_blocked_task(task: WeakAxTaskRef) {
BLOCKED_TASK.with_current(|t| {
*t.lock() = Some(task);
});
}

pub fn clear_blocked_task() {
BLOCKED_TASK.with_current(|t| {
*t.lock() = None;
});
}

fn wake_blocked_task() {
BLOCKED_TASK.with_current(|t| {
if let Some(weak) = t.lock().as_ref() {
if let Some(task) = weak.upgrade() {
select_run_queue::<NoPreemptIrqSave>(&task).unblock_task(task, false);
}
}
});
}

/// An asynchronous task that wraps a future.
pub struct AsyncTask {
id: TaskId,
future: SpinNoIrq<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
executor: Arc<AxExecutor>,
enqueued: AtomicBool,
}

impl AsyncTask {
pub fn new(
future: impl Future<Output = ()> + Send + 'static,
executor: Arc<AxExecutor>,
) -> Arc<Self> {
Arc::new(Self {
id: TaskId::new(),
future: SpinNoIrq::new(Box::pin(future)),
executor,
enqueued: AtomicBool::new(false),
})
}

pub fn id(&self) -> TaskId {
self.id
}

pub(crate) fn poll(self: &Arc<Self>) -> Poll<()> {
self.enqueued.store(false, Ordering::Release);
let waker = Waker::from(self.clone());
let mut cx = Context::from_waker(&waker);
let mut future = self.future.lock();
future.as_mut().poll(&mut cx)
}

fn enqueue(self: &Arc<Self>) -> bool {
if self
.enqueued
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.executor.add_task(self.clone());
true
} else {
false
}
}
}

impl Wake for AsyncTask {
fn wake(self: Arc<Self>) {
self.wake_by_ref();
}

fn wake_by_ref(self: &Arc<Self>) {
if self.enqueue() {
WAKE_COUNT.with_current(|c| c.fetch_add(1, Ordering::Release));
wake_blocked_task();
}
}
}

pub fn spawn<F>(future: F)
where
F: Future<Output = ()> + Send + 'static,
{
let executor = READY_QUEUE.with_current(|q| q.clone());
let task = AsyncTask::new(future, executor.clone());
if task.enqueue() {
WAKE_COUNT.with_current(|c| c.fetch_add(1, Ordering::Release));
}
}

pub fn run_once() -> Option<Poll<()>> {
if let Some(task) = READY_QUEUE.with_current(|q| q.pop_task()) {
Some(task.poll())
} else {
None
}
}

pub fn run_for(max_steps: usize) -> bool {
let mut ran = false;
for _ in 0..max_steps {
if run_once().is_none() {
break;
}
ran = true;
}
ran
}

pub fn wake_count() -> usize {
WAKE_COUNT.with_current(|c| c.load(Ordering::Acquire))
}

pub fn is_empty() -> bool {
READY_QUEUE.with_current(|q| q.is_empty())
}

pub fn run_until_idle() {
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 64 for SPIN_LIMIT lacks explanation. Consider adding a comment explaining why this specific value was chosen and what trade-offs it represents between spinning and yielding.

Suggested change
pub fn run_until_idle() {
pub fn run_until_idle() {
// Number of consecutive spin iterations before yielding the CPU.
// 64 is a small power-of-two chosen as a compromise: it allows short-lived
// bursts of wakeups to be handled without an expensive scheduler yield,
// while still bounding the time spent busy-waiting when the system is idle.

Copilot uses AI. Check for mistakes.
const SPIN_LIMIT: usize = 64;
let mut spin_count = 0;
loop {
let seen = wake_count();

while run_once().is_some() {}

let done = {
let _guard = kernel_guard::NoPreempt::new();
is_empty() && wake_count() == seen
};

if done {
break;
}

if spin_count < SPIN_LIMIT {
spin_count += 1;
core::hint::spin_loop();
} else {
spin_count = 0;
current_run_queue::<NoPreemptIrqSave>().yield_current();
}
}
}
48 changes: 36 additions & 12 deletions modules/axtask/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use core::{

use kernel_guard::NoPreemptIrqSave;

use crate::{AxTaskRef, WeakAxTaskRef, current, current_run_queue, select_run_queue};
use crate::{
executor,
AxTaskRef, WeakAxTaskRef, current, current_run_queue, select_run_queue,
};

mod poll;
pub use poll::*;
Expand Down Expand Up @@ -49,8 +52,13 @@ impl Wake for AxWaker {

/// Blocks the current task until the given future is resolved.
///
/// Note that this doesn't handle interruption and is not recommended for direct
/// use in most cases.
/// While waiting for the main future, this function also drives other async tasks
/// in the per-CPU executor. The thread only blocks when both:
/// - The main future is pending (not yet ready)
/// - The per-CPU executor has no ready tasks to run
///
/// When async tasks in the executor are woken, they will also wake up this
/// blocked thread, ensuring that the executor continues to make progress.
pub fn block_on<F: IntoFuture>(f: F) -> F::Output {
let mut fut = pin!(f.into_future());

Expand All @@ -66,16 +74,32 @@ pub fn block_on<F: IntoFuture>(f: F) -> F::Output {

loop {
woke.store(false, Ordering::Release);
match fut.as_mut().poll(&mut cx) {
Poll::Pending => {
if !woke.load(Ordering::Acquire) {
current_run_queue::<NoPreemptIrqSave>().blocked_resched();
} else {
// Immediately woken
crate::yield_now();
}

if let Poll::Ready(output) = fut.as_mut().poll(&mut cx) {
return output;
}

// While waiting for the main future, this function also drives other async tasks
loop {
let seen = executor::wake_count();

while executor::run_once().is_some() {}

let should_block = {
let _guard = kernel_guard::NoPreempt::new();
executor::is_empty()
&& executor::wake_count() == seen
&& !woke.load(Ordering::Acquire)
};

if should_block {
executor::set_blocked_task(Arc::downgrade(&task));
current_run_queue::<NoPreemptIrqSave>().blocked_resched();
executor::clear_blocked_task();
break;
} else if woke.load(Ordering::Acquire) {
break;
}
Poll::Ready(output) => break output,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions modules/axtask/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ cfg_if::cfg_if! {
#[macro_use]
mod run_queue;
mod task;
pub mod executor;
mod api;
mod wait_queue;

Expand Down
2 changes: 1 addition & 1 deletion modules/axtask/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub struct TaskInner {
}

impl TaskId {
fn new() -> Self {
pub(crate) fn new() -> Self {
static ID_COUNTER: AtomicU64 = AtomicU64::new(1);
Self(ID_COUNTER.fetch_add(1, Ordering::Relaxed))
}
Expand Down
Loading