Skip to content

feat: minimal async runtime on top of the NGINX event loop #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@ rust-version.workspace = true

[dependencies]
allocator-api2 = { version = "0.2.21", default-features = false }
async-task = { version = "4.7.1", optional = true }
lock_api = "0.4.13"
nginx-sys = { path = "nginx-sys", default-features=false, version = "0.5.0"}
pin-project-lite = { version = "0.2.16", optional = true }

[features]
default = ["vendored","std"]
default = ["std", "vendored"]
async = [
"alloc",
"dep:async-task",
"dep:pin-project-lite",
]
# Enables the components using memory allocation.
# If no `std` flag, `alloc` crate is internally used instead. This flag is mainly for `no_std` build.
alloc = ["allocator-api2/alloc"]
Expand Down
6 changes: 6 additions & 0 deletions src/async_/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! Async runtime and set of utilities on top of the NGINX event loop.
pub use self::sleep::{sleep, Sleep};
pub use self::spawn::{spawn, Task};

mod sleep;
mod spawn;
138 changes: 138 additions & 0 deletions src/async_/sleep.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use core::future::Future;
use core::mem;
use core::pin::Pin;
use core::ptr::{self, NonNull};
use core::task::{self, Poll};
use core::time::Duration;

use nginx_sys::{ngx_add_timer, ngx_del_timer, ngx_event_t, ngx_log_t, ngx_msec_int_t, ngx_msec_t};
use pin_project_lite::pin_project;

use crate::{ngx_container_of, ngx_log_debug};

/// Maximum duration that can be achieved using [ngx_add_timer].
const NGX_TIMER_DURATION_MAX: Duration = Duration::from_millis(ngx_msec_int_t::MAX as _);

/// Puts the current task to sleep for at least the specified amount of time.
///
/// The function is a shorthand for [Sleep::new] using the global logger for debug output.
#[inline]
pub fn sleep(duration: Duration) -> Sleep {
Sleep::new(duration, crate::log::ngx_cycle_log())
}

pin_project! {
/// Future returned by [sleep].
pub struct Sleep {
#[pin]
timer: TimerEvent,
duration: Duration,
}
}

impl Sleep {
/// Creates a new Sleep with the specified duration and logger for debug messages.
pub fn new(duration: Duration, log: NonNull<ngx_log_t>) -> Self {
let timer = TimerEvent::new(log);
ngx_log_debug!(timer.event.log, "async: sleep for {duration:?}");
Sleep { timer, duration }
}
}

impl Future for Sleep {
type Output = ();

#[cfg(not(target_pointer_width = "32"))]
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let msec = self.duration.min(NGX_TIMER_DURATION_MAX).as_millis() as ngx_msec_t;
let this = self.project();
this.timer.poll_sleep(msec, cx)
}

#[cfg(target_pointer_width = "32")]
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
if self.duration.is_zero() {
return Poll::Ready(());
}
let step = self.duration.min(NGX_TIMER_DURATION_MAX);

let mut this = self.project();
// Handle ngx_msec_t overflow on 32-bit platforms.
match this.timer.as_mut().poll_sleep(step.as_millis() as _, cx) {
// Last step
Poll::Ready(()) if this.duration == &step => Poll::Ready(()),
Poll::Ready(()) => {
*this.duration = this.duration.saturating_sub(step);
this.timer.event.set_timedout(0); // rearm
this.timer.as_mut().poll_sleep(step.as_millis() as _, cx)
}
x => x,
}
}
}

struct TimerEvent {
event: ngx_event_t,
waker: Option<task::Waker>,
}

// SAFETY: Timer will only be used in a single-threaded environment
unsafe impl Send for TimerEvent {}
unsafe impl Sync for TimerEvent {}

