Skip to content

Commit 1a19a60

Browse files
committed
feat: minimal async runtime on top of the NGINX event loop
This change introduces a general infrastructure for spawing async tasks on NGINX event loop. The only utility offered for now is a timer support via `ngx::async_::sleep`, with async IO and other scenarios being planned for future.
1 parent 06ec0e8 commit 1a19a60

File tree

6 files changed

+311
-1
lines changed

6 files changed

+311
-1
lines changed

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,18 @@ rust-version.workspace = true
2626

2727
[dependencies]
2828
allocator-api2 = { version = "0.2.21", default-features = false }
29+
async-task = { version = "4.7.1", optional = true }
2930
lock_api = "0.4.13"
3031
nginx-sys = { path = "nginx-sys", default-features=false, version = "0.5.0"}
32+
pin-project-lite = { version = "0.2.16", optional = true }
3133

3234
[features]
33-
default = ["vendored","std"]
35+
default = ["std", "vendored"]
36+
async = [
37+
"alloc",
38+
"dep:async-task",
39+
"dep:pin-project-lite",
40+
]
3441
# Enables the components using memory allocation.
3542
# If no `std` flag, `alloc` crate is internally used instead. This flag is mainly for `no_std` build.
3643
alloc = ["allocator-api2/alloc"]

src/async_/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
//! Async runtime and set of utilities on top of the NGINX event loop.
2+
pub use self::sleep::{sleep, Sleep};
3+
pub use self::spawn::{spawn, Task};
4+
5+
mod sleep;
6+
mod spawn;

src/async_/sleep.rs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
use core::future::Future;
2+
use core::mem;
3+
use core::pin::Pin;
4+
use core::ptr::{self, NonNull};
5+
use core::task::{self, Poll};
6+
use core::time::Duration;
7+
8+
use nginx_sys::{ngx_add_timer, ngx_del_timer, ngx_event_t, ngx_log_t, ngx_msec_int_t, ngx_msec_t};
9+
use pin_project_lite::pin_project;
10+
11+
use crate::{ngx_container_of, ngx_log_debug};
12+
13+
/// Maximum duration that can be achieved using [ngx_add_timer].
14+
const NGX_TIMER_DURATION_MAX: Duration = Duration::from_millis(ngx_msec_int_t::MAX as _);
15+
16+
/// Puts the current task to sleep for at least the specified amount of time.
17+
///
18+
/// The function is a shorthand for [Sleep::new] using the global logger for debug output.
19+
#[inline]
20+
pub fn sleep(duration: Duration) -> Sleep {
21+
Sleep::new(duration, crate::log::ngx_cycle_log())
22+
}
23+
24+
pin_project! {
25+
/// Future returned by [sleep].
26+
pub struct Sleep {
27+
#[pin]
28+
timer: TimerEvent,
29+
duration: Duration,
30+
}
31+
}
32+
33+
impl Sleep {
34+
/// Creates a new Sleep with the specified duration and logger for debug messages.
35+
pub fn new(duration: Duration, log: NonNull<ngx_log_t>) -> Self {
36+
let timer = TimerEvent::new(log);
37+
ngx_log_debug!(timer.event.log, "async: sleep for {duration:?}");
38+
Sleep { timer, duration }
39+
}
40+
}
41+
42+
impl Future for Sleep {
43+
type Output = ();
44+
45+
#[cfg(not(target_pointer_width = "32"))]
46+
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
47+
let msec = self.duration.min(NGX_TIMER_DURATION_MAX).as_millis() as ngx_msec_t;
48+
let this = self.project();
49+
this.timer.poll_sleep(msec, cx)
50+
}
51+
52+
#[cfg(target_pointer_width = "32")]
53+
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
54+
if self.duration.is_zero() {
55+
return Poll::Ready(());
56+
}
57+
let step = self.duration.min(NGX_TIMER_DURATION_MAX);
58+
59+
let mut this = self.project();
60+
// Handle ngx_msec_t overflow on 32-bit platforms.
61+
match this.timer.as_mut().poll_sleep(step.as_millis() as _, cx) {
62+
// Last step
63+
Poll::Ready(()) if this.duration == &step => Poll::Ready(()),
64+
Poll::Ready(()) => {
65+
*this.duration = this.duration.saturating_sub(step);
66+
this.timer.event.set_timedout(0); // rearm
67+
this.timer.as_mut().poll_sleep(step.as_millis() as _, cx)
68+
}
69+
x => x,
70+
}
71+
}
72+
}
73+
74+
struct TimerEvent {
75+
event: ngx_event_t,
76+
waker: Option<task::Waker>,
77+
}
78+
79+
// SAFETY: Timer will only be used in a single-threaded environment
80+
unsafe impl Send for TimerEvent {}
81+
unsafe impl Sync for TimerEvent {}
82+
83+
impl TimerEvent {
84+
pub fn new(log: NonNull<ngx_log_t>) -> Self {
85+
static IDENT: [usize; 4] = [
86+
0, 0, 0, 0x4153594e, // ASYN
87+
];
88+
89+
let mut ev: ngx_event_t = unsafe { mem::zeroed() };
90+
// The data is only used for `ngx_event_ident` and will not be mutated.
91+
ev.data = ptr::addr_of!(IDENT).cast_mut().cast();
92+
ev.handler = Some(Self::timer_handler);
93+
ev.log = log.as_ptr();
94+
ev.set_cancelable(1);
95+
96+
Self {
97+
event: ev,
98+
waker: None,
99+
}
100+
}
101+
102+
pub fn poll_sleep(
103+
mut self: Pin<&mut Self>,
104+
duration: ngx_msec_t,
105+
context: &mut task::Context<'_>,
106+
) -> Poll<()> {
107+
if self.event.timedout() != 0 {
108+
Poll::Ready(())
109+
} else if self.event.timer_set() != 0 {
110+
if let Some(waker) = self.waker.as_mut() {
111+
waker.clone_from(context.waker());
112+
} else {
113+
self.waker = Some(context.waker().clone());
114+
}
115+
Poll::Pending
116+
} else {
117+
unsafe { ngx_add_timer(ptr::addr_of_mut!(self.event), duration) };
118+
self.waker = Some(context.waker().clone());
119+
Poll::Pending
120+
}
121+
}
122+
123+
unsafe extern "C" fn timer_handler(ev: *mut ngx_event_t) {
124+
let timer = ngx_container_of!(ev, Self, event);
125+
126+
if let Some(waker) = (*timer).waker.take() {
127+
waker.wake();
128+
}
129+
}
130+
}
131+
132+
impl Drop for TimerEvent {
133+
fn drop(&mut self) {
134+
if self.event.timer_set() != 0 {
135+
unsafe { ngx_del_timer(ptr::addr_of_mut!(self.event)) };
136+
}
137+
}
138+
}

