broker_tokio/sync/mpsc/bounded.rs
1use crate::sync::mpsc::chan;
2use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError};
3use crate::sync::semaphore_ll as semaphore;
4
5use std::fmt;
6use std::task::{Context, Poll};
7
8/// Send values to the associated `Receiver`.
9///
10/// Instances are created by the [`channel`](channel) function.
11pub struct Sender<T> {
12 chan: chan::Tx<T, Semaphore>,
13}
14
15impl<T> Clone for Sender<T> {
16 fn clone(&self) -> Self {
17 Sender {
18 chan: self.chan.clone(),
19 }
20 }
21}
22
23impl<T> fmt::Debug for Sender<T> {
24 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
25 fmt.debug_struct("Sender")
26 .field("chan", &self.chan)
27 .finish()
28 }
29}
30
31/// Receive values from the associated `Sender`.
32///
33/// Instances are created by the [`channel`](channel) function.
34pub struct Receiver<T> {
35 /// The channel receiver
36 chan: chan::Rx<T, Semaphore>,
37}
38
39impl<T> fmt::Debug for Receiver<T> {
40 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
41 fmt.debug_struct("Receiver")
42 .field("chan", &self.chan)
43 .finish()
44 }
45}
46
47impl<T> Clone for Receiver<T> {
48 fn clone(&self) -> Self {
49 Receiver {
50 chan: self.chan.clone(),
51 }
52 }
53}
54
55/// Create a bounded mpsc channel for communicating between asynchronous tasks,
56/// returning the sender/receiver halves.
57///
58/// All data sent on `Sender` will become available on `Receiver` in the same
59/// order as it was sent.
60///
61/// The `Sender` can be cloned to `send` to the same channel from multiple code
62/// locations. Only one `Receiver` is supported.
63///
64/// If the `Receiver` is disconnected while trying to `send`, the `send` method
65/// will return a `SendError`. Similarly, if `Sender` is disconnected while
66/// trying to `recv`, the `recv` method will return a `RecvError`.
67///
68/// # Examples
69///
70/// ```rust
71/// use tokio::sync::mpsc;
72///
73/// #[tokio::main]
74/// async fn main() {
75/// let (mut tx, mut rx) = mpsc::channel(100);
76///
77/// tokio::spawn(async move {
78/// for i in 0..10 {
79/// if let Err(_) = tx.send(i).await {
80/// println!("receiver dropped");
81/// return;
82/// }
83/// }
84/// });
85///
86/// while let Some(i) = rx.recv().await {
87/// println!("got = {}", i);
88/// }
89/// }
90/// ```
91pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
92 assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
93 let semaphore = (semaphore::Semaphore::new(buffer), buffer);
94 let (tx, rx) = chan::channel(semaphore);
95
96 let tx = Sender::new(tx);
97 let rx = Receiver::new(rx);
98
99 (tx, rx)
100}
101
102/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
103/// representing the channel bound.
104type Semaphore = (semaphore::Semaphore, usize);
105
106impl<T> Receiver<T> {
107 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
108 Receiver { chan }
109 }
110
111 /// Receive the next value for this receiver.
112 ///
113 /// `None` is returned when all `Sender` halves have dropped, indicating
114 /// that no further values can be sent on the channel.
115 ///
116 /// # Examples
117 ///
118 /// ```
119 /// use tokio::sync::mpsc;
120 ///
121 /// #[tokio::main]
122 /// async fn main() {
123 /// let (mut tx, mut rx) = mpsc::channel(100);
124 ///
125 /// tokio::spawn(async move {
126 /// tx.send("hello").await.unwrap();
127 /// });
128 ///
129 /// assert_eq!(Some("hello"), rx.recv().await);
130 /// assert_eq!(None, rx.recv().await);
131 /// }
132 /// ```
133 ///
134 /// Values are buffered:
135 ///
136 /// ```
137 /// use tokio::sync::mpsc;
138 ///
139 /// #[tokio::main]
140 /// async fn main() {
141 /// let (mut tx, mut rx) = mpsc::channel(100);
142 ///
143 /// tx.send("hello").await.unwrap();
144 /// tx.send("world").await.unwrap();
145 ///
146 /// assert_eq!(Some("hello"), rx.recv().await);
147 /// assert_eq!(Some("world"), rx.recv().await);
148 /// }
149 /// ```
150 pub async fn recv(&mut self) -> Option<T> {
151 use crate::future::poll_fn;
152
153 poll_fn(|cx| self.poll_recv(cx)).await
154 }
155
156 #[doc(hidden)] // TODO: document
157 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
158 self.chan.recv(cx)
159 }
160
161 /// Attempts to return a pending value on this receiver without blocking.
162 ///
163 /// This method will never block the caller in order to wait for data to
164 /// become available. Instead, this will always return immediately with
165 /// a possible option of pending data on the channel.
166 ///
167 /// This is useful for a flavor of "optimistic check" before deciding to
168 /// block on a receiver.
169 ///
170 /// Compared with recv, this function has two failure cases instead of
171 /// one (one for disconnection, one for an empty buffer).
172 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
173 self.chan.try_recv()
174 }
175
176 /// Closes the receiving half of a channel, without dropping it.
177 ///
178 /// This prevents any further messages from being sent on the channel while
179 /// still enabling the receiver to drain messages that are buffered.
180 pub fn close(&mut self) {
181 self.chan.close();
182 }
183}
184
185impl<T> Unpin for Receiver<T> {}
186
187cfg_stream! {
188 impl<T> crate::stream::Stream for Receiver<T> {
189 type Item = T;
190
191 fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
192 self.poll_recv(cx)
193 }
194 }
195}
196
197impl<T> Sender<T> {
198 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
199 Sender { chan }
200 }
201
202 #[doc(hidden)] // TODO: document
203 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
204 self.chan.poll_ready(cx).map_err(|_| ClosedError::new())
205 }
206
207 /// Attempts to immediately send a message on this `Sender`
208 ///
209 /// This method differs from [`send`] by returning immediately if the channel's
210 /// buffer is full or no receiver is waiting to acquire some data. Compared
211 /// with [`send`], this function has two failure cases instead of one (one for
212 /// disconnection, one for a full buffer).
213 ///
214 /// This function may be paired with [`poll_ready`] in order to wait for
215 /// channel capacity before trying to send a value.
216 ///
217 /// # Errors
218 ///
219 /// If the channel capacity has been reached, i.e., the channel has `n`
220 /// buffered values where `n` is the argument passed to [`channel`], then an
221 /// error is returned.
222 ///
223 /// If the receive half of the channel is closed, either due to [`close`]
224 /// being called or the [`Receiver`] handle dropping, the function returns
225 /// an error. The error includes the value passed to `send`.
226 ///
227 /// [`send`]: Sender::send
228 /// [`poll_ready`]: Sender::poll_ready
229 /// [`channel`]: channel
230 /// [`close`]: Receiver::close
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// use tokio::sync::mpsc;
236 ///
237 /// #[tokio::main]
238 /// async fn main() {
239 /// // Create a channel with buffer size 1
240 /// let (mut tx1, mut rx) = mpsc::channel(1);
241 /// let mut tx2 = tx1.clone();
242 ///
243 /// tokio::spawn(async move {
244 /// tx1.send(1).await.unwrap();
245 /// tx1.send(2).await.unwrap();
246 /// // task waits until the receiver receives a value.
247 /// });
248 ///
249 /// tokio::spawn(async move {
250 /// // This will return an error and send
251 /// // no message if the buffer is full
252 /// let _ = tx2.try_send(3);
253 /// });
254 ///
255 /// let mut msg;
256 /// msg = rx.recv().await.unwrap();
257 /// println!("message {} received", msg);
258 ///
259 /// msg = rx.recv().await.unwrap();
260 /// println!("message {} received", msg);
261 ///
262 /// // Third message may have never been sent
263 /// match rx.recv().await {
264 /// Some(msg) => println!("message {} received", msg),
265 /// None => println!("the third message was never sent"),
266 /// }
267 /// }
268 /// ```
269 pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
270 self.chan.try_send(message)?;
271 Ok(())
272 }
273
274 /// Send a value, waiting until there is capacity.
275 ///
276 /// A successful send occurs when it is determined that the other end of the
277 /// channel has not hung up already. An unsuccessful send would be one where
278 /// the corresponding receiver has already been closed. Note that a return
279 /// value of `Err` means that the data will never be received, but a return
280 /// value of `Ok` does not mean that the data will be received. It is
281 /// possible for the corresponding receiver to hang up immediately after
282 /// this function returns `Ok`.
283 ///
284 /// # Errors
285 ///
286 /// If the receive half of the channel is closed, either due to [`close`]
287 /// being called or the [`Receiver`] handle dropping, the function returns
288 /// an error. The error includes the value passed to `send`.
289 ///
290 /// [`close`]: Receiver::close
291 /// [`Receiver`]: Receiver
292 ///
293 /// # Examples
294 ///
295 /// In the following example, each call to `send` will block until the
296 /// previously sent value was received.
297 ///
298 /// ```rust
299 /// use tokio::sync::mpsc;
300 ///
301 /// #[tokio::main]
302 /// async fn main() {
303 /// let (mut tx, mut rx) = mpsc::channel(1);
304 ///
305 /// tokio::spawn(async move {
306 /// for i in 0..10 {
307 /// if let Err(_) = tx.send(i).await {
308 /// println!("receiver dropped");
309 /// return;
310 /// }
311 /// }
312 /// });
313 ///
314 /// while let Some(i) = rx.recv().await {
315 /// println!("got = {}", i);
316 /// }
317 /// }
318 /// ```
319 pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
320 use crate::future::poll_fn;
321
322 if poll_fn(|cx| self.poll_ready(cx)).await.is_err() {
323 return Err(SendError(value));
324 }
325
326 match self.try_send(value) {
327 Ok(()) => Ok(()),
328 Err(TrySendError::Full(_)) => unreachable!(),
329 Err(TrySendError::Closed(value)) => Err(SendError(value)),
330 }
331 }
332}