Skip to content

Commit 89e5d8b

Browse files
authored
channel: Add try_recv and deprecate try_next (#2944)
1 parent 6f9a15f commit 89e5d8b

File tree

5 files changed

+98
-28
lines changed

5 files changed

+98
-28
lines changed

Diff for: futures-channel/src/mpsc/mod.rs

+48-12
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,14 @@ enum SendErrorKind {
167167
Disconnected,
168168
}
169169

170-
/// The error type returned from [`try_next`](Receiver::try_next).
171-
pub struct TryRecvError {
172-
_priv: (),
170+
/// The error type returned from [`try_recv`](Receiver::try_recv).
171+
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
172+
pub enum TryRecvError {
173+
/// The channel is empty but not closed.
174+
Empty,
175+
176+
/// The channel is empty and closed.
177+
Closed,
173178
}
174179

175180
impl fmt::Display for SendError {
@@ -196,6 +201,18 @@ impl SendError {
196201
}
197202
}
198203

204+
impl TryRecvError {
205+
/// Returns `true` if the channel is empty but not closed.
206+
pub fn is_empty(&self) -> bool {
207+
matches!(self, TryRecvError::Empty)
208+
}
209+
210+
/// Returns `true` if the channel is empty and closed.
211+
pub fn is_closed(&self) -> bool {
212+
matches!(self, TryRecvError::Closed)
213+
}
214+
}
215+
199216
impl<T> fmt::Debug for TrySendError<T> {
200217
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201218
f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
@@ -236,15 +253,12 @@ impl<T> TrySendError<T> {
236253
}
237254
}
238255

239-
impl fmt::Debug for TryRecvError {
240-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241-
f.debug_tuple("TryRecvError").finish()
242-
}
243-
}
244-
245256
impl fmt::Display for TryRecvError {
246257
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247-
write!(f, "receiver channel is empty")
258+
match self {
259+
TryRecvError::Empty => write!(f, "receive failed because channel is empty"),
260+
TryRecvError::Closed => write!(f, "receive failed because channel is closed"),
261+
}
248262
}
249263
}
250264

@@ -991,10 +1005,21 @@ impl<T> Receiver<T> {
9911005
/// * `Ok(Some(t))` when message is fetched
9921006
/// * `Ok(None)` when channel is closed and no messages left in the queue
9931007
/// * `Err(e)` when there are no messages available, but channel is not yet closed
1008+
#[deprecated(note = "please use `try_recv` instead")]
9941009
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
9951010
match self.next_message() {
9961011
Poll::Ready(msg) => Ok(msg),
997-
Poll::Pending => Err(TryRecvError { _priv: () }),
1012+
Poll::Pending => Err(TryRecvError::Empty),
1013+
}
1014+
}
1015+
1016+
/// Tries to receive a message from the channel without blocking.
1017+
/// If the channel is empty, or empty and closed, this method returns an error.
1018+
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1019+
match self.next_message() {
1020+
Poll::Ready(Some(msg)) => Ok(msg),
1021+
Poll::Ready(None) => Err(TryRecvError::Closed),
1022+
Poll::Pending => Err(TryRecvError::Empty),
9981023
}
9991024
}
10001025

@@ -1159,10 +1184,21 @@ impl<T> UnboundedReceiver<T> {
11591184
/// * `Ok(Some(t))` when message is fetched
11601185
/// * `Ok(None)` when channel is closed and no messages left in the queue
11611186
/// * `Err(e)` when there are no messages available, but channel is not yet closed
1187+
#[deprecated(note = "please use `try_recv` instead")]
11621188
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
11631189
match self.next_message() {
11641190
Poll::Ready(msg) => Ok(msg),
1165-
Poll::Pending => Err(TryRecvError { _priv: () }),
1191+
Poll::Pending => Err(TryRecvError::Empty),
1192+
}
1193+
}
1194+
1195+
/// Tries to receive a message from the channel without blocking.
1196+
/// If the channel is empty, or empty and closed, this method returns an error.
1197+
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1198+
match self.next_message() {
1199+
Poll::Ready(Some(msg)) => Ok(msg),
1200+
Poll::Ready(None) => Err(TryRecvError::Closed),
1201+
Poll::Pending => Err(TryRecvError::Empty),
11661202
}
11671203
}
11681204

Diff for: futures-channel/tests/mpsc-close.rs