src/async_/spawn.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use core::cell::UnsafeCell;
2+
use core::future::Future;
3+
use core::mem;
4+
use core::ptr::{self, NonNull};
5+
6+
#[cfg(all(not(feature = "std"), feature = "alloc"))]
7+
use alloc::collections::vec_deque::VecDeque;
8+
#[cfg(feature = "std")]
9+
use std::collections::vec_deque::VecDeque;
10+
11+
pub use async_task::Task;
12+
use async_task::{Runnable, ScheduleInfo, WithInfo};
13+
use nginx_sys::{
14+
ngx_cycle, ngx_del_timer, ngx_delete_posted_event, ngx_event_t, ngx_post_event,
15+
ngx_posted_next_events,
16+
};
17+
18+
use crate::log::ngx_cycle_log;
19+
use crate::{ngx_container_of, ngx_log_debug};
20+
21+
static SCHEDULER: Scheduler = Scheduler::new();
22+
23+
struct Scheduler(UnsafeCell<SchedulerInner>);
24+
25+
// SAFETY: Scheduler must only be used from the main thread of a worker process.
26+
unsafe impl Send for Scheduler {}
27+
unsafe impl Sync for Scheduler {}
28+
29+
impl Scheduler {
30+
const fn new() -> Self {
31+
Self(UnsafeCell::new(SchedulerInner::new()))
32+
}
33+
34+
pub fn schedule(&self, runnable: Runnable) {
35+
// SAFETY: the cell is not empty, and we have exclusive access due to being a
36+
// single-threaded application.
37+
let inner = unsafe { &mut *UnsafeCell::raw_get(&self.0) };
38+
inner.send(runnable)
39+
}
40+
}
41+
42+
#[repr(C)]
43+
struct SchedulerInner {
44+
_ident: [usize; 4], // `ngx_event_ident` compatibility
45+
event: ngx_event_t,
46+
queue: VecDeque<Runnable>,
47+
}
48+
49+
impl SchedulerInner {
50+
const fn new() -> Self {
51+
let mut event: ngx_event_t = unsafe { mem::zeroed() };
52+
event.handler = Some(Self::scheduler_event_handler);
53+
54+
Self {
55+
_ident: [
56+
0, 0, 0, 0x4153594e, // ASYN
57+
],
58+
event,
59+
queue: VecDeque::new(),
60+
}
61+
}
62+
63+
pub fn send(&mut self, runnable: Runnable) {
64+
// Cached `ngx_cycle.log` can be invalidated when reloading configuration in a single
65+
// process mode. Update `log` every time to avoid using stale log pointer.
66+
self.event.log = ngx_cycle_log().as_ptr();
67+
68+
// While this event is not used as a timer at the moment, we still want to ensure that it is
69+
// compatible with `ngx_event_ident`.
70+
if self.event.data.is_null() {
71+
self.event.data = ptr::from_mut(self).cast();
72+
}
73+
74+
// FIXME: VecDeque::push could panic on an allocation failure, switch to a datastructure
75+
// which will not and propagate the failure.
76+
self.queue.push_back(runnable);
77+
unsafe { ngx_post_event(&mut self.event, ptr::addr_of_mut!(ngx_posted_next_events)) }
78+
}
79+
80+
/// This event handler is called by ngx_event_process_posted at the end of
81+
/// ngx_process_events_and_timers.
82+
extern "C" fn scheduler_event_handler(ev: *mut ngx_event_t) {
83+
let mut runnables = {
84+
// SAFETY:
85+
// This handler always receives a non-null pointer to an event embedded into a
86+
// SchedulerInner instance.
87+
// We modify the contents of `UnsafeCell`, but we ensured that the access is unique due
88+
// to being single-threaded and dropping the reference before we start processing queued
89+
// runnables.
90+
let this =
91+
unsafe { ngx_container_of!(NonNull::new_unchecked(ev), Self, event).as_mut() };
92+
93+
ngx_log_debug!(
94+
this.event.log,
95+
"async: processing {} deferred wakeups",
96+
this.queue.len()
97+
);
98+
99+
// Move runnables to a new queue to avoid borrowing from the SchedulerInner and limit
100+
// processing to already queued wakeups. This ensures that we correctly handle tasks
101+
// that keep scheduling themselves (e.g. using yield_now() in a loop).
102+
// We can't use drain() as it borrows from self and breaks aliasing rules.
103+
mem::take(&mut this.queue)
104+
};
105+
106+
for runnable in runnables.drain(..) {
107+
runnable.run();
108+
}
109+
}
110+
}
111+
112+
impl Drop for SchedulerInner {
113+
fn drop(&mut self) {
114+
if self.event.posted() != 0 {
115+
unsafe { ngx_delete_posted_event(&mut self.event) };
116+
}
117+
118+
if self.event.timer_set() != 0 {
119+
unsafe { ngx_del_timer(&mut self.event) };
120+
}
121+
}
122+
}
123+
124+
fn schedule(runnable: Runnable, info: ScheduleInfo) {
125+
if info.woken_while_running {
126+
SCHEDULER.schedule(runnable);
127+
ngx_log_debug!(
128+
ngx_cycle_log().as_ptr(),
129+
"async: task scheduled while running"
130+
);
131+
} else {
132+
runnable.run();
133+
}
134+
}
135+
136+
/// Creates a new task running on the NGINX event loop.
137+
pub fn spawn<F, T>(future: F) -> Task<T>
138+
where
139+
F: Future<Output = T> + 'static,
140+
T: 'static,
141+
{
142+
ngx_log_debug!(unsafe { (*ngx_cycle).log }, "async: spawned new task");
143+
let scheduler = WithInfo(schedule);
144+
// Safety: single threaded embedding takes care of send/sync requirements for future and
145+
// scheduler. Future and scheduler are both 'static.
146+
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, scheduler) };
147+
runnable.schedule();
148+
task
149+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ extern crate alloc;
4141
extern crate std;
4242

4343
pub mod allocator;
44+
#[cfg(feature = "async")]
45+
pub mod async_;
4446

4547
/// The core module.
4648
///

0 commit comments

Comments
 (0)