diff --git a/Cargo.lock b/Cargo.lock index 4aedfba3c..1a2e26edc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1275,9 +1275,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "leb128-tokio" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "457ab94f61da2e4f9075341feb227735b993308bf167bcb2b97aee2e7fcde8b7" +checksum = "6a1f734b00871cae8f137d83d53cf4b5ee3f0d87224a87a4347dab4cde11a7a3" dependencies = [ "tokio", "tokio-util", @@ -2749,9 +2749,9 @@ dependencies = [ [[package]] name = "utf8-tokio" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c54c44014bcd22cc28f06fab5f3d8cb5322799e7903446fc4cc32b21736051c5" +checksum = "692aa1d5479fe22dac6ce581e8c6e83a9f8d90238250c1036b0139019a783a87" dependencies = [ "tokio", "tokio-util", diff --git a/crates/transport-quic/src/lib.rs b/crates/transport-quic/src/lib.rs index 77128089b..0a533643f 100644 --- a/crates/transport-quic/src/lib.rs +++ b/crates/transport-quic/src/lib.rs @@ -129,7 +129,7 @@ impl]>> FromIterator

for IndexTree { } impl IndexTree { - #[instrument(level = "trace", skip_all)] + #[instrument(level = "trace", skip(self))] fn take_rx(&mut self, path: &[usize]) -> Option> { let Some((i, path)) = path.split_first() else { return match self { @@ -163,7 +163,7 @@ impl IndexTree { } } - #[instrument(level = "trace", skip_all)] + #[instrument(level = "trace", skip(self))] fn take_tx(&mut self, path: &[usize]) -> Option> { let Some((i, path)) = path.split_first() else { return match self { @@ -199,7 +199,7 @@ impl IndexTree { /// Inserts `sender` and `receiver` under a `path` - returns `false` if it failed and `true` if it succeeded. /// Tree state after `false` is returned is undefined - #[instrument(level = "trace", skip_all)] + #[instrument(level = "trace", skip(self, sender, receiver), ret)] fn insert( &mut self, path: &[Option], @@ -244,8 +244,9 @@ impl IndexTree { true } (_, _, [Some(i), path @ ..]) => { - if nested.len() < *i { - nested.resize_with(i.saturating_add(1), Option::default); + let cap = i.saturating_add(1); + if nested.len() < cap { + nested.resize_with(cap, Option::default); } let nested = &mut nested[*i]; if let Some(nested) = nested { @@ -406,7 +407,7 @@ impl wrpc_transport::Index for Incoming { } impl AsyncRead for Incoming { - #[instrument(level = "trace", skip_all)] + #[instrument(level = "trace", skip_all, ret)] fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -486,7 +487,7 @@ impl wrpc_transport::Index for Outgoing { } impl AsyncWrite for Outgoing { - #[instrument(level = "trace", skip_all, fields(?buf))] + #[instrument(level = "trace", skip_all, ret, fields(buf = format!("{buf:02x?}")))] fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -511,7 +512,7 @@ impl AsyncWrite for Outgoing { .. } => { while !header.is_empty() { - trace!(?header, "writing header"); + trace!(header = format!("{header:02x?}"), "writing header"); let n = ready!(AsyncWrite::poll_write(tx.as_mut(), cx, header))?; if n < header.len() { header.advance(n); @@ -525,7 +526,7 @@ impl AsyncWrite for Outgoing { } } - #[instrument(level = "trace", skip_all)] + #[instrument(level = "trace", skip_all, ret)] fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.as_mut().project() { OutgoingProj::Opening { .. } => Poll::Ready(Ok(())), @@ -536,7 +537,7 @@ impl AsyncWrite for Outgoing { } } - #[instrument(level = "trace", skip_all)] + #[instrument(level = "trace", skip_all, ret)] fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.as_mut().project() { OutgoingProj::Opening { .. } => Poll::Ready(Ok(())), @@ -566,13 +567,13 @@ impl Session { conn: conn.clone(), incoming, outgoing, - reader: spawn(demux_connection(index, conn)), + reader: spawn(demux_connection(index, conn).in_current_span()), } } } impl wrpc_transport::Session for Session { - #[instrument(level = "trace", skip_all)] + #[instrument(level = "trace", skip_all, ret)] async fn finish(mut self, res: Result<(), &str>) -> anyhow::Result> { if let Err(err) = res { let mut buf = BytesMut::with_capacity(6 + err.len()); @@ -731,7 +732,7 @@ impl wrpc_transport::Invoke for Client { type Outgoing = Outgoing; type Incoming = Incoming; - #[instrument(level = "trace", skip(self, paths))] + #[instrument(level = "trace", skip(self, paths, params), fields(params = format!("{params:02x?}")))] async fn invoke( &self, cx: Self::Context, @@ -786,7 +787,7 @@ impl wrpc_transport::Invoke for Client { .set_priority(1) .context("failed to set result stream priority")?; let index = Arc::new(std::sync::Mutex::new(paths.iter().collect())); - trace!(?params, "writing parameters"); + trace!("writing parameters"); param_tx .write_all_chunks(&mut [Bytes::from_static(&[PROTOCOL]), params]) .await diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index 0a5a09443..6a7688961 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -18,6 +18,7 @@ pub mod frame; pub use frame::{Decoder as FrameDecoder, Encoder as FrameEncoder, FrameRef}; mod value; +use tracing::{debug, instrument, trace, Instrument as _}; pub use value::*; /// `Index` implementations are capable of multiplexing underlying connections using a particular @@ -65,7 +66,7 @@ pub trait Invoke: Send + Sync { type Outgoing: AsyncWrite + Index + Send + Sync + 'static; /// Incoming multiplexed byte stream - type Incoming: AsyncRead + Index + Send + Sync + Unpin; + type Incoming: AsyncRead + Index + Send + Sync + Unpin + 'static; /// Invoke function `func` on instance `instance` fn invoke( @@ -80,6 +81,7 @@ pub trait Invoke: Send + Sync { > + Send; /// Invoke function `func` on instance `instance` using typed `Params` and `Returns` + #[instrument(level = "trace", skip(self, cx, params, paths))] fn invoke_values( &self, cx: Self::Context, @@ -94,20 +96,20 @@ pub trait Invoke: Send + Sync { )>, > + Send where - ValueEncoder: tokio_util::codec::Encoder, - ValueDecoder: tokio_util::codec::Decoder, - Params: Send, - Returns: Decode, - as tokio_util::codec::Encoder>::Error: + Params: Encode + Send, + Returns: Decode, + >::Error: std::error::Error + Send + Sync + 'static, - as tokio_util::codec::Decoder>::Error: + ::Error: std::error::Error + Send + Sync + 'static, { async { let mut buf = BytesMut::default(); - let mut enc = ValueEncoder::::default(); + let mut enc = Params::Encoder::default(); + trace!("encoding parameters"); enc.encode(params, &mut buf) .context("failed to encode parameters")?; + debug!("invoking function"); let Invocation { outgoing, incoming, @@ -116,19 +118,24 @@ pub trait Invoke: Send + Sync { .invoke(cx, instance, func, buf.freeze(), paths) .await .context("failed to invoke function")?; - let tx = tokio::spawn(async { - enc.write_deferred(outgoing) - .await - .context("failed to write async values") + let tx = enc.take_deferred().map(|tx| { + tokio::spawn( + async { + trace!("writing async parameters"); + tx(outgoing.into(), Vec::with_capacity(8)) + .await + .context("failed to write async parameters") + } + .in_current_span(), + ) }); - let mut dec = FramedRead::new( - incoming, - ValueDecoder::::new(Vec::default()), - ); + + let mut dec = FramedRead::new(incoming, Returns::Decoder::default()); + debug!("receiving sync returns"); let Some(returns) = dec .try_next() .await - .context("failed to decode return values")? + .context("failed to receive sync returns")? else { bail!("incomplete returns") }; @@ -137,15 +144,23 @@ pub trait Invoke: Send + Sync { if let Some(rx) = rx { try_join!( async { - rx(dec.into_inner()) + debug!("receiving async returns"); + rx(dec.into_inner().into(), Vec::with_capacity(8)) .await - .context("reading async return values failed") + .context("receiving async returns failed") }, - async { tx.await.context("writing async parameters failed")? } + async { + if let Some(tx) = tx { + tx.await.context("writing async parameters failed")? + } else { + Ok(()) + } + } )?; - } else { + } else if let Some(tx) = tx { tx.await.context("writing async parameters failed")??; }; + debug!("finishing session"); session .finish(Ok(())) .await @@ -156,7 +171,7 @@ pub trait Invoke: Send + Sync { } /// Server-side handle to a wRPC transport -pub trait Serve: Send + Sync { +pub trait Serve: Sync { /// Transport-specific invocation context type Context: Send + Sync + 'static; @@ -188,6 +203,7 @@ pub trait Serve: Send + Sync { > + Send; /// Serve function `func` from instance `instance` using typed `Params` and `Returns` + #[instrument(level = "trace", skip(self, paths))] fn serve_values( &self, instance: &str, @@ -199,7 +215,9 @@ pub trait Serve: Send + Sync { Item = anyhow::Result<( Self::Context, Params, - Option> + Sync + Send + Unpin>, + Option< + impl Future> + Sync + Send + Unpin + 'static, + >, impl FnOnce( Result>, ) -> Pin< @@ -216,13 +234,11 @@ pub trait Serve: Send + Sync { > + Send where P: AsRef<[Option]> + Send + Sync + 'static, - Params: Decode, - Returns: Send + Sync + 'static, - ValueEncoder: tokio_util::codec::Encoder, - ValueDecoder: tokio_util::codec::Decoder, - as tokio_util::codec::Encoder>::Error: + Params: Decode + Send + Sync + 'static, + Returns: Encode + Send + Sync + 'static, + ::Error: std::error::Error + Send + Sync + 'static, - as tokio_util::codec::Decoder>::Error: + >::Error: std::error::Error + Send + Sync + 'static, { async { @@ -235,55 +251,59 @@ pub trait Serve: Send + Sync { incoming, session, }, - )| async { - let mut dec = FramedRead::new( - incoming, - ValueDecoder::::new(Vec::default()), - ); - let Some(params) = dec - .try_next() - .await - .context("failed to decode parameters")? - else { - bail!("incomplete parameters") - }; - let rx = dec.decoder_mut().take_deferred(); - Ok(( - cx, - params, - rx.map(|f| f(dec.into_inner())), - |returns: Result<_, Arc>| { - Box::pin(async move { - match returns { - Ok(returns) => { - let mut enc = FramedWrite::< - Self::Outgoing, - ValueEncoder, - >::new( - outgoing, - ValueEncoder::::default(), - ); - enc.send(returns) - .await - .context("failed to write return values")?; - if let Some(tx) = enc.encoder_mut().take_deferred() { - tx(enc.into_inner()) + )| { + async { + let mut dec = FramedRead::new(incoming, Params::Decoder::default()); + debug!("receiving sync parameters"); + let Some(params) = dec + .try_next() + .await + .context("failed to receive sync parameters")? + else { + bail!("incomplete sync parameters") + }; + trace!("received sync parameters"); + let rx = dec.decoder_mut().take_deferred(); + Ok(( + cx, + params, + rx.map(|f| f(dec.into_inner().into(), Vec::with_capacity(8))), + |returns: Result<_, Arc>| { + Box::pin(async move { + match returns { + Ok(returns) => { + let mut enc = FramedWrite::new( + outgoing, + Returns::Encoder::default(), + ); + debug!("transmitting sync returns"); + enc.send(returns).await.context( + "failed to transmit synchronous returns", + )?; + if let Some(tx) = enc.encoder_mut().take_deferred() { + debug!("transmitting async returns"); + tx(enc.into_inner().into(), Vec::with_capacity(8)) + .await + .context("failed to write async returns")?; + } + debug!("finishing session with success"); + session + .finish(Ok(())) .await - .context("failed to write async return values")?; + .context("failed to finish session") + } + Err(err) => { + debug!(?err, "finishing session with an error"); + session + .finish(Err(&err)) + .await + .context("failed to finish session") } - session - .finish(Ok(())) - .await - .context("failed to finish session") } - Err(err) => session - .finish(Err(&err)) - .await - .context("failed to finish session"), - } - }) as Pin<_> - }, - )) + }) as Pin<_> + }, + )) + } }, )) } diff --git a/crates/transport/src/value.rs b/crates/transport/src/value.rs index f66eeed78..a6413808f 100644 --- a/crates/transport/src/value.rs +++ b/crates/transport/src/value.rs @@ -1,191 +1,660 @@ +use core::any::TypeId; +use core::fmt::{self, Debug}; use core::future::Future; use core::iter::zip; +use core::marker::PhantomData; use core::mem; +use core::ops::{Deref, DerefMut}; use core::pin::Pin; -use bytes::{Buf as _, BufMut as _, Bytes, BytesMut}; +use bytes::{BufMut as _, Bytes, BytesMut}; use futures::stream::{self, FuturesUnordered}; use futures::{Stream, StreamExt as _, TryStreamExt as _}; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt as _}; +use tokio::select; use tokio::sync::mpsc; -use tokio::try_join; use tokio_stream::wrappers::ReceiverStream; -use tokio_util::codec::FramedRead; -use tracing::instrument; +use tokio_util::codec::{Encoder as _, FramedRead}; +use tracing::{instrument, trace}; use wasm_tokio::cm::{ - BoolCodec, F32Codec, F64Codec, S16Codec, S32Codec, S64Codec, S8Codec, TupleDecoder, U16Codec, - U32Codec, U64Codec, U8Codec, + BoolCodec, F32Codec, F64Codec, PrimValEncoder, S16Codec, S32Codec, S64Codec, S8Codec, + TupleDecoder, TupleEncoder, U16Codec, U32Codec, U64Codec, U8Codec, }; use wasm_tokio::{ - CoreNameDecoder, CoreNameEncoder, CoreVecDecoderBytes, CoreVecEncoderBytes, Leb128DecoderU32, + CoreNameDecoder, CoreNameEncoder, CoreVecDecoder, CoreVecDecoderBytes, CoreVecEncoderBytes, + Leb128DecoderI128, Leb128DecoderI16, Leb128DecoderI32, Leb128DecoderI64, Leb128DecoderI8, + Leb128DecoderU128, Leb128DecoderU16, Leb128DecoderU32, Leb128DecoderU64, Leb128DecoderU8, Leb128Encoder, Utf8Codec, }; -pub struct ValueEncoder( - Option< - Box< - dyn FnOnce(W) -> Pin> + Send + Sync>> - + Send - + Sync, - >, - >, -); +#[repr(transparent)] +pub struct ResourceBorrow { + repr: Bytes, + _ty: PhantomData, +} -impl Default for ValueEncoder { - fn default() -> Self { - Self(None) +impl From for ResourceBorrow { + fn from(repr: Bytes) -> Self { + Self { + repr, + _ty: PhantomData, + } } } -impl ValueEncoder { - pub fn set_deferred( - &mut self, - f: Box< - dyn FnOnce(W) -> Pin> + Send + Sync>> - + Send - + Sync, - >, - ) -> Option< - Box< - dyn FnOnce(W) -> Pin> + Send + Sync>> - + Send - + Sync, - >, - > { - self.0.replace(f) +impl From> for Bytes { + fn from(ResourceBorrow { repr, .. }: ResourceBorrow) -> Self { + repr } +} - pub fn take_deferred( - &mut self, - ) -> Option< - Box< - dyn FnOnce(W) -> Pin> + Send + Sync>> - + Send - + Sync, - >, - > { - self.0.take() +impl AsRef<[u8]> for ResourceBorrow { + fn as_ref(&self) -> &[u8] { + &self.repr } +} - pub async fn write_deferred(self, w: W) -> std::io::Result<()> { - let Some(f) = self.0 else { return Ok(()) }; - f(w).await +impl AsRef for ResourceBorrow { + fn as_ref(&self) -> &Bytes { + &self.repr } } -pub trait Decode: Send + Sync { - type State: Default + Send + Sync; +impl Debug for ResourceBorrow { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "borrow<{:?}>", TypeId::of::()) + } } -pub struct ValueDecoder { - state: T::State, - index: Vec, - deferred: Option< - Box< - dyn FnOnce(R) -> Pin> + Send + Sync>> - + Send - + Sync, - >, - >, +impl ResourceBorrow { + pub fn new(repr: impl Into) -> Self { + Self::from(repr.into()) + } } -impl Default for ValueDecoder { - fn default() -> Self { +#[repr(transparent)] +pub struct ResourceOwn { + repr: Bytes, + _ty: PhantomData, +} + +impl From> for ResourceBorrow { + fn from(ResourceOwn { repr, _ty }: ResourceOwn) -> Self { Self { - state: T::State::::default(), - index: Vec::default(), - deferred: None, + repr, + _ty: PhantomData, } } } -impl ValueDecoder { - #[must_use] - pub fn new(index: Vec) -> Self { +impl From for ResourceOwn { + fn from(repr: Bytes) -> Self { Self { - state: T::State::default(), - index, - deferred: None, + repr, + _ty: PhantomData, } } +} + +impl From> for Bytes { + fn from(ResourceOwn { repr, .. }: ResourceOwn) -> Self { + repr + } +} + +impl AsRef<[u8]> for ResourceOwn { + fn as_ref(&self) -> &[u8] { + &self.repr + } +} + +impl AsRef for ResourceOwn { + fn as_ref(&self) -> &Bytes { + &self.repr + } +} + +impl Debug for ResourceOwn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "own<{:?}>", TypeId::of::()) + } +} + +impl ResourceOwn { + pub fn new(repr: impl Into) -> Self { + Self::from(repr.into()) + } +} + +pub type DeferredFn = Box< + dyn FnOnce( + Arc, + Vec, + ) -> Pin> + Send + Sync>> + + Send + + Sync, +>; + +pub trait Deferred { + fn take_deferred(&mut self) -> Option>; +} + +macro_rules! impl_deferred_sync { + ($t:ty) => { + impl Deferred for $t { + fn take_deferred(&mut self) -> Option> { + None + } + } + }; +} + +impl_deferred_sync!(BoolCodec); +impl_deferred_sync!(S8Codec); +impl_deferred_sync!(U8Codec); +impl_deferred_sync!(S16Codec); +impl_deferred_sync!(U16Codec); +impl_deferred_sync!(S32Codec); +impl_deferred_sync!(U32Codec); +impl_deferred_sync!(S64Codec); +impl_deferred_sync!(U64Codec); +impl_deferred_sync!(F32Codec); +impl_deferred_sync!(F64Codec); +impl_deferred_sync!(CoreNameDecoder); +impl_deferred_sync!(CoreNameEncoder); +impl_deferred_sync!(CoreVecDecoderBytes); +impl_deferred_sync!(CoreVecEncoderBytes); +impl_deferred_sync!(Utf8Codec); +impl_deferred_sync!(PrimValEncoder); +impl_deferred_sync!(Leb128Encoder); +impl_deferred_sync!(Leb128DecoderI8); +impl_deferred_sync!(Leb128DecoderU8); +impl_deferred_sync!(Leb128DecoderI16); +impl_deferred_sync!(Leb128DecoderU16); +impl_deferred_sync!(Leb128DecoderI32); +impl_deferred_sync!(Leb128DecoderU32); +impl_deferred_sync!(Leb128DecoderI64); +impl_deferred_sync!(Leb128DecoderU64); +impl_deferred_sync!(Leb128DecoderI128); +impl_deferred_sync!(Leb128DecoderU128); +impl_deferred_sync!(ResourceEncoder); +impl_deferred_sync!(UnitCodec); +impl_deferred_sync!(ListDecoderU8); + +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); +impl_deferred_sync!(CoreVecDecoder); + +pub struct SyncCodec(pub T); + +impl Deref for SyncCodec { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for SyncCodec { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Deferred for SyncCodec { + fn take_deferred(&mut self) -> Option> { + None + } +} + +impl Default for SyncCodec { + fn default() -> Self { + Self(T::default()) + } +} + +impl tokio_util::codec::Encoder for SyncCodec +where + T: tokio_util::codec::Encoder, +{ + type Error = T::Error; + + fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> { + self.0.encode(item, dst) + } +} + +impl tokio_util::codec::Decoder for SyncCodec +where + T: tokio_util::codec::Decoder, +{ + type Item = T::Item; + type Error = T::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + self.0.decode(src) + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + self.0.decode_eof(buf) + } + + fn framed( + self, + io: IO, + ) -> tokio_util::codec::Framed + where + Self: Sized, + { + self.0.framed(io).map_codec(Self) + } +} + +#[instrument(level = "trace", skip(w, deferred))] +pub async fn handle_deferred( + w: Arc, + deferred: I, + mut path: Vec, +) -> std::io::Result<()> +where + I: IntoIterator>>, + I::IntoIter: ExactSizeIterator, +{ + let mut futs = FuturesUnordered::default(); + for (i, f) in zip(0.., deferred) { + if let Some(f) = f { + path.push(i); + futs.push(f(Arc::clone(&w), path.clone())); + path.pop(); + } + } + while let Some(()) = futs.try_next().await? {} + Ok(()) +} + +pub trait Encode: Sized { + type Encoder: tokio_util::codec::Encoder + Deferred + Default + Send + Sync; + + #[instrument(level = "trace", skip(self, enc))] + fn encode( + self, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result>, >::Error> + { + enc.encode(self, dst)?; + Ok(enc.take_deferred()) + } + + #[instrument(level = "trace", skip(items, enc))] + fn encode_iter_own( + items: I, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result>, >::Error> + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + T: crate::Index + Send + Sync + 'static, + { + let items = items.into_iter(); + dst.reserve(items.len()); + let mut deferred = Vec::with_capacity(items.len()); + for item in items { + enc.encode(item, dst)?; + deferred.push(enc.take_deferred()); + } + if deferred.iter().any(Option::is_some) { + Ok(Some(Box::new(|w, path| { + Box::pin(handle_deferred(w, deferred, path)) + }))) + } else { + Ok(None) + } + } + + #[instrument(level = "trace", skip(items, enc))] + fn encode_iter_ref<'a, I>( + items: I, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result>, >::Error> + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + T: crate::Index + Send + Sync + 'static, + Self::Encoder: tokio_util::codec::Encoder<&'a Self>, + { + let items = items.into_iter(); + dst.reserve(items.len()); + let mut deferred = Vec::with_capacity(items.len()); + for item in items { + enc.encode(item, dst)?; + deferred.push(enc.take_deferred()); + } + if deferred.iter().any(Option::is_some) { + Ok(Some(Box::new(|w, path| { + Box::pin(handle_deferred(w, deferred, path)) + }))) + } else { + Ok(None) + } + } + + #[instrument(level = "trace", skip(items, enc), fields(ty = "list"))] + fn encode_list_own( + items: Vec, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result>, >::Error> + where + T: crate::Index + Send + Sync + 'static, + { + let n = u32::try_from(items.len()) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?; + dst.reserve(5 + items.len()); + Leb128Encoder.encode(n, dst)?; + Self::encode_iter_own(items, enc, dst) + } - pub fn set_deferred( - &mut self, - f: Box< - dyn FnOnce(R) -> Pin> + Send + Sync>> - + Send - + Sync, - >, - ) -> Option< - Box< - dyn FnOnce(R) -> Pin> + Send + Sync>> - + Send - + Sync, - >, - > { - self.deferred.replace(f) - } - - pub fn take_deferred( - &mut self, - ) -> Option< - Box< - dyn FnOnce(R) -> Pin> + Send + Sync>> - + Send - + Sync, - >, - > { + #[instrument(level = "trace", skip(items, enc), fields(ty = "list"))] + fn encode_list_ref<'a>( + items: &'a [Self], + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result>, >::Error> + where + T: crate::Index + Send + Sync + 'static, + Self::Encoder: tokio_util::codec::Encoder<&'a Self>, + { + let n = u32::try_from(items.len()) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?; + dst.reserve(5 + items.len()); + Leb128Encoder.encode(n, dst)?; + Self::encode_iter_ref(items, enc, dst) + } +} + +pub trait Decode: Sized { + type Decoder: tokio_util::codec::Decoder + Deferred + Default + Send; + type ListDecoder: tokio_util::codec::Decoder> + Default; +} + +pub struct ListEncoder { + deferred: Option>, +} + +impl Default for ListEncoder { + fn default() -> Self { + Self { deferred: None } + } +} + +impl Deferred for ListEncoder { + fn take_deferred(&mut self) -> Option> { self.deferred.take() } +} - pub async fn read_deferred(self, r: R) -> std::io::Result<()> { - let Some(f) = self.deferred else { - return Ok(()); - }; - f(r).await +impl tokio_util::codec::Encoder> for ListEncoder +where + T: Encode, + W: crate::Index + Send + Sync + 'static, +{ + type Error = >::Error; + + fn encode(&mut self, items: Vec, dst: &mut BytesMut) -> Result<(), Self::Error> { + let mut enc = T::Encoder::default(); + self.deferred = T::encode_list_own(items, &mut enc, dst)?; + Ok(()) } } -macro_rules! impl_copy_codec { - ($t:ty, $c:expr) => { - impl tokio_util::codec::Encoder<$t> for ValueEncoder { - type Error = std::io::Error; +impl<'a, W, T> tokio_util::codec::Encoder<&'a [T]> for ListEncoder +where + T: Encode, + T::Encoder: tokio_util::codec::Encoder<&'a T>, + W: crate::Index + Send + Sync + 'static, +{ + type Error = >::Error; - #[instrument(level = "trace", skip(self), ret)] - fn encode(&mut self, item: $t, dst: &mut BytesMut) -> std::io::Result<()> { - $c.encode(item, dst) + fn encode(&mut self, items: &'a [T], dst: &mut BytesMut) -> Result<(), Self::Error> { + let mut enc = T::Encoder::default(); + self.deferred = T::encode_list_ref(items, &mut enc, dst)?; + Ok(()) + } +} + +impl Encode for Vec +where + T: Encode, + W: crate::Index + Send + Sync + 'static, +{ + type Encoder = ListEncoder; +} + +impl<'a, T, W> Encode for &'a [T] +where + T: Encode, + T::Encoder: tokio_util::codec::Encoder<&'a T>, + W: crate::Index + Send + Sync + 'static, +{ + type Encoder = ListEncoder; +} + +pub struct ListDecoder +where + T: tokio_util::codec::Decoder, +{ + dec: T, + ret: Vec, + cap: usize, + deferred: Vec>>, +} + +impl Default for ListDecoder +where + T: tokio_util::codec::Decoder + Default, +{ + fn default() -> Self { + Self { + dec: T::default(), + ret: Vec::default(), + cap: 0, + deferred: vec![], + } + } +} + +impl Deferred for ListDecoder +where + T: tokio_util::codec::Decoder, + R: crate::Index + Send + Sync + 'static, +{ + fn take_deferred(&mut self) -> Option> { + let deferred = mem::take(&mut self.deferred); + if deferred.iter().any(Option::is_some) { + Some(Box::new(|r, path| { + Box::pin(handle_deferred(r, deferred, path)) + })) + } else { + None + } + } +} + +impl tokio_util::codec::Decoder for ListDecoder +where + T: tokio_util::codec::Decoder + Deferred, +{ + type Item = Vec; + type Error = T::Error; + + #[instrument(level = "trace", skip(self), fields(ty = "list"))] + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if self.cap == 0 { + let Some(len) = Leb128DecoderU32.decode(src)? else { + return Ok(None); + }; + if len == 0 { + return Ok(Some(Vec::default())); } + let len = len + .try_into() + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?; + self.ret = Vec::with_capacity(len); + self.deferred = Vec::with_capacity(len); + self.cap = len; + } + while self.cap > 0 { + let Some(v) = self.dec.decode(src)? else { + return Ok(None); + }; + self.ret.push(v); + self.deferred.push(self.dec.take_deferred()); + self.cap -= 1; } + Ok(Some(mem::take(&mut self.ret))) + } +} + +impl Decode for Vec +where + T: Decode + Send + Sync, + T::ListDecoder: Deferred + Send, + R: crate::Index + Send + Sync + 'static, +{ + type Decoder = T::ListDecoder; + type ListDecoder = ListDecoder; +} - impl tokio_util::codec::Encoder<&$t> for ValueEncoder { - type Error = std::io::Error; +macro_rules! impl_copy_codec { + ($t:ty, $c:tt) => { + impl Encode for $t { + type Encoder = $c; + + #[instrument(level = "trace", skip(items))] + fn encode_iter_own( + items: I, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result< + Option>, + >::Error, + > + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + let items = items.into_iter(); + dst.reserve(items.len()); + for item in items { + enc.encode(item, dst)?; + } + Ok(None) + } - #[instrument(level = "trace", skip(self), ret)] - fn encode(&mut self, item: &$t, dst: &mut BytesMut) -> std::io::Result<()> { - $c.encode(*item, dst) + #[instrument(level = "trace", skip(items))] + fn encode_iter_ref<'a, I>( + items: I, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result< + Option>, + >::Error, + > + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + let items = items.into_iter(); + dst.reserve(items.len()); + for item in items { + enc.encode(*item, dst)?; + } + Ok(None) } } - impl tokio_util::codec::Decoder for ValueDecoder { - type Item = $t; - type Error = std::io::Error; + impl<'b, T> Encode for &'b $t { + type Encoder = $c; + + #[instrument(level = "trace", skip(items))] + fn encode_iter_own( + items: I, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result< + Option>, + >::Error, + > + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + let items = items.into_iter(); + dst.reserve(items.len()); + for item in items { + enc.encode(*item, dst)?; + } + Ok(None) + } - #[instrument(level = "trace", skip(self), ret)] - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - $c.decode(src) + #[instrument(level = "trace", skip(items))] + fn encode_iter_ref<'a, I>( + items: I, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result< + Option>, + >::Error, + > + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + 'b: 'a, + { + let items = items.into_iter(); + dst.reserve(items.len()); + for item in items { + enc.encode(item, dst)?; + } + Ok(None) } } - impl Decode for $t { - type State = (); + impl Decode for $t { + type Decoder = $c; + type ListDecoder = CoreVecDecoder; } }; } impl_copy_codec!(bool, BoolCodec); impl_copy_codec!(i8, S8Codec); -impl_copy_codec!(u8, U8Codec); impl_copy_codec!(i16, S16Codec); impl_copy_codec!(u16, U16Codec); impl_copy_codec!(i32, S32Codec); @@ -196,159 +665,373 @@ impl_copy_codec!(f32, F32Codec); impl_copy_codec!(f64, F64Codec); impl_copy_codec!(char, Utf8Codec); -macro_rules! impl_string_encode { - ($t:ty) => { - impl tokio_util::codec::Encoder<$t> for ValueEncoder { - type Error = std::io::Error; +impl Encode for u8 { + type Encoder = U8Codec; - #[instrument(level = "trace", skip(self), ret)] - fn encode(&mut self, item: $t, dst: &mut BytesMut) -> std::io::Result<()> { - CoreNameEncoder.encode(item, dst) - } - } + #[instrument(level = "trace", skip(items))] + fn encode_iter_own( + items: I, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result>, >::Error> + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + let items = items.into_iter(); + dst.reserve(items.len()); + dst.extend(items); + Ok(None) + } - impl tokio_util::codec::Encoder<&$t> for ValueEncoder { - type Error = std::io::Error; + #[instrument(level = "trace", skip(items))] + fn encode_iter_ref<'a, I>( + items: I, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result>, >::Error> + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + let items = items.into_iter(); + dst.reserve(items.len()); + dst.extend(items); + Ok(None) + } - #[instrument(level = "trace", skip(self), ret)] - fn encode(&mut self, item: &$t, dst: &mut BytesMut) -> std::io::Result<()> { - CoreNameEncoder.encode(item, dst) - } + #[instrument(level = "trace", skip(items), fields(ty = "list"))] + fn encode_list_own( + items: Vec, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result>, >::Error> + { + CoreVecEncoderBytes.encode(items, dst)?; + Ok(None) + } + + #[instrument(level = "trace", skip(items), fields(ty = "list"))] + fn encode_list_ref<'a>( + items: &'a [Self], + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result>, >::Error> + where + Self::Encoder: tokio_util::codec::Encoder<&'a Self>, + { + CoreVecEncoderBytes.encode(items, dst)?; + Ok(None) + } +} + +impl<'b, T> Encode for &'b u8 { + type Encoder = U8Codec; + + #[instrument(level = "trace", skip(items))] + fn encode_iter_own( + items: I, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result>, >::Error> + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + let items = items.into_iter(); + dst.reserve(items.len()); + dst.extend(items); + Ok(None) + } + + #[instrument(level = "trace", skip(items))] + fn encode_iter_ref<'a, I>( + items: I, + enc: &mut Self::Encoder, + dst: &mut BytesMut, + ) -> Result>, >::Error> + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + 'b: 'a, + { + let items = items.into_iter(); + dst.reserve(items.len()); + dst.extend(items.map(|b| **b)); + Ok(None) + } +} + +#[derive(Debug, Default)] +#[repr(transparent)] +pub struct ListDecoderU8(CoreVecDecoderBytes); + +impl tokio_util::codec::Decoder for ListDecoderU8 { + type Item = Vec; + type Error = ::Error; + + #[instrument(level = "trace", skip(self), fields(ty = "list"))] + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let Some(buf) = self.0.decode(src)? else { + return Ok(None); + }; + Ok(Some(buf.into())) + } +} + +impl Decode for u8 { + type Decoder = U8Codec; + type ListDecoder = ListDecoderU8; +} + +impl Encode for &str { + type Encoder = CoreNameEncoder; +} + +impl Encode for String { + type Encoder = CoreNameEncoder; +} + +impl Decode for String { + type Decoder = CoreNameDecoder; + type ListDecoder = CoreVecDecoder; +} + +impl Encode for Bytes { + type Encoder = CoreVecEncoderBytes; +} + +impl Decode for Bytes { + type Decoder = CoreVecDecoderBytes; + type ListDecoder = CoreVecDecoder; +} + +#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)] +#[repr(transparent)] +pub struct ResourceEncoder; + +impl tokio_util::codec::Encoder> for ResourceEncoder { + type Error = std::io::Error; + + #[instrument(level = "trace", skip(self, item), ret, fields(ty = "own"))] + fn encode(&mut self, item: ResourceOwn, dst: &mut BytesMut) -> std::io::Result<()> { + CoreVecEncoderBytes.encode(item.repr, dst) + } +} + +impl tokio_util::codec::Encoder<&ResourceOwn> for ResourceEncoder { + type Error = std::io::Error; + + #[instrument(level = "trace", skip(self, item), ret, fields(ty = "own"))] + fn encode(&mut self, item: &ResourceOwn, dst: &mut BytesMut) -> std::io::Result<()> { + CoreVecEncoderBytes.encode(&item.repr, dst) + } +} + +impl tokio_util::codec::Encoder> for ResourceEncoder { + type Error = std::io::Error; + + #[instrument(level = "trace", skip(self, item), ret, fields(ty = "borrow"))] + fn encode(&mut self, item: ResourceBorrow, dst: &mut BytesMut) -> std::io::Result<()> { + CoreVecEncoderBytes.encode(item.repr, dst) + } +} + +impl tokio_util::codec::Encoder<&ResourceBorrow> for ResourceEncoder { + type Error = std::io::Error; + + #[instrument(level = "trace", skip(self, item), ret, fields(ty = "borrow"))] + fn encode(&mut self, item: &ResourceBorrow, dst: &mut BytesMut) -> std::io::Result<()> { + CoreVecEncoderBytes.encode(&item.repr, dst) + } +} + +#[derive(Debug)] +#[repr(transparent)] +pub struct ResourceBorrowDecoder { + dec: CoreVecDecoderBytes, + _ty: PhantomData, +} + +impl Default for ResourceBorrowDecoder { + fn default() -> Self { + Self { + dec: CoreVecDecoderBytes::default(), + _ty: PhantomData, } - }; + } } -impl_string_encode!(&str); -impl_string_encode!(String); +impl Deferred for ResourceBorrowDecoder { + fn take_deferred(&mut self) -> Option> { + None + } +} + +impl Deferred for CoreVecDecoder> { + fn take_deferred(&mut self) -> Option> { + None + } +} -impl tokio_util::codec::Decoder for ValueDecoder { - type Item = String; +impl Decode for ResourceBorrow { + type Decoder = ResourceBorrowDecoder; + type ListDecoder = CoreVecDecoder; +} + +impl tokio_util::codec::Decoder for ResourceBorrowDecoder { + type Item = ResourceBorrow; type Error = std::io::Error; - #[instrument(level = "trace", skip(self), ret)] + #[instrument(level = "trace", skip(self), fields(ty = "borrow"))] fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - self.state.decode(src) + let repr = self.dec.decode(src)?; + Ok(repr.map(Self::Item::from)) } } -impl Decode for String { - type State = CoreNameDecoder; +#[derive(Debug)] +#[repr(transparent)] +pub struct ResourceOwnDecoder { + dec: CoreVecDecoderBytes, + _ty: PhantomData, } -impl tokio_util::codec::Encoder for ValueEncoder { +impl Default for ResourceOwnDecoder { + fn default() -> Self { + Self { + dec: CoreVecDecoderBytes::default(), + _ty: PhantomData, + } + } +} + +impl Deferred for ResourceOwnDecoder { + fn take_deferred(&mut self) -> Option> { + None + } +} + +impl Deferred for CoreVecDecoder> { + fn take_deferred(&mut self) -> Option> { + None + } +} + +impl Decode for ResourceOwn { + type Decoder = ResourceOwnDecoder; + type ListDecoder = CoreVecDecoder; +} + +impl tokio_util::codec::Decoder for ResourceOwnDecoder { + type Item = ResourceOwn; type Error = std::io::Error; - #[instrument(level = "trace", skip(self), ret)] - fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> std::io::Result<()> { - CoreVecEncoderBytes.encode(item, dst) + #[instrument(level = "trace", skip(self), fields(ty = "own"))] + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let repr = self.dec.decode(src)?; + Ok(repr.map(Self::Item::from)) } } -impl tokio_util::codec::Encoder<&Bytes> for ValueEncoder { +#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)] +#[repr(transparent)] +pub struct UnitCodec; + +impl tokio_util::codec::Encoder<()> for UnitCodec { type Error = std::io::Error; #[instrument(level = "trace", skip(self), ret)] - fn encode(&mut self, item: &Bytes, dst: &mut BytesMut) -> std::io::Result<()> { - CoreVecEncoderBytes.encode(item.as_ref(), dst) + fn encode(&mut self, (): (), dst: &mut BytesMut) -> std::io::Result<()> { + Ok(()) } } -impl tokio_util::codec::Decoder for ValueDecoder { - type Item = Bytes; +impl tokio_util::codec::Encoder<&()> for UnitCodec { type Error = std::io::Error; #[instrument(level = "trace", skip(self), ret)] + fn encode(&mut self, (): &(), dst: &mut BytesMut) -> std::io::Result<()> { + Ok(()) + } +} + +impl tokio_util::codec::Decoder for UnitCodec { + type Item = (); + type Error = std::io::Error; + + #[instrument(level = "trace", skip(self))] fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - self.state.decode(src) + Ok(Some(())) } } -impl Decode for Bytes { - type State = CoreVecDecoderBytes; +impl Encode for () { + type Encoder = UnitCodec; } -async fn handle_deferred(w: T, deferred: I) -> std::io::Result<()> -where - T: crate::Index + 'static, - I: IntoIterator, - I::IntoIter: ExactSizeIterator< - Item = Option< - Box< - dyn FnOnce(T) -> Pin> + Send + Sync>> - + Send - + Sync, - >, - >, - >, -{ - let futs: FuturesUnordered<_> = zip(0.., deferred) - .filter_map(|(i, f)| f.map(|f| (w.index(&[i]), f))) - .map(|(w, f)| async move { - let w = w.map_err(std::io::Error::other)?; - f(w).await - }) - .collect(); - futs.try_collect().await?; - Ok(()) +impl Decode for () { + type Decoder = UnitCodec; + type ListDecoder = CoreVecDecoder; } macro_rules! impl_tuple_codec { - ($($vn:ident),+; $($vt:ident),+; $($cn:ident),+) => { - impl tokio_util::codec::Encoder<($($vt),+,)> for ValueEncoder + ($($vn:ident),+; $($vt:ident),+; $($cn:ident),+; $($ct:ident),+) => { + impl Deferred for TupleEncoder::<($($ct),+,)> where - E: From, - W: AsyncWrite + crate::Index + Send + Sync + 'static, - std::io::Error: From, - $(Self: tokio_util::codec::Encoder<$vt, Error = E>),+ + W: crate::Index + Send + Sync + 'static, + $($ct: Deferred + Default),+ { - type Error = std::io::Error; - - #[instrument(level = "trace", skip_all, ret, fields(ty = "tuple", dst))] - fn encode(&mut self, ($($vn),+,): ($($vt),+,), dst: &mut BytesMut) -> std::io::Result<()> { - $( - let mut $cn = Self::default(); - $cn.encode($vn, dst)?; - )+ + fn take_deferred(&mut self) -> Option> { + let Self(($(mut $cn),+,)) = mem::take(self); let deferred = [ $($cn.take_deferred()),+ ]; if deferred.iter().any(Option::is_some) { - self.0 = Some(Box::new(|w| Box::pin(handle_deferred(w, deferred)))); + Some(Box::new(|r, path| Box::pin(handle_deferred(r, deferred, path)))) + } else { + None } - Ok(()) } } - impl tokio_util::codec::Decoder for ValueDecoder + impl Encode for ($($vt),+,) where + W: crate::Index + Send + Sync + 'static, E: From, - R: AsyncRead + crate::Index + Send + Sync + Unpin + 'static, - std::io::Error: From, - $(ValueDecoder: tokio_util::codec::Decoder),+, - $($vt: Decode),+ + $( + $vt: Encode, + $vt::Encoder: tokio_util::codec::Encoder<$vt, Error = E>, + )+ + { + type Encoder = TupleEncoder::<($($vt::Encoder),+,)>; + } + + impl Deferred for TupleDecoder::<($($vt::Decoder),+,), ($(Option<$vt>),+,)> + where + R: crate::Index + Send + Sync + 'static, + $($vt: Decode),+ { - type Error = std::io::Error; - type Item = ($($vt),+,); - - #[instrument(level = "trace", skip(self))] - fn decode( - &mut self, - src: &mut BytesMut, - ) -> Result, Self::Error> { - let Some(ret) = self.state.decode(src)? else { - return Ok(None) - }; - let ($(mut $cn),+,) = mem::take(&mut self.state).into_inner(); + fn take_deferred(&mut self) -> Option> { + let ($(mut $cn),+,) = mem::take(self).into_inner(); let deferred = [ $($cn.take_deferred()),+ ]; if deferred.iter().any(Option::is_some) { - self.deferred = Some(Box::new(|r| Box::pin(handle_deferred(r, deferred)))); + Some(Box::new(|r, path| Box::pin(handle_deferred(r, deferred, path)))) + } else { + None } - Ok(Some(ret)) } } - impl<$($vt),+> Decode for ($($vt),+,) where - $($vt: Decode),+ + impl Decode for ($($vt),+,) + where + R: crate::Index + Send + Sync + 'static, + E: From, + $( + $vt: Decode + Send + Sync, + $vt::Decoder: tokio_util::codec::Decoder + Send + Sync, + )+ { - type State = TupleDecoder::<($(ValueDecoder),+,), ($(Option<$vt>),+,)>; + type Decoder = TupleDecoder::<($($vt::Decoder),+,), ($(Option<$vt>),+,)>; + type ListDecoder = ListDecoder; } }; } @@ -356,157 +1039,123 @@ macro_rules! impl_tuple_codec { impl_tuple_codec!( v0; V0; - c0 + c0; + C0 ); impl_tuple_codec!( v0, v1; V0, V1; - c0, c1 + c0, c1; + C0, C1 ); impl_tuple_codec!( v0, v1, v2; V0, V1, V2; - c0, c1, c2 + c0, c1, c2; + C0, C1, C2 ); impl_tuple_codec!( v0, v1, v2, v3; V0, V1, V2, V3; - c0, c1, c2, c3 + c0, c1, c2, c3; + C0, C1, C2, C3 ); impl_tuple_codec!( v0, v1, v2, v3, v4; V0, V1, V2, V3, V4; - c0, c1, c2, c3, c4 + c0, c1, c2, c3, c4; + C0, C1, C2, C3, C4 ); impl_tuple_codec!( v0, v1, v2, v3, v4, v5; V0, V1, V2, V3, V4, V5; - c0, c1, c2, c3, c4, c5 + c0, c1, c2, c3, c4, c5; + C0, C1, C2, C3, C4, C5 ); impl_tuple_codec!( v0, v1, v2, v3, v4, v5, v6; V0, V1, V2, V3, V4, V5, V6; - c0, c1, c2, c3, c4, c5, c6 + c0, c1, c2, c3, c4, c5, c6; + C0, C1, C2, C3, C4, C5, C6 ); impl_tuple_codec!( v0, v1, v2, v3, v4, v5, v6, v7; V0, V1, V2, V3, V4, V5, V6, V7; - c0, c1, c2, c3, c4, c5, c6, c7 + c0, c1, c2, c3, c4, c5, c6, c7; + C0, C1, C2, C3, C4, C5, C6, C7 ); impl_tuple_codec!( v0, v1, v2, v3, v4, v5, v6, v7, v8; V0, V1, V2, V3, V4, V5, V6, V7, V8; - c0, c1, c2, c3, c4, c5, c6, c7, c8 + c0, c1, c2, c3, c4, c5, c6, c7, c8; + C0, C1, C2, C3, C4, C5, C6, C7, C8 ); impl_tuple_codec!( v0, v1, v2, v3, v4, v5, v6, v7, v8, v9; V0, V1, V2, V3, V4, V5, V6, V7, V8, V9; - c0, c1, c2, c3, c4, c5, c6, c7, c8, c9 + c0, c1, c2, c3, c4, c5, c6, c7, c8, c9; + C0, C1, C2, C3, C4, C5, C6, C7, C8, C9 ); impl_tuple_codec!( v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10; V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10; - c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10 + c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10; + C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10 ); impl_tuple_codec!( v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11; V0, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11; - c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11 + c0, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11; + C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11 ); -impl tokio_util::codec::Encoder> for ValueEncoder -where - Self: tokio_util::codec::Encoder, - W: AsyncWrite + crate::Index + Send + Sync + 'static, - >::Error: Into, -{ - type Error = std::io::Error; - - #[instrument(level = "trace", skip(self, item), fields(ty = "list"))] - fn encode(&mut self, item: Vec, dst: &mut BytesMut) -> std::io::Result<()> { - let items = item.len(); - let n = u32::try_from(items) - .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?; - dst.reserve(5 + items); - Leb128Encoder.encode(n, dst)?; - let mut deferred = Vec::with_capacity(items); - for item in item { - let mut enc = ValueEncoder::default(); - enc.encode(item, dst).map_err(Into::into)?; - deferred.push(enc.0); - } - if deferred.iter().any(Option::is_some) { - self.0 = Some(Box::new(|w| Box::pin(handle_deferred(w, deferred)))); - } - Ok(()) - } +pub struct FutureEncoder { + deferred: Option>, } -impl tokio_util::codec::Encoder<&[T]> for ValueEncoder -where - Self: tokio_util::codec::Encoder, - W: AsyncWrite + crate::Index + Send + Sync + 'static, -{ - type Error = std::io::Error; - - #[instrument(level = "trace", skip(self, item), fields(ty = "list"))] - fn encode(&mut self, item: &[T], dst: &mut BytesMut) -> std::io::Result<()> { - let items = item.len(); - let n = u32::try_from(items) - .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?; - dst.reserve(5 + items); - Leb128Encoder.encode(n, dst)?; - let mut deferred = Vec::with_capacity(items); - for item in item { - let mut enc = ValueEncoder::default(); - enc.encode(*item, dst)?; - deferred.push(enc.0); - } - if deferred.iter().any(Option::is_some) { - self.0 = Some(Box::new(|w| Box::pin(handle_deferred(w, deferred)))); - } - Ok(()) +impl Default for FutureEncoder { + fn default() -> Self { + Self { deferred: None } } } -impl tokio_util::codec::Encoder + Send + Sync>>> - for ValueEncoder +impl tokio_util::codec::Encoder for FutureEncoder where - Self: tokio_util::codec::Encoder, + T: Encode, W: AsyncWrite + crate::Index + Send + Sync + Unpin + 'static, - T: 'static, - std::io::Error: From<>::Error>, + Fut: Future + Send + Sync + 'static, + std::io::Error: From<>::Error>, { type Error = std::io::Error; #[instrument(level = "trace", skip(self, item), fields(ty = "future"))] - fn encode( - &mut self, - item: Pin + Send + Sync>>, - dst: &mut BytesMut, - ) -> std::io::Result<()> { + fn encode(&mut self, item: Fut, dst: &mut BytesMut) -> std::io::Result<()> { + // TODO: Check if future is resolved dst.reserve(1); dst.put_u8(0x00); - self.0 = Some(Box::new(|w| { + self.deferred = Some(Box::new(|w, mut path| { Box::pin(async move { - let mut w = w.index(&[0]).map_err(std::io::Error::other)?; + let mut root = w.index(&path).map_err(std::io::Error::other)?; let item = item.await; - let mut enc = ValueEncoder::default(); + let mut enc = T::Encoder::default(); let mut buf = BytesMut::default(); enc.encode(item, &mut buf)?; - w.write_all(&buf).await?; - enc.write_deferred(w).await?; + root.write_all(&buf).await?; + if let Some(f) = enc.take_deferred() { + path.push(0); + f(w, path).await?; + } Ok(()) }) })); @@ -514,244 +1163,220 @@ where } } -impl tokio_util::codec::Encoder + Send + Sync>>> - for ValueEncoder +pub struct StreamEncoder { + deferred: Option>, +} + +impl Default for StreamEncoder { + fn default() -> Self { + Self { deferred: None } + } +} + +impl Deferred for StreamEncoder { + fn take_deferred(&mut self) -> Option> { + self.deferred.take() + } +} + +impl tokio_util::codec::Encoder for StreamEncoder where - Self: tokio_util::codec::Encoder, + T: Encode + Send + Sync + 'static, W: AsyncWrite + crate::Index + Send + Sync + Unpin + 'static, - T: Send + Sync + 'static, - std::io::Error: From<>::Error>, + S: Stream + Send + Sync + Unpin + 'static, + std::io::Error: From<>::Error>, { type Error = std::io::Error; - #[instrument(level = "trace", skip(self, item), fields(ty = "stream"))] - fn encode( - &mut self, - item: Pin + Send + Sync>>, - dst: &mut BytesMut, - ) -> std::io::Result<()> { + #[instrument(level = "trace", skip(self, items), fields(ty = "stream"))] + fn encode(&mut self, items: S, dst: &mut BytesMut) -> std::io::Result<()> { + // TODO: Check if stream is resolved dst.reserve(1); dst.put_u8(0x00); - self.0 = Some(Box::new(|w| { - let w = Arc::new(w); - Box::pin( - item.enumerate() - .map(Ok) - .try_for_each_concurrent(None, move |(i, item)| { - let w = Arc::clone(&w); - async move { - let mut w = w.index(&[i]).map_err(std::io::Error::other)?; - let mut enc = ValueEncoder::::default(); - let mut buf = BytesMut::default(); - enc.encode(item, &mut buf)?; - w.write_all(&buf).await?; - enc.write_deferred(w).await?; - Ok(()) - } - }), - ) + self.deferred = Some(Box::new(|w, mut path| { + Box::pin(async move { + let mut root = w.index(&path).map_err(std::io::Error::other)?; + let mut enc = T::Encoder::default(); + let mut buf = BytesMut::default(); + // TODO: Optimize by chunking items + let mut items = items.enumerate(); + while let Some((i, item)) = items.next().await { + buf.reserve(1); + buf.put_u8(0x01); // chunk of length of 1 + trace!(i, "encoding stream item"); + enc.encode(item, &mut buf)?; + trace!(i, buf = format!("{buf:02x?}"), "writing stream item"); + root.write_all(&buf).await?; + buf.clear(); + if let Some(f) = enc.take_deferred() { + path.push(i); + // TODO: Do this concurrently with writing the stream + trace!(?path, "writing deferred stream item"); + f(Arc::clone(&w), path.clone()).await?; + path.pop(); + } + } + trace!("writing stream end"); + root.write_all(&[0x00]).await + }) })); Ok(()) } } -impl Decode for Pin + Send + Sync>> { - type State = Option>; +impl Encode for Pin + Send + Sync>> +where + T: Encode + Send + Sync + 'static, + W: AsyncWrite + crate::Index + Send + Sync + Unpin + 'static, + std::io::Error: From<>::Error>, +{ + type Encoder = StreamEncoder; } -impl Decode for StreamChunk { - type State = StreamChunkDecodeState; +pub struct StreamDecoder { + dec: T, + deferred: Option>, } -struct StreamChunk(Vec); - -impl Default for StreamChunk { +impl Default for StreamDecoder { fn default() -> Self { - Self(Vec::default()) + Self { + dec: T::default(), + deferred: None, + } } } -pub struct StreamChunkDecodeState { - offset: Option, - ret: Vec, - cap: usize, -} - -impl Default for StreamChunkDecodeState { - fn default() -> Self { - Self { - offset: None, - ret: Vec::default(), - cap: 0, - } +impl Deferred for StreamDecoder { + fn take_deferred(&mut self) -> Option> { + self.deferred.take() } } -impl tokio_util::codec::Decoder for ValueDecoder> +#[instrument(level = "trace", skip(r, tx), ret)] +async fn handle_deferred_stream( + r: Arc, + mut path: Vec, + tx: mpsc::Sender, +) -> std::io::Result<()> where - ValueDecoder: tokio_util::codec::Decoder, + C: tokio_util::codec::Decoder> + Deferred + Send + Sync + Default, + C::Error: Send + Sync, + T: Send + Sync + 'static, R: AsyncRead + crate::Index + Send + Sync + Unpin + 'static, - T: Decode, - std::io::Error: From< as tokio_util::codec::Decoder>::Error>, + std::io::Error: From, { - type Item = StreamChunk; - type Error = std::io::Error; - - #[instrument(level = "trace", skip(self), fields(ty = "stream"))] - fn decode(&mut self, src: &mut BytesMut) -> std::io::Result> { - if self.state.cap == 0 { - let Some(len) = Leb128DecoderU32.decode(src)? else { - return Ok(None); - }; - if len == 0 { - mem::take(&mut self.state); - return Ok(Some(StreamChunk::default())); - } - let len = len - .try_into() - .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?; - self.state.ret = Vec::with_capacity(len); - self.state.cap = len; - } - while self.state.cap > 0 { - let mut index = mem::take(&mut self.index); - let offset = self.state.offset.ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "stream element offset overflows usize", - ) - })?; - let i = self.state.ret.len().checked_add(offset).ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "stream element index overflows usize", - ) - })?; - index.push(i); - let mut dec = ValueDecoder::::new(index); - let res = dec.decode(src); - let mut index = dec.index; - let Some(v) = res? else { - index.pop(); - self.index = index; - return Ok(None); - }; - if let Some(deferred) = dec.deferred { - let index = Arc::from(index.as_ref()); - if let Some(f) = self.deferred.take() { - self.deferred = Some(Box::new(|r| { - Box::pin(async move { - let indexed = r.index(&index).map_err(std::io::Error::other)?; - try_join!(f(r), deferred(indexed))?; - Ok(()) - }) - })); - } else { - self.deferred = Some(Box::new(|r| { - Box::pin(async move { - let indexed = r.index(&index).map_err(std::io::Error::other)?; - deferred(indexed).await - }) - })); + let indexed = r.index(&path).map_err(std::io::Error::other)?; + let mut framed = FramedRead::new(indexed, C::default()); + let mut deferred = FuturesUnordered::default(); + let mut offset = 0usize; + loop { + trace!("receiving stream chunk"); + select! { + Some(chunk) = framed.next() => { + let chunk = chunk?; + if chunk.is_empty() { + trace!("received stream end"); + break; + } + trace!(offset, "received stream chunk"); + if usize::MAX - chunk.len() < offset { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "stream element index would overflow usize", + )); } + for (i, v) in zip(offset.., chunk) { + trace!(i, "sending stream element"); + tx.send(v).await.map_err(|_| { + std::io::Error::new(std::io::ErrorKind::BrokenPipe, "receiver closed") + })?; + if let Some(f) = framed.decoder_mut().take_deferred() { + path.push(i); + let indexed = r.index(&path).map_err(std::io::Error::other)?; + deferred.push(f(indexed.into(), path.clone())); + path.pop(); + } + offset += 1; + } + }, + Some(rx) = deferred.next() => { + rx?; } - index.pop(); - self.index = index; - self.state.ret.push(v); - self.state.cap -= 1; - self.state.offset = offset.checked_add(1); } - let StreamChunkDecodeState { ret, .. } = mem::take(&mut self.state); - Ok(Some(StreamChunk(ret))) } + trace!("draining stream"); + while let Some(()) = deferred.try_next().await? {} + Ok(()) } -impl tokio_util::codec::Decoder - for ValueDecoder + Send + Sync>>> +impl tokio_util::codec::Decoder for StreamDecoder where - ValueDecoder: tokio_util::codec::Decoder, + C: tokio_util::codec::Decoder> + Deferred + Send + Sync + Default, + C::Error: Send + Sync, + T: Send + Sync + 'static, R: AsyncRead + crate::Index + Send + Sync + Unpin + 'static, - T: Decode + Send + Sync + 'static, - std::io::Error: From< as tokio_util::codec::Decoder>::Error>, + std::io::Error: From, { type Item = Pin + Send + Sync>>; - type Error = std::io::Error; + type Error = C::Error; #[instrument(level = "trace", skip(self), fields(ty = "stream"))] - fn decode(&mut self, src: &mut BytesMut) -> std::io::Result> { - let state = if let Some(state) = self.state.take() { - state - } else { - if src.is_empty() { - src.reserve(1); - return Ok(None); - } - match src.get_u8() { - 0 => { - let index = self.index.clone(); - let (tx, rx) = mpsc::channel(128); - self.deferred = Some(Box::new(|r| { - Box::pin(async move { - let indexed = r.index(&index).map_err(std::io::Error::other)?; - let mut r = FramedRead::new( - indexed, - ValueDecoder::>::new(index), - ); - while let Some(StreamChunk(chunk)) = r.try_next().await? { - if chunk.is_empty() { - return Ok(()); - } - for v in chunk { - tx.send(v).await.map_err(|_| { - std::io::Error::new( - std::io::ErrorKind::BrokenPipe, - "receiver closed", - ) - })?; - } - } - Ok(()) - }) - })); - return Ok(Some(Box::pin(ReceiverStream::new(rx)))); - } - 1 => StreamChunkDecodeState::default(), - n => { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("invalid stream status byte `{n}`"), - )) - } - } - }; - let index = mem::take(&mut self.index); - let mut dec = ValueDecoder::> { - state, - index, - deferred: None, + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let Some(chunk) = self.dec.decode(src)? else { + return Ok(None); }; - let res = dec.decode(src)?; - self.index = dec.index; - if let Some(StreamChunk(chunk)) = res { - Ok(Some(Box::pin(stream::iter(chunk)))) - } else { - self.state = Some(dec.state); - Ok(None) + let mut dec = mem::take(&mut self.dec); + if !chunk.is_empty() { + self.deferred = dec.take_deferred(); + return Ok(Some(Box::pin(stream::iter(chunk)))); } + debug_assert!(dec.take_deferred().is_none()); + + // stream is pending + let (tx, rx) = mpsc::channel(128); + self.deferred = Some(Box::new(|r, path| { + Box::pin(async move { handle_deferred_stream::(r, path, tx).await }) + })); + return Ok(Some(Box::pin(ReceiverStream::new(rx)))); } } +impl Decode for Pin + Send + Sync>> +where + T: Decode + Send + Sync + 'static, + T::ListDecoder: Deferred + Send + Sync, + ::Error: Send + Sync, + R: AsyncRead + crate::Index + Send + Sync + Unpin + 'static, + std::io::Error: From<::Error>, +{ + type Decoder = StreamDecoder; + type ListDecoder = ListDecoder; +} + #[cfg(test)] mod tests { - use tokio_util::codec::Encoder as _; + use anyhow::bail; use super::*; + struct NoopStream; + + impl crate::Index for NoopStream { + fn index(&self, path: &[usize]) -> anyhow::Result { + panic!("index should not be called with path {path:?}") + } + } + #[test_log::test(tokio::test)] - async fn value_encoder() -> std::io::Result<()> { + async fn codec() -> anyhow::Result<()> { let mut buf = BytesMut::new(); - ValueEncoder::<()>::default().encode(0x42u8, &mut buf)?; - assert_eq!(buf.as_ref(), b"\x42"); + let mut enc = <(u8, u32) as Encode>::Encoder::default(); + enc.encode((0x42, 0x42), &mut buf)?; + if let Some(_f) = Deferred::::take_deferred(&mut enc) { + bail!("no deferred write should have been returned"); + } + assert_eq!(buf.as_ref(), b"\x42\x42"); Ok(()) } } diff --git a/tests/rust.rs b/tests/rust.rs index 5bccc850a..d9c70c377 100644 --- a/tests/rust.rs +++ b/tests/rust.rs @@ -1,3 +1,5 @@ +#![allow(clippy::type_complexity)] + mod common; use core::net::Ipv6Addr; @@ -8,11 +10,11 @@ use core::time::Duration; use std::sync::Arc; use anyhow::Context; -use futures::{FutureExt as _, TryStreamExt as _}; +use futures::{stream, FutureExt as _, Stream, StreamExt as _, TryStreamExt as _}; use tokio::sync::{oneshot, RwLock}; use tokio::time::sleep; -use tokio::{join, try_join}; -use tracing::instrument; +use tokio::{join, spawn, try_join}; +use tracing::{info, info_span, instrument, Instrument}; use wrpc_transport::{Invoke as _, Serve as _}; use wrpc_transport_legacy::{ResourceBorrow, ResourceOwn}; @@ -431,122 +433,207 @@ async fn rust_bindgen() -> anyhow::Result<()> { #[instrument(ret)] #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn rust_dynamic() -> anyhow::Result<()> { - with_quic(&["bar.foo"], |port, clt_ep, srv_ep| async move { - let clt = wrpc_transport_quic::Client::new(clt_ep, (Ipv6Addr::LOCALHOST, port)); - let srv = wrpc_transport_quic::Server::default(); - - let foo_bar_inv = srv - .serve_values::<[_; 0], ( - (bool, u8, u16, u32, u64, i8, i16, i32, i64, f32, f64, char), - String, - ), ( - (bool, u8, u16, u32, u64, i8, i16, i32, i64, f32, f64, char), - &str, - )>("foo", "bar", []) - .await - .context("failed to serve `foo.bar`")?; - let mut foo_bar_inv = pin!(foo_bar_inv); - join!( - async { - let ok = srv - .accept(&srv_ep) - .await - .expect("failed to accept client connection"); - assert!(ok); - let ((), ((a, b, c, d, e, f, g, h, i, j, k, l), m), rx, tx) = foo_bar_inv - .try_next() - .await - .expect("failed to accept invocation") - .expect("unexpected end of stream"); - assert!(rx.is_none()); - assert!(a); - assert_eq!(b, 0xfe); - assert_eq!(c, 0xfeff); - assert_eq!(d, 0xfeff_ffff); - assert_eq!(e, 0xfeff_ffff_ffff_ffff); - assert_eq!(f, 0x7e); - assert_eq!(g, 0x7eff); - assert_eq!(h, 0x7eff_ffff); - assert_eq!(i, 0x7eff_ffff_ffff_ffff); - assert_eq!(j, 0.42); - assert_eq!(k, 0.4242); - assert_eq!(l, 'a'); - assert_eq!(m, "test"); - tx(Ok(( - ( - true, - 0xfe, - 0xfeff, - 0xfeff_ffff, - 0xfeff_ffff_ffff_ffff, - 0x7e, - 0x7eff, - 0x7eff_ffff, - 0x7eff_ffff_ffff_ffff, - 0.42, - 0.4242, - 'a', - ), - "test", - ))) + with_quic( + &["sync.test", "async.test"], + |port, clt_ep, srv_ep| async move { + let clt = wrpc_transport_quic::Client::new(clt_ep, (Ipv6Addr::LOCALHOST, port)); + let srv = wrpc_transport_quic::Server::default(); + + let async_inv = srv + .serve_values("test", "async", [[Some(0)], [Some(1)]]) .await - .expect("failed to send response") - .expect("session failed"); - }, - async { - let (((a, b, c, d, e, f, g, h, i, j, k, l), m), rx) = clt - .invoke_values::<( - (bool, u8, u16, u32, u64, i8, i16, i32, i64, f32, f64, char), - &str, - ), ( + .context("failed to serve `test.async`")?; + let sync_inv = srv + .serve_values("test", "sync", [[]]) + .await + .context("failed to serve `test.sync`")?; + let mut async_inv = pin!(async_inv); + let mut sync_inv = pin!(sync_inv); + + join!( + async { + info!("accepting `test.sync` connection"); + let ok = srv + .accept(&srv_ep) + .await + .expect("failed to accept client connection"); + assert!(ok); + info!("receiving `test.sync` parameters"); + let ((), params, rx, tx) = sync_inv + .try_next() + .await + .expect("failed to accept invocation") + .expect("unexpected end of stream"); + let ((a, b, c, d, e, f, g, h, i, j, k, l), m, n): ( (bool, u8, u16, u32, u64, i8, i16, i32, i64, f32, f64, char), String, - )>( - (), - "foo", - "bar", + Vec>>, + ) = params; + assert!(rx.is_none()); + assert!(a); + assert_eq!(b, 0xfe); + assert_eq!(c, 0xfeff); + assert_eq!(d, 0xfeff_ffff); + assert_eq!(e, 0xfeff_ffff_ffff_ffff); + assert_eq!(f, 0x7e); + assert_eq!(g, 0x7eff); + assert_eq!(h, 0x7eff_ffff); + assert_eq!(i, 0x7eff_ffff_ffff_ffff); + assert_eq!(j, 0.42); + assert_eq!(k, 0.4242); + assert_eq!(l, 'a'); + assert_eq!(m, "test"); + assert_eq!(n, [[b"foo"]]); + info!("transmitting `test.sync` returns"); + tx(Ok(( ( - ( - true, - 0xfe, - 0xfeff, - 0xfeff_ffff, - 0xfeff_ffff_ffff_ffff, - 0x7e, - 0x7eff, - 0x7eff_ffff, - 0x7eff_ffff_ffff_ffff, - 0.42, - 0.4242, - 'a', - ), - "test", + true, + 0xfe_u8, + 0xfeff_u16, + 0xfeff_ffff_u32, + 0xfeff_ffff_ffff_ffff_u64, + 0x7e_i8, + 0x7eff_i16, + 0x7eff_ffff_i32, + 0x7eff_ffff_ffff_ffff_i64, + 0.42_f32, + 0.4242_f64, + 'a', ), - &[[]; 0], - ) + "test", + vec![vec!["foo".as_bytes()]], + ))) .await - .expect("failed to invoke `foo.bar`"); - assert!(a); - assert_eq!(b, 0xfe); - assert_eq!(c, 0xfeff); - assert_eq!(d, 0xfeff_ffff); - assert_eq!(e, 0xfeff_ffff_ffff_ffff); - assert_eq!(f, 0x7e); - assert_eq!(g, 0x7eff); - assert_eq!(h, 0x7eff_ffff); - assert_eq!(i, 0x7eff_ffff_ffff_ffff); - assert_eq!(j, 0.42); - assert_eq!(k, 0.4242); - assert_eq!(l, 'a'); - assert_eq!(m, "test"); - rx.await - .expect("failed to complete exchange") + .expect("failed to send response") .expect("session failed"); - } - ); - - Ok(()) - }) + } + .instrument(info_span!("server")), + async { + info!("invoking `test.sync`"); + let (returns, rx) = clt + .invoke_values( + (), + "test", + "sync", + ( + ( + true, + 0xfe_u8, + 0xfeff_u16, + 0xfeff_ffff_u32, + 0xfeff_ffff_ffff_ffff_u64, + 0x7e_i8, + 0x7eff_i16, + 0x7eff_ffff_i32, + 0x7eff_ffff_ffff_ffff_i64, + 0.42_f32, + 0.4242_f64, + 'a', + ), + "test", + vec![vec!["foo".as_bytes()]], + ), + &[[]], + ) + .await + .expect("failed to invoke `test.sync`"); + let ((a, b, c, d, e, f, g, h, i, j, k, l), m, n): ( + (bool, u8, u16, u32, u64, i8, i16, i32, i64, f32, f64, char), + String, + Vec>>, + ) = returns; + assert!(a); + assert_eq!(b, 0xfe); + assert_eq!(c, 0xfeff); + assert_eq!(d, 0xfeff_ffff); + assert_eq!(e, 0xfeff_ffff_ffff_ffff); + assert_eq!(f, 0x7e); + assert_eq!(g, 0x7eff); + assert_eq!(h, 0x7eff_ffff); + assert_eq!(i, 0x7eff_ffff_ffff_ffff); + assert_eq!(j, 0.42); + assert_eq!(k, 0.4242); + assert_eq!(l, 'a'); + assert_eq!(m, "test"); + assert_eq!(n, [[b"foo"]]); + info!("finishing `test.sync` session"); + rx.await + .expect("failed to complete exchange") + .expect("session failed"); + } + .instrument(info_span!("client")), + ); + + join!( + async { + info!("accepting `test.async` connection"); + let ok = srv + .accept(&srv_ep) + .await + .expect("failed to accept client connection"); + assert!(ok); + info!("receiving `test.async` parameters"); + let ((), params, rx, tx) = async_inv + .try_next() + .await + .expect("failed to accept invocation") + .expect("unexpected end of stream"); + let (a, b): ( + Pin + Send + Sync>>, + Pin + Send + Sync>>, + ) = params; + let rx = rx.map(Instrument::in_current_span).map(spawn); + assert_eq!(a.collect::>().await, [0xc0, 0xff, 0xee]); + assert_eq!(b.collect::>().await, ["foo", "bar"]); + let a: Pin + Send + Sync>> = + Box::pin(stream::iter([0xc0, 0xff, 0xee])); + + let b: Pin + Send + Sync>> = + Box::pin(stream::iter(["foo", "bar"])); + if let Some(rx) = rx { + rx.await + .expect("async receiver panicked") + .expect("failed to perform async I/O"); + } + info!("transmitting `test.async` returns"); + tx(Ok((a, b))) + .await + .expect("failed to send response") + .expect("session failed"); + } + .instrument(info_span!("server")), + async { + let a: Pin + Send + Sync>> = + Box::pin(stream::iter([0xc0, 0xff, 0xee])); + let b: Pin + Send + Sync>> = + Box::pin(stream::iter(["foo", "bar"])); + info!("invoking `test.async`"); + let (returns, rx) = clt + .invoke_values((), "test", "async", (a, b), &[[Some(0)], [Some(1)]]) + .await + .expect("failed to invoke `test.async`"); + let (a, b): ( + Pin + Send + Sync>>, + Pin + Send + Sync>>, + ) = returns; + info!("receiving `test.async` async values"); + join!( + async { assert_eq!(a.collect::>().await, [0xc0, 0xff, 0xee]) }, + async { + assert_eq!(b.collect::>().await, ["foo", "bar"]); + }, + async { + rx.await + .expect("failed to complete exchange") + .expect("session failed"); + } + ); + } + .instrument(info_span!("client")), + ); + Ok(()) + }, + ) .await // TODO: migrate the whole test suite // with_nats(|_, nats_client| async {