Skip to content

Commit

Permalink
feat: impl AsyncRead/WriteRent for some foreign types; rewrite readv/…
Browse files Browse the repository at this point in the history
…writev impl for some types
  • Loading branch information
ihciah committed Oct 28, 2024
1 parent ead462c commit 8e4f859
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 25 deletions.
2 changes: 0 additions & 2 deletions monoio-compat/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! For compat with tokio AsyncRead and AsyncWrite.
#![cfg_attr(feature = "unstable", feature(new_uninit))]

pub mod box_future;
mod buf;

Expand Down
6 changes: 6 additions & 0 deletions monoio/src/buf/io_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ pub unsafe trait IoBuf: Unpin + 'static {
/// For `Vec`, this is identical to `len()`.
fn bytes_init(&self) -> usize;

/// Returns a slice of the buffer.
#[inline]
fn as_slice(&self) -> &[u8] {
unsafe { core::slice::from_raw_parts(self.read_ptr(), self.bytes_init()) }
}

/// Returns a view of the buffer with the specified range.
#[inline]
fn slice(self, range: impl ops::RangeBounds<usize>) -> Slice<Self>
Expand Down
5 changes: 3 additions & 2 deletions monoio/src/fs/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,9 @@ impl File {
Ok(())
}

async fn flush(&mut self) -> io::Result<()> {
Ok(())
#[inline]
fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
std::future::ready(Ok(()))
}

/// Closes the file.
Expand Down
101 changes: 84 additions & 17 deletions monoio/src/io/async_read_rent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::future::Future;
use std::{future::Future, io::Cursor};

use crate::{
buf::{IoBufMut, IoVecBufMut, RawBuf},
buf::{IoBufMut, IoVecBufMut},
BufResult,
};

Expand Down Expand Up @@ -49,6 +49,17 @@ pub trait AsyncReadRentAt {
) -> impl Future<Output = BufResult<usize, T>>;
}

impl<A: ?Sized + AsyncReadRentAt> AsyncReadRentAt for &mut A {
#[inline]
fn read_at<T: IoBufMut>(
&mut self,
buf: T,
pos: usize,
) -> impl Future<Output = BufResult<usize, T>> {
(**self).read_at(buf, pos)
}
}

impl<A: ?Sized + AsyncReadRent> AsyncReadRent for &mut A {
#[inline]
fn read<T: IoBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
Expand All @@ -70,29 +81,85 @@ impl AsyncReadRent for &[u8] {
buf.set_init(amt);
}
*self = b;
async move { (Ok(amt), buf) }
std::future::ready((Ok(amt), buf))
}

fn readv<T: IoVecBufMut>(&mut self, mut buf: T) -> impl Future<Output = BufResult<usize, T>> {
// # Safety
// We do it in pure sync way.
let n = match unsafe { RawBuf::new_from_iovec_mut(&mut buf) } {
Some(mut raw_buf) => {
// copy from read to avoid await
let amt = std::cmp::min(self.len(), raw_buf.bytes_total());
let mut sum = 0;
{
#[cfg(windows)]
let buf_slice = unsafe {
std::slice::from_raw_parts_mut(buf.write_wsabuf_ptr(), buf.write_wsabuf_len())
};
#[cfg(unix)]
let buf_slice = unsafe {
std::slice::from_raw_parts_mut(buf.write_iovec_ptr(), buf.write_iovec_len())
};
for buf in buf_slice {
#[cfg(windows)]
let amt = std::cmp::min(self.len(), buf.len);
#[cfg(unix)]
let amt = std::cmp::min(self.len(), buf.iov_len);

let (a, b) = self.split_at(amt);
// # Safety
// The pointer is valid.
unsafe {
raw_buf
.write_ptr()
#[cfg(windows)]
buf.buf
.cast::<u8>()
.copy_from_nonoverlapping(a.as_ptr(), amt);
#[cfg(unix)]
buf.iov_base
.cast::<u8>()
.copy_from_nonoverlapping(a.as_ptr(), amt);
raw_buf.set_init(amt);
}
*self = b;
amt
sum += amt;

if self.is_empty() {
break;
}
}
None => 0,
};
unsafe { buf.set_init(n) };
async move { (Ok(n), buf) }
}

unsafe { buf.set_init(sum) };
std::future::ready((Ok(sum), buf))
}
}

