From 8e4f8599acee92f3ad41b80d97ab2bc7456def96 Mon Sep 17 00:00:00 2001 From: ihciah Date: Mon, 28 Oct 2024 08:37:48 +0000 Subject: [PATCH] feat: impl AsyncRead/WriteRent for some foreign types; rewrite readv/writev impl for some types --- monoio-compat/src/lib.rs | 2 - monoio/src/buf/io_buf.rs | 6 ++ monoio/src/fs/file/mod.rs | 5 +- monoio/src/io/async_read_rent.rs | 101 +++++++++++++++++++++++++----- monoio/src/io/async_write_rent.rs | 64 +++++++++++++++++++ monoio/src/net/tcp/stream.rs | 8 +-- 6 files changed, 161 insertions(+), 25 deletions(-) diff --git a/monoio-compat/src/lib.rs b/monoio-compat/src/lib.rs index 39e87af0..209aa25b 100644 --- a/monoio-compat/src/lib.rs +++ b/monoio-compat/src/lib.rs @@ -1,7 +1,5 @@ //! For compat with tokio AsyncRead and AsyncWrite. -#![cfg_attr(feature = "unstable", feature(new_uninit))] - pub mod box_future; mod buf; diff --git a/monoio/src/buf/io_buf.rs b/monoio/src/buf/io_buf.rs index 90c06be4..17515570 100644 --- a/monoio/src/buf/io_buf.rs +++ b/monoio/src/buf/io_buf.rs @@ -39,6 +39,12 @@ pub unsafe trait IoBuf: Unpin + 'static { /// For `Vec`, this is identical to `len()`. fn bytes_init(&self) -> usize; + /// Returns a slice of the buffer. + #[inline] + fn as_slice(&self) -> &[u8] { + unsafe { core::slice::from_raw_parts(self.read_ptr(), self.bytes_init()) } + } + /// Returns a view of the buffer with the specified range. #[inline] fn slice(self, range: impl ops::RangeBounds) -> Slice diff --git a/monoio/src/fs/file/mod.rs b/monoio/src/fs/file/mod.rs index 6a457389..804b488f 100644 --- a/monoio/src/fs/file/mod.rs +++ b/monoio/src/fs/file/mod.rs @@ -505,8 +505,9 @@ impl File { Ok(()) } - async fn flush(&mut self) -> io::Result<()> { - Ok(()) + #[inline] + fn flush(&mut self) -> impl Future> { + std::future::ready(Ok(())) } /// Closes the file. diff --git a/monoio/src/io/async_read_rent.rs b/monoio/src/io/async_read_rent.rs index 72843cff..32d669e0 100644 --- a/monoio/src/io/async_read_rent.rs +++ b/monoio/src/io/async_read_rent.rs @@ -1,7 +1,7 @@ -use std::future::Future; +use std::{future::Future, io::Cursor}; use crate::{ - buf::{IoBufMut, IoVecBufMut, RawBuf}, + buf::{IoBufMut, IoVecBufMut}, BufResult, }; @@ -49,6 +49,17 @@ pub trait AsyncReadRentAt { ) -> impl Future>; } +impl AsyncReadRentAt for &mut A { + #[inline] + fn read_at( + &mut self, + buf: T, + pos: usize, + ) -> impl Future> { + (**self).read_at(buf, pos) + } +} + impl AsyncReadRent for &mut A { #[inline] fn read(&mut self, buf: T) -> impl Future> { @@ -70,29 +81,85 @@ impl AsyncReadRent for &[u8] { buf.set_init(amt); } *self = b; - async move { (Ok(amt), buf) } + std::future::ready((Ok(amt), buf)) } fn readv(&mut self, mut buf: T) -> impl Future> { - // # Safety - // We do it in pure sync way. - let n = match unsafe { RawBuf::new_from_iovec_mut(&mut buf) } { - Some(mut raw_buf) => { - // copy from read to avoid await - let amt = std::cmp::min(self.len(), raw_buf.bytes_total()); + let mut sum = 0; + { + #[cfg(windows)] + let buf_slice = unsafe { + std::slice::from_raw_parts_mut(buf.write_wsabuf_ptr(), buf.write_wsabuf_len()) + }; + #[cfg(unix)] + let buf_slice = unsafe { + std::slice::from_raw_parts_mut(buf.write_iovec_ptr(), buf.write_iovec_len()) + }; + for buf in buf_slice { + #[cfg(windows)] + let amt = std::cmp::min(self.len(), buf.len); + #[cfg(unix)] + let amt = std::cmp::min(self.len(), buf.iov_len); + let (a, b) = self.split_at(amt); + // # Safety + // The pointer is valid. unsafe { - raw_buf - .write_ptr() + #[cfg(windows)] + buf.buf + .cast::() + .copy_from_nonoverlapping(a.as_ptr(), amt); + #[cfg(unix)] + buf.iov_base + .cast::() .copy_from_nonoverlapping(a.as_ptr(), amt); - raw_buf.set_init(amt); } *self = b; - amt + sum += amt; + + if self.is_empty() { + break; + } } - None => 0, - }; - unsafe { buf.set_init(n) }; - async move { (Ok(n), buf) } + } + + unsafe { buf.set_init(sum) }; + std::future::ready((Ok(sum), buf)) + } +} + +impl> AsyncReadRent for Cursor { + async fn read(&mut self, buf: B) -> BufResult { + let pos = self.position(); + let slice: &[u8] = (*self).get_ref().as_ref(); + + if pos > slice.len() as u64 { + return (Ok(0), buf); + } + + (&slice[pos as usize..]).read(buf).await + } + + async fn readv(&mut self, buf: B) -> BufResult { + let pos = self.position(); + let slice: &[u8] = (*self).get_ref().as_ref(); + + if pos > slice.len() as u64 { + return (Ok(0), buf); + } + + (&slice[pos as usize..]).readv(buf).await + } +} + +impl AsyncReadRent for Box { + #[inline] + fn read(&mut self, buf: B) -> impl Future> { + (**self).read(buf) + } + + #[inline] + fn readv(&mut self, buf: B) -> impl Future> { + (**self).readv(buf) } } diff --git a/monoio/src/io/async_write_rent.rs b/monoio/src/io/async_write_rent.rs index 482f1945..68afd608 100644 --- a/monoio/src/io/async_write_rent.rs +++ b/monoio/src/io/async_write_rent.rs @@ -83,6 +83,17 @@ pub trait AsyncWriteRentAt { ) -> impl Future>; } +impl AsyncWriteRentAt for &mut A { + #[inline] + fn write_at( + &mut self, + buf: T, + pos: usize, + ) -> impl Future> { + (**self).write_at(buf, pos) + } +} + impl AsyncWriteRent for &mut A { #[inline] fn write(&mut self, buf: T) -> impl Future> { @@ -104,3 +115,56 @@ impl AsyncWriteRent for &mut A { (**self).shutdown() } } + +impl AsyncWriteRent for Vec { + fn write(&mut self, buf: T) -> impl Future> { + let slice = buf.as_slice(); + self.extend_from_slice(slice); + let len = slice.len(); + std::future::ready((Ok(len), buf)) + } + + fn writev(&mut self, buf: T) -> impl Future> { + let mut sum = 0; + { + #[cfg(windows)] + let buf_slice = + unsafe { std::slice::from_raw_parts(buf.read_wsabuf_ptr(), buf.read_wsabuf_len()) }; + #[cfg(unix)] + let buf_slice = + unsafe { std::slice::from_raw_parts(buf.read_iovec_ptr(), buf.read_iovec_len()) }; + for buf in buf_slice { + #[cfg(windows)] + let len = buf.buf.len; + #[cfg(unix)] + let len = buf.iov_len; + + sum += len; + } + self.reserve(sum); + for buf in buf_slice { + #[cfg(windows)] + let ptr = buf.buf.cast::(); + #[cfg(unix)] + let ptr = buf.iov_base.cast::(); + #[cfg(windows)] + let len = buf.buf.len; + #[cfg(unix)] + let len = buf.iov_len; + + self.extend_from_slice(unsafe { std::slice::from_raw_parts(ptr, len) }); + } + } + std::future::ready((Ok(sum), buf)) + } + + #[inline] + fn flush(&mut self) -> impl Future> { + std::future::ready(Ok(())) + } + + #[inline] + fn shutdown(&mut self) -> impl Future> { + std::future::ready(Ok(())) + } +} diff --git a/monoio/src/net/tcp/stream.rs b/monoio/src/net/tcp/stream.rs index 36eb1ae3..19978767 100644 --- a/monoio/src/net/tcp/stream.rs +++ b/monoio/src/net/tcp/stream.rs @@ -331,9 +331,9 @@ impl AsyncWriteRent for TcpStream { } #[inline] - async fn flush(&mut self) -> std::io::Result<()> { + fn flush(&mut self) -> impl Future> { // Tcp stream does not need flush. - Ok(()) + std::future::ready(Ok(())) } fn shutdown(&mut self) -> impl Future> { @@ -347,7 +347,7 @@ impl AsyncWriteRent for TcpStream { -1 => Err(io::Error::last_os_error()), _ => Ok(()), }; - async move { res } + std::future::ready(res) } } @@ -403,7 +403,7 @@ impl CancelableAsyncWriteRent for TcpStream { -1 => Err(io::Error::last_os_error()), _ => Ok(()), }; - async move { res } + std::future::ready(res) } }