Skip to content

Commit

Permalink
feat: support error context in stream/error operations
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Adossi <[email protected]>
  • Loading branch information
vados-cosmonic committed Feb 7, 2025
1 parent b19ef10 commit 7ac0f08
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 74 deletions.
51 changes: 39 additions & 12 deletions crates/guest-rust/rt/src/async_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ pub enum Handle {
LocalClosed,
Read,
Write,
// Local end is closed with an error
// NOTE: this is only valid for write ends
WriteClosedErr(Option<ErrorContext>),
}

/// The current task being polled (or null if none).
Expand Down Expand Up @@ -176,7 +179,7 @@ pub async unsafe fn await_result(
STATUS_RETURNED | STATUS_DONE => {
alloc::dealloc(params, params_layout);
}
_ => unreachable!(),
_ => unreachable!("unrecognized async call status"),
}
}

Expand All @@ -187,26 +190,50 @@ mod results {
pub const CANCELED: u32 = 0;
}

/// Result of awaiting a asynchronous read or write
#[doc(hidden)]
pub enum AsyncWaitResult {
/// Used when a value was successfully sent or received
Values(usize),
/// Represents a successful but error-indicating read
Error(u32),
/// Represents a failed read (closed, canceled, etc)
End,
}

impl AsyncWaitResult {
/// Interpret the results from an async operation that is known to *not* be blocked
fn from_nonblocked_async_result(v: u32) -> Self {
match v {
results::CLOSED | results::CANCELED => Self::End,
v => {
if v & results::CLOSED != 0 {
Self::Error(v & !results::CLOSED)
} else {
Self::Values(v as usize)
}
}
}
}
}

/// Await the completion of a future read or write.
#[doc(hidden)]
pub async unsafe fn await_future_result(
import: unsafe extern "C" fn(u32, *mut u8) -> u32,
future: u32,
address: *mut u8,
) -> bool {
) -> AsyncWaitResult {
let result = import(future, address);
match result {
results::BLOCKED => {
assert!(!CURRENT.is_null());
(*CURRENT).todo += 1;
let (tx, rx) = oneshot::channel();
CALLS.insert(future as _, tx);
let v = rx.await.unwrap();
v == 1
AsyncWaitResult::from_nonblocked_async_result(rx.await.unwrap())
}
results::CLOSED | results::CANCELED => false,
1 => true,
_ => unreachable!(),
v => AsyncWaitResult::from_nonblocked_async_result(v),
}
}

Expand All @@ -217,7 +244,7 @@ pub async unsafe fn await_stream_result(
stream: u32,
address: *mut u8,
count: u32,
) -> Option<usize> {
) -> AsyncWaitResult {
let result = import(stream, address, count);
match result {
results::BLOCKED => {
Expand All @@ -227,13 +254,12 @@ pub async unsafe fn await_stream_result(
CALLS.insert(stream as _, tx);
let v = rx.await.unwrap();
if let results::CLOSED | results::CANCELED = v {
None
AsyncWaitResult::End
} else {
Some(usize::try_from(v).unwrap())
AsyncWaitResult::Values(usize::try_from(v).unwrap())
}
}
results::CLOSED | results::CANCELED => None,
v => Some(usize::try_from(v).unwrap()),
v => AsyncWaitResult::from_nonblocked_async_result(v),
}
}

Expand Down Expand Up @@ -310,6 +336,7 @@ pub unsafe fn callback(ctx: *mut u8, event0: i32, event1: i32, event2: i32) -> i
}

/// Represents the Component Model `error-context` type.
#[derive(PartialEq, Eq)]
pub struct ErrorContext {
handle: u32,
}
Expand Down
74 changes: 58 additions & 16 deletions crates/guest-rust/rt/src/async_support/future_support.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
extern crate std;

use {
super::ErrorContext,
super::Handle,
futures::{
channel::oneshot,
Expand All @@ -20,10 +21,10 @@ use {
#[doc(hidden)]
pub struct FutureVtable<T> {
pub write: fn(future: u32, value: T) -> Pin<Box<dyn Future<Output = bool>>>,
pub read: fn(future: u32) -> Pin<Box<dyn Future<Output = Option<T>>>>,
pub read: fn(future: u32) -> Pin<Box<dyn Future<Output = Option<Result<T, ErrorContext>>>>>,
pub cancel_write: fn(future: u32),
pub cancel_read: fn(future: u32),
pub close_writable: fn(future: u32),
pub close_writable: fn(future: u32, err_ctx: u32),
pub close_readable: fn(future: u32),
}

Expand Down Expand Up @@ -78,7 +79,8 @@ impl<T> CancelableWrite<T> {
Handle::LocalOpen
| Handle::LocalWaiting(_)
| Handle::Read
| Handle::LocalClosed => unreachable!(),
| Handle::LocalClosed
| Handle::WriteClosedErr(_) => unreachable!(),
Handle::LocalReady(..) => {
entry.insert(Handle::LocalOpen);
}
Expand Down Expand Up @@ -126,7 +128,9 @@ impl<T> FutureWriter<T> {
Poll::Pending
}
Handle::LocalReady(..) => Poll::Pending,
Handle::LocalClosed => Poll::Ready(()),
Handle::LocalClosed | Handle::WriteClosedErr(_) => {
Poll::Ready(())
}
Handle::LocalWaiting(_) | Handle::Read | Handle::Write => {
unreachable!()
}
Expand All @@ -141,13 +145,29 @@ impl<T> FutureWriter<T> {
_ = tx.send(Box::new(v));
Box::pin(future::ready(()))
}
Handle::LocalClosed => Box::pin(future::ready(())),
Handle::LocalClosed | Handle::WriteClosedErr(_) => Box::pin(future::ready(())),
Handle::Read | Handle::LocalReady(..) => unreachable!(),
Handle::Write => Box::pin((vtable.write)(handle, v).map(drop)),
},
}),
}
}

