diff --git a/monoio-compat/src/lib.rs b/monoio-compat/src/lib.rs index 39e87af0..209aa25b 100644 --- a/monoio-compat/src/lib.rs +++ b/monoio-compat/src/lib.rs @@ -1,7 +1,5 @@ //! For compat with tokio AsyncRead and AsyncWrite. -#![cfg_attr(feature = "unstable", feature(new_uninit))] - pub mod box_future; mod buf; diff --git a/monoio/src/blocking.rs b/monoio/src/blocking.rs index 4e6bba7d..15dd49ce 100644 --- a/monoio/src/blocking.rs +++ b/monoio/src/blocking.rs @@ -287,44 +287,39 @@ mod tests { #[test] fn default_pool() { - let shared_pool = Box::new(DefaultThreadPool::new(3)); + let shared_pool = Box::new(DefaultThreadPool::new(6)); let mut rt = crate::RuntimeBuilder::::new() .attach_thread_pool(shared_pool) .enable_timer() .build() .unwrap(); + fn thread_sleep(s: &'static str) -> impl FnOnce() -> &'static str { + move || { + // Simulate a heavy computation. + std::thread::sleep(std::time::Duration::from_millis(500)); + s + } + } rt.block_on(async { let begin = std::time::Instant::now(); - let join1 = crate::spawn_blocking(|| { - // Simulate a heavy computation. - std::thread::sleep(std::time::Duration::from_millis(150)); - "hello spawn_blocking1!".to_string() - }); - let join2 = crate::spawn_blocking(|| { - // Simulate a heavy computation. - std::thread::sleep(std::time::Duration::from_millis(150)); - "hello spawn_blocking2!".to_string() - }); - let join3 = crate::spawn_blocking(|| { - // Simulate a heavy computation. - std::thread::sleep(std::time::Duration::from_millis(150)); - "hello spawn_blocking3!".to_string() - }); - let join4 = crate::spawn_blocking(|| { - // Simulate a heavy computation. - std::thread::sleep(std::time::Duration::from_millis(150)); - "hello spawn_blocking4!".to_string() - }); - let sleep_async = crate::time::sleep(std::time::Duration::from_millis(150)); - let (result1, result2, result3, result4, _) = - crate::join!(join1, join2, join3, join4, sleep_async); + let join1 = crate::spawn_blocking(thread_sleep("hello spawn_blocking1!")); + let join2 = crate::spawn_blocking(thread_sleep("hello spawn_blocking2!")); + let join3 = crate::spawn_blocking(thread_sleep("hello spawn_blocking3!")); + let join4 = crate::spawn_blocking(thread_sleep("hello spawn_blocking4!")); + let join5 = crate::spawn_blocking(thread_sleep("hello spawn_blocking5!")); + let join6 = crate::spawn_blocking(thread_sleep("hello spawn_blocking6!")); + let sleep_async = crate::time::sleep(std::time::Duration::from_millis(500)); + let (result1, result2, result3, result4, result5, result6, _) = + crate::join!(join1, join2, join3, join4, join5, join6, sleep_async); let eps = begin.elapsed(); - assert!(eps < std::time::Duration::from_millis(590)); - assert!(eps >= std::time::Duration::from_millis(150)); + assert!(eps < std::time::Duration::from_millis(3000)); + assert!(eps >= std::time::Duration::from_millis(500)); assert_eq!(result1.unwrap(), "hello spawn_blocking1!"); assert_eq!(result2.unwrap(), "hello spawn_blocking2!"); assert_eq!(result3.unwrap(), "hello spawn_blocking3!"); assert_eq!(result4.unwrap(), "hello spawn_blocking4!"); + assert_eq!(result5.unwrap(), "hello spawn_blocking5!"); + assert_eq!(result6.unwrap(), "hello spawn_blocking6!"); }); } } diff --git a/monoio/src/buf/io_buf.rs b/monoio/src/buf/io_buf.rs index 90c06be4..17515570 100644 --- a/monoio/src/buf/io_buf.rs +++ b/monoio/src/buf/io_buf.rs @@ -39,6 +39,12 @@ pub unsafe trait IoBuf: Unpin + 'static { /// For `Vec`, this is identical to `len()`. fn bytes_init(&self) -> usize; + /// Returns a slice of the buffer. + #[inline] + fn as_slice(&self) -> &[u8] { + unsafe { core::slice::from_raw_parts(self.read_ptr(), self.bytes_init()) } + } + /// Returns a view of the buffer with the specified range. #[inline] fn slice(self, range: impl ops::RangeBounds) -> Slice diff --git a/monoio/src/fs/file/mod.rs b/monoio/src/fs/file/mod.rs index 6a457389..804b488f 100644 --- a/monoio/src/fs/file/mod.rs +++ b/monoio/src/fs/file/mod.rs @@ -505,8 +505,9 @@ impl File { Ok(()) } - async fn flush(&mut self) -> io::Result<()> { - Ok(()) + #[inline] + fn flush(&mut self) -> impl Future> { + std::future::ready(Ok(())) } /// Closes the file. diff --git a/monoio/src/io/async_read_rent.rs b/monoio/src/io/async_read_rent.rs index 72843cff..1e3e215f 100644 --- a/monoio/src/io/async_read_rent.rs +++ b/monoio/src/io/async_read_rent.rs @@ -1,7 +1,7 @@ -use std::future::Future; +use std::{future::Future, io::Cursor}; use crate::{ - buf::{IoBufMut, IoVecBufMut, RawBuf}, + buf::{IoBufMut, IoVecBufMut}, BufResult, }; @@ -49,6 +49,17 @@ pub trait AsyncReadRentAt { ) -> impl Future>; } +impl AsyncReadRentAt for &mut A { + #[inline] + fn read_at( + &mut self, + buf: T, + pos: usize, + ) -> impl Future> { + (**self).read_at(buf, pos) + } +} + impl AsyncReadRent for &mut A { #[inline] fn read(&mut self, buf: T) -> impl Future> { @@ -70,29 +81,85 @@ impl AsyncReadRent for &[u8] { buf.set_init(amt); } *self = b; - async move { (Ok(amt), buf) } + std::future::ready((Ok(amt), buf)) } fn readv(&mut self, mut buf: T) -> impl Future> { - // # Safety - // We do it in pure sync way. - let n = match unsafe { RawBuf::new_from_iovec_mut(&mut buf) } { - Some(mut raw_buf) => { - // copy from read to avoid await - let amt = std::cmp::min(self.len(), raw_buf.bytes_total()); + let mut sum = 0; + { + #[cfg(windows)] + let buf_slice = unsafe { + std::slice::from_raw_parts_mut(buf.write_wsabuf_ptr(), buf.write_wsabuf_len()) + }; + #[cfg(unix)] + let buf_slice = unsafe { + std::slice::from_raw_parts_mut(buf.write_iovec_ptr(), buf.write_iovec_len()) + }; + for buf in buf_slice { + #[cfg(windows)] + let amt = std::cmp::min(self.len(), buf.len as usize); + #[cfg(unix)] + let amt = std::cmp::min(self.len(), buf.iov_len); + let (a, b) = self.split_at(amt); + // # Safety + // The pointer is valid. unsafe { - raw_buf - .write_ptr() + #[cfg(windows)] + buf.buf + .cast::() + .copy_from_nonoverlapping(a.as_ptr(), amt); + #[cfg(unix)] + buf.iov_base + .cast::() .copy_from_nonoverlapping(a.as_ptr(), amt); - raw_buf.set_init(amt); } *self = b; - amt + sum += amt; + + if self.is_empty() { + break; + } } - None => 0, - }; - unsafe { buf.set_init(n) }; - async move { (Ok(n), buf) } + } + + unsafe { buf.set_init(sum) }; + std::future::ready((Ok(sum), buf)) + } +} + +impl> AsyncReadRent for Cursor { + async fn read(&mut self, buf: B) -> BufResult { + let pos = self.position(); + let slice: &[u8] = (*self).get_ref().as_ref(); + + if pos > slice.len() as u64 { + return (Ok(0), buf); + } + + (&slice[pos as usize..]).read(buf).await + } + + async fn readv(&mut self, buf: B) -> BufResult { + let pos = self.position(); + let slice: &[u8] = (*self).get_ref().as_ref(); + + if pos > slice.len() as u64 { + return (Ok(0), buf); + } + + (&slice[pos as usize..]).readv(buf).await + } +} + +impl AsyncReadRent for Box { + #[inline] + fn read(&mut self, buf: B) -> impl Future> { + (**self).read(buf) + } + + #[inline] + fn readv(&mut self, buf: B) -> impl Future> { + (**self).readv(buf) } } diff --git a/monoio/src/io/async_write_rent.rs b/monoio/src/io/async_write_rent.rs index 482f1945..17551030 100644 --- a/monoio/src/io/async_write_rent.rs +++ b/monoio/src/io/async_write_rent.rs @@ -83,6 +83,17 @@ pub trait AsyncWriteRentAt { ) -> impl Future>; } +impl AsyncWriteRentAt for &mut A { + #[inline] + fn write_at( + &mut self, + buf: T, + pos: usize, + ) -> impl Future> { + (**self).write_at(buf, pos) + } +} + impl AsyncWriteRent for &mut A { #[inline] fn write(&mut self, buf: T) -> impl Future> { @@ -104,3 +115,56 @@ impl AsyncWriteRent for &mut A { (**self).shutdown() } } + +impl AsyncWriteRent for Vec { + fn write(&mut self, buf: T) -> impl Future> { + let slice = buf.as_slice(); + self.extend_from_slice(slice); + let len = slice.len(); + std::future::ready((Ok(len), buf)) + } + + fn writev(&mut self, buf: T) -> impl Future> { + let mut sum = 0; + { + #[cfg(windows)] + let buf_slice = + unsafe { std::slice::from_raw_parts(buf.read_wsabuf_ptr(), buf.read_wsabuf_len()) }; + #[cfg(unix)] + let buf_slice = + unsafe { std::slice::from_raw_parts(buf.read_iovec_ptr(), buf.read_iovec_len()) }; + for buf in buf_slice { + #[cfg(windows)] + let len = buf.len as usize; + #[cfg(unix)] + let len = buf.iov_len; + + sum += len; + } + self.reserve(sum); + for buf in buf_slice { + #[cfg(windows)] + let ptr = buf.buf.cast::(); + #[cfg(unix)] + let ptr = buf.iov_base.cast::(); + #[cfg(windows)] + let len = buf.len as usize; + #[cfg(unix)] + let len = buf.iov_len; + + self.extend_from_slice(unsafe { std::slice::from_raw_parts(ptr, len) }); + } + } + std::future::ready((Ok(sum), buf)) + } + + #[inline] + fn flush(&mut self) -> impl Future> { + std::future::ready(Ok(())) + } + + #[inline] + fn shutdown(&mut self) -> impl Future> { + std::future::ready(Ok(())) + } +} diff --git a/monoio/src/net/tcp/stream.rs b/monoio/src/net/tcp/stream.rs index 36eb1ae3..19978767 100644 --- a/monoio/src/net/tcp/stream.rs +++ b/monoio/src/net/tcp/stream.rs @@ -331,9 +331,9 @@ impl AsyncWriteRent for TcpStream { } #[inline] - async fn flush(&mut self) -> std::io::Result<()> { + fn flush(&mut self) -> impl Future> { // Tcp stream does not need flush. - Ok(()) + std::future::ready(Ok(())) } fn shutdown(&mut self) -> impl Future> { @@ -347,7 +347,7 @@ impl AsyncWriteRent for TcpStream { -1 => Err(io::Error::last_os_error()), _ => Ok(()), }; - async move { res } + std::future::ready(res) } } @@ -403,7 +403,7 @@ impl CancelableAsyncWriteRent for TcpStream { -1 => Err(io::Error::last_os_error()), _ => Ok(()), }; - async move { res } + std::future::ready(res) } }