summaryrefslogtreecommitdiff
path: root/library/std
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2024-10-01 13:35:16 +0000
committerbors <bors@rust-lang.org>2024-10-01 13:35:16 +0000
commit8dd5cd0bc1d683c30805e1dc831cac546b621a75 (patch)
treee35889870b1424b48f22796b6797d1276b8bfa87 /library/std
parentc4f7176501a7d3c19c230b8c9111b2d39142f83a (diff)
parent041e76b7cd7c499837fcea63f7f780fdf774a1a5 (diff)
Auto merge of #126839 - obeis:mpmc, r=AmanieuHEADmaster
Add multi-producer, multi-consumer channel (mpmc) Closes #125712 Tracking issue: #126840 r? m-ou-se
Diffstat (limited to 'library/std')
-rw-r--r--library/std/src/lib.rs4
-rw-r--r--library/std/src/sync/mod.rs9
-rw-r--r--library/std/src/sync/mpmc/error.rs5
-rw-r--r--library/std/src/sync/mpmc/mod.rs1035
-rw-r--r--library/std/src/sync/mpmc/tests.rs728
5 files changed, 1732 insertions, 49 deletions
diff --git a/library/std/src/lib.rs b/library/std/src/lib.rs
index b81e7c18abb..65a9aa66c7c 100644
--- a/library/std/src/lib.rs
+++ b/library/std/src/lib.rs
@@ -153,7 +153,7 @@
//! the [`io`], [`fs`], and [`net`] modules.
//!
//! The [`thread`] module contains Rust's threading abstractions. [`sync`]
-//! contains further primitive shared memory types, including [`atomic`] and
+//! contains further primitive shared memory types, including [`atomic`], [`mpmc`] and
//! [`mpsc`], which contains the channel types for message passing.
//!
//! # Use before and after `main()`
@@ -177,6 +177,7 @@
//! - after-main use of thread-locals, which also affects additional features:
//! - [`thread::current()`]
//! - [`thread::scope()`]
+//! - [`sync::mpmc`]
//! - [`sync::mpsc`]
//! - before-main stdio file descriptors are not guaranteed to be open on unix platforms
//!
@@ -202,6 +203,7 @@
//! [`atomic`]: sync::atomic
//! [`for`]: ../book/ch03-05-control-flow.html#looping-through-a-collection-with-for
//! [`str`]: prim@str
+//! [`mpmc`]: sync::mpmc
//! [`mpsc`]: sync::mpsc
//! [`std::cmp`]: cmp
//! [`std::slice`]: mod@slice
diff --git a/library/std/src/sync/mod.rs b/library/std/src/sync/mod.rs
index 0fb8e669bf8..0fb77331293 100644
--- a/library/std/src/sync/mod.rs
+++ b/library/std/src/sync/mod.rs
@@ -133,6 +133,11 @@
//! inter-thread synchronisation mechanism, at the cost of some
//! extra memory.
//!
+//! - [`mpmc`]: Multi-producer, multi-consumer queues, used for
+//! message-based communication. Can provide a lightweight
+//! inter-thread synchronisation mechanism, at the cost of some
+//! extra memory.
+//!
//! - [`Mutex`]: Mutual Exclusion mechanism, which ensures that at
//! most one thread at a time is able to access some data.
//!
@@ -153,6 +158,7 @@
//! [`Arc`]: crate::sync::Arc
//! [`Barrier`]: crate::sync::Barrier
//! [`Condvar`]: crate::sync::Condvar
+//! [`mpmc`]: crate::sync::mpmc
//! [`mpsc`]: crate::sync::mpsc
//! [`Mutex`]: crate::sync::Mutex
//! [`Once`]: crate::sync::Once
@@ -193,12 +199,13 @@ pub use self::rwlock::{MappedRwLockReadGuard, MappedRwLockWriteGuard};
#[stable(feature = "rust1", since = "1.0.0")]
pub use self::rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
+#[unstable(feature = "mpmc_channel", issue = "126840")]
+pub mod mpmc;
pub mod mpsc;
mod barrier;
mod condvar;
mod lazy_lock;
-mod mpmc;
mod mutex;
pub(crate) mod once;
mod once_lock;
diff --git a/library/std/src/sync/mpmc/error.rs b/library/std/src/sync/mpmc/error.rs
index e3aec7e7623..e34b56d0831 100644
--- a/library/std/src/sync/mpmc/error.rs
+++ b/library/std/src/sync/mpmc/error.rs
@@ -7,6 +7,7 @@ use crate::{error, fmt};
///
/// [`send_timeout`]: super::Sender::send_timeout
#[derive(PartialEq, Eq, Clone, Copy)]
+#[unstable(feature = "mpmc_channel", issue = "126840")]
pub enum SendTimeoutError<T> {
/// The message could not be sent because the channel is full and the operation timed out.
///
@@ -18,12 +19,14 @@ pub enum SendTimeoutError<T> {
Disconnected(T),
}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> fmt::Debug for SendTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"SendTimeoutError(..)".fmt(f)
}
}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> fmt::Display for SendTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
@@ -33,8 +36,10 @@ impl<T> fmt::Display for SendTimeoutError<T> {
}
}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> error::Error for SendTimeoutError<T> {}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> From<SendError<T>> for SendTimeoutError<T> {
fn from(err: SendError<T>) -> SendTimeoutError<T> {
match err {
diff --git a/library/std/src/sync/mpmc/mod.rs b/library/std/src/sync/mpmc/mod.rs
index c640e07348e..77a67f4fd38 100644
--- a/library/std/src/sync/mpmc/mod.rs
+++ b/library/std/src/sync/mpmc/mod.rs
@@ -1,8 +1,112 @@
-//! Multi-producer multi-consumer channels.
+//! Multi-producer, multi-consumer FIFO queue communication primitives.
+//!
+//! This module provides message-based communication over channels, concretely
+//! defined by two types:
+//!
+//! * [`Sender`]
+//! * [`Receiver`]
+//!
+//! [`Sender`]s are used to send data to a set of [`Receiver`]s. Both
+//! sender and receiver are cloneable (multi-producer) such that many threads can send
+//! simultaneously to receivers (multi-consumer).
+//!
+//! These channels come in two flavors:
+//!
+//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
+//! will return a `(Sender, Receiver)` tuple where all sends will be
+//! **asynchronous** (they never block). The channel conceptually has an
+//! infinite buffer.
+//!
+//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
+//! return a `(SyncSender, Receiver)` tuple where the storage for pending
+//! messages is a pre-allocated buffer of a fixed size. All sends will be
+//! **synchronous** by blocking until there is buffer space available. Note
+//! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
+//! channel where each sender atomically hands off a message to a receiver.
+//!
+//! [`send`]: Sender::send
+//!
+//! ## Disconnection
+//!
+//! The send and receive operations on channels will all return a [`Result`]
+//! indicating whether the operation succeeded or not. An unsuccessful operation
+//! is normally indicative of the other half of a channel having "hung up" by
+//! being dropped in its corresponding thread.
+//!
+//! Once half of a channel has been deallocated, most operations can no longer
+//! continue to make progress, so [`Err`] will be returned. Many applications
+//! will continue to [`unwrap`] the results returned from this module,
+//! instigating a propagation of failure among threads if one unexpectedly dies.
+//!
+//! [`unwrap`]: Result::unwrap
+//!
+//! # Examples
+//!
+//! Simple usage:
+//!
+//! ```
+//! #![feature(mpmc_channel)]
+//!
+//! use std::thread;
+//! use std::sync::mpmc::channel;
+//!
+//! // Create a simple streaming channel
+//! let (tx, rx) = channel();
+//! thread::spawn(move || {
+//! tx.send(10).unwrap();
+//! });
+//! assert_eq!(rx.recv().unwrap(), 10);
+//! ```
+//!
+//! Shared usage:
+//!
+//! ```
+//! #![feature(mpmc_channel)]
+//!
+//! use std::thread;
+//! use std::sync::mpmc::channel;
+//!
+//! // Create a shared channel that can be sent along from many threads
+//! // where tx is the sending half (tx for transmission), and rx is the receiving
+//! // half (rx for receiving).
+//! let (tx, rx) = channel();
+//! for i in 0..10 {
+//! let tx = tx.clone();
+//! thread::spawn(move || {
+//! tx.send(i).unwrap();
+//! });
+//! }
+//!
+//! for _ in 0..5 {
+//! let rx1 = rx.clone();
+//! let rx2 = rx.clone();
+//! thread::spawn(move || {
+//! let j = rx1.recv().unwrap();
+//! assert!(0 <= j && j < 10);
+//! });
+//! thread::spawn(move || {
+//! let j = rx2.recv().unwrap();
+//! assert!(0 <= j && j < 10);
+//! });
+//! }
+//! ```
+//!
+//! Propagating panics:
+//!
+//! ```
+//! #![feature(mpmc_channel)]
+//!
+//! use std::sync::mpmc::channel;
+//!
+//! // The call to recv() will return an error because the channel has already
+//! // hung up (or been deallocated)
+//! let (tx, rx) = channel::<i32>();
+//! drop(tx);
+//! assert!(rx.recv().is_err());
+//! ```
-// This module is not currently exposed publicly, but is used
-// as the implementation for the channels in `sync::mpsc`. The
-// implementation comes from the crossbeam-channel crate:
+// This module is used as the implementation for the channels in `sync::mpsc`.
+// The implementation comes from the crossbeam-channel crate:
//
// Copyright (c) 2019 The Crossbeam Project Developers
//
@@ -46,9 +150,47 @@ use crate::fmt;
use crate::panic::{RefUnwindSafe, UnwindSafe};
use crate::time::{Duration, Instant};
-/// Creates a channel of unbounded capacity.
+/// Creates a new asynchronous channel, returning the sender/receiver halves.
+/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
+/// the same order as it was sent, and no [`send`] will block the calling thread
+/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
+/// block after its buffer limit is reached). [`recv`] will block until a message
+/// is available while there is at least one [`Sender`] alive (including clones).
///
-/// This channel has a growable buffer that can hold any number of messages at a time.
+/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times.
+/// The [`Receiver`] also can be cloned to have multi receivers.
+///
+/// If the [`Receiver`] is disconnected while trying to [`send`] with the
+/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
+/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
+/// return a [`RecvError`].
+///
+/// [`send`]: Sender::send
+/// [`recv`]: Receiver::recv
+///
+/// # Examples
+///
+/// ```
+/// #![feature(mpmc_channel)]
+///
+/// use std::sync::mpmc::channel;
+/// use std::thread;
+///
+/// let (sender, receiver) = channel();
+///
+/// // Spawn off an expensive computation
+/// thread::spawn(move || {
+/// # fn expensive_computation() {}
+/// sender.send(expensive_computation()).unwrap();
+/// });
+///
+/// // Do some useful work for awhile
+///
+/// // Let's see what that answer was
+/// println!("{:?}", receiver.recv().unwrap());
+/// ```
+#[must_use]
+#[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (s, r) = counter::new(list::Channel::new());
let s = Sender { flavor: SenderFlavor::List(s) };
@@ -56,12 +198,50 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
(s, r)
}
-/// Creates a channel of bounded capacity.
+/// Creates a new synchronous, bounded channel.
+/// All data sent on the [`Sender`] will become available on the [`Receiver`]
+/// in the same order as it was sent. Like asynchronous [`channel`]s, the
+/// [`Receiver`] will block until a message becomes available. `sync_channel`
+/// differs greatly in the semantics of the sender, however.
+///
+/// This channel has an internal buffer on which messages will be queued.
+/// `bound` specifies the buffer size. When the internal buffer becomes full,
+/// future sends will *block* waiting for the buffer to open up. Note that a
+/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
+/// where each [`send`] will not return until a [`recv`] is paired with it.
+///
+/// The [`Sender`] can be cloned to [`send`] to the same channel multiple
+/// times. The [`Receiver`] also can be cloned to have multi receivers.
+///
+/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
+/// to [`send`] with the [`Sender`], the [`send`] method will return a
+/// [`SendError`]. Similarly, If the [`Sender`] is disconnected while trying
+/// to [`recv`], the [`recv`] method will return a [`RecvError`].
+///
+/// [`send`]: Sender::send
+/// [`recv`]: Receiver::recv
+///
+/// # Examples
+///
+/// ```
+/// use std::sync::mpsc::sync_channel;
+/// use std::thread;
+///
+/// let (sender, receiver) = sync_channel(1);
///
-/// This channel has a buffer that can hold at most `cap` messages at a time.
+/// // this returns immediately
+/// sender.send(1).unwrap();
///
-/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
-/// receive operations must appear at the same time in order to pair up and pass the message over.
+/// thread::spawn(move || {
+/// // this will block until the previous message has been received
+/// sender.send(2).unwrap();
+/// });
+///
+/// assert_eq!(receiver.recv().unwrap(), 1);
+/// assert_eq!(receiver.recv().unwrap(), 2);
+/// ```
+#[must_use]
+#[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
if cap == 0 {
let (s, r) = counter::new(zero::Channel::new());
@@ -76,7 +256,42 @@ pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
}
}
-/// The sending side of a channel.
+/// The sending-half of Rust's synchronous [`channel`] type.
+///
+/// Messages can be sent through this channel with [`send`].
+///
+/// Note: all senders (the original and its clones) need to be dropped for the receiver
+/// to stop blocking to receive messages with [`Receiver::recv`].
+///
+/// [`send`]: Sender::send
+///
+/// # Examples
+///
+/// ```rust
+/// #![feature(mpmc_channel)]
+///
+/// use std::sync::mpmc::channel;
+/// use std::thread;
+///
+/// let (sender, receiver) = channel();
+/// let sender2 = sender.clone();
+///
+/// // First thread owns sender
+/// thread::spawn(move || {
+/// sender.send(1).unwrap();
+/// });
+///
+/// // Second thread owns sender2
+/// thread::spawn(move || {
+/// sender2.send(2).unwrap();
+/// });
+///
+/// let msg = receiver.recv().unwrap();
+/// let msg2 = receiver.recv().unwrap();
+///
+/// assert_eq!(3, msg + msg2);
+/// ```
+#[unstable(feature = "mpmc_channel", issue = "126840")]
pub struct Sender<T> {
flavor: SenderFlavor<T>,
}
@@ -93,10 +308,14 @@ enum SenderFlavor<T> {
Zero(counter::Sender<zero::Channel<T>>),
}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
unsafe impl<T: Send> Send for Sender<T> {}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
unsafe impl<T: Send> Sync for Sender<T> {}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> UnwindSafe for Sender<T> {}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> RefUnwindSafe for Sender<T> {}
impl<T> Sender<T> {
@@ -107,6 +326,19 @@ impl<T> Sender<T> {
///
/// If called on a zero-capacity channel, this method will send the message only if there
/// happens to be a receive operation on the other side of the channel at the same time.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc::{channel, Receiver, Sender};
+ ///
+ /// let (sender, _receiver): (Sender<i32>, Receiver<i32>) = channel();
+ ///
+ /// assert!(sender.try_send(1).is_ok());
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.try_send(msg),
@@ -115,14 +347,36 @@ impl<T> Sender<T> {
}
}
- /// Blocks the current thread until a message is sent or the channel is disconnected.
+ /// Attempts to send a value on this channel, returning it back if it could
+ /// not be sent.
///
- /// If the channel is full and not disconnected, this call will block until the send operation
- /// can proceed. If the channel becomes disconnected, this call will wake up and return an
- /// error. The returned error contains the original message.
+ /// A successful send occurs when it is determined that the other end of
+ /// the channel has not hung up already. An unsuccessful send would be one
+ /// where the corresponding receiver has already been deallocated. Note
+ /// that a return value of [`Err`] means that the data will never be
+ /// received, but a return value of [`Ok`] does *not* mean that the data
+ /// will be received. It is possible for the corresponding receiver to
+ /// hang up immediately after this function returns [`Ok`].
///
- /// If called on a zero-capacity channel, this method will wait for a receive operation to
- /// appear on the other side of the channel.
+ /// This method will never block the current thread.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc::channel;
+ ///
+ /// let (tx, rx) = channel();
+ ///
+ /// // This send is always successful
+ /// tx.send(1).unwrap();
+ ///
+ /// // This send will fail because the receiver is gone
+ /// drop(rx);
+ /// assert!(tx.send(1).is_err());
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, None),
@@ -136,10 +390,6 @@ impl<T> Sender<T> {
}
}
-// The methods below are not used by `sync::mpsc`, but
-// are useful and we'll likely want to expose them
-// eventually
-#[allow(unused)]
impl<T> Sender<T> {
/// Waits for a message to be sent into the channel, but only for a limited time.
///
@@ -149,6 +399,20 @@ impl<T> Sender<T> {
///
/// If called on a zero-capacity channel, this method will wait for a receive operation to
/// appear on the other side of the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc::channel;
+ /// use std::time::Duration;
+ ///
+ /// let (tx, rx) = channel();
+ ///
+ /// tx.send_timeout(1, Duration::from_millis(400)).unwrap();
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
match Instant::now().checked_add(timeout) {
Some(deadline) => self.send_deadline(msg, deadline),
@@ -165,6 +429,21 @@ impl<T> Sender<T> {
///
/// If called on a zero-capacity channel, this method will wait for a receive operation to
/// appear on the other side of the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc::channel;
+ /// use std::time::{Duration, Instant};
+ ///
+ /// let (tx, rx) = channel();
+ ///
+ /// let t = Instant::now() + Duration::from_millis(400);
+ /// tx.send_deadline(1, t).unwrap();
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
@@ -176,6 +455,31 @@ impl<T> Sender<T> {
/// Returns `true` if the channel is empty.
///
/// Note: Zero-capacity channels are always empty.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc;
+ /// use std::thread;
+ ///
+ /// let (send, _recv) = mpmc::channel();
+ ///
+ /// let tx1 = send.clone();
+ /// let tx2 = send.clone();
+ ///
+ /// assert!(tx1.is_empty());
+ ///
+ /// let handle = thread::spawn(move || {
+ /// tx2.send(1u8).unwrap();
+ /// });
+ ///
+ /// handle.join().unwrap();
+ ///
+ /// assert!(!tx1.is_empty());
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn is_empty(&self) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.is_empty(),
@@ -187,6 +491,29 @@ impl<T> Sender<T> {
/// Returns `true` if the channel is full.
///
/// Note: Zero-capacity channels are always full.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc;
+ /// use std::thread;
+ ///
+ /// let (send, _recv) = mpmc::sync_channel(1);
+ ///
+ /// let (tx1, tx2) = (send.clone(), send.clone());
+ /// assert!(!tx1.is_full());
+ ///
+ /// let handle = thread::spawn(move || {
+ /// tx2.send(1u8).unwrap();
+ /// });
+ ///
+ /// handle.join().unwrap();
+ ///
+ /// assert!(tx1.is_full());
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn is_full(&self) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.is_full(),
@@ -196,6 +523,29 @@ impl<T> Sender<T> {
}
/// Returns the number of messages in the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc;
+ /// use std::thread;
+ ///
+ /// let (send, _recv) = mpmc::channel();
+ /// let (tx1, tx2) = (send.clone(), send.clone());
+ ///
+ /// assert_eq!(tx1.len(), 0);
+ ///
+ /// let handle = thread::spawn(move || {
+ /// tx2.send(1u8).unwrap();
+ /// });
+ ///
+ /// handle.join().unwrap();
+ ///
+ /// assert_eq!(tx1.len(), 1);
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn len(&self) -> usize {
match &self.flavor {
SenderFlavor::Array(chan) => chan.len(),
@@ -205,6 +555,29 @@ impl<T> Sender<T> {
}
/// If the channel is bounded, returns its capacity.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc;
+ /// use std::thread;
+ ///
+ /// let (send, _recv) = mpmc::sync_channel(3);
+ /// let (tx1, tx2) = (send.clone(), send.clone());
+ ///
+ /// assert_eq!(tx1.capacity(), Some(3));
+ ///
+ /// let handle = thread::spawn(move || {
+ /// tx2.send(1u8).unwrap();
+ /// });
+ ///
+ /// handle.join().unwrap();
+ ///
+ /// assert_eq!(tx1.capacity(), Some(3));
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn capacity(&self) -> Option<usize> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.capacity(),
@@ -214,6 +587,21 @@ impl<T> Sender<T> {
}
/// Returns `true` if senders belong to the same channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc;
+ ///
+ /// let (tx1, _) = mpmc::channel::<i32>();
+ /// let (tx2, _) = mpmc::channel::<i32>();
+ ///
+ /// assert!(tx1.same_channel(&tx1));
+ /// assert!(!tx1.same_channel(&tx2));
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn same_channel(&self, other: &Sender<T>) -> bool {
match (&self.flavor, &other.flavor) {
(SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
@@ -224,6 +612,7 @@ impl<T> Sender<T> {
}
}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe {
@@ -236,6 +625,7 @@ impl<T> Drop for Sender<T> {
}
}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
let flavor = match &self.flavor {
@@ -248,17 +638,216 @@ impl<T> Clone for Sender<T> {
}
}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Sender { .. }")
}
}
-/// The receiving side of a channel.
+/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
+/// Different threads can share this [`Sender`] by cloning it.
+///
+/// Messages sent to the channel can be retrieved using [`recv`].
+///
+/// [`recv`]: Receiver::recv
+///
+/// # Examples
+///
+/// ```rust
+/// #![feature(mpmc_channel)]
+///
+/// use std::sync::mpmc::channel;
+/// use std::thread;
+/// use std::time::Duration;
+///
+/// let (send, recv) = channel();
+///
+/// let tx_thread = thread::spawn(move || {
+/// send.send("Hello world!").unwrap();
+/// thread::sleep(Duration::from_secs(2)); // block for two seconds
+/// send.send("Delayed for 2 seconds").unwrap();
+/// });
+///
+/// let (rx1, rx2) = (recv.clone(), recv.clone());
+/// let rx_thread_1 = thread::spawn(move || {
+/// println!("{}", rx1.recv().unwrap()); // Received immediately
+/// });
+/// let rx_thread_2 = thread::spawn(move || {
+/// println!("{}", rx2.recv().unwrap()); // Received after 2 seconds
+/// });
+///
+/// tx_thread.join().unwrap();
+/// rx_thread_1.join().unwrap();
+/// rx_thread_2.join().unwrap();
+/// ```
+#[unstable(feature = "mpmc_channel", issue = "126840")]
pub struct Receiver<T> {
flavor: ReceiverFlavor<T>,
}
+/// An iterator over messages on a [`Receiver`], created by [`iter`].
+///
+/// This iterator will block whenever [`next`] is called,
+/// waiting for a new message, and [`None`] will be returned
+/// when the corresponding channel has hung up.
+///
+/// [`iter`]: Receiver::iter
+/// [`next`]: Iterator::next
+///
+/// # Examples
+///
+/// ```rust
+/// #![feature(mpmc_channel)]
+///
+/// use std::sync::mpmc::channel;
+/// use std::thread;
+///
+/// let (send, recv) = channel();
+///
+/// thread::spawn(move || {
+/// send.send(1u8).unwrap();
+/// send.send(2u8).unwrap();
+/// send.send(3u8).unwrap();
+/// });
+///
+/// for x in recv.iter() {
+/// println!("Got: {x}");
+/// }
+/// ```
+#[unstable(feature = "mpmc_channel", issue = "126840")]
+#[derive(Debug)]
+pub struct Iter<'a, T: 'a> {
+ rx: &'a Receiver<T>,
+}
+
+/// An iterator that attempts to yield all pending values for a [`Receiver`],
+/// created by [`try_iter`].
+///
+/// [`None`] will be returned when there are no pending values remaining or
+/// if the corresponding channel has hung up.
+///
+/// This iterator will never block the caller in order to wait for data to
+/// become available. Instead, it will return [`None`].
+///
+/// [`try_iter`]: Receiver::try_iter
+///
+/// # Examples
+///
+/// ```rust
+/// #![feature(mpmc_channel)]
+///
+/// use std::sync::mpmc::channel;
+/// use std::thread;
+/// use std::time::Duration;
+///
+/// let (sender, receiver) = channel();
+///
+/// // Nothing is in the buffer yet
+/// assert!(receiver.try_iter().next().is_none());
+/// println!("Nothing in the buffer...");
+///
+/// thread::spawn(move || {
+/// sender.send(1).unwrap();
+/// sender.send(2).unwrap();
+/// sender.send(3).unwrap();
+/// });
+///
+/// println!("Going to sleep...");
+/// thread::sleep(Duration::from_secs(2)); // block for two seconds
+///
+/// for x in receiver.try_iter() {
+/// println!("Got: {x}");
+/// }
+/// ```
+#[unstable(feature = "mpmc_channel", issue = "126840")]
+#[derive(Debug)]
+pub struct TryIter<'a, T: 'a> {
+ rx: &'a Receiver<T>,
+}
+
+/// An owning iterator over messages on a [`Receiver`],
+/// created by [`into_iter`].
+///
+/// This iterator will block whenever [`next`]
+/// is called, waiting for a new message, and [`None`] will be
+/// returned if the corresponding channel has hung up.
+///
+/// [`into_iter`]: Receiver::into_iter
+/// [`next`]: Iterator::next
+///
+/// # Examples
+///
+/// ```rust
+/// #![feature(mpmc_channel)]
+///
+/// use std::sync::mpmc::channel;
+/// use std::thread;
+///
+/// let (send, recv) = channel();
+///
+/// thread::spawn(move || {
+/// send.send(1u8).unwrap();
+/// send.send(2u8).unwrap();
+/// send.send(3u8).unwrap();
+/// });
+///
+/// for x in recv.into_iter() {
+/// println!("Got: {x}");
+/// }
+/// ```
+#[unstable(feature = "mpmc_channel", issue = "126840")]
+#[derive(Debug)]
+pub struct IntoIter<T> {
+ rx: Receiver<T>,
+}
+
+#[unstable(feature = "mpmc_channel", issue = "126840")]
+impl<'a, T> Iterator for Iter<'a, T> {
+ type Item = T;
+
+ fn next(&mut self) -> Option<T> {
+ self.rx.recv().ok()
+ }
+}
+
+#[unstable(feature = "mpmc_channel", issue = "126840")]
+impl<'a, T> Iterator for TryIter<'a, T> {
+ type Item = T;
+
+ fn next(&mut self) -> Option<T> {
+ self.rx.try_recv().ok()
+ }
+}
+
+#[unstable(feature = "mpmc_channel", issue = "126840")]
+impl<'a, T> IntoIterator for &'a Receiver<T> {
+ type Item = T;
+ type IntoIter = Iter<'a, T>;
+
+ fn into_iter(self) -> Iter<'a, T> {
+ self.iter()
+ }
+}
+
+#[unstable(feature = "mpmc_channel", issue = "126840")]
+impl<T> Iterator for IntoIter<T> {
+ type Item = T;
+ fn next(&mut self) -> Option<T> {
+ self.rx.recv().ok()
+ }
+}
+
+#[unstable(feature = "mpmc_channel", issue = "126840")]
+impl<T> IntoIterator for Receiver<T> {
+ type Item = T;
+ type IntoIter = IntoIter<T>;
+
+ fn into_iter(self) -> IntoIter<T> {
+ IntoIter { rx: self }
+ }
+}
+
/// Receiver flavors.
enum ReceiverFlavor<T> {
/// Bounded channel based on a preallocated array.
@@ -271,20 +860,46 @@ enum ReceiverFlavor<T> {
Zero(counter::Receiver<zero::Channel<T>>),
}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
unsafe impl<T: Send> Send for Receiver<T> {}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
unsafe impl<T: Send> Sync for Receiver<T> {}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> UnwindSafe for Receiver<T> {}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> RefUnwindSafe for Receiver<T> {}
impl<T> Receiver<T> {
/// Attempts to receive a message from the channel without blocking.
///
- /// This method will either receive a message from the channel immediately or return an error
- /// if the channel is empty.
+ /// This method will never block the caller in order to wait for data to
+ /// become available. Instead, this will always return immediately with a
+ /// possible option of pending data on the channel.
///
/// If called on a zero-capacity channel, this method will receive a message only if there
/// happens to be a send operation on the other side of the channel at the same time.
+ ///
+ /// This is useful for a flavor of "optimistic check" before deciding to
+ /// block on a receiver.
+ ///
+ /// Compared with [`recv`], this function has two failure cases instead of one
+ /// (one for disconnection, one for an empty buffer).
+ ///
+ /// [`recv`]: Self::recv
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc::{Receiver, channel};
+ ///
+ /// let (_, receiver): (_, Receiver<i32>) = channel();
+ ///
+ /// assert!(receiver.try_recv().is_err());
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn try_recv(&self) -> Result<T, TryRecvError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.try_recv(),
@@ -293,15 +908,64 @@ impl<T> Receiver<T> {
}
}
- /// Blocks the current thread until a message is received or the channel is empty and
- /// disconnected.
+ /// Attempts to wait for a value on this receiver, returning an error if the
+ /// corresponding channel has hung up.
///
- /// If the channel is empty and not disconnected, this call will block until the receive
- /// operation can proceed. If the channel is empty and becomes disconnected, this call will
- /// wake up and return an error.
+ /// This function will always block the current thread if there is no data
+ /// available and it's possible for more data to be sent (at least one sender
+ /// still exists). Once a message is sent to the corresponding [`Sender`],
+ /// this receiver will wake up and return that message.
///
- /// If called on a zero-capacity channel, this method will wait for a send operation to appear
- /// on the other side of the channel.
+ /// If the corresponding [`Sender`] has disconnected, or it disconnects while
+ /// this call is blocking, this call will wake up and return [`Err`] to
+ /// indicate that no more messages can ever be received on this channel.
+ /// However, since channels are buffered, messages sent before the disconnect
+ /// will still be properly received.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc;
+ /// use std::thread;
+ ///
+ /// let (send, recv) = mpmc::channel();
+ /// let handle = thread::spawn(move || {
+ /// send.send(1u8).unwrap();
+ /// });
+ ///
+ /// handle.join().unwrap();
+ ///
+ /// assert_eq!(Ok(1), recv.recv());
+ /// ```
+ ///
+ /// Buffering behavior:
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc;
+ /// use std::thread;
+ /// use std::sync::mpmc::RecvError;
+ ///
+ /// let (send, recv) = mpmc::channel();
+ /// let handle = thread::spawn(move || {
+ /// send.send(1u8).unwrap();
+ /// send.send(2).unwrap();
+ /// send.send(3).unwrap();
+ /// drop(send);
+ /// });
+ ///
+ /// // wait for the thread to join so we ensure the sender is dropped
+ /// handle.join().unwrap();
+ ///
+ /// assert_eq!(Ok(1), recv.recv());
+ /// assert_eq!(Ok(2), recv.recv());
+ /// assert_eq!(Ok(3), recv.recv());
+ /// assert_eq!(Err(RecvError), recv.recv());
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn recv(&self) -> Result<T, RecvError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.recv(None),
@@ -311,14 +975,65 @@ impl<T> Receiver<T> {
.map_err(|_| RecvError)
}
- /// Waits for a message to be received from the channel, but only for a limited time.
+ /// Attempts to wait for a value on this receiver, returning an error if the
+ /// corresponding channel has hung up, or if it waits more than `timeout`.
+ ///
+ /// This function will always block the current thread if there is no data
+ /// available and it's possible for more data to be sent (at least one sender
+ /// still exists). Once a message is sent to the corresponding [`Sender`],
+ /// this receiver will wake up and return that message.
+ ///
+ /// If the corresponding [`Sender`] has disconnected, or it disconnects while
+ /// this call is blocking, this call will wake up and return [`Err`] to
+ /// indicate that no more messages can ever be received on this channel.
+ /// However, since channels are buffered, messages sent before the disconnect
+ /// will still be properly received.
+ ///
+ /// # Examples
+ ///
+ /// Successfully receiving value before encountering timeout:
///
- /// If the channel is empty and not disconnected, this call will block until the receive
- /// operation can proceed or the operation times out. If the channel is empty and becomes
- /// disconnected, this call will wake up and return an error.
+ /// ```no_run
+ /// #![feature(mpmc_channel)]
///
- /// If called on a zero-capacity channel, this method will wait for a send operation to appear
- /// on the other side of the channel.
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use std::sync::mpmc;
+ ///
+ /// let (send, recv) = mpmc::channel();
+ ///
+ /// thread::spawn(move || {
+ /// send.send('a').unwrap();
+ /// });
+ ///
+ /// assert_eq!(
+ /// recv.recv_timeout(Duration::from_millis(400)),
+ /// Ok('a')
+ /// );
+ /// ```
+ ///
+ /// Receiving an error upon reaching timeout:
+ ///
+ /// ```no_run
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use std::sync::mpmc;
+ ///
+ /// let (send, recv) = mpmc::channel();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_millis(800));
+ /// send.send('a').unwrap();
+ /// });
+ ///
+ /// assert_eq!(
+ /// recv.recv_timeout(Duration::from_millis(400)),
+ /// Err(mpmc::RecvTimeoutError::Timeout)
+ /// );
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
match Instant::now().checked_add(timeout) {
Some(deadline) => self.recv_deadline(deadline),
@@ -327,14 +1042,65 @@ impl<T> Receiver<T> {
}
}
- /// Waits for a message to be received from the channel, but only for a limited time.
+ /// Attempts to wait for a value on this receiver, returning an error if the
+ /// corresponding channel has hung up, or if `deadline` is reached.
+ ///
+ /// This function will always block the current thread if there is no data
+ /// available and it's possible for more data to be sent. Once a message is
+ /// sent to the corresponding [`Sender`], then this receiver will wake up
+ /// and return that message.
+ ///
+ /// If the corresponding [`Sender`] has disconnected, or it disconnects while
+ /// this call is blocking, this call will wake up and return [`Err`] to
+ /// indicate that no more messages can ever be received on this channel.
+ /// However, since channels are buffered, messages sent before the disconnect
+ /// will still be properly received.
+ ///
+ /// # Examples
+ ///
+ /// Successfully receiving value before reaching deadline:
+ ///
+ /// ```no_run
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::thread;
+ /// use std::time::{Duration, Instant};
+ /// use std::sync::mpmc;
///
- /// If the channel is empty and not disconnected, this call will block until the receive
- /// operation can proceed or the operation times out. If the channel is empty and becomes
- /// disconnected, this call will wake up and return an error.
+ /// let (send, recv) = mpmc::channel();
///
- /// If called on a zero-capacity channel, this method will wait for a send operation to appear
- /// on the other side of the channel.
+ /// thread::spawn(move || {
+ /// send.send('a').unwrap();
+ /// });
+ ///
+ /// assert_eq!(
+ /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
+ /// Ok('a')
+ /// );
+ /// ```
+ ///
+ /// Receiving an error upon reaching deadline:
+ ///
+ /// ```no_run
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::thread;
+ /// use std::time::{Duration, Instant};
+ /// use std::sync::mpmc;
+ ///
+ /// let (send, recv) = mpmc::channel();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_millis(800));
+ /// send.send('a').unwrap();
+ /// });
+ ///
+ /// assert_eq!(
+ /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
+ /// Err(mpmc::RecvTimeoutError::Timeout)
+ /// );
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
@@ -342,16 +1108,77 @@ impl<T> Receiver<T> {
ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
}
}
+
+ /// Returns an iterator that will attempt to yield all pending values.
+ /// It will return `None` if there are no more pending values or if the
+ /// channel has hung up. The iterator will never [`panic!`] or block the
+ /// user by waiting for values.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc::channel;
+ /// use std::thread;
+ /// use std::time::Duration;
+ ///
+ /// let (sender, receiver) = channel();
+ ///
+ /// // nothing is in the buffer yet
+ /// assert!(receiver.try_iter().next().is_none());
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// sender.send(1).unwrap();
+ /// sender.send(2).unwrap();
+ /// sender.send(3).unwrap();
+ /// });
+ ///
+ /// // nothing is in the buffer yet
+ /// assert!(receiver.try_iter().next().is_none());
+ ///
+ /// // block for two seconds
+ /// thread::sleep(Duration::from_secs(2));
+ ///
+ /// let mut iter = receiver.try_iter();
+ /// assert_eq!(iter.next(), Some(1));
+ /// assert_eq!(iter.next(), Some(2));
+ /// assert_eq!(iter.next(), Some(3));
+ /// assert_eq!(iter.next(), None);
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
+ pub fn try_iter(&self) -> TryIter<'_, T> {
+ TryIter { rx: self }
+ }
}
-// The methods below are not used by `sync::mpsc`, but
-// are useful and we'll likely want to expose them
-// eventually
-#[allow(unused)]
impl<T> Receiver<T> {
/// Returns `true` if the channel is empty.
///
/// Note: Zero-capacity channels are always empty.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc;
+ /// use std::thread;
+ ///
+ /// let (send, recv) = mpmc::channel();
+ ///
+ /// assert!(recv.is_empty());
+ ///
+ /// let handle = thread::spawn(move || {
+ /// send.send(1u8).unwrap();
+ /// });
+ ///
+ /// handle.join().unwrap();
+ ///
+ /// assert!(!recv.is_empty());
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn is_empty(&self) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.is_empty(),
@@ -363,6 +1190,28 @@ impl<T> Receiver<T> {
/// Returns `true` if the channel is full.
///
/// Note: Zero-capacity channels are always full.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc;
+ /// use std::thread;
+ ///
+ /// let (send, recv) = mpmc::sync_channel(1);
+ ///
+ /// assert!(!recv.is_full());
+ ///
+ /// let handle = thread::spawn(move || {
+ /// send.send(1u8).unwrap();
+ /// });
+ ///
+ /// handle.join().unwrap();
+ ///
+ /// assert!(recv.is_full());
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn is_full(&self) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.is_full(),
@@ -372,6 +1221,28 @@ impl<T> Receiver<T> {
}
/// Returns the number of messages in the channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc;
+ /// use std::thread;
+ ///
+ /// let (send, recv) = mpmc::channel();
+ ///
+ /// assert_eq!(recv.len(), 0);
+ ///
+ /// let handle = thread::spawn(move || {
+ /// send.send(1u8).unwrap();
+ /// });
+ ///
+ /// handle.join().unwrap();
+ ///
+ /// assert_eq!(recv.len(), 1);
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn len(&self) -> usize {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.len(),
@@ -381,6 +1252,28 @@ impl<T> Receiver<T> {
}
/// If the channel is bounded, returns its capacity.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc;
+ /// use std::thread;
+ ///
+ /// let (send, recv) = mpmc::sync_channel(3);
+ ///
+ /// assert_eq!(recv.capacity(), Some(3));
+ ///
+ /// let handle = thread::spawn(move || {
+ /// send.send(1u8).unwrap();
+ /// });
+ ///
+ /// handle.join().unwrap();
+ ///
+ /// assert_eq!(recv.capacity(), Some(3));
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn capacity(&self) -> Option<usize> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.capacity(),
@@ -390,6 +1283,21 @@ impl<T> Receiver<T> {
}
/// Returns `true` if receivers belong to the same channel.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc;
+ ///
+ /// let (_, rx1) = mpmc::channel::<i32>();
+ /// let (_, rx2) = mpmc::channel::<i32>();
+ ///
+ /// assert!(rx1.same_channel(&rx1));
+ /// assert!(!rx1.same_channel(&rx2));
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
pub fn same_channel(&self, other: &Receiver<T>) -> bool {
match (&self.flavor, &other.flavor) {
(ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
@@ -398,8 +1306,39 @@ impl<T> Receiver<T> {
_ => false,
}
}
+
+ /// Returns an iterator that will block waiting for messages, but never
+ /// [`panic!`]. It will return [`None`] when the channel has hung up.
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// #![feature(mpmc_channel)]
+ ///
+ /// use std::sync::mpmc::channel;
+ /// use std::thread;
+ ///
+ /// let (send, recv) = channel();
+ ///
+ /// thread::spawn(move || {
+ /// send.send(1).unwrap();
+ /// send.send(2).unwrap();
+ /// send.send(3).unwrap();
+ /// });
+ ///
+ /// let mut iter = recv.iter();
+ /// assert_eq!(iter.next(), Some(1));
+ /// assert_eq!(iter.next(), Some(2));
+ /// assert_eq!(iter.next(), Some(3));
+ /// assert_eq!(iter.next(), None);
+ /// ```
+ #[unstable(feature = "mpmc_channel", issue = "126840")]
+ pub fn iter(&self) -> Iter<'_, T> {
+ Iter { rx: self }
+ }
}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
unsafe {
@@ -412,6 +1351,7 @@ impl<T> Drop for Receiver<T> {
}
}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let flavor = match &self.flavor {
@@ -424,6 +1364,7 @@ impl<T> Clone for Receiver<T> {
}
}
+#[unstable(feature = "mpmc_channel", issue = "126840")]
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Receiver { .. }")
diff --git a/library/std/src/sync/mpmc/tests.rs b/library/std/src/sync/mpmc/tests.rs
new file mode 100644
index 00000000000..ab14050df6c
--- /dev/null
+++ b/library/std/src/sync/mpmc/tests.rs
@@ -0,0 +1,728 @@
+use super::*;
+use crate::{env, thread};
+
+pub fn stress_factor() -> usize {
+ match env::var("RUST_TEST_STRESS") {
+ Ok(val) => val.parse().unwrap(),
+ Err(..) => 1,
+ }
+}
+
+#[test]
+fn smoke() {
+ let (tx, rx) = channel::<i32>();
+ tx.send(1).unwrap();
+ assert_eq!(rx.recv().unwrap(), 1);
+}
+
+#[test]
+fn drop_full() {
+ let (tx, _rx) = channel::<Box<isize>>();
+ tx.send(Box::new(1)).unwrap();
+}
+
+#[test]
+fn drop_full_shared() {
+ let (tx, _rx) = channel::<Box<isize>>();
+ drop(tx.clone());
+ drop(tx.clone());
+ tx.send(Box::new(1)).unwrap();
+}
+
+#[test]
+fn smoke_shared() {
+ let (tx, rx) = channel::<i32>();
+ tx.send(1).unwrap();
+ assert_eq!(rx.recv().unwrap(), 1);
+ let tx = tx.clone();
+ tx.send(1).unwrap();
+ assert_eq!(rx.recv().unwrap(), 1);
+}
+
+#[test]
+fn smoke_threads() {
+ let (tx, rx) = channel::<i32>();
+ let t1 = thread::spawn(move || {
+ for i in 0..2 {
+ tx.send(i).unwrap();
+ }
+ });
+ let t2 = thread::spawn(move || {
+ assert_eq!(rx.recv().unwrap(), 0);
+ assert_eq!(rx.recv().unwrap(), 1);
+ });
+ t1.join().unwrap();
+ t2.join().unwrap();
+}
+
+#[test]
+fn smoke_port_gone() {
+ let (tx, rx) = channel::<i32>();
+ drop(rx);
+ assert!(tx.send(1).is_err());
+}
+
+#[test]
+fn smoke_shared_port_gone() {
+ let (tx, rx) = channel::<i32>();
+ drop(rx);
+ assert!(tx.send(1).is_err())
+}
+
+#[test]
+fn smoke_shared_port_gone2() {
+ let (tx, rx) = channel::<i32>();
+ drop(rx);
+ let tx2 = tx.clone();
+ drop(tx);
+ assert!(tx2.send(1).is_err());
+}
+
+#[test]
+fn port_gone_concurrent() {
+ let (tx, rx) = channel::<i32>();
+ let _t = thread::spawn(move || {
+ rx.recv().unwrap();
+ });
+ while tx.send(1).is_ok() {}
+}
+
+#[test]
+fn port_gone_concurrent_shared() {
+ let (tx, rx) = channel::<i32>();
+ let tx2 = tx.clone();
+ let _t = thread::spawn(move || {
+ rx.recv().unwrap();
+ });
+ while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
+}
+
+#[test]
+fn smoke_chan_gone() {
+ let (tx, rx) = channel::<i32>();
+ drop(tx);
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn smoke_chan_gone_shared() {
+ let (tx, rx) = channel::<()>();
+ let tx2 = tx.clone();
+ drop(tx);
+ drop(tx2);
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn chan_gone_concurrent() {
+ let (tx, rx) = channel::<i32>();
+ let _t = thread::spawn(move || {
+ tx.send(1).unwrap();
+ tx.send(1).unwrap();
+ });
+ while rx.recv().is_ok() {}
+}
+
+#[test]
+fn stress() {
+ let count = if cfg!(miri) { 100 } else { 10000 };
+ let (tx, rx) = channel::<i32>();
+ let t = thread::spawn(move || {
+ for _ in 0..count {
+ tx.send(1).unwrap();
+ }
+ });
+ for _ in 0..count {
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+ t.join().ok().expect("thread panicked");
+}
+
+#[test]
+fn stress_shared() {
+ const AMT: u32 = if cfg!(miri) { 100 } else { 10000 };
+ const NTHREADS: u32 = 8;
+ let (tx, rx) = channel::<i32>();
+
+ let t = thread::spawn(move || {
+ for _ in 0..AMT * NTHREADS {
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+ match rx.try_recv() {
+ Ok(..) => panic!(),
+ _ => {}
+ }
+ });
+
+ for _ in 0..NTHREADS {
+ let tx = tx.clone();
+ thread::spawn(move || {
+ for _ in 0..AMT {
+ tx.send(1).unwrap();
+ }
+ });
+ }
+ drop(tx);
+ t.join().ok().expect("thread panicked");
+}
+
+#[test]
+fn send_from_outside_runtime() {
+ let (tx1, rx1) = channel::<()>();
+ let (tx2, rx2) = channel::<i32>();
+ let t1 = thread::spawn(move || {
+ tx1.send(()).unwrap();
+ for _ in 0..40 {
+ assert_eq!(rx2.recv().unwrap(), 1);
+ }
+ });
+ rx1.recv().unwrap();
+ let t2 = thread::spawn(move || {
+ for _ in 0..40 {
+ tx2.send(1).unwrap();
+ }
+ });
+ t1.join().ok().expect("thread panicked");
+ t2.join().ok().expect("thread panicked");
+}
+
+#[test]
+fn recv_from_outside_runtime() {
+ let (tx, rx) = channel::<i32>();
+ let t = thread::spawn(move || {
+ for _ in 0..40 {
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+ });
+ for _ in 0..40 {
+ tx.send(1).unwrap();
+ }
+ t.join().ok().expect("thread panicked");
+}
+
+#[test]
+fn no_runtime() {
+ let (tx1, rx1) = channel::<i32>();
+ let (tx2, rx2) = channel::<i32>();
+ let t1 = thread::spawn(move || {
+ assert_eq!(rx1.recv().unwrap(), 1);
+ tx2.send(2).unwrap();
+ });
+ let t2 = thread::spawn(move || {
+ tx1.send(1).unwrap();
+ assert_eq!(rx2.recv().unwrap(), 2);
+ });
+ t1.join().ok().expect("thread panicked");
+ t2.join().ok().expect("thread panicked");
+}
+
+#[test]
+fn oneshot_single_thread_close_port_first() {
+ // Simple test of closing without sending
+ let (_tx, rx) = channel::<i32>();
+ drop(rx);
+}
+
+#[test]
+fn oneshot_single_thread_close_chan_first() {
+ // Simple test of closing without sending
+ let (tx, _rx) = channel::<i32>();
+ drop(tx);
+}
+
+#[test]
+fn oneshot_single_thread_send_port_close() {
+ // Testing that the sender cleans up the payload if receiver is closed
+ let (tx, rx) = channel::<Box<i32>>();
+ drop(rx);
+ assert!(tx.send(Box::new(0)).is_err());
+}
+
+#[test]
+fn oneshot_single_thread_recv_chan_close() {
+ // Receiving on a closed chan will panic
+ let res = thread::spawn(move || {
+ let (tx, rx) = channel::<i32>();
+ drop(tx);
+ rx.recv().unwrap();
+ })
+ .join();
+ // What is our res?
+ assert!(res.is_err());
+}
+
+#[test]
+fn oneshot_single_thread_send_then_recv() {
+ let (tx, rx) = channel::<Box<i32>>();
+ tx.send(Box::new(10)).unwrap();
+ assert!(*rx.recv().unwrap() == 10);
+}
+
+#[test]
+fn oneshot_single_thread_try_send_open() {
+ let (tx, rx) = channel::<i32>();
+ assert!(tx.send(10).is_ok());
+ assert!(rx.recv().unwrap() == 10);
+}
+
+#[test]
+fn oneshot_single_thread_try_send_closed() {
+ let (tx, rx) = channel::<i32>();
+ drop(rx);
+ assert!(tx.send(10).is_err());
+}
+
+#[test]
+fn oneshot_single_thread_try_recv_open() {
+ let (tx, rx) = channel::<i32>();
+ tx.send(10).unwrap();
+ assert!(rx.recv() == Ok(10));
+}
+
+#[test]
+fn oneshot_single_thread_try_recv_closed() {
+ let (tx, rx) = channel::<i32>();
+ drop(tx);
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn oneshot_single_thread_peek_data() {
+ let (tx, rx) = channel::<i32>();
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
+ tx.send(10).unwrap();
+ assert_eq!(rx.try_recv(), Ok(10));
+}
+
+#[test]
+fn oneshot_single_thread_peek_close() {
+ let (tx, rx) = channel::<i32>();
+ drop(tx);
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
+}
+
+#[test]
+fn oneshot_single_thread_peek_open() {
+ let (_tx, rx) = channel::<i32>();
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
+}
+
+#[test]
+fn oneshot_multi_task_recv_then_send() {
+ let (tx, rx) = channel::<Box<i32>>();
+ let _t = thread::spawn(move || {
+ assert!(*rx.recv().unwrap() == 10);
+ });
+
+ tx.send(Box::new(10)).unwrap();
+}
+
+#[test]
+fn oneshot_multi_task_recv_then_close() {
+ let (tx, rx) = channel::<Box<i32>>();
+ let _t = thread::spawn(move || {
+ drop(tx);
+ });
+ let res = thread::spawn(move || {
+ assert!(*rx.recv().unwrap() == 10);
+ })
+ .join();
+ assert!(res.is_err());
+}
+
+#[test]
+fn oneshot_multi_thread_close_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = channel::<i32>();
+ let _t = thread::spawn(move || {
+ drop(rx);
+ });
+ drop(tx);
+ }
+}
+
+#[test]
+fn oneshot_multi_thread_send_close_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = channel::<i32>();
+ let _t = thread::spawn(move || {
+ drop(rx);
+ });
+ let _ = thread::spawn(move || {
+ tx.send(1).unwrap();
+ })
+ .join();
+ }
+}
+
+#[test]
+fn oneshot_multi_thread_recv_close_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = channel::<i32>();
+ thread::spawn(move || {
+ let res = thread::spawn(move || {
+ rx.recv().unwrap();
+ })
+ .join();
+ assert!(res.is_err());
+ });
+ let _t = thread::spawn(move || {
+ thread::spawn(move || {
+ drop(tx);
+ });
+ });
+ }
+}
+
+#[test]
+fn oneshot_multi_thread_send_recv_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = channel::<Box<isize>>();
+ let _t = thread::spawn(move || {
+ tx.send(Box::new(10)).unwrap();
+ });
+ assert!(*rx.recv().unwrap() == 10);
+ }
+}
+
+#[test]
+fn stream_send_recv_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = channel();
+
+ send(tx, 0);
+ recv(rx, 0);
+
+ fn send(tx: Sender<Box<i32>>, i: i32) {
+ if i == 10 {
+ return;
+ }
+
+ thread::spawn(move || {
+ tx.send(Box::new(i)).unwrap();
+ send(tx, i + 1);
+ });
+ }
+
+ fn recv(rx: Receiver<Box<i32>>, i: i32) {
+ if i == 10 {
+ return;
+ }
+
+ thread::spawn(move || {
+ assert!(*rx.recv().unwrap() == i);
+ recv(rx, i + 1);
+ });
+ }
+ }
+}
+
+#[test]
+fn oneshot_single_thread_recv_timeout() {
+ let (tx, rx) = channel();
+ tx.send(()).unwrap();
+ assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
+ assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
+ tx.send(()).unwrap();
+ assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
+}
+
+#[test]
+fn stress_recv_timeout_two_threads() {
+ let (tx, rx) = channel();
+ let stress = stress_factor() + 100;
+ let timeout = Duration::from_millis(100);
+
+ thread::spawn(move || {
+ for i in 0..stress {
+ if i % 2 == 0 {
+ thread::sleep(timeout * 2);
+ }
+ tx.send(1usize).unwrap();
+ }
+ });
+
+ let mut recv_count = 0;
+ loop {
+ match rx.recv_timeout(timeout) {
+ Ok(n) => {
+ assert_eq!(n, 1usize);
+ recv_count += 1;
+ }
+ Err(RecvTimeoutError::Timeout) => continue,
+ Err(RecvTimeoutError::Disconnected) => break,
+ }
+ }
+
+ assert_eq!(recv_count, stress);
+}
+
+#[test]
+fn recv_timeout_upgrade() {
+ let (tx, rx) = channel::<()>();
+ let timeout = Duration::from_millis(1);
+ let _tx_clone = tx.clone();
+
+ let start = Instant::now();
+ assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
+ assert!(Instant::now() >= start + timeout);
+}
+
+#[test]
+fn stress_recv_timeout_shared() {
+ let (tx, rx) = channel();
+ let stress = stress_factor() + 100;
+
+ for i in 0..stress {
+ let tx = tx.clone();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(i as u64 * 10));
+ tx.send(1usize).unwrap();
+ });
+ }
+
+ drop(tx);
+
+ let mut recv_count = 0;
+ loop {
+ match rx.recv_timeout(Duration::from_millis(10)) {
+ Ok(n) => {
+ assert_eq!(n, 1usize);
+ recv_count += 1;
+ }
+ Err(RecvTimeoutError::Timeout) => continue,
+ Err(RecvTimeoutError::Disconnected) => break,
+ }
+ }
+
+ assert_eq!(recv_count, stress);
+}
+
+#[test]
+fn very_long_recv_timeout_wont_panic() {
+ let (tx, rx) = channel::<()>();
+ let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX)));
+ thread::sleep(Duration::from_secs(1));
+ assert!(tx.send(()).is_ok());
+ assert_eq!(join_handle.join().unwrap(), Ok(()));
+}
+
+#[test]
+fn recv_a_lot() {
+ let count = if cfg!(miri) { 1000 } else { 10000 };
+ // Regression test that we don't run out of stack in scheduler context
+ let (tx, rx) = channel();
+ for _ in 0..count {
+ tx.send(()).unwrap();
+ }
+ for _ in 0..count {
+ rx.recv().unwrap();
+ }
+}
+
+#[test]
+fn shared_recv_timeout() {
+ let (tx, rx) = channel();
+ let total = 5;
+ for _ in 0..total {
+ let tx = tx.clone();
+ thread::spawn(move || {
+ tx.send(()).unwrap();
+ });
+ }
+
+ for _ in 0..total {
+ rx.recv().unwrap();
+ }
+
+ assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
+ tx.send(()).unwrap();
+ assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
+}
+
+#[test]
+fn shared_chan_stress() {
+ let (tx, rx) = channel();
+ let total = stress_factor() + 100;
+ for _ in 0..total {
+ let tx = tx.clone();
+ thread::spawn(move || {
+ tx.send(()).unwrap();
+ });
+ }
+
+ for _ in 0..total {
+ rx.recv().unwrap();
+ }
+}
+
+#[test]
+fn test_nested_recv_iter() {
+ let (tx, rx) = channel::<i32>();
+ let (total_tx, total_rx) = channel::<i32>();
+
+ let _t = thread::spawn(move || {
+ let mut acc = 0;
+ for x in rx.iter() {
+ acc += x;
+ }
+ total_tx.send(acc).unwrap();
+ });
+
+ tx.send(3).unwrap();
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+ drop(tx);
+ assert_eq!(total_rx.recv().unwrap(), 6);
+}
+
+#[test]
+fn test_recv_iter_break() {
+ let (tx, rx) = channel::<i32>();
+ let (count_tx, count_rx) = channel();
+
+ let _t = thread::spawn(move || {
+ let mut count = 0;
+ for x in rx.iter() {
+ if count >= 3 {
+ break;
+ } else {
+ count += x;
+ }
+ }
+ count_tx.send(count).unwrap();
+ });
+
+ tx.send(2).unwrap();
+ tx.send(2).unwrap();
+ tx.send(2).unwrap();
+ let _ = tx.send(2);
+ drop(tx);
+ assert_eq!(count_rx.recv().unwrap(), 4);
+}
+
+#[test]
+fn test_recv_try_iter() {
+ let (request_tx, request_rx) = channel();
+ let (response_tx, response_rx) = channel();
+
+ // Request `x`s until we have `6`.
+ let t = thread::spawn(move || {
+ let mut count = 0;
+ loop {
+ for x in response_rx.try_iter() {
+ count += x;
+ if count == 6 {
+ return count;
+ }
+ }
+ request_tx.send(()).unwrap();
+ }
+ });
+
+ for _ in request_rx.iter() {
+ if response_tx.send(2).is_err() {
+ break;
+ }
+ }
+
+ assert_eq!(t.join().unwrap(), 6);
+}
+
+#[test]
+fn test_recv_into_iter_owned() {
+ let mut iter = {
+ let (tx, rx) = channel::<i32>();
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+
+ rx.into_iter()
+ };
+ assert_eq!(iter.next().unwrap(), 1);
+ assert_eq!(iter.next().unwrap(), 2);
+ assert_eq!(iter.next().is_none(), true);
+}
+
+#[test]
+fn test_recv_into_iter_borrowed() {
+ let (tx, rx) = channel::<i32>();
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+ drop(tx);
+ let mut iter = (&rx).into_iter();
+ assert_eq!(iter.next().unwrap(), 1);
+ assert_eq!(iter.next().unwrap(), 2);
+ assert_eq!(iter.next().is_none(), true);
+}
+
+#[test]
+fn try_recv_states() {
+ let (tx1, rx1) = channel::<i32>();
+ let (tx2, rx2) = channel::<()>();
+ let (tx3, rx3) = channel::<()>();
+ let _t = thread::spawn(move || {
+ rx2.recv().unwrap();
+ tx1.send(1).unwrap();
+ tx3.send(()).unwrap();
+ rx2.recv().unwrap();
+ drop(tx1);
+ tx3.send(()).unwrap();
+ });
+
+ assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
+ tx2.send(()).unwrap();
+ rx3.recv().unwrap();
+ assert_eq!(rx1.try_recv(), Ok(1));
+ assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
+ tx2.send(()).unwrap();
+ rx3.recv().unwrap();
+ assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
+}
+
+// This bug used to end up in a livelock inside of the Receiver destructor
+// because the internal state of the Shared packet was corrupted
+#[test]
+fn destroy_upgraded_shared_port_when_sender_still_active() {
+ let (tx, rx) = channel();
+ let (tx2, rx2) = channel();
+ let _t = thread::spawn(move || {
+ rx.recv().unwrap(); // wait on a oneshot
+ drop(rx); // destroy a shared
+ tx2.send(()).unwrap();
+ });
+ // make sure the other thread has gone to sleep
+ for _ in 0..5000 {
+ thread::yield_now();
+ }
+
+ // upgrade to a shared chan and send a message
+ let t = tx.clone();
+ drop(tx);
+ t.send(()).unwrap();
+
+ // wait for the child thread to exit before we exit
+ rx2.recv().unwrap();
+}
+
+#[test]
+fn issue_32114() {
+ let (tx, _) = channel();
+ let _ = tx.send(123);
+ assert_eq!(tx.send(123), Err(SendError(123)));
+}
+
+#[test]
+fn issue_39364() {
+ let (tx, rx) = channel::<()>();
+ let t = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(300));
+ let _ = tx.clone();
+ // Don't drop; hand back to caller.
+ tx
+ });
+
+ let _ = rx.recv_timeout(Duration::from_millis(500));
+ let _tx = t.join().unwrap(); // delay dropping until end of test
+ let _ = rx.recv_timeout(Duration::from_millis(500));
+}