broker_tokio/sync/mpsc/
unbounded.rs

1use crate::loom::sync::atomic::AtomicUsize;
2use crate::sync::mpsc::chan;
3use crate::sync::mpsc::error::{SendError, TryRecvError};
4
5use std::fmt;
6use std::task::{Context, Poll};
7
8/// Send values to the associated `UnboundedReceiver`.
9///
10/// Instances are created by the
11/// [`unbounded_channel`](unbounded_channel) function.
12pub struct UnboundedSender<T> {
13    chan: chan::Tx<T, Semaphore>,
14}
15
16impl<T> Clone for UnboundedSender<T> {
17    fn clone(&self) -> Self {
18        UnboundedSender {
19            chan: self.chan.clone(),
20        }
21    }
22}
23
24impl<T> fmt::Debug for UnboundedSender<T> {
25    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
26        fmt.debug_struct("UnboundedSender")
27            .field("chan", &self.chan)
28            .finish()
29    }
30}
31
32/// Receive values from the associated `UnboundedSender`.
33///
34/// Instances are created by the
35/// [`unbounded_channel`](unbounded_channel) function.
36pub struct UnboundedReceiver<T> {
37    /// The channel receiver
38    chan: chan::Rx<T, Semaphore>,
39}
40
41impl<T> fmt::Debug for UnboundedReceiver<T> {
42    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
43        fmt.debug_struct("UnboundedReceiver")
44            .field("chan", &self.chan)
45            .finish()
46    }
47}
48
49impl<T> Clone for UnboundedReceiver<T> {
50    fn clone(&self) -> Self {
51        UnboundedReceiver {
52            chan: self.chan.clone(),
53        }
54    }
55}
56
57/// Create an unbounded mpsc channel for communicating between asynchronous
58/// tasks.
59///
60/// A `send` on this channel will always succeed as long as the receive half has
61/// not been closed. If the receiver falls behind, messages will be arbitrarily
62/// buffered.
63///
64/// **Note** that the amount of available system memory is an implicit bound to
65/// the channel. Using an `unbounded` channel has the ability of causing the
66/// process to run out of memory. In this case, the process will be aborted.
67pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
68    let (tx, rx) = chan::channel(AtomicUsize::new(0));
69
70    let tx = UnboundedSender::new(tx);
71    let rx = UnboundedReceiver::new(rx);
72
73    (tx, rx)
74}
75
76/// No capacity
77type Semaphore = AtomicUsize;
78
79impl<T> UnboundedReceiver<T> {
80    pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
81        UnboundedReceiver { chan }
82    }
83
84    #[doc(hidden)] // TODO: doc
85    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
86        self.chan.recv(cx)
87    }
88
89    /// Receive the next value for this receiver.
90    ///
91    /// `None` is returned when all `Sender` halves have dropped, indicating
92    /// that no further values can be sent on the channel.
93    ///
94    /// # Examples
95    ///
96    /// ```
97    /// use tokio::sync::mpsc;
98    ///
99    /// #[tokio::main]
100    /// async fn main() {
101    ///     let (tx, mut rx) = mpsc::unbounded_channel();
102    ///
103    ///     tokio::spawn(async move {
104    ///         tx.send("hello").unwrap();
105    ///     });
106    ///
107    ///     assert_eq!(Some("hello"), rx.recv().await);
108    ///     assert_eq!(None, rx.recv().await);
109    /// }
110    /// ```
111    ///
112    /// Values are buffered:
113    ///
114    /// ```
115    /// use tokio::sync::mpsc;
116    ///
117    /// #[tokio::main]
118    /// async fn main() {
119    ///     let (tx, mut rx) = mpsc::unbounded_channel();
120    ///
121    ///     tx.send("hello").unwrap();
122    ///     tx.send("world").unwrap();
123    ///
124    ///     assert_eq!(Some("hello"), rx.recv().await);
125    ///     assert_eq!(Some("world"), rx.recv().await);
126    /// }
127    /// ```
128    pub async fn recv(&mut self) -> Option<T> {
129        use crate::future::poll_fn;
130
131        poll_fn(|cx| self.poll_recv(cx)).await
132    }
133
134    /// Attempts to return a pending value on this receiver without blocking.
135    ///
136    /// This method will never block the caller in order to wait for data to
137    /// become available. Instead, this will always return immediately with
138    /// a possible option of pending data on the channel.
139    ///
140    /// This is useful for a flavor of "optimistic check" before deciding to
141    /// block on a receiver.
142    ///
143    /// Compared with recv, this function has two failure cases instead of
144    /// one (one for disconnection, one for an empty buffer).
145    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
146        self.chan.try_recv()
147    }
148
149    /// Closes the receiving half of a channel, without dropping it.
150    ///
151    /// This prevents any further messages from being sent on the channel while
152    /// still enabling the receiver to drain messages that are buffered.
153    pub fn close(&mut self) {
154        self.chan.close();
155    }
156}
157
158#[cfg(feature = "stream")]
159impl<T> crate::stream::Stream for UnboundedReceiver<T> {
160    type Item = T;
161
162    fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
163        self.poll_recv(cx)
164    }
165}
166
167impl<T> UnboundedSender<T> {
168    pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
169        UnboundedSender { chan }
170    }
171
172    /// Attempts to send a message on this `UnboundedSender` without blocking.
173    pub fn send(&self, message: T) -> Result<(), SendError<T>> {
174        self.chan.send_unbounded(message)?;
175        Ok(())
176    }
177}