/// Close the writer with an error that will be returned as the last value
///
/// Note that this error is not sent immediately, but only when the
/// writer closes, which is normally a result of a `drop()`
pub fn close_with_error(&mut self, err: ErrorContext) {
super::with_entry(self.handle, move |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get_mut() {
// Regardless of current state, put the writer into a closed with error state
_ => {
entry.insert(Handle::WriteClosedErr(Some(err)));
}
},
});
}
}

impl<T> Drop for FutureWriter<T> {
Expand All @@ -161,7 +181,18 @@ impl<T> Drop for FutureWriter<T> {
Handle::Read => unreachable!(),
Handle::Write | Handle::LocalClosed => {
entry.remove();
(self.vtable.close_writable)(self.handle);
(self.vtable.close_writable)(self.handle, 0);
}
Handle::WriteClosedErr(_) => {
match entry.remove() {
Handle::WriteClosedErr(None) => {
(self.vtable.close_writable)(self.handle, 0);
}
Handle::WriteClosedErr(Some(err_ctx)) => {
(self.vtable.close_writable)(self.handle, err_ctx.handle());
}
_ => unreachable!(),
}
}
},
});
Expand All @@ -171,13 +202,13 @@ impl<T> Drop for FutureWriter<T> {
/// Represents a read operation which may be canceled prior to completion.
pub struct CancelableRead<T: 'static> {
reader: Option<FutureReader<T>>,
future: Pin<Box<dyn Future<Output = Option<T>>>>,
future: Pin<Box<dyn Future<Output = Option<Result<T, ErrorContext>>>>>,
}

impl<T> Future for CancelableRead<T> {
type Output = Option<T>;
type Output = Option<Result<T, ErrorContext>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<T>> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T, ErrorContext>>> {
let me = self.get_mut();
match me.future.poll_unpin(cx) {
Poll::Ready(v) => {
Expand Down Expand Up @@ -206,7 +237,8 @@ impl<T> CancelableRead<T> {
Handle::LocalOpen
| Handle::LocalReady(..)
| Handle::Write
| Handle::LocalClosed => unreachable!(),
| Handle::LocalClosed
| Handle::WriteClosedErr(_) => unreachable!(),
Handle::LocalWaiting(_) => {
entry.insert(Handle::LocalOpen);
}
Expand Down Expand Up @@ -262,7 +294,8 @@ impl<T> FutureReader<T> {
| Handle::LocalOpen
| Handle::LocalReady(..)
| Handle::LocalWaiting(_)
| Handle::LocalClosed => {
| Handle::LocalClosed
| Handle::WriteClosedErr(_) => {
unreachable!()
}
},
Expand All @@ -286,7 +319,10 @@ impl<T> FutureReader<T> {
Handle::Read | Handle::LocalClosed => {
entry.remove();
}
Handle::LocalReady(..) | Handle::LocalWaiting(_) | Handle::Write => unreachable!(),
Handle::LocalReady(..)
| Handle::LocalWaiting(_)
| Handle::Write
| Handle::WriteClosedErr(_) => unreachable!(),
},
});

Expand All @@ -295,7 +331,7 @@ impl<T> FutureReader<T> {
}

impl<T> IntoFuture for FutureReader<T> {
type Output = Option<T>;
type Output = Option<Result<T, ErrorContext>>;
type IntoFuture = CancelableRead<T>;

/// Convert this object into a `Future` which will resolve when a value is
Expand All @@ -308,8 +344,10 @@ impl<T> IntoFuture for FutureReader<T> {
reader: Some(self),
future: super::with_entry(handle, |entry| match entry {
Entry::Vacant(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get() {
Handle::Write | Handle::LocalWaiting(_) => unreachable!(),
Entry::Occupied(mut entry) => match entry.get_mut() {
Handle::Write | Handle::LocalWaiting(_) => {
unreachable!()
}
Handle::Read => Box::pin(async move { (vtable.read)(handle).await })
as Pin<Box<dyn Future<Output = _>>>,
Handle::LocalOpen => {
Expand All @@ -318,6 +356,10 @@ impl<T> IntoFuture for FutureReader<T> {
Box::pin(async move { rx.await.ok().map(|v| *v.downcast().unwrap()) })
}
Handle::LocalClosed => Box::pin(future::ready(None)),
Handle::WriteClosedErr(err_ctx) => match err_ctx.take() {
None => Box::pin(future::ready(None)),
Some(err_ctx) => Box::pin(future::ready(Some(Err(err_ctx)))),
},
Handle::LocalReady(..) => {
let Handle::LocalReady(v, waker) = entry.insert(Handle::LocalClosed) else {
unreachable!()
Expand Down Expand Up @@ -353,7 +395,7 @@ impl<T> Drop for FutureReader<T> {
entry.remove();
(self.vtable.close_readable)(handle);
}
Handle::Write => unreachable!(),
Handle::Write | Handle::WriteClosedErr(_) => unreachable!(),
},
});
}
Expand Down
Loading

0 comments on commit 7ac0f08

Please sign in to comment.