Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 48 additions & 12 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,14 @@ enum SendErrorKind {
Disconnected,
}

/// The error type returned from [`try_next`](Receiver::try_next).
pub struct TryRecvError {
_priv: (),
/// The error type returned from [`try_recv`](Receiver::try_recv).
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum TryRecvError {
/// The channel is empty but not closed.
Empty,

/// The channel is empty and closed.
Closed,
}

impl fmt::Display for SendError {
Expand All @@ -196,6 +201,18 @@ impl SendError {
}
}

impl TryRecvError {
/// Returns `true` if the channel is empty but not closed.
pub fn is_empty(&self) -> bool {
matches!(self, TryRecvError::Empty)
}

/// Returns `true` if the channel is empty and closed.
pub fn is_closed(&self) -> bool {
matches!(self, TryRecvError::Closed)
}
}

impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
Expand Down Expand Up @@ -236,15 +253,12 @@ impl<T> TrySendError<T> {
}
}

impl fmt::Debug for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("TryRecvError").finish()
}
}

impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "receiver channel is empty")
match self {
TryRecvError::Empty => write!(f, "receive failed because channel is empty"),
TryRecvError::Closed => write!(f, "receive failed because channel is closed"),
}
}
}

Expand Down Expand Up @@ -991,10 +1005,21 @@ impl<T> Receiver<T> {
/// * `Ok(Some(t))` when message is fetched
/// * `Ok(None)` when channel is closed and no messages left in the queue
/// * `Err(e)` when there are no messages available, but channel is not yet closed
#[deprecated(note = "please use `try_recv` instead")]
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
Poll::Ready(msg) => Ok(msg),
Poll::Pending => Err(TryRecvError { _priv: () }),
Poll::Pending => Err(TryRecvError::Empty),
}
}

/// Tries to receive a message from the channel without blocking.
/// If the channel is empty, or empty and closed, this method returns an error.
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
match self.next_message() {
Poll::Ready(Some(msg)) => Ok(msg),
Poll::Ready(None) => Err(TryRecvError::Closed),
Poll::Pending => Err(TryRecvError::Empty),
}
}

Expand Down Expand Up @@ -1159,10 +1184,21 @@ impl<T> UnboundedReceiver<T> {
/// * `Ok(Some(t))` when message is fetched
/// * `Ok(None)` when channel is closed and no messages left in the queue
/// * `Err(e)` when there are no messages available, but channel is not yet closed
#[deprecated(note = "please use `try_recv` instead")]
pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
match self.next_message() {
Poll::Ready(msg) => Ok(msg),
Poll::Pending => Err(TryRecvError { _priv: () }),
Poll::Pending => Err(TryRecvError::Empty),
}
}

/// Tries to receive a message from the channel without blocking.
/// If the channel is empty, or empty and closed, this method returns an error.
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
match self.next_message() {
Poll::Ready(Some(msg)) => Ok(msg),
Poll::Ready(None) => Err(TryRecvError::Closed),
Poll::Pending => Err(TryRecvError::Empty),
}
}