impl TimerEvent {
pub fn new(log: NonNull<ngx_log_t>) -> Self {
static IDENT: [usize; 4] = [
0, 0, 0, 0x4153594e, // ASYN
];

let mut ev: ngx_event_t = unsafe { mem::zeroed() };
// The data is only used for `ngx_event_ident` and will not be mutated.
ev.data = ptr::addr_of!(IDENT).cast_mut().cast();
ev.handler = Some(Self::timer_handler);
ev.log = log.as_ptr();
ev.set_cancelable(1);

Self {
event: ev,
waker: None,
}
}

pub fn poll_sleep(
mut self: Pin<&mut Self>,
duration: ngx_msec_t,
context: &mut task::Context<'_>,
) -> Poll<()> {
if self.event.timedout() != 0 {
Poll::Ready(())
} else if self.event.timer_set() != 0 {
if let Some(waker) = self.waker.as_mut() {
waker.clone_from(context.waker());
} else {
self.waker = Some(context.waker().clone());
}
Poll::Pending
} else {
unsafe { ngx_add_timer(ptr::addr_of_mut!(self.event), duration) };
self.waker = Some(context.waker().clone());
Poll::Pending
}
}

unsafe extern "C" fn timer_handler(ev: *mut ngx_event_t) {
let timer = ngx_container_of!(ev, Self, event);

if let Some(waker) = (*timer).waker.take() {
waker.wake();
}
}
}

impl Drop for TimerEvent {
fn drop(&mut self) {
if self.event.timer_set() != 0 {
unsafe { ngx_del_timer(ptr::addr_of_mut!(self.event)) };
}
}
}
148 changes: 148 additions & 0 deletions src/async_/spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use core::cell::UnsafeCell;
use core::future::Future;
use core::mem;
use core::ptr::{self, NonNull};

#[cfg(all(not(feature = "std"), feature = "alloc"))]
use alloc::collections::vec_deque::VecDeque;
#[cfg(feature = "std")]
use std::collections::vec_deque::VecDeque;

pub use async_task::Task;
use async_task::{Runnable, ScheduleInfo, WithInfo};
use nginx_sys::{
ngx_del_timer, ngx_delete_posted_event, ngx_event_t, ngx_post_event, ngx_posted_next_events,
};

use crate::log::ngx_cycle_log;
use crate::{ngx_container_of, ngx_log_debug};

static SCHEDULER: Scheduler = Scheduler::new();

struct Scheduler(UnsafeCell<SchedulerInner>);

// SAFETY: Scheduler must only be used from the main thread of a worker process.
unsafe impl Send for Scheduler {}
unsafe impl Sync for Scheduler {}

impl Scheduler {
const fn new() -> Self {
Self(UnsafeCell::new(SchedulerInner::new()))
}

pub fn schedule(&self, runnable: Runnable) {
// SAFETY: the cell is not empty, and we have exclusive access due to being a
// single-threaded application.
let inner = unsafe { &mut *UnsafeCell::raw_get(&self.0) };
inner.send(runnable)
}
}

#[repr(C)]
struct SchedulerInner {
_ident: [usize; 4], // `ngx_event_ident` compatibility
event: ngx_event_t,
queue: VecDeque<Runnable>,
}