+33
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use futures::future::Future;
44
use futures::sink::SinkExt;
55
use futures::stream::StreamExt;
66
use futures::task::{Context, Poll};
7+
use futures_channel::mpsc::TryRecvError;
78
use std::pin::Pin;
89
use std::sync::{Arc, Weak};
910
use std::thread;
@@ -278,6 +279,7 @@ fn stress_try_send_as_receiver_closes() {
278279

279280
#[test]
280281
fn unbounded_try_next_after_none() {
282+
#![allow(deprecated)]
281283
let (tx, mut rx) = mpsc::unbounded::<String>();
282284
// Drop the sender, close the channel.
283285
drop(tx);
@@ -289,6 +291,7 @@ fn unbounded_try_next_after_none() {
289291

290292
#[test]
291293
fn bounded_try_next_after_none() {
294+
#![allow(deprecated)]
292295
let (tx, mut rx) = mpsc::channel::<String>(17);
293296
// Drop the sender, close the channel.
294297
drop(tx);
@@ -297,3 +300,33 @@ fn bounded_try_next_after_none() {
297300
// None received, check we can call `try_next` again.
298301
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
299302
}
303+
304+
#[test]
305+
fn unbounded_try_recv_after_none() {
306+
let (tx, mut rx) = mpsc::unbounded::<String>();
307+
308+
// Channel is empty initially.
309+
assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
310+
311+
// Drop the sender, close the channel.
312+
drop(tx);
313+
// Receive the end of channel.
314+
assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
315+
// Closed received, check we can call `try_next` again.
316+
assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
317+
}
318+
319+
#[test]
320+
fn bounded_try_recv_after_none() {
321+
let (tx, mut rx) = mpsc::channel::<String>(17);
322+
323+
// Channel is empty initially.
324+
assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
325+
326+
// Drop the sender, close the channel.
327+
drop(tx);
328+
// Receive the end of channel.
329+
assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
330+
// Closed received, check we can call `try_next` again.
331+
assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
332+
}

Diff for: futures-channel/tests/mpsc-size_hint.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@ fn unbounded_size_hint() {
77
assert_eq!((0, None), rx.size_hint());
88
tx.unbounded_send(1).unwrap();
99
assert_eq!((1, None), rx.size_hint());
10-
rx.try_next().unwrap().unwrap();
10+
rx.try_recv().unwrap();
1111
assert_eq!((0, None), rx.size_hint());
1212
tx.unbounded_send(2).unwrap();
1313
tx.unbounded_send(3).unwrap();
1414
assert_eq!((2, None), rx.size_hint());
1515
drop(tx);
1616
assert_eq!((2, Some(2)), rx.size_hint());
17-
rx.try_next().unwrap().unwrap();
17+
rx.try_recv().unwrap();
1818
assert_eq!((1, Some(1)), rx.size_hint());
19-
rx.try_next().unwrap().unwrap();
19+
rx.try_recv().unwrap();
2020
assert_eq!((0, Some(0)), rx.size_hint());
2121
}
2222

@@ -26,15 +26,15 @@ fn channel_size_hint() {
2626
assert_eq!((0, None), rx.size_hint());
2727
tx.try_send(1).unwrap();
2828
assert_eq!((1, None), rx.size_hint());
29-
rx.try_next().unwrap().unwrap();
29+
rx.try_recv().unwrap();
3030
assert_eq!((0, None), rx.size_hint());
3131
tx.try_send(2).unwrap();
3232
tx.try_send(3).unwrap();
3333
assert_eq!((2, None), rx.size_hint());
3434
drop(tx);
3535
assert_eq!((2, Some(2)), rx.size_hint());
36-
rx.try_next().unwrap().unwrap();
36+
rx.try_recv().unwrap();
3737
assert_eq!((1, Some(1)), rx.size_hint());
38-
rx.try_next().unwrap().unwrap();
38+
rx.try_recv().unwrap();
3939
assert_eq!((0, Some(0)), rx.size_hint());
4040
}

Diff for: futures-channel/tests/mpsc.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use futures::future::{poll_fn, FutureExt};
44
use futures::sink::{Sink, SinkExt};
55
use futures::stream::{Stream, StreamExt};
66
use futures::task::{Context, Poll};
7+
use futures_channel::mpsc::TryRecvError;
78
use futures_test::task::{new_count_waker, noop_context};
89
use std::pin::pin;
910
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -503,12 +504,12 @@ fn try_send_recv() {
503504
tx.try_send("hello").unwrap();
504505
tx.try_send("hello").unwrap();
505506
tx.try_send("hello").unwrap_err(); // should be full
506-
rx.try_next().unwrap();
507-
rx.try_next().unwrap();
508-
rx.try_next().unwrap_err(); // should be empty
507+
rx.try_recv().unwrap();
508+
rx.try_recv().unwrap();
509+
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
509510
tx.try_send("hello").unwrap();
510-
rx.try_next().unwrap();
511-
rx.try_next().unwrap_err(); // should be empty
511+
rx.try_recv().unwrap();
512+
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
512513
}
513514

514515
#[test]

Diff for: futures/tests/sink.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -512,19 +512,19 @@ fn sink_unfold() {
512512
let mut unfold = pin!(unfold);
513513
assert_eq!(unfold.as_mut().start_send(1), Ok(()));
514514
assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(())));
515-
assert_eq!(rx.try_next().unwrap(), Some(1));
515+
assert_eq!(rx.try_recv().unwrap(), 1);
516516

517517
assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
518518
assert_eq!(unfold.as_mut().start_send(2), Ok(()));
519519
assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
520520
assert_eq!(unfold.as_mut().start_send(3), Ok(()));
521-
assert_eq!(rx.try_next().unwrap(), Some(2));
522-
assert!(rx.try_next().is_err());
521+
assert_eq!(rx.try_recv().unwrap(), 2);
522+
assert!(rx.try_recv().is_err());
523523
assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
524524
assert_eq!(unfold.as_mut().start_send(4), Ok(()));
525525
assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full
526-
assert_eq!(rx.try_next().unwrap(), Some(3));
527-
assert_eq!(rx.try_next().unwrap(), Some(4));
526+
assert_eq!(rx.try_recv().unwrap(), 3);
527+
assert_eq!(rx.try_recv().unwrap(), 4);
528528

529529
Poll::Ready(())
530530
}))

0 commit comments

Comments
 (0)