Expand Down
33 changes: 33 additions & 0 deletions futures-channel/tests/mpsc-close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::future::Future;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::task::{Context, Poll};
use futures_channel::mpsc::TryRecvError;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::thread;
Expand Down Expand Up @@ -278,6 +279,7 @@ fn stress_try_send_as_receiver_closes() {

#[test]
fn unbounded_try_next_after_none() {
#![allow(deprecated)]
let (tx, mut rx) = mpsc::unbounded::<String>();
// Drop the sender, close the channel.
drop(tx);
Expand All @@ -289,6 +291,7 @@ fn unbounded_try_next_after_none() {

#[test]
fn bounded_try_next_after_none() {
#![allow(deprecated)]
let (tx, mut rx) = mpsc::channel::<String>(17);
// Drop the sender, close the channel.
drop(tx);
Expand All @@ -297,3 +300,33 @@ fn bounded_try_next_after_none() {
// None received, check we can call `try_next` again.
assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
}

#[test]
fn unbounded_try_recv_after_none() {
let (tx, mut rx) = mpsc::unbounded::<String>();

// Channel is empty initially.
assert_eq!(Err(TryRecvError::Empty), rx.try_recv());

// Drop the sender, close the channel.
drop(tx);
// Receive the end of channel.
assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
// Closed received, check we can call `try_next` again.
assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
}

#[test]
fn bounded_try_recv_after_none() {
let (tx, mut rx) = mpsc::channel::<String>(17);

// Channel is empty initially.
assert_eq!(Err(TryRecvError::Empty), rx.try_recv());

// Drop the sender, close the channel.
drop(tx);
// Receive the end of channel.
assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
// Closed received, check we can call `try_next` again.
assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
}
12 changes: 6 additions & 6 deletions futures-channel/tests/mpsc-size_hint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ fn unbounded_size_hint() {
assert_eq!((0, None), rx.size_hint());
tx.unbounded_send(1).unwrap();
assert_eq!((1, None), rx.size_hint());
rx.try_next().unwrap().unwrap();
rx.try_recv().unwrap();
assert_eq!((0, None), rx.size_hint());
tx.unbounded_send(2).unwrap();
tx.unbounded_send(3).unwrap();
assert_eq!((2, None), rx.size_hint());
drop(tx);
assert_eq!((2, Some(2)), rx.size_hint());
rx.try_next().unwrap().unwrap();
rx.try_recv().unwrap();
assert_eq!((1, Some(1)), rx.size_hint());
rx.try_next().unwrap().unwrap();
rx.try_recv().unwrap();
assert_eq!((0, Some(0)), rx.size_hint());
}

Expand All @@ -26,15 +26,15 @@ fn channel_size_hint() {
assert_eq!((0, None), rx.size_hint());
tx.try_send(1).unwrap();
assert_eq!((1, None), rx.size_hint());
rx.try_next().unwrap().unwrap();
rx.try_recv().unwrap();
assert_eq!((0, None), rx.size_hint());
tx.try_send(2).unwrap();
tx.try_send(3).unwrap();
assert_eq!((2, None), rx.size_hint());
drop(tx);
assert_eq!((2, Some(2)), rx.size_hint());
rx.try_next().unwrap().unwrap();
rx.try_recv().unwrap();
assert_eq!((1, Some(1)), rx.size_hint());
rx.try_next().unwrap().unwrap();
rx.try_recv().unwrap();
assert_eq!((0, Some(0)), rx.size_hint());
}
11 changes: 6 additions & 5 deletions futures-channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::future::{poll_fn, FutureExt};
use futures::sink::{Sink, SinkExt};
use futures::stream::{Stream, StreamExt};
use futures::task::{Context, Poll};
use futures_channel::mpsc::TryRecvError;
use futures_test::task::{new_count_waker, noop_context};
use std::pin::pin;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -503,12 +504,12 @@ fn try_send_recv() {
tx.try_send("hello").unwrap();
tx.try_send("hello").unwrap();
tx.try_send("hello").unwrap_err(); // should be full
rx.try_next().unwrap();
rx.try_next().unwrap();
rx.try_next().unwrap_err(); // should be empty
rx.try_recv().unwrap();
rx.try_recv().unwrap();
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
tx.try_send("hello").unwrap();
rx.try_next().unwrap();
rx.try_next().unwrap_err(); // should be empty
rx.try_recv().unwrap();
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
}

#[test]
Expand Down
10 changes: 5 additions & 5 deletions futures/tests/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,19 +512,19 @@ fn sink_unfold() {
let mut unfold = pin!(unfold);
assert_eq!(unfold.as_mut().start_send(1), Ok(()));
assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(())));
assert_eq!(rx.try_next().unwrap(), Some(1));
assert_eq!(rx.try_recv().unwrap(), 1);

assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(unfold.as_mut().start_send(2), Ok(()));
assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(unfold.as_mut().start_send(3), Ok(()));
assert_eq!(rx.try_next().unwrap(), Some(2));
assert!(rx.try_next().is_err());
assert_eq!(rx.try_recv().unwrap(), 2);
assert!(rx.try_recv().is_err());
assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(unfold.as_mut().start_send(4), Ok(()));
assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full
assert_eq!(rx.try_next().unwrap(), Some(3));
assert_eq!(rx.try_next().unwrap(), Some(4));
assert_eq!(rx.try_recv().unwrap(), 3);
assert_eq!(rx.try_recv().unwrap(), 4);

Poll::Ready(())
}))
Expand Down