impl SchedulerInner {
const fn new() -> Self {
let mut event: ngx_event_t = unsafe { mem::zeroed() };
event.handler = Some(Self::scheduler_event_handler);

Self {
_ident: [
0, 0, 0, 0x4153594e, // ASYN
],
event,
queue: VecDeque::new(),
}
}

pub fn send(&mut self, runnable: Runnable) {
// Cached `ngx_cycle.log` can be invalidated when reloading configuration in a single
// process mode. Update `log` every time to avoid using stale log pointer.
self.event.log = ngx_cycle_log().as_ptr();

// While this event is not used as a timer at the moment, we still want to ensure that it is
// compatible with `ngx_event_ident`.
if self.event.data.is_null() {
self.event.data = ptr::from_mut(self).cast();
}

// FIXME: VecDeque::push could panic on an allocation failure, switch to a datastructure
// which will not and propagate the failure.
self.queue.push_back(runnable);
unsafe { ngx_post_event(&mut self.event, ptr::addr_of_mut!(ngx_posted_next_events)) }
}

/// This event handler is called by ngx_event_process_posted at the end of
/// ngx_process_events_and_timers.
extern "C" fn scheduler_event_handler(ev: *mut ngx_event_t) {
let mut runnables = {
// SAFETY:
// This handler always receives a non-null pointer to an event embedded into a
// SchedulerInner instance.
// We modify the contents of `UnsafeCell`, but we ensured that the access is unique due
// to being single-threaded and dropping the reference before we start processing queued
// runnables.
let this =
unsafe { ngx_container_of!(NonNull::new_unchecked(ev), Self, event).as_mut() };

ngx_log_debug!(
this.event.log,
"async: processing {} deferred wakeups",
this.queue.len()
);

// Move runnables to a new queue to avoid borrowing from the SchedulerInner and limit
// processing to already queued wakeups. This ensures that we correctly handle tasks
// that keep scheduling themselves (e.g. using yield_now() in a loop).
// We can't use drain() as it borrows from self and breaks aliasing rules.
mem::take(&mut this.queue)
};

for runnable in runnables.drain(..) {
runnable.run();
}
}
}

impl Drop for SchedulerInner {
fn drop(&mut self) {
if self.event.posted() != 0 {
unsafe { ngx_delete_posted_event(&mut self.event) };
}

if self.event.timer_set() != 0 {
unsafe { ngx_del_timer(&mut self.event) };
}
}
}

fn schedule(runnable: Runnable, info: ScheduleInfo) {
if info.woken_while_running {
SCHEDULER.schedule(runnable);
ngx_log_debug!(
ngx_cycle_log().as_ptr(),
"async: task scheduled while running"
);
} else {
runnable.run();
}
}

/// Creates a new task running on the NGINX event loop.
pub fn spawn<F, T>(future: F) -> Task<T>
where
F: Future<Output = T> + 'static,
T: 'static,
{
ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: spawning new task");
let scheduler = WithInfo(schedule);
// Safety: single threaded embedding takes care of send/sync requirements for future and
// scheduler. Future and scheduler are both 'static.
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, scheduler) };
runnable.schedule();
task
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ extern crate alloc;
extern crate std;

pub mod allocator;
#[cfg(feature = "async")]
pub mod async_;

/// The core module.
///
Expand Down
14 changes: 14 additions & 0 deletions src/log.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::cmp;
use core::fmt::{self, Write};
use core::mem::MaybeUninit;
use core::ptr::NonNull;

use crate::ffi::{self, ngx_err_t, ngx_log_t, ngx_uint_t, NGX_MAX_ERROR_STR};

Expand All @@ -11,6 +12,19 @@ use crate::ffi::{self, ngx_err_t, ngx_log_t, ngx_uint_t, NGX_MAX_ERROR_STR};
pub const LOG_BUFFER_SIZE: usize =
NGX_MAX_ERROR_STR as usize - b"1970/01/01 00:00:00 [info] 1#1: ".len();

/// Obtains a pointer to the global (cycle) log object.
///
/// The returned pointer is tied to the current cycle lifetime, and will be invalidated by a
/// configuration reload in the master process or in a single-process mode. If you plan to store it,
/// make sure that your storage is also tied to the cycle lifetime (e.g. module configuration or
/// connection/request data).
///
/// The function may panic if you call it before the main() in nginx creates an initial cycle.
#[inline(always)]
pub fn ngx_cycle_log() -> NonNull<ngx_log_t> {
NonNull::new(unsafe { (*nginx_sys::ngx_cycle).log }).expect("global logger")
}

/// Utility function to provide typed checking of the mask's field state.
#[inline(always)]
pub fn check_mask(mask: DebugMask, log_level: usize) -> bool {
Expand Down
Loading