broker_tokio/sync/mpsc/
bounded.rs

1use crate::sync::mpsc::chan;
2use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError};
3use crate::sync::semaphore_ll as semaphore;
4
5use std::fmt;
6use std::task::{Context, Poll};
7
8/// Send values to the associated `Receiver`.
9///
10/// Instances are created by the [`channel`](channel) function.
11pub struct Sender<T> {
12    chan: chan::Tx<T, Semaphore>,
13}
14
15impl<T> Clone for Sender<T> {
16    fn clone(&self) -> Self {
17        Sender {
18            chan: self.chan.clone(),
19        }
20    }
21}
22
23impl<T> fmt::Debug for Sender<T> {
24    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
25        fmt.debug_struct("Sender")
26            .field("chan", &self.chan)
27            .finish()
28    }
29}
30
31/// Receive values from the associated `Sender`.
32///
33/// Instances are created by the [`channel`](channel) function.
34pub struct Receiver<T> {
35    /// The channel receiver
36    chan: chan::Rx<T, Semaphore>,
37}
38
39impl<T> fmt::Debug for Receiver<T> {
40    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
41        fmt.debug_struct("Receiver")
42            .field("chan", &self.chan)
43            .finish()
44    }
45}
46
47impl<T> Clone for Receiver<T> {
48    fn clone(&self) -> Self {
49        Receiver {
50            chan: self.chan.clone(),
51        }
52    }
53}
54
55/// Create a bounded mpsc channel for communicating between asynchronous tasks,
56/// returning the sender/receiver halves.
57///
58/// All data sent on `Sender` will become available on `Receiver` in the same
59/// order as it was sent.
60///
61/// The `Sender` can be cloned to `send` to the same channel from multiple code
62/// locations. Only one `Receiver` is supported.
63///
64/// If the `Receiver` is disconnected while trying to `send`, the `send` method
65/// will return a `SendError`. Similarly, if `Sender` is disconnected while
66/// trying to `recv`, the `recv` method will return a `RecvError`.
67///
68/// # Examples
69///
70/// ```rust
71/// use tokio::sync::mpsc;
72///
73/// #[tokio::main]
74/// async fn main() {
75///     let (mut tx, mut rx) = mpsc::channel(100);
76///
77///     tokio::spawn(async move {
78///         for i in 0..10 {
79///             if let Err(_) = tx.send(i).await {
80///                 println!("receiver dropped");
81///                 return;
82///             }
83///         }
84///     });
85///
86///     while let Some(i) = rx.recv().await {
87///         println!("got = {}", i);
88///     }
89/// }
90/// ```
91pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
92    assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
93    let semaphore = (semaphore::Semaphore::new(buffer), buffer);
94    let (tx, rx) = chan::channel(semaphore);
95
96    let tx = Sender::new(tx);
97    let rx = Receiver::new(rx);
98
99    (tx, rx)
100}
101
102/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
103/// representing the channel bound.
104type Semaphore = (semaphore::Semaphore, usize);
105
106impl<T> Receiver<T> {
107    pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
108        Receiver { chan }
109    }
110
111    /// Receive the next value for this receiver.
112    ///
113    /// `None` is returned when all `Sender` halves have dropped, indicating
114    /// that no further values can be sent on the channel.
115    ///
116    /// # Examples
117    ///
118    /// ```
119    /// use tokio::sync::mpsc;
120    ///
121    /// #[tokio::main]
122    /// async fn main() {
123    ///     let (mut tx, mut rx) = mpsc::channel(100);
124    ///
125    ///     tokio::spawn(async move {
126    ///         tx.send("hello").await.unwrap();
127    ///     });
128    ///
129    ///     assert_eq!(Some("hello"), rx.recv().await);
130    ///     assert_eq!(None, rx.recv().await);
131    /// }
132    /// ```
133    ///
134    /// Values are buffered:
135    ///
136    /// ```
137    /// use tokio::sync::mpsc;
138    ///
139    /// #[tokio::main]
140    /// async fn main() {
141    ///     let (mut tx, mut rx) = mpsc::channel(100);
142    ///
143    ///     tx.send("hello").await.unwrap();
144    ///     tx.send("world").await.unwrap();
145    ///
146    ///     assert_eq!(Some("hello"), rx.recv().await);
147    ///     assert_eq!(Some("world"), rx.recv().await);
148    /// }
149    /// ```
150    pub async fn recv(&mut self) -> Option<T> {
151        use crate::future::poll_fn;
152
153        poll_fn(|cx| self.poll_recv(cx)).await
154    }
155
156    #[doc(hidden)] // TODO: document
157    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
158        self.chan.recv(cx)
159    }
160
161    /// Attempts to return a pending value on this receiver without blocking.
162    ///
163    /// This method will never block the caller in order to wait for data to
164    /// become available. Instead, this will always return immediately with
165    /// a possible option of pending data on the channel.
166    ///
167    /// This is useful for a flavor of "optimistic check" before deciding to
168    /// block on a receiver.
169    ///
170    /// Compared with recv, this function has two failure cases instead of
171    /// one (one for disconnection, one for an empty buffer).
172    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
173        self.chan.try_recv()
174    }
175
176    /// Closes the receiving half of a channel, without dropping it.
177    ///
178    /// This prevents any further messages from being sent on the channel while
179    /// still enabling the receiver to drain messages that are buffered.
180    pub fn close(&mut self) {
181        self.chan.close();
182    }
183}
184
185impl<T> Unpin for Receiver<T> {}
186
187cfg_stream! {
188    impl<T> crate::stream::Stream for Receiver<T> {
189        type Item = T;
190
191        fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
192            self.poll_recv(cx)
193        }
194    }
195}
196
197impl<T> Sender<T> {
198    pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
199        Sender { chan }
200    }
201
202    #[doc(hidden)] // TODO: document
203    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
204        self.chan.poll_ready(cx).map_err(|_| ClosedError::new())
205    }
206
207    /// Attempts to immediately send a message on this `Sender`
208    ///
209    /// This method differs from [`send`] by returning immediately if the channel's
210    /// buffer is full or no receiver is waiting to acquire some data. Compared
211    /// with [`send`], this function has two failure cases instead of one (one for
212    /// disconnection, one for a full buffer).
213    ///
214    /// This function may be paired with [`poll_ready`] in order to wait for
215    /// channel capacity before trying to send a value.
216    ///
217    /// # Errors
218    ///
219    /// If the channel capacity has been reached, i.e., the channel has `n`
220    /// buffered values where `n` is the argument passed to [`channel`], then an
221    /// error is returned.
222    ///
223    /// If the receive half of the channel is closed, either due to [`close`]
224    /// being called or the [`Receiver`] handle dropping, the function returns
225    /// an error. The error includes the value passed to `send`.
226    ///
227    /// [`send`]: Sender::send
228    /// [`poll_ready`]: Sender::poll_ready
229    /// [`channel`]: channel
230    /// [`close`]: Receiver::close
231    ///
232    /// # Examples
233    ///
234    /// ```
235    /// use tokio::sync::mpsc;
236    ///
237    /// #[tokio::main]
238    /// async fn main() {
239    ///     // Create a channel with buffer size 1
240    ///     let (mut tx1, mut rx) = mpsc::channel(1);
241    ///     let mut tx2 = tx1.clone();
242    ///
243    ///     tokio::spawn(async move {
244    ///         tx1.send(1).await.unwrap();
245    ///         tx1.send(2).await.unwrap();
246    ///         // task waits until the receiver receives a value.
247    ///     });
248    ///
249    ///     tokio::spawn(async move {
250    ///         // This will return an error and send
251    ///         // no message if the buffer is full
252    ///         let _ = tx2.try_send(3);
253    ///     });
254    ///
255    ///     let mut msg;
256    ///     msg = rx.recv().await.unwrap();
257    ///     println!("message {} received", msg);
258    ///
259    ///     msg = rx.recv().await.unwrap();
260    ///     println!("message {} received", msg);
261    ///
262    ///     // Third message may have never been sent
263    ///     match rx.recv().await {
264    ///         Some(msg) => println!("message {} received", msg),
265    ///         None => println!("the third message was never sent"),
266    ///     }
267    /// }
268    /// ```
269    pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
270        self.chan.try_send(message)?;
271        Ok(())
272    }
273
274    /// Send a value, waiting until there is capacity.
275    ///
276    /// A successful send occurs when it is determined that the other end of the
277    /// channel has not hung up already. An unsuccessful send would be one where
278    /// the corresponding receiver has already been closed. Note that a return
279    /// value of `Err` means that the data will never be received, but a return
280    /// value of `Ok` does not mean that the data will be received. It is
281    /// possible for the corresponding receiver to hang up immediately after
282    /// this function returns `Ok`.
283    ///
284    /// # Errors
285    ///
286    /// If the receive half of the channel is closed, either due to [`close`]
287    /// being called or the [`Receiver`] handle dropping, the function returns
288    /// an error. The error includes the value passed to `send`.
289    ///
290    /// [`close`]: Receiver::close
291    /// [`Receiver`]: Receiver
292    ///
293    /// # Examples
294    ///
295    /// In the following example, each call to `send` will block until the
296    /// previously sent value was received.
297    ///
298    /// ```rust
299    /// use tokio::sync::mpsc;
300    ///
301    /// #[tokio::main]
302    /// async fn main() {
303    ///     let (mut tx, mut rx) = mpsc::channel(1);
304    ///
305    ///     tokio::spawn(async move {
306    ///         for i in 0..10 {
307    ///             if let Err(_) = tx.send(i).await {
308    ///                 println!("receiver dropped");
309    ///                 return;
310    ///             }
311    ///         }
312    ///     });
313    ///
314    ///     while let Some(i) = rx.recv().await {
315    ///         println!("got = {}", i);
316    ///     }
317    /// }
318    /// ```
319    pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
320        use crate::future::poll_fn;
321
322        if poll_fn(|cx| self.poll_ready(cx)).await.is_err() {
323            return Err(SendError(value));
324        }
325
326        match self.try_send(value) {
327            Ok(()) => Ok(()),
328            Err(TrySendError::Full(_)) => unreachable!(),
329            Err(TrySendError::Closed(value)) => Err(SendError(value)),
330        }
331    }
332}