broker_tokio/sync/mpsc/unbounded.rs
1use crate::loom::sync::atomic::AtomicUsize;
2use crate::sync::mpsc::chan;
3use crate::sync::mpsc::error::{SendError, TryRecvError};
4
5use std::fmt;
6use std::task::{Context, Poll};
7
8/// Send values to the associated `UnboundedReceiver`.
9///
10/// Instances are created by the
11/// [`unbounded_channel`](unbounded_channel) function.
12pub struct UnboundedSender<T> {
13 chan: chan::Tx<T, Semaphore>,
14}
15
16impl<T> Clone for UnboundedSender<T> {
17 fn clone(&self) -> Self {
18 UnboundedSender {
19 chan: self.chan.clone(),
20 }
21 }
22}
23
24impl<T> fmt::Debug for UnboundedSender<T> {
25 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
26 fmt.debug_struct("UnboundedSender")
27 .field("chan", &self.chan)
28 .finish()
29 }
30}
31
32/// Receive values from the associated `UnboundedSender`.
33///
34/// Instances are created by the
35/// [`unbounded_channel`](unbounded_channel) function.
36pub struct UnboundedReceiver<T> {
37 /// The channel receiver
38 chan: chan::Rx<T, Semaphore>,
39}
40
41impl<T> fmt::Debug for UnboundedReceiver<T> {
42 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
43 fmt.debug_struct("UnboundedReceiver")
44 .field("chan", &self.chan)
45 .finish()
46 }
47}
48
49impl<T> Clone for UnboundedReceiver<T> {
50 fn clone(&self) -> Self {
51 UnboundedReceiver {
52 chan: self.chan.clone(),
53 }
54 }
55}
56
57/// Create an unbounded mpsc channel for communicating between asynchronous
58/// tasks.
59///
60/// A `send` on this channel will always succeed as long as the receive half has
61/// not been closed. If the receiver falls behind, messages will be arbitrarily
62/// buffered.
63///
64/// **Note** that the amount of available system memory is an implicit bound to
65/// the channel. Using an `unbounded` channel has the ability of causing the
66/// process to run out of memory. In this case, the process will be aborted.
67pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
68 let (tx, rx) = chan::channel(AtomicUsize::new(0));
69
70 let tx = UnboundedSender::new(tx);
71 let rx = UnboundedReceiver::new(rx);
72
73 (tx, rx)
74}
75
76/// No capacity
77type Semaphore = AtomicUsize;
78
79impl<T> UnboundedReceiver<T> {
80 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
81 UnboundedReceiver { chan }
82 }
83
84 #[doc(hidden)] // TODO: doc
85 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
86 self.chan.recv(cx)
87 }
88
89 /// Receive the next value for this receiver.
90 ///
91 /// `None` is returned when all `Sender` halves have dropped, indicating
92 /// that no further values can be sent on the channel.
93 ///
94 /// # Examples
95 ///
96 /// ```
97 /// use tokio::sync::mpsc;
98 ///
99 /// #[tokio::main]
100 /// async fn main() {
101 /// let (tx, mut rx) = mpsc::unbounded_channel();
102 ///
103 /// tokio::spawn(async move {
104 /// tx.send("hello").unwrap();
105 /// });
106 ///
107 /// assert_eq!(Some("hello"), rx.recv().await);
108 /// assert_eq!(None, rx.recv().await);
109 /// }
110 /// ```
111 ///
112 /// Values are buffered:
113 ///
114 /// ```
115 /// use tokio::sync::mpsc;
116 ///
117 /// #[tokio::main]
118 /// async fn main() {
119 /// let (tx, mut rx) = mpsc::unbounded_channel();
120 ///
121 /// tx.send("hello").unwrap();
122 /// tx.send("world").unwrap();
123 ///
124 /// assert_eq!(Some("hello"), rx.recv().await);
125 /// assert_eq!(Some("world"), rx.recv().await);
126 /// }
127 /// ```
128 pub async fn recv(&mut self) -> Option<T> {
129 use crate::future::poll_fn;
130
131 poll_fn(|cx| self.poll_recv(cx)).await
132 }
133
134 /// Attempts to return a pending value on this receiver without blocking.
135 ///
136 /// This method will never block the caller in order to wait for data to
137 /// become available. Instead, this will always return immediately with
138 /// a possible option of pending data on the channel.
139 ///
140 /// This is useful for a flavor of "optimistic check" before deciding to
141 /// block on a receiver.
142 ///
143 /// Compared with recv, this function has two failure cases instead of
144 /// one (one for disconnection, one for an empty buffer).
145 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
146 self.chan.try_recv()
147 }
148
149 /// Closes the receiving half of a channel, without dropping it.
150 ///
151 /// This prevents any further messages from being sent on the channel while
152 /// still enabling the receiver to drain messages that are buffered.
153 pub fn close(&mut self) {
154 self.chan.close();
155 }
156}
157
158#[cfg(feature = "stream")]
159impl<T> crate::stream::Stream for UnboundedReceiver<T> {
160 type Item = T;
161
162 fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
163 self.poll_recv(cx)
164 }
165}
166
167impl<T> UnboundedSender<T> {
168 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
169 UnboundedSender { chan }
170 }
171
172 /// Attempts to send a message on this `UnboundedSender` without blocking.
173 pub fn send(&self, message: T) -> Result<(), SendError<T>> {
174 self.chan.send_unbounded(message)?;
175 Ok(())
176 }
177}