[go: up one dir, main page]

std/sync/mpmc/
mod.rs

1//! Multi-producer, multi-consumer FIFO queue communication primitives.
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined by two types:
5//!
6//! * [`Sender`]
7//! * [`Receiver`]
8//!
9//! [`Sender`]s are used to send data to a set of [`Receiver`]s where each item
10//! sent is delivered to (at most) one receiver. Both sender and receiver are
11//! cloneable (multi-producer) such that many threads can send simultaneously
12//! to receivers (multi-consumer).
13//!
14//! These channels come in two flavors:
15//!
16//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17//!    will return a `(Sender, Receiver)` tuple where all sends will be
18//!    **asynchronous** (they never block). The channel conceptually has an
19//!    infinite buffer.
20//!
21//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22//!    return a `(Sender, Receiver)` tuple where the storage for pending
23//!    messages is a pre-allocated buffer of a fixed size. All sends will be
24//!    **synchronous** by blocking until there is buffer space available. Note
25//!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26//!    channel where each sender atomically hands off a message to a receiver.
27//!
28//! [`send`]: Sender::send
29//!
30//! ## Disconnection
31//!
32//! The send and receive operations on channels will all return a [`Result`]
33//! indicating whether the operation succeeded or not. An unsuccessful operation
34//! is normally indicative of the other half of a channel having "hung up" by
35//! being dropped in its corresponding thread.
36//!
37//! Once half of a channel has been deallocated, most operations can no longer
38//! continue to make progress, so [`Err`] will be returned. Many applications
39//! will continue to [`unwrap`] the results returned from this module,
40//! instigating a propagation of failure among threads if one unexpectedly dies.
41//!
42//! [`unwrap`]: Result::unwrap
43//!
44//! # Examples
45//!
46//! Simple usage:
47//!
48//! ```
49//! #![feature(mpmc_channel)]
50//!
51//! use std::thread;
52//! use std::sync::mpmc::channel;
53//!
54//! // Create a simple streaming channel
55//! let (tx, rx) = channel();
56//! thread::spawn(move || {
57//!     tx.send(10).unwrap();
58//! });
59//! assert_eq!(rx.recv().unwrap(), 10);
60//! ```
61//!
62//! Shared usage:
63//!
64//! ```
65//! #![feature(mpmc_channel)]
66//!
67//! use std::thread;
68//! use std::sync::mpmc::channel;
69//!
70//! thread::scope(|s| {
71//!     // Create a shared channel that can be sent along from many threads
72//!     // where tx is the sending half (tx for transmission), and rx is the receiving
73//!     // half (rx for receiving).
74//!     let (tx, rx) = channel();
75//!     for i in 0..10 {
76//!         let tx = tx.clone();
77//!         s.spawn(move || {
78//!             tx.send(i).unwrap();
79//!         });
80//!     }
81//!
82//!     for _ in 0..5 {
83//!         let rx1 = rx.clone();
84//!         let rx2 = rx.clone();
85//!         s.spawn(move || {
86//!             let j = rx1.recv().unwrap();
87//!             assert!(0 <= j && j < 10);
88//!         });
89//!         s.spawn(move || {
90//!             let j = rx2.recv().unwrap();
91//!             assert!(0 <= j && j < 10);
92//!         });
93//!     }
94//! })
95//! ```
96//!
97//! Propagating panics:
98//!
99//! ```
100//! #![feature(mpmc_channel)]
101//!
102//! use std::sync::mpmc::channel;
103//!
104//! // The call to recv() will return an error because the channel has already
105//! // hung up (or been deallocated)
106//! let (tx, rx) = channel::<i32>();
107//! drop(tx);
108//! assert!(rx.recv().is_err());
109//! ```
110
111// This module is used as the implementation for the channels in `sync::mpsc`.
112// The implementation comes from the crossbeam-channel crate:
113//
114// Copyright (c) 2019 The Crossbeam Project Developers
115//
116// Permission is hereby granted, free of charge, to any
117// person obtaining a copy of this software and associated
118// documentation files (the "Software"), to deal in the
119// Software without restriction, including without
120// limitation the rights to use, copy, modify, merge,
121// publish, distribute, sublicense, and/or sell copies of
122// the Software, and to permit persons to whom the Software
123// is furnished to do so, subject to the following
124// conditions:
125//
126// The above copyright notice and this permission notice
127// shall be included in all copies or substantial portions
128// of the Software.
129//
130// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
131// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
132// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
133// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
134// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
135// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
136// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
137// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
138// DEALINGS IN THE SOFTWARE.
139
140mod array;
141mod context;
142mod counter;
143mod error;
144mod list;
145mod select;
146mod utils;
147mod waker;
148mod zero;
149
150pub use error::*;
151
152use crate::fmt;
153use crate::panic::{RefUnwindSafe, UnwindSafe};
154use crate::time::{Duration, Instant};
155
156/// Creates a new asynchronous channel, returning the sender/receiver halves.
157///
158/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
159/// the same order as it was sent, and no [`send`] will block the calling thread
160/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
161/// block after its buffer limit is reached). [`recv`] will block until a message
162/// is available while there is at least one [`Sender`] alive (including clones).
163///
164/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times.
165/// The [`Receiver`] also can be cloned to have multi receivers.
166///
167/// If the [`Receiver`] is disconnected while trying to [`send`] with the
168/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
169/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
170/// return a [`RecvError`].
171///
172/// [`send`]: Sender::send
173/// [`recv`]: Receiver::recv
174///
175/// # Examples
176///
177/// ```
178/// #![feature(mpmc_channel)]
179///
180/// use std::sync::mpmc::channel;
181/// use std::thread;
182///
183/// let (sender, receiver) = channel();
184///
185/// // Spawn off an expensive computation
186/// thread::spawn(move || {
187/// #   fn expensive_computation() {}
188///     sender.send(expensive_computation()).unwrap();
189/// });
190///
191/// // Do some useful work for a while
192///
193/// // Let's see what that answer was
194/// println!("{:?}", receiver.recv().unwrap());
195/// ```
196#[must_use]
197#[unstable(feature = "mpmc_channel", issue = "126840")]
198pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
199    let (s, r) = counter::new(list::Channel::new());
200    let s = Sender { flavor: SenderFlavor::List(s) };
201    let r = Receiver { flavor: ReceiverFlavor::List(r) };
202    (s, r)
203}
204
205/// Creates a new synchronous, bounded channel.
206///
207/// All data sent on the [`Sender`] will become available on the [`Receiver`]
208/// in the same order as it was sent. Like asynchronous [`channel`]s, the
209/// [`Receiver`] will block until a message becomes available. `sync_channel`
210/// differs greatly in the semantics of the sender, however.
211///
212/// This channel has an internal buffer on which messages will be queued.
213/// `bound` specifies the buffer size. When the internal buffer becomes full,
214/// future sends will *block* waiting for the buffer to open up. Note that a
215/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
216/// where each [`send`] will not return until a [`recv`] is paired with it.
217///
218/// The [`Sender`] can be cloned to [`send`] to the same channel multiple
219/// times. The [`Receiver`] also can be cloned to have multi receivers.
220///
221/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
222/// to [`send`] with the [`Sender`], the [`send`] method will return a
223/// [`SendError`]. Similarly, If the [`Sender`] is disconnected while trying
224/// to [`recv`], the [`recv`] method will return a [`RecvError`].
225///
226/// [`send`]: Sender::send
227/// [`recv`]: Receiver::recv
228///
229/// # Examples
230///
231/// ```
232/// use std::sync::mpsc::sync_channel;
233/// use std::thread;
234///
235/// let (sender, receiver) = sync_channel(1);
236///
237/// // this returns immediately
238/// sender.send(1).unwrap();
239///
240/// thread::spawn(move || {
241///     // this will block until the previous message has been received
242///     sender.send(2).unwrap();
243/// });
244///
245/// assert_eq!(receiver.recv().unwrap(), 1);
246/// assert_eq!(receiver.recv().unwrap(), 2);
247/// ```
248#[must_use]
249#[unstable(feature = "mpmc_channel", issue = "126840")]
250pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
251    if cap == 0 {
252        let (s, r) = counter::new(zero::Channel::new());
253        let s = Sender { flavor: SenderFlavor::Zero(s) };
254        let r = Receiver { flavor: ReceiverFlavor::Zero(r) };
255        (s, r)
256    } else {
257        let (s, r) = counter::new(array::Channel::with_capacity(cap));
258        let s = Sender { flavor: SenderFlavor::Array(s) };
259        let r = Receiver { flavor: ReceiverFlavor::Array(r) };
260        (s, r)
261    }
262}
263
264/// The sending-half of Rust's synchronous [`channel`] type.
265///
266/// Messages can be sent through this channel with [`send`].
267///
268/// Note: all senders (the original and its clones) need to be dropped for the receiver
269/// to stop blocking to receive messages with [`Receiver::recv`].
270///
271/// [`send`]: Sender::send
272///
273/// # Examples
274///
275/// ```rust
276/// #![feature(mpmc_channel)]
277///
278/// use std::sync::mpmc::channel;
279/// use std::thread;
280///
281/// let (sender, receiver) = channel();
282/// let sender2 = sender.clone();
283///
284/// // First thread owns sender
285/// thread::spawn(move || {
286///     sender.send(1).unwrap();
287/// });
288///
289/// // Second thread owns sender2
290/// thread::spawn(move || {
291///     sender2.send(2).unwrap();
292/// });
293///
294/// let msg = receiver.recv().unwrap();
295/// let msg2 = receiver.recv().unwrap();
296///
297/// assert_eq!(3, msg + msg2);
298/// ```
299#[unstable(feature = "mpmc_channel", issue = "126840")]
300pub struct Sender<T> {
301    flavor: SenderFlavor<T>,
302}
303
304/// Sender flavors.
305enum SenderFlavor<T> {
306    /// Bounded channel based on a preallocated array.
307    Array(counter::Sender<array::Channel<T>>),
308
309    /// Unbounded channel implemented as a linked list.
310    List(counter::Sender<list::Channel<T>>),
311
312    /// Zero-capacity channel.
313    Zero(counter::Sender<zero::Channel<T>>),
314}
315
316#[unstable(feature = "mpmc_channel", issue = "126840")]
317unsafe impl<T: Send> Send for Sender<T> {}
318#[unstable(feature = "mpmc_channel", issue = "126840")]
319unsafe impl<T: Send> Sync for Sender<T> {}
320
321#[unstable(feature = "mpmc_channel", issue = "126840")]
322impl<T> UnwindSafe for Sender<T> {}
323#[unstable(feature = "mpmc_channel", issue = "126840")]
324impl<T> RefUnwindSafe for Sender<T> {}
325
326impl<T> Sender<T> {
327    /// Attempts to send a message into the channel without blocking.
328    ///
329    /// This method will either send a message into the channel immediately or return an error if
330    /// the channel is full or disconnected. The returned error contains the original message.
331    ///
332    /// If called on a zero-capacity channel, this method will send the message only if there
333    /// happens to be a receive operation on the other side of the channel at the same time.
334    ///
335    /// # Examples
336    ///
337    /// ```rust
338    /// #![feature(mpmc_channel)]
339    ///
340    /// use std::sync::mpmc::{channel, Receiver, Sender};
341    ///
342    /// let (sender, _receiver): (Sender<i32>, Receiver<i32>) = channel();
343    ///
344    /// assert!(sender.try_send(1).is_ok());
345    /// ```
346    #[unstable(feature = "mpmc_channel", issue = "126840")]
347    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
348        match &self.flavor {
349            SenderFlavor::Array(chan) => chan.try_send(msg),
350            SenderFlavor::List(chan) => chan.try_send(msg),
351            SenderFlavor::Zero(chan) => chan.try_send(msg),
352        }
353    }
354
355    /// Attempts to send a value on this channel, returning it back if it could
356    /// not be sent.
357    ///
358    /// A successful send occurs when it is determined that the other end of
359    /// the channel has not hung up already. An unsuccessful send would be one
360    /// where the corresponding receiver has already been deallocated. Note
361    /// that a return value of [`Err`] means that the data will never be
362    /// received, but a return value of [`Ok`] does *not* mean that the data
363    /// will be received. It is possible for the corresponding receiver to
364    /// hang up immediately after this function returns [`Ok`]. However, if
365    /// the channel is zero-capacity, it acts as a rendezvous channel and a
366    /// return value of [`Ok`] means that the data has been received.
367    ///
368    /// If the channel is full and not disconnected, this call will block until
369    /// the send operation can proceed. If the channel becomes disconnected,
370    /// this call will wake up and return an error. The returned error contains
371    /// the original message.
372    ///
373    /// If called on a zero-capacity channel, this method will wait for a receive
374    /// operation to appear on the other side of the channel.
375    ///
376    /// # Examples
377    ///
378    /// ```
379    /// #![feature(mpmc_channel)]
380    ///
381    /// use std::sync::mpmc::channel;
382    ///
383    /// let (tx, rx) = channel();
384    ///
385    /// // This send is always successful
386    /// tx.send(1).unwrap();
387    ///
388    /// // This send will fail because the receiver is gone
389    /// drop(rx);
390    /// assert!(tx.send(1).is_err());
391    /// ```
392    #[unstable(feature = "mpmc_channel", issue = "126840")]
393    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
394        match &self.flavor {
395            SenderFlavor::Array(chan) => chan.send(msg, None),
396            SenderFlavor::List(chan) => chan.send(msg, None),
397            SenderFlavor::Zero(chan) => chan.send(msg, None),
398        }
399        .map_err(|err| match err {
400            SendTimeoutError::Disconnected(msg) => SendError(msg),
401            SendTimeoutError::Timeout(_) => unreachable!(),
402        })
403    }
404}
405
406impl<T> Sender<T> {
407    /// Waits for a message to be sent into the channel, but only for a limited time.
408    ///
409    /// If the channel is full and not disconnected, this call will block until the send operation
410    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
411    /// wake up and return an error. The returned error contains the original message.
412    ///
413    /// If called on a zero-capacity channel, this method will wait for a receive operation to
414    /// appear on the other side of the channel.
415    ///
416    /// # Examples
417    ///
418    /// ```
419    /// #![feature(mpmc_channel)]
420    ///
421    /// use std::sync::mpmc::channel;
422    /// use std::time::Duration;
423    ///
424    /// let (tx, rx) = channel();
425    ///
426    /// tx.send_timeout(1, Duration::from_millis(400)).unwrap();
427    /// ```
428    #[unstable(feature = "mpmc_channel", issue = "126840")]
429    pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
430        match Instant::now().checked_add(timeout) {
431            Some(deadline) => self.send_deadline(msg, deadline),
432            // So far in the future that it's practically the same as waiting indefinitely.
433            None => self.send(msg).map_err(SendTimeoutError::from),
434        }
435    }
436
437    /// Waits for a message to be sent into the channel, but only until a given deadline.
438    ///
439    /// If the channel is full and not disconnected, this call will block until the send operation
440    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
441    /// wake up and return an error. The returned error contains the original message.
442    ///
443    /// If called on a zero-capacity channel, this method will wait for a receive operation to
444    /// appear on the other side of the channel.
445    ///
446    /// # Examples
447    ///
448    /// ```
449    /// #![feature(mpmc_channel)]
450    ///
451    /// use std::sync::mpmc::channel;
452    /// use std::time::{Duration, Instant};
453    ///
454    /// let (tx, rx) = channel();
455    ///
456    /// let t = Instant::now() + Duration::from_millis(400);
457    /// tx.send_deadline(1, t).unwrap();
458    /// ```
459    #[unstable(feature = "mpmc_channel", issue = "126840")]
460    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
461        match &self.flavor {
462            SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
463            SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
464            SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
465        }
466    }
467
468    /// Returns `true` if the channel is empty.
469    ///
470    /// Note: Zero-capacity channels are always empty.
471    ///
472    /// # Examples
473    ///
474    /// ```
475    /// #![feature(mpmc_channel)]
476    ///
477    /// use std::sync::mpmc;
478    /// use std::thread;
479    ///
480    /// let (send, _recv) = mpmc::channel();
481    ///
482    /// let tx1 = send.clone();
483    /// let tx2 = send.clone();
484    ///
485    /// assert!(tx1.is_empty());
486    ///
487    /// let handle = thread::spawn(move || {
488    ///     tx2.send(1u8).unwrap();
489    /// });
490    ///
491    /// handle.join().unwrap();
492    ///
493    /// assert!(!tx1.is_empty());
494    /// ```
495    #[unstable(feature = "mpmc_channel", issue = "126840")]
496    pub fn is_empty(&self) -> bool {
497        match &self.flavor {
498            SenderFlavor::Array(chan) => chan.is_empty(),
499            SenderFlavor::List(chan) => chan.is_empty(),
500            SenderFlavor::Zero(chan) => chan.is_empty(),
501        }
502    }
503
504    /// Returns `true` if the channel is full.
505    ///
506    /// Note: Zero-capacity channels are always full.
507    ///
508    /// # Examples
509    ///
510    /// ```
511    /// #![feature(mpmc_channel)]
512    ///
513    /// use std::sync::mpmc;
514    /// use std::thread;
515    ///
516    /// let (send, _recv) = mpmc::sync_channel(1);
517    ///
518    /// let (tx1, tx2) = (send.clone(), send.clone());
519    /// assert!(!tx1.is_full());
520    ///
521    /// let handle = thread::spawn(move || {
522    ///     tx2.send(1u8).unwrap();
523    /// });
524    ///
525    /// handle.join().unwrap();
526    ///
527    /// assert!(tx1.is_full());
528    /// ```
529    #[unstable(feature = "mpmc_channel", issue = "126840")]
530    pub fn is_full(&self) -> bool {
531        match &self.flavor {
532            SenderFlavor::Array(chan) => chan.is_full(),
533            SenderFlavor::List(chan) => chan.is_full(),
534            SenderFlavor::Zero(chan) => chan.is_full(),
535        }
536    }
537
538    /// Returns the number of messages in the channel.
539    ///
540    /// # Examples
541    ///
542    /// ```
543    /// #![feature(mpmc_channel)]
544    ///
545    /// use std::sync::mpmc;
546    /// use std::thread;
547    ///
548    /// let (send, _recv) = mpmc::channel();
549    /// let (tx1, tx2) = (send.clone(), send.clone());
550    ///
551    /// assert_eq!(tx1.len(), 0);
552    ///
553    /// let handle = thread::spawn(move || {
554    ///     tx2.send(1u8).unwrap();
555    /// });
556    ///
557    /// handle.join().unwrap();
558    ///
559    /// assert_eq!(tx1.len(), 1);
560    /// ```
561    #[unstable(feature = "mpmc_channel", issue = "126840")]
562    pub fn len(&self) -> usize {
563        match &self.flavor {
564            SenderFlavor::Array(chan) => chan.len(),
565            SenderFlavor::List(chan) => chan.len(),
566            SenderFlavor::Zero(chan) => chan.len(),
567        }
568    }
569
570    /// If the channel is bounded, returns its capacity.
571    ///
572    /// # Examples
573    ///
574    /// ```
575    /// #![feature(mpmc_channel)]
576    ///
577    /// use std::sync::mpmc;
578    /// use std::thread;
579    ///
580    /// let (send, _recv) = mpmc::sync_channel(3);
581    /// let (tx1, tx2) = (send.clone(), send.clone());
582    ///
583    /// assert_eq!(tx1.capacity(), Some(3));
584    ///
585    /// let handle = thread::spawn(move || {
586    ///     tx2.send(1u8).unwrap();
587    /// });
588    ///
589    /// handle.join().unwrap();
590    ///
591    /// assert_eq!(tx1.capacity(), Some(3));
592    /// ```
593    #[unstable(feature = "mpmc_channel", issue = "126840")]
594    pub fn capacity(&self) -> Option<usize> {
595        match &self.flavor {
596            SenderFlavor::Array(chan) => chan.capacity(),
597            SenderFlavor::List(chan) => chan.capacity(),
598            SenderFlavor::Zero(chan) => chan.capacity(),
599        }
600    }
601
602    /// Returns `true` if senders belong to the same channel.
603    ///
604    /// # Examples
605    ///
606    /// ```
607    /// #![feature(mpmc_channel)]
608    ///
609    /// use std::sync::mpmc;
610    ///
611    /// let (tx1, _) = mpmc::channel::<i32>();
612    /// let (tx2, _) = mpmc::channel::<i32>();
613    ///
614    /// assert!(tx1.same_channel(&tx1));
615    /// assert!(!tx1.same_channel(&tx2));
616    /// ```
617    #[unstable(feature = "mpmc_channel", issue = "126840")]
618    pub fn same_channel(&self, other: &Sender<T>) -> bool {
619        match (&self.flavor, &other.flavor) {
620            (SenderFlavor::Array(a), SenderFlavor::Array(b)) => a == b,
621            (SenderFlavor::List(a), SenderFlavor::List(b)) => a == b,
622            (SenderFlavor::Zero(a), SenderFlavor::Zero(b)) => a == b,
623            _ => false,
624        }
625    }
626}
627
628#[unstable(feature = "mpmc_channel", issue = "126840")]
629impl<T> Drop for Sender<T> {
630    fn drop(&mut self) {
631        unsafe {
632            match &self.flavor {
633                SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()),
634                SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
635                SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
636            }
637        }
638    }
639}
640
641#[unstable(feature = "mpmc_channel", issue = "126840")]
642impl<T> Clone for Sender<T> {
643    fn clone(&self) -> Self {
644        let flavor = match &self.flavor {
645            SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
646            SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
647            SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
648        };
649
650        Sender { flavor }
651    }
652}
653
654#[unstable(feature = "mpmc_channel", issue = "126840")]
655impl<T> fmt::Debug for Sender<T> {
656    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657        f.pad("Sender { .. }")
658    }
659}
660
661/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
662/// Different threads can share this [`Receiver`] by cloning it.
663///
664/// Messages sent to the channel can be retrieved using [`recv`].
665///
666/// [`recv`]: Receiver::recv
667///
668/// # Examples
669///
670/// ```rust
671/// #![feature(mpmc_channel)]
672///
673/// use std::sync::mpmc::channel;
674/// use std::thread;
675/// use std::time::Duration;
676///
677/// let (send, recv) = channel();
678///
679/// let tx_thread = thread::spawn(move || {
680///     send.send("Hello world!").unwrap();
681///     thread::sleep(Duration::from_secs(2)); // block for two seconds
682///     send.send("Delayed for 2 seconds").unwrap();
683/// });
684///
685/// let (rx1, rx2) = (recv.clone(), recv.clone());
686/// let rx_thread_1 = thread::spawn(move || {
687///     println!("{}", rx1.recv().unwrap()); // Received immediately
688/// });
689/// let rx_thread_2 = thread::spawn(move || {
690///     println!("{}", rx2.recv().unwrap()); // Received after 2 seconds
691/// });
692///
693/// tx_thread.join().unwrap();
694/// rx_thread_1.join().unwrap();
695/// rx_thread_2.join().unwrap();
696/// ```
697#[unstable(feature = "mpmc_channel", issue = "126840")]
698pub struct Receiver<T> {
699    flavor: ReceiverFlavor<T>,
700}
701
702/// An iterator over messages on a [`Receiver`], created by [`iter`].
703///
704/// This iterator will block whenever [`next`] is called,
705/// waiting for a new message, and [`None`] will be returned
706/// when the corresponding channel has hung up.
707///
708/// [`iter`]: Receiver::iter
709/// [`next`]: Iterator::next
710///
711/// # Examples
712///
713/// ```rust
714/// #![feature(mpmc_channel)]
715///
716/// use std::sync::mpmc::channel;
717/// use std::thread;
718///
719/// let (send, recv) = channel();
720///
721/// thread::spawn(move || {
722///     send.send(1u8).unwrap();
723///     send.send(2u8).unwrap();
724///     send.send(3u8).unwrap();
725/// });
726///
727/// for x in recv.iter() {
728///     println!("Got: {x}");
729/// }
730/// ```
731#[unstable(feature = "mpmc_channel", issue = "126840")]
732#[derive(Debug)]
733pub struct Iter<'a, T: 'a> {
734    rx: &'a Receiver<T>,
735}
736
737/// An iterator that attempts to yield all pending values for a [`Receiver`],
738/// created by [`try_iter`].
739///
740/// [`None`] will be returned when there are no pending values remaining or
741/// if the corresponding channel has hung up.
742///
743/// This iterator will never block the caller in order to wait for data to
744/// become available. Instead, it will return [`None`].
745///
746/// [`try_iter`]: Receiver::try_iter
747///
748/// # Examples
749///
750/// ```rust
751/// #![feature(mpmc_channel)]
752///
753/// use std::sync::mpmc::channel;
754/// use std::thread;
755/// use std::time::Duration;
756///
757/// let (sender, receiver) = channel();
758///
759/// // Nothing is in the buffer yet
760/// assert!(receiver.try_iter().next().is_none());
761/// println!("Nothing in the buffer...");
762///
763/// thread::spawn(move || {
764///     sender.send(1).unwrap();
765///     sender.send(2).unwrap();
766///     sender.send(3).unwrap();
767/// });
768///
769/// println!("Going to sleep...");
770/// thread::sleep(Duration::from_secs(2)); // block for two seconds
771///
772/// for x in receiver.try_iter() {
773///     println!("Got: {x}");
774/// }
775/// ```
776#[unstable(feature = "mpmc_channel", issue = "126840")]
777#[derive(Debug)]
778pub struct TryIter<'a, T: 'a> {
779    rx: &'a Receiver<T>,
780}
781
782/// An owning iterator over messages on a [`Receiver`],
783/// created by [`into_iter`].
784///
785/// This iterator will block whenever [`next`]
786/// is called, waiting for a new message, and [`None`] will be
787/// returned if the corresponding channel has hung up.
788///
789/// [`into_iter`]: Receiver::into_iter
790/// [`next`]: Iterator::next
791///
792/// # Examples
793///
794/// ```rust
795/// #![feature(mpmc_channel)]
796///
797/// use std::sync::mpmc::channel;
798/// use std::thread;
799///
800/// let (send, recv) = channel();
801///
802/// thread::spawn(move || {
803///     send.send(1u8).unwrap();
804///     send.send(2u8).unwrap();
805///     send.send(3u8).unwrap();
806/// });
807///
808/// for x in recv.into_iter() {
809///     println!("Got: {x}");
810/// }
811/// ```
812#[unstable(feature = "mpmc_channel", issue = "126840")]
813#[derive(Debug)]
814pub struct IntoIter<T> {
815    rx: Receiver<T>,
816}
817
818#[unstable(feature = "mpmc_channel", issue = "126840")]
819impl<'a, T> Iterator for Iter<'a, T> {
820    type Item = T;
821
822    fn next(&mut self) -> Option<T> {
823        self.rx.recv().ok()
824    }
825}
826
827#[unstable(feature = "mpmc_channel", issue = "126840")]
828impl<'a, T> Iterator for TryIter<'a, T> {
829    type Item = T;
830
831    fn next(&mut self) -> Option<T> {
832        self.rx.try_recv().ok()
833    }
834}
835
836#[unstable(feature = "mpmc_channel", issue = "126840")]
837impl<'a, T> IntoIterator for &'a Receiver<T> {
838    type Item = T;
839    type IntoIter = Iter<'a, T>;
840
841    fn into_iter(self) -> Iter<'a, T> {
842        self.iter()
843    }
844}
845
846#[unstable(feature = "mpmc_channel", issue = "126840")]
847impl<T> Iterator for IntoIter<T> {
848    type Item = T;
849    fn next(&mut self) -> Option<T> {
850        self.rx.recv().ok()
851    }
852}
853
854#[unstable(feature = "mpmc_channel", issue = "126840")]
855impl<T> IntoIterator for Receiver<T> {
856    type Item = T;
857    type IntoIter = IntoIter<T>;
858
859    fn into_iter(self) -> IntoIter<T> {
860        IntoIter { rx: self }
861    }
862}
863
864/// Receiver flavors.
865enum ReceiverFlavor<T> {
866    /// Bounded channel based on a preallocated array.
867    Array(counter::Receiver<array::Channel<T>>),
868
869    /// Unbounded channel implemented as a linked list.
870    List(counter::Receiver<list::Channel<T>>),
871
872    /// Zero-capacity channel.
873    Zero(counter::Receiver<zero::Channel<T>>),
874}
875
876#[unstable(feature = "mpmc_channel", issue = "126840")]
877unsafe impl<T: Send> Send for Receiver<T> {}
878#[unstable(feature = "mpmc_channel", issue = "126840")]
879unsafe impl<T: Send> Sync for Receiver<T> {}
880
881#[unstable(feature = "mpmc_channel", issue = "126840")]
882impl<T> UnwindSafe for Receiver<T> {}
883#[unstable(feature = "mpmc_channel", issue = "126840")]
884impl<T> RefUnwindSafe for Receiver<T> {}
885
886impl<T> Receiver<T> {
887    /// Attempts to receive a message from the channel without blocking.
888    ///
889    /// This method will never block the caller in order to wait for data to
890    /// become available. Instead, this will always return immediately with a
891    /// possible option of pending data on the channel.
892    ///
893    /// If called on a zero-capacity channel, this method will receive a message only if there
894    /// happens to be a send operation on the other side of the channel at the same time.
895    ///
896    /// This is useful for a flavor of "optimistic check" before deciding to
897    /// block on a receiver.
898    ///
899    /// Compared with [`recv`], this function has two failure cases instead of one
900    /// (one for disconnection, one for an empty buffer).
901    ///
902    /// [`recv`]: Self::recv
903    ///
904    /// # Examples
905    ///
906    /// ```rust
907    /// #![feature(mpmc_channel)]
908    ///
909    /// use std::sync::mpmc::{Receiver, channel};
910    ///
911    /// let (_, receiver): (_, Receiver<i32>) = channel();
912    ///
913    /// assert!(receiver.try_recv().is_err());
914    /// ```
915    #[unstable(feature = "mpmc_channel", issue = "126840")]
916    pub fn try_recv(&self) -> Result<T, TryRecvError> {
917        match &self.flavor {
918            ReceiverFlavor::Array(chan) => chan.try_recv(),
919            ReceiverFlavor::List(chan) => chan.try_recv(),
920            ReceiverFlavor::Zero(chan) => chan.try_recv(),
921        }
922    }
923
924    /// Attempts to wait for a value on this receiver, returning an error if the
925    /// corresponding channel has hung up.
926    ///
927    /// This function will always block the current thread if there is no data
928    /// available and it's possible for more data to be sent (at least one sender
929    /// still exists). Once a message is sent to the corresponding [`Sender`],
930    /// this receiver will wake up and return that message.
931    ///
932    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
933    /// this call is blocking, this call will wake up and return [`Err`] to
934    /// indicate that no more messages can ever be received on this channel.
935    /// However, since channels are buffered, messages sent before the disconnect
936    /// will still be properly received.
937    ///
938    /// # Examples
939    ///
940    /// ```
941    /// #![feature(mpmc_channel)]
942    ///
943    /// use std::sync::mpmc;
944    /// use std::thread;
945    ///
946    /// let (send, recv) = mpmc::channel();
947    /// let handle = thread::spawn(move || {
948    ///     send.send(1u8).unwrap();
949    /// });
950    ///
951    /// handle.join().unwrap();
952    ///
953    /// assert_eq!(Ok(1), recv.recv());
954    /// ```
955    ///
956    /// Buffering behavior:
957    ///
958    /// ```
959    /// #![feature(mpmc_channel)]
960    ///
961    /// use std::sync::mpmc;
962    /// use std::thread;
963    /// use std::sync::mpmc::RecvError;
964    ///
965    /// let (send, recv) = mpmc::channel();
966    /// let handle = thread::spawn(move || {
967    ///     send.send(1u8).unwrap();
968    ///     send.send(2).unwrap();
969    ///     send.send(3).unwrap();
970    ///     drop(send);
971    /// });
972    ///
973    /// // wait for the thread to join so we ensure the sender is dropped
974    /// handle.join().unwrap();
975    ///
976    /// assert_eq!(Ok(1), recv.recv());
977    /// assert_eq!(Ok(2), recv.recv());
978    /// assert_eq!(Ok(3), recv.recv());
979    /// assert_eq!(Err(RecvError), recv.recv());
980    /// ```
981    #[unstable(feature = "mpmc_channel", issue = "126840")]
982    pub fn recv(&self) -> Result<T, RecvError> {
983        match &self.flavor {
984            ReceiverFlavor::Array(chan) => chan.recv(None),
985            ReceiverFlavor::List(chan) => chan.recv(None),
986            ReceiverFlavor::Zero(chan) => chan.recv(None),
987        }
988        .map_err(|_| RecvError)
989    }
990
991    /// Attempts to wait for a value on this receiver, returning an error if the
992    /// corresponding channel has hung up, or if it waits more than `timeout`.
993    ///
994    /// This function will always block the current thread if there is no data
995    /// available and it's possible for more data to be sent (at least one sender
996    /// still exists). Once a message is sent to the corresponding [`Sender`],
997    /// this receiver will wake up and return that message.
998    ///
999    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1000    /// this call is blocking, this call will wake up and return [`Err`] to
1001    /// indicate that no more messages can ever be received on this channel.
1002    /// However, since channels are buffered, messages sent before the disconnect
1003    /// will still be properly received.
1004    ///
1005    /// # Examples
1006    ///
1007    /// Successfully receiving value before encountering timeout:
1008    ///
1009    /// ```no_run
1010    /// #![feature(mpmc_channel)]
1011    ///
1012    /// use std::thread;
1013    /// use std::time::Duration;
1014    /// use std::sync::mpmc;
1015    ///
1016    /// let (send, recv) = mpmc::channel();
1017    ///
1018    /// thread::spawn(move || {
1019    ///     send.send('a').unwrap();
1020    /// });
1021    ///
1022    /// assert_eq!(
1023    ///     recv.recv_timeout(Duration::from_millis(400)),
1024    ///     Ok('a')
1025    /// );
1026    /// ```
1027    ///
1028    /// Receiving an error upon reaching timeout:
1029    ///
1030    /// ```no_run
1031    /// #![feature(mpmc_channel)]
1032    ///
1033    /// use std::thread;
1034    /// use std::time::Duration;
1035    /// use std::sync::mpmc;
1036    ///
1037    /// let (send, recv) = mpmc::channel();
1038    ///
1039    /// thread::spawn(move || {
1040    ///     thread::sleep(Duration::from_millis(800));
1041    ///     send.send('a').unwrap();
1042    /// });
1043    ///
1044    /// assert_eq!(
1045    ///     recv.recv_timeout(Duration::from_millis(400)),
1046    ///     Err(mpmc::RecvTimeoutError::Timeout)
1047    /// );
1048    /// ```
1049    #[unstable(feature = "mpmc_channel", issue = "126840")]
1050    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1051        match Instant::now().checked_add(timeout) {
1052            Some(deadline) => self.recv_deadline(deadline),
1053            // So far in the future that it's practically the same as waiting indefinitely.
1054            None => self.recv().map_err(RecvTimeoutError::from),
1055        }
1056    }
1057
1058    /// Attempts to wait for a value on this receiver, returning an error if the
1059    /// corresponding channel has hung up, or if `deadline` is reached.
1060    ///
1061    /// This function will always block the current thread if there is no data
1062    /// available and it's possible for more data to be sent. Once a message is
1063    /// sent to the corresponding [`Sender`], then this receiver will wake up
1064    /// and return that message.
1065    ///
1066    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1067    /// this call is blocking, this call will wake up and return [`Err`] to
1068    /// indicate that no more messages can ever be received on this channel.
1069    /// However, since channels are buffered, messages sent before the disconnect
1070    /// will still be properly received.
1071    ///
1072    /// # Examples
1073    ///
1074    /// Successfully receiving value before reaching deadline:
1075    ///
1076    /// ```no_run
1077    /// #![feature(mpmc_channel)]
1078    ///
1079    /// use std::thread;
1080    /// use std::time::{Duration, Instant};
1081    /// use std::sync::mpmc;
1082    ///
1083    /// let (send, recv) = mpmc::channel();
1084    ///
1085    /// thread::spawn(move || {
1086    ///     send.send('a').unwrap();
1087    /// });
1088    ///
1089    /// assert_eq!(
1090    ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1091    ///     Ok('a')
1092    /// );
1093    /// ```
1094    ///
1095    /// Receiving an error upon reaching deadline:
1096    ///
1097    /// ```no_run
1098    /// #![feature(mpmc_channel)]
1099    ///
1100    /// use std::thread;
1101    /// use std::time::{Duration, Instant};
1102    /// use std::sync::mpmc;
1103    ///
1104    /// let (send, recv) = mpmc::channel();
1105    ///
1106    /// thread::spawn(move || {
1107    ///     thread::sleep(Duration::from_millis(800));
1108    ///     send.send('a').unwrap();
1109    /// });
1110    ///
1111    /// assert_eq!(
1112    ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1113    ///     Err(mpmc::RecvTimeoutError::Timeout)
1114    /// );
1115    /// ```
1116    #[unstable(feature = "mpmc_channel", issue = "126840")]
1117    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1118        match &self.flavor {
1119            ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
1120            ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
1121            ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
1122        }
1123    }
1124
1125    /// Returns an iterator that will attempt to yield all pending values.
1126    /// It will return `None` if there are no more pending values or if the
1127    /// channel has hung up. The iterator will never [`panic!`] or block the
1128    /// user by waiting for values.
1129    ///
1130    /// # Examples
1131    ///
1132    /// ```no_run
1133    /// #![feature(mpmc_channel)]
1134    ///
1135    /// use std::sync::mpmc::channel;
1136    /// use std::thread;
1137    /// use std::time::Duration;
1138    ///
1139    /// let (sender, receiver) = channel();
1140    ///
1141    /// // nothing is in the buffer yet
1142    /// assert!(receiver.try_iter().next().is_none());
1143    ///
1144    /// thread::spawn(move || {
1145    ///     thread::sleep(Duration::from_secs(1));
1146    ///     sender.send(1).unwrap();
1147    ///     sender.send(2).unwrap();
1148    ///     sender.send(3).unwrap();
1149    /// });
1150    ///
1151    /// // nothing is in the buffer yet
1152    /// assert!(receiver.try_iter().next().is_none());
1153    ///
1154    /// // block for two seconds
1155    /// thread::sleep(Duration::from_secs(2));
1156    ///
1157    /// let mut iter = receiver.try_iter();
1158    /// assert_eq!(iter.next(), Some(1));
1159    /// assert_eq!(iter.next(), Some(2));
1160    /// assert_eq!(iter.next(), Some(3));
1161    /// assert_eq!(iter.next(), None);
1162    /// ```
1163    #[unstable(feature = "mpmc_channel", issue = "126840")]
1164    pub fn try_iter(&self) -> TryIter<'_, T> {
1165        TryIter { rx: self }
1166    }
1167}
1168
1169impl<T> Receiver<T> {
1170    /// Returns `true` if the channel is empty.
1171    ///
1172    /// Note: Zero-capacity channels are always empty.
1173    ///
1174    /// # Examples
1175    ///
1176    /// ```
1177    /// #![feature(mpmc_channel)]
1178    ///
1179    /// use std::sync::mpmc;
1180    /// use std::thread;
1181    ///
1182    /// let (send, recv) = mpmc::channel();
1183    ///
1184    /// assert!(recv.is_empty());
1185    ///
1186    /// let handle = thread::spawn(move || {
1187    ///     send.send(1u8).unwrap();
1188    /// });
1189    ///
1190    /// handle.join().unwrap();
1191    ///
1192    /// assert!(!recv.is_empty());
1193    /// ```
1194    #[unstable(feature = "mpmc_channel", issue = "126840")]
1195    pub fn is_empty(&self) -> bool {
1196        match &self.flavor {
1197            ReceiverFlavor::Array(chan) => chan.is_empty(),
1198            ReceiverFlavor::List(chan) => chan.is_empty(),
1199            ReceiverFlavor::Zero(chan) => chan.is_empty(),
1200        }
1201    }
1202
1203    /// Returns `true` if the channel is full.
1204    ///
1205    /// Note: Zero-capacity channels are always full.
1206    ///
1207    /// # Examples
1208    ///
1209    /// ```
1210    /// #![feature(mpmc_channel)]
1211    ///
1212    /// use std::sync::mpmc;
1213    /// use std::thread;
1214    ///
1215    /// let (send, recv) = mpmc::sync_channel(1);
1216    ///
1217    /// assert!(!recv.is_full());
1218    ///
1219    /// let handle = thread::spawn(move || {
1220    ///     send.send(1u8).unwrap();
1221    /// });
1222    ///
1223    /// handle.join().unwrap();
1224    ///
1225    /// assert!(recv.is_full());
1226    /// ```
1227    #[unstable(feature = "mpmc_channel", issue = "126840")]
1228    pub fn is_full(&self) -> bool {
1229        match &self.flavor {
1230            ReceiverFlavor::Array(chan) => chan.is_full(),
1231            ReceiverFlavor::List(chan) => chan.is_full(),
1232            ReceiverFlavor::Zero(chan) => chan.is_full(),
1233        }
1234    }
1235
1236    /// Returns the number of messages in the channel.
1237    ///
1238    /// # Examples
1239    ///
1240    /// ```
1241    /// #![feature(mpmc_channel)]
1242    ///
1243    /// use std::sync::mpmc;
1244    /// use std::thread;
1245    ///
1246    /// let (send, recv) = mpmc::channel();
1247    ///
1248    /// assert_eq!(recv.len(), 0);
1249    ///
1250    /// let handle = thread::spawn(move || {
1251    ///     send.send(1u8).unwrap();
1252    /// });
1253    ///
1254    /// handle.join().unwrap();
1255    ///
1256    /// assert_eq!(recv.len(), 1);
1257    /// ```
1258    #[unstable(feature = "mpmc_channel", issue = "126840")]
1259    pub fn len(&self) -> usize {
1260        match &self.flavor {
1261            ReceiverFlavor::Array(chan) => chan.len(),
1262            ReceiverFlavor::List(chan) => chan.len(),
1263            ReceiverFlavor::Zero(chan) => chan.len(),
1264        }
1265    }
1266
1267    /// If the channel is bounded, returns its capacity.
1268    ///
1269    /// # Examples
1270    ///
1271    /// ```
1272    /// #![feature(mpmc_channel)]
1273    ///
1274    /// use std::sync::mpmc;
1275    /// use std::thread;
1276    ///
1277    /// let (send, recv) = mpmc::sync_channel(3);
1278    ///
1279    /// assert_eq!(recv.capacity(), Some(3));
1280    ///
1281    /// let handle = thread::spawn(move || {
1282    ///     send.send(1u8).unwrap();
1283    /// });
1284    ///
1285    /// handle.join().unwrap();
1286    ///
1287    /// assert_eq!(recv.capacity(), Some(3));
1288    /// ```
1289    #[unstable(feature = "mpmc_channel", issue = "126840")]
1290    pub fn capacity(&self) -> Option<usize> {
1291        match &self.flavor {
1292            ReceiverFlavor::Array(chan) => chan.capacity(),
1293            ReceiverFlavor::List(chan) => chan.capacity(),
1294            ReceiverFlavor::Zero(chan) => chan.capacity(),
1295        }
1296    }
1297
1298    /// Returns `true` if receivers belong to the same channel.
1299    ///
1300    /// # Examples
1301    ///
1302    /// ```
1303    /// #![feature(mpmc_channel)]
1304    ///
1305    /// use std::sync::mpmc;
1306    ///
1307    /// let (_, rx1) = mpmc::channel::<i32>();
1308    /// let (_, rx2) = mpmc::channel::<i32>();
1309    ///
1310    /// assert!(rx1.same_channel(&rx1));
1311    /// assert!(!rx1.same_channel(&rx2));
1312    /// ```
1313    #[unstable(feature = "mpmc_channel", issue = "126840")]
1314    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1315        match (&self.flavor, &other.flavor) {
1316            (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1317            (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1318            (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1319            _ => false,
1320        }
1321    }
1322
1323    /// Returns an iterator that will block waiting for messages, but never
1324    /// [`panic!`]. It will return [`None`] when the channel has hung up.
1325    ///
1326    /// # Examples
1327    ///
1328    /// ```rust
1329    /// #![feature(mpmc_channel)]
1330    ///
1331    /// use std::sync::mpmc::channel;
1332    /// use std::thread;
1333    ///
1334    /// let (send, recv) = channel();
1335    ///
1336    /// thread::spawn(move || {
1337    ///     send.send(1).unwrap();
1338    ///     send.send(2).unwrap();
1339    ///     send.send(3).unwrap();
1340    /// });
1341    ///
1342    /// let mut iter = recv.iter();
1343    /// assert_eq!(iter.next(), Some(1));
1344    /// assert_eq!(iter.next(), Some(2));
1345    /// assert_eq!(iter.next(), Some(3));
1346    /// assert_eq!(iter.next(), None);
1347    /// ```
1348    #[unstable(feature = "mpmc_channel", issue = "126840")]
1349    pub fn iter(&self) -> Iter<'_, T> {
1350        Iter { rx: self }
1351    }
1352}
1353
1354#[unstable(feature = "mpmc_channel", issue = "126840")]
1355impl<T> Drop for Receiver<T> {
1356    fn drop(&mut self) {
1357        unsafe {
1358            match &self.flavor {
1359                ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()),
1360                ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1361                ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1362            }
1363        }
1364    }
1365}
1366
1367#[unstable(feature = "mpmc_channel", issue = "126840")]
1368impl<T> Clone for Receiver<T> {
1369    fn clone(&self) -> Self {
1370        let flavor = match &self.flavor {
1371            ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1372            ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1373            ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1374        };
1375
1376        Receiver { flavor }
1377    }
1378}
1379
1380#[unstable(feature = "mpmc_channel", issue = "126840")]
1381impl<T> fmt::Debug for Receiver<T> {
1382    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1383        f.pad("Receiver { .. }")
1384    }
1385}
1386
1387#[cfg(test)]
1388mod tests;