impl<T: AsRef<[u8]>> AsyncReadRent for Cursor<T> {
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
let pos = self.position();
let slice: &[u8] = (*self).get_ref().as_ref();

if pos > slice.len() as u64 {
return (Ok(0), buf);
}

(&slice[pos as usize..]).read(buf).await
}

async fn readv<B: IoVecBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
let pos = self.position();
let slice: &[u8] = (*self).get_ref().as_ref();

if pos > slice.len() as u64 {
return (Ok(0), buf);
}

(&slice[pos as usize..]).readv(buf).await
}
}

impl<T: ?Sized + AsyncReadRent> AsyncReadRent for Box<T> {
#[inline]
fn read<B: IoBufMut>(&mut self, buf: B) -> impl Future<Output = BufResult<usize, B>> {
(**self).read(buf)
}

#[inline]
fn readv<B: IoVecBufMut>(&mut self, buf: B) -> impl Future<Output = BufResult<usize, B>> {
(**self).readv(buf)
}
}
64 changes: 64 additions & 0 deletions monoio/src/io/async_write_rent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ pub trait AsyncWriteRentAt {
) -> impl Future<Output = BufResult<usize, T>>;
}

impl<A: ?Sized + AsyncWriteRentAt> AsyncWriteRentAt for &mut A {
#[inline]
fn write_at<T: IoBuf>(
&mut self,
buf: T,
pos: usize,
) -> impl Future<Output = BufResult<usize, T>> {
(**self).write_at(buf, pos)
}
}

impl<A: ?Sized + AsyncWriteRent> AsyncWriteRent for &mut A {
#[inline]
fn write<T: IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
Expand All @@ -104,3 +115,56 @@ impl<A: ?Sized + AsyncWriteRent> AsyncWriteRent for &mut A {
(**self).shutdown()
}
}

impl AsyncWriteRent for Vec<u8> {
fn write<T: IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
let slice = buf.as_slice();
self.extend_from_slice(slice);
let len = slice.len();
std::future::ready((Ok(len), buf))
}

fn writev<T: IoVecBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
let mut sum = 0;
{
#[cfg(windows)]
let buf_slice =
unsafe { std::slice::from_raw_parts(buf.read_wsabuf_ptr(), buf.read_wsabuf_len()) };
#[cfg(unix)]
let buf_slice =
unsafe { std::slice::from_raw_parts(buf.read_iovec_ptr(), buf.read_iovec_len()) };
for buf in buf_slice {
#[cfg(windows)]
let len = buf.buf.len;
#[cfg(unix)]
let len = buf.iov_len;

sum += len;
}
self.reserve(sum);
for buf in buf_slice {
#[cfg(windows)]
let ptr = buf.buf.cast::<u8>();
#[cfg(unix)]
let ptr = buf.iov_base.cast::<u8>();
#[cfg(windows)]
let len = buf.buf.len;
#[cfg(unix)]
let len = buf.iov_len;

self.extend_from_slice(unsafe { std::slice::from_raw_parts(ptr, len) });
}
}
std::future::ready((Ok(sum), buf))
}

#[inline]
fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
std::future::ready(Ok(()))
}

#[inline]
fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
std::future::ready(Ok(()))
}
}
8 changes: 4 additions & 4 deletions monoio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,9 @@ impl AsyncWriteRent for TcpStream {
}

#[inline]
async fn flush(&mut self) -> std::io::Result<()> {
fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
// Tcp stream does not need flush.
Ok(())
std::future::ready(Ok(()))
}

fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
Expand All @@ -347,7 +347,7 @@ impl AsyncWriteRent for TcpStream {
-1 => Err(io::Error::last_os_error()),
_ => Ok(()),
};
async move { res }
std::future::ready(res)
}
}

Expand Down Expand Up @@ -403,7 +403,7 @@ impl CancelableAsyncWriteRent for TcpStream {
-1 => Err(io::Error::last_os_error()),
_ => Ok(()),
};
async move { res }
std::future::ready(res)
}
}

Expand Down

0 comments on commit 8e4f859

Please sign in to comment.