Skip to content

Commit aceccf9

Browse files
committed
io: add AsyncReadExt::read_buf
Brings back `read_buf` from 0.2. This will be stabilized as part of 1.0.
1 parent 2696794 commit aceccf9

File tree

10 files changed

+389
-3
lines changed

10 files changed

+389
-3
lines changed

tokio/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ full = [
4545
]
4646

4747
fs = []
48-
io-util = ["memchr"]
48+
io-util = ["memchr", "bytes"]
4949
# stdin, stdout, stderr
5050
io-std = []
5151
macros = ["tokio-macros"]
@@ -88,10 +88,10 @@ time = []
8888
[dependencies]
8989
tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true }
9090

91-
bytes = "0.5.0"
9291
pin-project-lite = "0.1.1"
9392

9493
# Everything else is optional...
94+
bytes = { git = "https://github.com/tokio-rs/bytes", optional = true }
9595
fnv = { version = "1.0.6", optional = true }
9696
futures-core = { version = "0.3.0", optional = true }
9797
lazy_static = { version = "1.0.2", optional = true }

tokio/src/io/util/async_read_ext.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::io::util::chain::{chain, Chain};
22
use crate::io::util::read::{read, Read};
3+
use crate::io::util::read_buf::{read_buf, ReadBuf};
34
use crate::io::util::read_exact::{read_exact, ReadExact};
45
use crate::io::util::read_int::{
56
ReadI128, ReadI128Le, ReadI16, ReadI16Le, ReadI32, ReadI32Le, ReadI64, ReadI64Le, ReadI8,
@@ -12,6 +13,8 @@ use crate::io::util::read_to_string::{read_to_string, ReadToString};
1213
use crate::io::util::take::{take, Take};
1314
use crate::io::AsyncRead;
1415

16+
use bytes::BufMut;
17+
1518
cfg_io_util! {
1619
/// Defines numeric reader
1720
macro_rules! read_impl {
@@ -163,6 +166,71 @@ cfg_io_util! {
163166
read(self, buf)
164167
}
165168

169+
/// Pulls some bytes from this source into the specified buffer,
170+
/// advancing the buffer's internal cursor.
171+
///
172+
/// Equivalent to:
173+
///
174+
/// ```ignore
175+
/// async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> io::Result<usize>;
176+
/// ```
177+
///
178+
/// Usually, only a single `read` syscall is issued, even if there is
179+
/// more space in the supplied buffer.
180+
///
181+
/// This function does not provide any guarantees about whether it
182+
/// completes immediately or asynchronously
183+
///
184+
/// # Return
185+
///
186+
/// On a successful read, the number of read bytes is returned. If the
187+
/// supplied buffer is not empty and the function returns `Ok(0)` then
188+
/// the source as reached an "end-of-file" event.
189+
///
190+
/// # Errors
191+
///
192+
/// If this function encounters any form of I/O or other error, an error
193+
/// variant will be returned. If an error is returned then it must be
194+
/// guaranteed that no bytes were read.
195+
///
196+
/// # Examples
197+
///
198+
/// [`File`] implements `Read` and [`BytesMut`] implements [`BufMut`]:
199+
///
200+
/// [`File`]: crate::fs::File
201+
/// [`BytesMut`]: bytes::BytesMut
202+
/// [`BufMut`]: bytes::BufMut
203+
///
204+
/// ```no_run
205+
/// use tokio::fs::File;
206+
/// use tokio::io::{self, AsyncReadExt};
207+
///
208+
/// use bytes::BytesMut;
209+
///
210+
/// #[tokio::main]
211+
/// async fn main() -> io::Result<()> {
212+
/// let mut f = File::open("foo.txt").await?;
213+
/// let mut buffer = BytesMut::with_capacity(10);
214+
///
215+
/// assert!(buffer.is_empty());
216+
///
217+
/// // read up to 10 bytes, note that the return value is not needed
218+
/// // to access the data that was read as `buffer`'s internal
219+
/// // cursor is updated.
220+
/// f.read_buf(&mut buffer).await?;
221+
///
222+
/// println!("The bytes: {:?}", &buffer[..]);
223+
/// Ok(())
224+
/// }
225+
/// ```
226+
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
227+
where
228+
Self: Sized + Unpin,
229+
B: BufMut,
230+
{
231+
read_buf(self, buf)
232+
}
233+
166234
/// Reads the exact number of bytes required to fill `buf`.
167235
///
168236
/// Equivalent to:

tokio/src/io/util/async_write_ext.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::io::util::flush::{flush, Flush};
22
use crate::io::util::shutdown::{shutdown, Shutdown};
33
use crate::io::util::write::{write, Write};
44
use crate::io::util::write_all::{write_all, WriteAll};
5+
use crate::io::util::write_buf::{write_buf, WriteBuf};
56
use crate::io::util::write_int::{
67
WriteI128, WriteI128Le, WriteI16, WriteI16Le, WriteI32, WriteI32Le, WriteI64, WriteI64Le,
78
WriteI8,
@@ -12,6 +13,8 @@ use crate::io::util::write_int::{
1213
};
1314
use crate::io::AsyncWrite;
1415

16+
use bytes::Buf;
17+
1518
cfg_io_util! {
1619
/// Defines numeric writer
1720
macro_rules! write_impl {
@@ -116,6 +119,80 @@ cfg_io_util! {
116119
write(self, src)
117120
}
118121

122+
123+
/// Writes a buffer into this writer, advancing the buffer's internal
124+
/// cursor.
125+
///
126+
/// Equivalent to:
127+
///
128+
/// ```ignore
129+
/// async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> io::Result<usize>;
130+
/// ```
131+
///
132+
/// This function will attempt to write the entire contents of `buf`, but
133+
/// the entire write may not succeed, or the write may also generate an
134+
/// error. After the operation completes, the buffer's
135+
/// internal cursor is advanced by the number of bytes written. A
136+
/// subsequent call to `write_buf` using the **same** `buf` value will
137+
/// resume from the point that the first call to `write_buf` completed.
138+
/// A call to `write` represents *at most one* attempt to write to any
139+
/// wrapped object.
140+
///
141+
/// # Return
142+
///
143+
/// If the return value is `Ok(n)` then it must be guaranteed that `n <=
144+
/// buf.len()`. A return value of `0` typically means that the
145+
/// underlying object is no longer able to accept bytes and will likely
146+
/// not be able to in the future as well, or that the buffer provided is
147+
/// empty.
148+
///
149+
/// # Errors
150+
///
151+
/// Each call to `write` may generate an I/O error indicating that the
152+
/// operation could not be completed. If an error is returned then no bytes
153+
/// in the buffer were written to this writer.
154+
///
155+
/// It is **not** considered an error if the entire buffer could not be
156+
/// written to this writer.
157+
///
158+
/// # Examples
159+
///
160+
/// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]:
161+
///
162+
/// [`File`]: crate::fs::File
163+
/// [`Buf`]: bytes::Buf
164+
///
165+
/// ```no_run
166+
/// use tokio::io::{self, AsyncWriteExt};
167+
/// use tokio::fs::File;
168+
///
169+
/// use bytes::Buf;
170+
/// use std::io::Cursor;
171+
///
172+
/// #[tokio::main]
173+
/// async fn main() -> io::Result<()> {
174+
/// let mut file = File::create("foo.txt").await?;
175+
/// let mut buffer = Cursor::new(b"data to write");
176+
///
177+
/// // Loop until the entire contents of the buffer are written to
178+
/// // the file.
179+
/// while buffer.has_remaining() {
180+
/// // Writes some prefix of the byte string, not necessarily
181+
/// // all of it.
182+
/// file.write_buf(&mut buffer).await?;
183+
/// }
184+
///
185+
/// Ok(())
186+
/// }
187+
/// ```
188+
fn write_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteBuf<'a, Self, B>
189+
where
190+
Self: Sized + Unpin,
191+
B: Buf,
192+
{
193+
write_buf(self, src)
194+
}
195+
119196
/// Attempts to write an entire buffer into this writer.
120197
///
121198
/// Equivalent to:

tokio/src/io/util/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ cfg_io_util! {
4242
pub use mem::{duplex, DuplexStream};
4343

4444
mod read;
45+
mod read_buf;
4546
mod read_exact;
4647
mod read_int;
4748
mod read_line;
@@ -70,6 +71,7 @@ cfg_io_util! {
7071

7172
mod write;
7273
mod write_all;
74+
mod write_buf;
7375
mod write_int;
7476

7577

tokio/src/io/util/read_buf.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
use crate::io::AsyncRead;
2+
3+
use bytes::BufMut;
4+
use pin_project_lite::pin_project;
5+
use std::future::Future;
6+
use std::io;
7+
use std::marker::PhantomPinned;
8+
use std::pin::Pin;
9+
use std::task::{Context, Poll};
10+
11+
pub(crate) fn read_buf<'a, R, B>(reader: &'a mut R, buf: &'a mut B) -> ReadBuf<'a, R, B>
12+
where
13+
R: AsyncRead + Unpin,
14+
B: BufMut,
15+
{
16+
ReadBuf {
17+
reader,
18+
buf,
19+
_pin: PhantomPinned,
20+
}
21+
}
22+
23+
pin_project! {
24+
/// Future returned by [`read_buf`](crate::io::AsyncReadExt::read_buf).
25+
#[derive(Debug)]
26+
#[must_use = "futures do nothing unless you `.await` or poll them"]
27+
pub struct ReadBuf<'a, R, B> {
28+
reader: &'a mut R,
29+
buf: &'a mut B,
30+
_pin: PhantomPinned,
31+
}
32+
}
33+
34+
impl<R, B> Future for ReadBuf<'_, R, B>
35+
where
36+
R: AsyncRead + Unpin,
37+
B: BufMut,
38+
{
39+
type Output = io::Result<usize>;
40+
41+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
42+
use crate::io::ReadBuf;
43+
use std::mem::MaybeUninit;
44+
45+
let me = self.project();
46+
47+
if !me.buf.has_remaining_mut() {
48+
return Poll::Ready(Ok(0));
49+
}
50+
51+
let n = {
52+
let dst = me.buf.bytes_mut();
53+
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
54+
let mut buf = ReadBuf::uninit(dst);
55+
let ptr = buf.filled().as_ptr();
56+
ready!(Pin::new(me.reader).poll_read(cx, &mut buf)?);
57+
58+
// Ensure the pointer does not change from under us
59+
assert_eq!(ptr, buf.filled().as_ptr());
60+
buf.filled().len()
61+
};
62+
63+
// Safety: This is guaranteed to be the number of initialized (and read)
64+
// bytes due to the invariants provided by `ReadBuf::filled`.
65+
unsafe {
66+
me.buf.advance_mut(n);
67+
}
68+
69+
Poll::Ready(Ok(n))
70+
}
71+
}

tokio/src/io/util/read_to_end.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ fn reserve(buf: &mut Vec<u8>, bytes: usize) {
9898

9999
/// Returns the unused capacity of the provided vector.
100100
fn get_unused_capacity(buf: &mut Vec<u8>) -> &mut [MaybeUninit<u8>] {
101-
bytes::BufMut::bytes_mut(buf)
101+
let uninit = bytes::BufMut::bytes_mut(buf);
102+
unsafe { &mut *(uninit as *mut _ as *mut [MaybeUninit<u8>]) }
102103
}
103104

104105
impl<A> Future for ReadToEnd<'_, A>

tokio/src/io/util/write_buf.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use crate::io::AsyncWrite;
2+
3+
use bytes::Buf;
4+
use pin_project_lite::pin_project;
5+
use std::future::Future;
6+
use std::io;
7+
use std::marker::PhantomPinned;
8+
use std::pin::Pin;
9+
use std::task::{Context, Poll};
10+
11+
pin_project! {
12+
/// A future to write some of the buffer to an `AsyncWrite`.
13+
#[derive(Debug)]
14+
#[must_use = "futures do nothing unless you `.await` or poll them"]
15+
pub struct WriteBuf<'a, W, B> {
16+
writer: &'a mut W,
17+
buf: &'a mut B,
18+
_pin: PhantomPinned,
19+
}
20+
}
21+
22+
/// Tries to write some bytes from the given `buf` to the writer in an
23+
/// asynchronous manner, returning a future.
24+
pub(crate) fn write_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteBuf<'a, W, B>
25+
where
26+
W: AsyncWrite + Unpin,
27+
B: Buf,
28+
{
29+
WriteBuf {
30+
writer,
31+
buf,
32+
_pin: PhantomPinned,
33+
}
34+
}
35+
36+
impl<W, B> Future for WriteBuf<'_, W, B>
37+
where
38+
W: AsyncWrite + Unpin,
39+
B: Buf,
40+
{
41+
type Output = io::Result<usize>;
42+
43+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
44+
let me = self.project();
45+
46+
if !me.buf.has_remaining() {
47+
return Poll::Ready(Ok(0));
48+
}
49+
50+
let n = ready!(Pin::new(me.writer).poll_write(cx, me.buf.bytes()))?;
51+
me.buf.advance(n);
52+
Poll::Ready(Ok(n))
53+
}
54+
}

tokio/tests/io_read.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,24 @@ async fn read() {
3636
assert_eq!(n, 11);
3737
assert_eq!(buf[..], b"hello world"[..]);
3838
}
39+
40+
struct BadAsyncRead;
41+
42+
impl AsyncRead for BadAsyncRead {
43+
fn poll_read(
44+
self: Pin<&mut Self>,
45+
_cx: &mut Context<'_>,
46+
buf: &mut ReadBuf<'_>,
47+
) -> Poll<io::Result<()>> {
48+
*buf = ReadBuf::new(Box::leak(vec![0; buf.capacity()].into_boxed_slice()));
49+
buf.advance(buf.capacity());
50+
Poll::Ready(Ok(()))
51+
}
52+
}
53+
54+
#[tokio::test]
55+
#[should_panic]
56+
async fn read_buf_bad_async_read() {
57+
let mut buf = Vec::with_capacity(10);
58+
BadAsyncRead.read_buf(&mut buf).await.unwrap();
59+
}

0 commit comments

Comments
 (0)