madsim_real_tokio/sync/mpsc/bounded.rs
1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7 use crate::sync::mpsc::error::SendTimeoutError;
8 use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`] function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23 chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// # Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// #[tokio::main]
44/// async fn main() {
45/// let (tx, _rx) = channel::<i32>(15);
46/// let tx_weak = tx.downgrade();
47///
48/// // Upgrading will succeed because `tx` still exists.
49/// assert!(tx_weak.upgrade().is_some());
50///
51/// // If we drop `tx`, then it will fail.
52/// drop(tx);
53/// assert!(tx_weak.clone().upgrade().is_none());
54/// }
55/// ```
56pub struct WeakSender<T> {
57 chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68 chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel.
72///
73/// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
74/// and are used to guarantee channel capacity before generating `n` messages to send.
75///
76/// [`Sender::reserve_many()`]: Sender::reserve_many
77/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
78pub struct PermitIterator<'a, T> {
79 chan: &'a chan::Tx<T, Semaphore>,
80 n: usize,
81}
82
83/// Owned permit to send one value into the channel.
84///
85/// This is identical to the [`Permit`] type, except that it moves the sender
86/// rather than borrowing it.
87///
88/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
89/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
90/// before generating a message to send.
91///
92/// [`Permit`]: Permit
93/// [`Sender::reserve_owned()`]: Sender::reserve_owned
94/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
95pub struct OwnedPermit<T> {
96 chan: Option<chan::Tx<T, Semaphore>>,
97}
98
99/// Receives values from the associated `Sender`.
100///
101/// Instances are created by the [`channel`] function.
102///
103/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
104///
105/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
106pub struct Receiver<T> {
107 /// The channel receiver.
108 chan: chan::Rx<T, Semaphore>,
109}
110
111/// Creates a bounded mpsc channel for communicating between asynchronous tasks
112/// with backpressure.
113///
114/// The channel will buffer up to the provided number of messages. Once the
115/// buffer is full, attempts to send new messages will wait until a message is
116/// received from the channel. The provided buffer capacity must be at least 1.
117///
118/// All data sent on `Sender` will become available on `Receiver` in the same
119/// order as it was sent.
120///
121/// The `Sender` can be cloned to `send` to the same channel from multiple code
122/// locations. Only one `Receiver` is supported.
123///
124/// If the `Receiver` is disconnected while trying to `send`, the `send` method
125/// will return a `SendError`. Similarly, if `Sender` is disconnected while
126/// trying to `recv`, the `recv` method will return `None`.
127///
128/// # Panics
129///
130/// Panics if the buffer capacity is 0.
131///
132/// # Examples
133///
134/// ```rust
135/// use tokio::sync::mpsc;
136///
137/// #[tokio::main]
138/// async fn main() {
139/// let (tx, mut rx) = mpsc::channel(100);
140///
141/// tokio::spawn(async move {
142/// for i in 0..10 {
143/// if let Err(_) = tx.send(i).await {
144/// println!("receiver dropped");
145/// return;
146/// }
147/// }
148/// });
149///
150/// while let Some(i) = rx.recv().await {
151/// println!("got = {}", i);
152/// }
153/// }
154/// ```
155#[track_caller]
156pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
157 assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
158 let semaphore = Semaphore {
159 semaphore: semaphore::Semaphore::new(buffer),
160 bound: buffer,
161 };
162 let (tx, rx) = chan::channel(semaphore);
163
164 let tx = Sender::new(tx);
165 let rx = Receiver::new(rx);
166
167 (tx, rx)
168}
169
170/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
171/// representing the channel bound.
172#[derive(Debug)]
173pub(crate) struct Semaphore {
174 pub(crate) semaphore: semaphore::Semaphore,
175 pub(crate) bound: usize,
176}
177
178impl<T> Receiver<T> {
179 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
180 Receiver { chan }
181 }
182
183 /// Receives the next value for this receiver.
184 ///
185 /// This method returns `None` if the channel has been closed and there are
186 /// no remaining messages in the channel's buffer. This indicates that no
187 /// further values can ever be received from this `Receiver`. The channel is
188 /// closed when all senders have been dropped, or when [`close`] is called.
189 ///
190 /// If there are no messages in the channel's buffer, but the channel has
191 /// not yet been closed, this method will sleep until a message is sent or
192 /// the channel is closed. Note that if [`close`] is called, but there are
193 /// still outstanding [`Permits`] from before it was closed, the channel is
194 /// not considered closed by `recv` until the permits are released.
195 ///
196 /// # Cancel safety
197 ///
198 /// This method is cancel safe. If `recv` is used as the event in a
199 /// [`tokio::select!`](crate::select) statement and some other branch
200 /// completes first, it is guaranteed that no messages were received on this
201 /// channel.
202 ///
203 /// [`close`]: Self::close
204 /// [`Permits`]: struct@crate::sync::mpsc::Permit
205 ///
206 /// # Examples
207 ///
208 /// ```
209 /// use tokio::sync::mpsc;
210 ///
211 /// #[tokio::main]
212 /// async fn main() {
213 /// let (tx, mut rx) = mpsc::channel(100);
214 ///
215 /// tokio::spawn(async move {
216 /// tx.send("hello").await.unwrap();
217 /// });
218 ///
219 /// assert_eq!(Some("hello"), rx.recv().await);
220 /// assert_eq!(None, rx.recv().await);
221 /// }
222 /// ```
223 ///
224 /// Values are buffered:
225 ///
226 /// ```
227 /// use tokio::sync::mpsc;
228 ///
229 /// #[tokio::main]
230 /// async fn main() {
231 /// let (tx, mut rx) = mpsc::channel(100);
232 ///
233 /// tx.send("hello").await.unwrap();
234 /// tx.send("world").await.unwrap();
235 ///
236 /// assert_eq!(Some("hello"), rx.recv().await);
237 /// assert_eq!(Some("world"), rx.recv().await);
238 /// }
239 /// ```
240 pub async fn recv(&mut self) -> Option<T> {
241 use crate::future::poll_fn;
242 poll_fn(|cx| self.chan.recv(cx)).await
243 }
244
245 /// Receives the next values for this receiver and extends `buffer`.
246 ///
247 /// This method extends `buffer` by no more than a fixed number of values
248 /// as specified by `limit`. If `limit` is zero, the function immediately
249 /// returns `0`. The return value is the number of values added to `buffer`.
250 ///
251 /// For `limit > 0`, if there are no messages in the channel's queue, but
252 /// the channel has not yet been closed, this method will sleep until a
253 /// message is sent or the channel is closed. Note that if [`close`] is
254 /// called, but there are still outstanding [`Permits`] from before it was
255 /// closed, the channel is not considered closed by `recv_many` until the
256 /// permits are released.
257 ///
258 /// For non-zero values of `limit`, this method will never return `0` unless
259 /// the channel has been closed and there are no remaining messages in the
260 /// channel's queue. This indicates that no further values can ever be
261 /// received from this `Receiver`. The channel is closed when all senders
262 /// have been dropped, or when [`close`] is called.
263 ///
264 /// The capacity of `buffer` is increased as needed.
265 ///
266 /// # Cancel safety
267 ///
268 /// This method is cancel safe. If `recv_many` is used as the event in a
269 /// [`tokio::select!`](crate::select) statement and some other branch
270 /// completes first, it is guaranteed that no messages were received on this
271 /// channel.
272 ///
273 /// [`close`]: Self::close
274 /// [`Permits`]: struct@crate::sync::mpsc::Permit
275 ///
276 /// # Examples
277 ///
278 /// ```
279 /// use tokio::sync::mpsc;
280 ///
281 /// #[tokio::main]
282 /// async fn main() {
283 /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
284 /// let limit = 2;
285 /// let (tx, mut rx) = mpsc::channel(100);
286 /// let tx2 = tx.clone();
287 /// tx2.send("first").await.unwrap();
288 /// tx2.send("second").await.unwrap();
289 /// tx2.send("third").await.unwrap();
290 ///
291 /// // Call `recv_many` to receive up to `limit` (2) values.
292 /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
293 /// assert_eq!(vec!["first", "second"], buffer);
294 ///
295 /// // If the buffer is full, the next call to `recv_many`
296 /// // reserves additional capacity.
297 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
298 ///
299 /// tokio::spawn(async move {
300 /// tx.send("fourth").await.unwrap();
301 /// });
302 ///
303 /// // 'tx' is dropped, but `recv_many`
304 /// // is guaranteed not to return 0 as the channel
305 /// // is not yet closed.
306 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
307 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
308 ///
309 /// // Once the last sender is dropped, the channel is
310 /// // closed and `recv_many` returns 0, capacity unchanged.
311 /// drop(tx2);
312 /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
313 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
314 /// }
315 /// ```
316 pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
317 use crate::future::poll_fn;
318 poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
319 }
320
321 /// Tries to receive the next value for this receiver.
322 ///
323 /// This method returns the [`Empty`] error if the channel is currently
324 /// empty, but there are still outstanding [senders] or [permits].
325 ///
326 /// This method returns the [`Disconnected`] error if the channel is
327 /// currently empty, and there are no outstanding [senders] or [permits].
328 ///
329 /// Unlike the [`poll_recv`] method, this method will never return an
330 /// [`Empty`] error spuriously.
331 ///
332 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
333 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
334 /// [`poll_recv`]: Self::poll_recv
335 /// [senders]: crate::sync::mpsc::Sender
336 /// [permits]: crate::sync::mpsc::Permit
337 ///
338 /// # Examples
339 ///
340 /// ```
341 /// use tokio::sync::mpsc;
342 /// use tokio::sync::mpsc::error::TryRecvError;
343 ///
344 /// #[tokio::main]
345 /// async fn main() {
346 /// let (tx, mut rx) = mpsc::channel(100);
347 ///
348 /// tx.send("hello").await.unwrap();
349 ///
350 /// assert_eq!(Ok("hello"), rx.try_recv());
351 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
352 ///
353 /// tx.send("hello").await.unwrap();
354 /// // Drop the last sender, closing the channel.
355 /// drop(tx);
356 ///
357 /// assert_eq!(Ok("hello"), rx.try_recv());
358 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
359 /// }
360 /// ```
361 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
362 self.chan.try_recv()
363 }
364
365 /// Blocking receive to call outside of asynchronous contexts.
366 ///
367 /// This method returns `None` if the channel has been closed and there are
368 /// no remaining messages in the channel's buffer. This indicates that no
369 /// further values can ever be received from this `Receiver`. The channel is
370 /// closed when all senders have been dropped, or when [`close`] is called.
371 ///
372 /// If there are no messages in the channel's buffer, but the channel has
373 /// not yet been closed, this method will block until a message is sent or
374 /// the channel is closed.
375 ///
376 /// This method is intended for use cases where you are sending from
377 /// asynchronous code to synchronous code, and will work even if the sender
378 /// is not using [`blocking_send`] to send the message.
379 ///
380 /// Note that if [`close`] is called, but there are still outstanding
381 /// [`Permits`] from before it was closed, the channel is not considered
382 /// closed by `blocking_recv` until the permits are released.
383 ///
384 /// [`close`]: Self::close
385 /// [`Permits`]: struct@crate::sync::mpsc::Permit
386 /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
387 ///
388 /// # Panics
389 ///
390 /// This function panics if called within an asynchronous execution
391 /// context.
392 ///
393 /// # Examples
394 ///
395 /// ```
396 /// use std::thread;
397 /// use tokio::runtime::Runtime;
398 /// use tokio::sync::mpsc;
399 ///
400 /// fn main() {
401 /// let (tx, mut rx) = mpsc::channel::<u8>(10);
402 ///
403 /// let sync_code = thread::spawn(move || {
404 /// assert_eq!(Some(10), rx.blocking_recv());
405 /// });
406 ///
407 /// Runtime::new()
408 /// .unwrap()
409 /// .block_on(async move {
410 /// let _ = tx.send(10).await;
411 /// });
412 /// sync_code.join().unwrap()
413 /// }
414 /// ```
415 #[track_caller]
416 #[cfg(feature = "sync")]
417 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
418 pub fn blocking_recv(&mut self) -> Option<T> {
419 crate::future::block_on(self.recv())
420 }
421
422 /// Closes the receiving half of a channel without dropping it.
423 ///
424 /// This prevents any further messages from being sent on the channel while
425 /// still enabling the receiver to drain messages that are buffered. Any
426 /// outstanding [`Permit`] values will still be able to send messages.
427 ///
428 /// To guarantee that no messages are dropped, after calling `close()`,
429 /// `recv()` must be called until `None` is returned. If there are
430 /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
431 /// not return `None` until those are released.
432 ///
433 /// [`Permit`]: Permit
434 /// [`OwnedPermit`]: OwnedPermit
435 ///
436 /// # Examples
437 ///
438 /// ```
439 /// use tokio::sync::mpsc;
440 ///
441 /// #[tokio::main]
442 /// async fn main() {
443 /// let (tx, mut rx) = mpsc::channel(20);
444 ///
445 /// tokio::spawn(async move {
446 /// let mut i = 0;
447 /// while let Ok(permit) = tx.reserve().await {
448 /// permit.send(i);
449 /// i += 1;
450 /// }
451 /// });
452 ///
453 /// rx.close();
454 ///
455 /// while let Some(msg) = rx.recv().await {
456 /// println!("got {}", msg);
457 /// }
458 ///
459 /// // Channel closed and no messages are lost.
460 /// }
461 /// ```
462 pub fn close(&mut self) {
463 self.chan.close();
464 }
465
466 /// Checks if a channel is closed.
467 ///
468 /// This method returns `true` if the channel has been closed. The channel is closed
469 /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
470 ///
471 /// [`Sender`]: crate::sync::mpsc::Sender
472 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
473 ///
474 /// # Examples
475 /// ```
476 /// use tokio::sync::mpsc;
477 ///
478 /// #[tokio::main]
479 /// async fn main() {
480 /// let (_tx, mut rx) = mpsc::channel::<()>(10);
481 /// assert!(!rx.is_closed());
482 ///
483 /// rx.close();
484 ///
485 /// assert!(rx.is_closed());
486 /// }
487 /// ```
488 pub fn is_closed(&self) -> bool {
489 self.chan.is_closed()
490 }
491
492 /// Checks if a channel is empty.
493 ///
494 /// This method returns `true` if the channel has no messages.
495 ///
496 /// # Examples
497 /// ```
498 /// use tokio::sync::mpsc;
499 ///
500 /// #[tokio::main]
501 /// async fn main() {
502 /// let (tx, rx) = mpsc::channel(10);
503 /// assert!(rx.is_empty());
504 ///
505 /// tx.send(0).await.unwrap();
506 /// assert!(!rx.is_empty());
507 /// }
508 ///
509 /// ```
510 pub fn is_empty(&self) -> bool {
511 self.chan.is_empty()
512 }
513
514 /// Returns the number of messages in the channel.
515 ///
516 /// # Examples
517 /// ```
518 /// use tokio::sync::mpsc;
519 ///
520 /// #[tokio::main]
521 /// async fn main() {
522 /// let (tx, rx) = mpsc::channel(10);
523 /// assert_eq!(0, rx.len());
524 ///
525 /// tx.send(0).await.unwrap();
526 /// assert_eq!(1, rx.len());
527 /// }
528 /// ```
529 pub fn len(&self) -> usize {
530 self.chan.len()
531 }
532
533 /// Polls to receive the next message on this channel.
534 ///
535 /// This method returns:
536 ///
537 /// * `Poll::Pending` if no messages are available but the channel is not
538 /// closed, or if a spurious failure happens.
539 /// * `Poll::Ready(Some(message))` if a message is available.
540 /// * `Poll::Ready(None)` if the channel has been closed and all messages
541 /// sent before it was closed have been received.
542 ///
543 /// When the method returns `Poll::Pending`, the `Waker` in the provided
544 /// `Context` is scheduled to receive a wakeup when a message is sent on any
545 /// receiver, or when the channel is closed. Note that on multiple calls to
546 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
547 /// passed to the most recent call is scheduled to receive a wakeup.
548 ///
549 /// If this method returns `Poll::Pending` due to a spurious failure, then
550 /// the `Waker` will be notified when the situation causing the spurious
551 /// failure has been resolved. Note that receiving such a wakeup does not
552 /// guarantee that the next call will succeed — it could fail with another
553 /// spurious failure.
554 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
555 self.chan.recv(cx)
556 }
557
558 /// Polls to receive multiple messages on this channel, extending the provided buffer.
559 ///
560 /// This method returns:
561 /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
562 /// spurious failure happens.
563 /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
564 /// stored in `buffer`. This can be less than, or equal to, `limit`.
565 /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
566 ///
567 /// When the method returns `Poll::Pending`, the `Waker` in the provided
568 /// `Context` is scheduled to receive a wakeup when a message is sent on any
569 /// receiver, or when the channel is closed. Note that on multiple calls to
570 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
571 /// passed to the most recent call is scheduled to receive a wakeup.
572 ///
573 /// Note that this method does not guarantee that exactly `limit` messages
574 /// are received. Rather, if at least one message is available, it returns
575 /// as many messages as it can up to the given limit. This method returns
576 /// zero only if the channel is closed (or if `limit` is zero).
577 ///
578 /// # Examples
579 ///
580 /// ```
581 /// use std::task::{Context, Poll};
582 /// use std::pin::Pin;
583 /// use tokio::sync::mpsc;
584 /// use futures::Future;
585 ///
586 /// struct MyReceiverFuture<'a> {
587 /// receiver: mpsc::Receiver<i32>,
588 /// buffer: &'a mut Vec<i32>,
589 /// limit: usize,
590 /// }
591 ///
592 /// impl<'a> Future for MyReceiverFuture<'a> {
593 /// type Output = usize; // Number of messages received
594 ///
595 /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
596 /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
597 ///
598 /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
599 /// match receiver.poll_recv_many(cx, *buffer, *limit) {
600 /// Poll::Pending => Poll::Pending,
601 /// Poll::Ready(count) => Poll::Ready(count),
602 /// }
603 /// }
604 /// }
605 ///
606 /// #[tokio::main]
607 /// async fn main() {
608 /// let (tx, rx) = mpsc::channel(32);
609 /// let mut buffer = Vec::new();
610 ///
611 /// let my_receiver_future = MyReceiverFuture {
612 /// receiver: rx,
613 /// buffer: &mut buffer,
614 /// limit: 3,
615 /// };
616 ///
617 /// for i in 0..10 {
618 /// tx.send(i).await.unwrap();
619 /// }
620 ///
621 /// let count = my_receiver_future.await;
622 /// assert_eq!(count, 3);
623 /// assert_eq!(buffer, vec![0,1,2])
624 /// }
625 /// ```
626 pub fn poll_recv_many(
627 &mut self,
628 cx: &mut Context<'_>,
629 buffer: &mut Vec<T>,
630 limit: usize,
631 ) -> Poll<usize> {
632 self.chan.recv_many(cx, buffer, limit)
633 }
634}
635
636impl<T> fmt::Debug for Receiver<T> {
637 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
638 fmt.debug_struct("Receiver")
639 .field("chan", &self.chan)
640 .finish()
641 }
642}
643
644impl<T> Unpin for Receiver<T> {}
645
646impl<T> Sender<T> {
647 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
648 Sender { chan }
649 }
650
651 /// Sends a value, waiting until there is capacity.
652 ///
653 /// A successful send occurs when it is determined that the other end of the
654 /// channel has not hung up already. An unsuccessful send would be one where
655 /// the corresponding receiver has already been closed. Note that a return
656 /// value of `Err` means that the data will never be received, but a return
657 /// value of `Ok` does not mean that the data will be received. It is
658 /// possible for the corresponding receiver to hang up immediately after
659 /// this function returns `Ok`.
660 ///
661 /// # Errors
662 ///
663 /// If the receive half of the channel is closed, either due to [`close`]
664 /// being called or the [`Receiver`] handle dropping, the function returns
665 /// an error. The error includes the value passed to `send`.
666 ///
667 /// [`close`]: Receiver::close
668 /// [`Receiver`]: Receiver
669 ///
670 /// # Cancel safety
671 ///
672 /// If `send` is used as the event in a [`tokio::select!`](crate::select)
673 /// statement and some other branch completes first, then it is guaranteed
674 /// that the message was not sent. **However, in that case, the message
675 /// is dropped and will be lost.**
676 ///
677 /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
678 /// capacity, then use the returned [`Permit`] to send the message.
679 ///
680 /// This channel uses a queue to ensure that calls to `send` and `reserve`
681 /// complete in the order they were requested. Cancelling a call to
682 /// `send` makes you lose your place in the queue.
683 ///
684 /// # Examples
685 ///
686 /// In the following example, each call to `send` will block until the
687 /// previously sent value was received.
688 ///
689 /// ```rust
690 /// use tokio::sync::mpsc;
691 ///
692 /// #[tokio::main]
693 /// async fn main() {
694 /// let (tx, mut rx) = mpsc::channel(1);
695 ///
696 /// tokio::spawn(async move {
697 /// for i in 0..10 {
698 /// if let Err(_) = tx.send(i).await {
699 /// println!("receiver dropped");
700 /// return;
701 /// }
702 /// }
703 /// });
704 ///
705 /// while let Some(i) = rx.recv().await {
706 /// println!("got = {}", i);
707 /// }
708 /// }
709 /// ```
710 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
711 match self.reserve().await {
712 Ok(permit) => {
713 permit.send(value);
714 Ok(())
715 }
716 Err(_) => Err(SendError(value)),
717 }
718 }
719
720 /// Completes when the receiver has dropped.
721 ///
722 /// This allows the producers to get notified when interest in the produced
723 /// values is canceled and immediately stop doing work.
724 ///
725 /// # Cancel safety
726 ///
727 /// This method is cancel safe. Once the channel is closed, it stays closed
728 /// forever and all future calls to `closed` will return immediately.
729 ///
730 /// # Examples
731 ///
732 /// ```
733 /// use tokio::sync::mpsc;
734 ///
735 /// #[tokio::main]
736 /// async fn main() {
737 /// let (tx1, rx) = mpsc::channel::<()>(1);
738 /// let tx2 = tx1.clone();
739 /// let tx3 = tx1.clone();
740 /// let tx4 = tx1.clone();
741 /// let tx5 = tx1.clone();
742 /// tokio::spawn(async move {
743 /// drop(rx);
744 /// });
745 ///
746 /// futures::join!(
747 /// tx1.closed(),
748 /// tx2.closed(),
749 /// tx3.closed(),
750 /// tx4.closed(),
751 /// tx5.closed()
752 /// );
753 /// println!("Receiver dropped");
754 /// }
755 /// ```
756 pub async fn closed(&self) {
757 self.chan.closed().await;
758 }
759
760 /// Attempts to immediately send a message on this `Sender`
761 ///
762 /// This method differs from [`send`] by returning immediately if the channel's
763 /// buffer is full or no receiver is waiting to acquire some data. Compared
764 /// with [`send`], this function has two failure cases instead of one (one for
765 /// disconnection, one for a full buffer).
766 ///
767 /// # Errors
768 ///
769 /// If the channel capacity has been reached, i.e., the channel has `n`
770 /// buffered values where `n` is the argument passed to [`channel`], then an
771 /// error is returned.
772 ///
773 /// If the receive half of the channel is closed, either due to [`close`]
774 /// being called or the [`Receiver`] handle dropping, the function returns
775 /// an error. The error includes the value passed to `send`.
776 ///
777 /// [`send`]: Sender::send
778 /// [`channel`]: channel
779 /// [`close`]: Receiver::close
780 ///
781 /// # Examples
782 ///
783 /// ```
784 /// use tokio::sync::mpsc;
785 ///
786 /// #[tokio::main]
787 /// async fn main() {
788 /// // Create a channel with buffer size 1
789 /// let (tx1, mut rx) = mpsc::channel(1);
790 /// let tx2 = tx1.clone();
791 ///
792 /// tokio::spawn(async move {
793 /// tx1.send(1).await.unwrap();
794 /// tx1.send(2).await.unwrap();
795 /// // task waits until the receiver receives a value.
796 /// });
797 ///
798 /// tokio::spawn(async move {
799 /// // This will return an error and send
800 /// // no message if the buffer is full
801 /// let _ = tx2.try_send(3);
802 /// });
803 ///
804 /// let mut msg;
805 /// msg = rx.recv().await.unwrap();
806 /// println!("message {} received", msg);
807 ///
808 /// msg = rx.recv().await.unwrap();
809 /// println!("message {} received", msg);
810 ///
811 /// // Third message may have never been sent
812 /// match rx.recv().await {
813 /// Some(msg) => println!("message {} received", msg),
814 /// None => println!("the third message was never sent"),
815 /// }
816 /// }
817 /// ```
818 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
819 match self.chan.semaphore().semaphore.try_acquire(1) {
820 Ok(()) => {}
821 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
822 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
823 }
824
825 // Send the message
826 self.chan.send(message);
827 Ok(())
828 }
829
830 /// Sends a value, waiting until there is capacity, but only for a limited time.
831 ///
832 /// Shares the same success and error conditions as [`send`], adding one more
833 /// condition for an unsuccessful send, which is when the provided timeout has
834 /// elapsed, and there is no capacity available.
835 ///
836 /// [`send`]: Sender::send
837 ///
838 /// # Errors
839 ///
840 /// If the receive half of the channel is closed, either due to [`close`]
841 /// being called or the [`Receiver`] having been dropped,
842 /// the function returns an error. The error includes the value passed to `send`.
843 ///
844 /// [`close`]: Receiver::close
845 /// [`Receiver`]: Receiver
846 ///
847 /// # Panics
848 ///
849 /// This function panics if it is called outside the context of a Tokio
850 /// runtime [with time enabled](crate::runtime::Builder::enable_time).
851 ///
852 /// # Examples
853 ///
854 /// In the following example, each call to `send_timeout` will block until the
855 /// previously sent value was received, unless the timeout has elapsed.
856 ///
857 /// ```rust
858 /// use tokio::sync::mpsc;
859 /// use tokio::time::{sleep, Duration};
860 ///
861 /// #[tokio::main]
862 /// async fn main() {
863 /// let (tx, mut rx) = mpsc::channel(1);
864 ///
865 /// tokio::spawn(async move {
866 /// for i in 0..10 {
867 /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
868 /// println!("send error: #{:?}", e);
869 /// return;
870 /// }
871 /// }
872 /// });
873 ///
874 /// while let Some(i) = rx.recv().await {
875 /// println!("got = {}", i);
876 /// sleep(Duration::from_millis(200)).await;
877 /// }
878 /// }
879 /// ```
880 #[cfg(feature = "time")]
881 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
882 pub async fn send_timeout(
883 &self,
884 value: T,
885 timeout: Duration,
886 ) -> Result<(), SendTimeoutError<T>> {
887 let permit = match crate::time::timeout(timeout, self.reserve()).await {
888 Err(_) => {
889 return Err(SendTimeoutError::Timeout(value));
890 }
891 Ok(Err(_)) => {
892 return Err(SendTimeoutError::Closed(value));
893 }
894 Ok(Ok(permit)) => permit,
895 };
896
897 permit.send(value);
898 Ok(())
899 }
900
901 /// Blocking send to call outside of asynchronous contexts.
902 ///
903 /// This method is intended for use cases where you are sending from
904 /// synchronous code to asynchronous code, and will work even if the
905 /// receiver is not using [`blocking_recv`] to receive the message.
906 ///
907 /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
908 ///
909 /// # Panics
910 ///
911 /// This function panics if called within an asynchronous execution
912 /// context.
913 ///
914 /// # Examples
915 ///
916 /// ```
917 /// use std::thread;
918 /// use tokio::runtime::Runtime;
919 /// use tokio::sync::mpsc;
920 ///
921 /// fn main() {
922 /// let (tx, mut rx) = mpsc::channel::<u8>(1);
923 ///
924 /// let sync_code = thread::spawn(move || {
925 /// tx.blocking_send(10).unwrap();
926 /// });
927 ///
928 /// Runtime::new().unwrap().block_on(async move {
929 /// assert_eq!(Some(10), rx.recv().await);
930 /// });
931 /// sync_code.join().unwrap()
932 /// }
933 /// ```
934 #[track_caller]
935 #[cfg(feature = "sync")]
936 #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
937 pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
938 crate::future::block_on(self.send(value))
939 }
940
941 /// Checks if the channel has been closed. This happens when the
942 /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
943 /// called.
944 ///
945 /// [`Receiver`]: crate::sync::mpsc::Receiver
946 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
947 ///
948 /// ```
949 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
950 /// assert!(!tx.is_closed());
951 ///
952 /// let tx2 = tx.clone();
953 /// assert!(!tx2.is_closed());
954 ///
955 /// drop(rx);
956 /// assert!(tx.is_closed());
957 /// assert!(tx2.is_closed());
958 /// ```
959 pub fn is_closed(&self) -> bool {
960 self.chan.is_closed()
961 }
962
963 /// Waits for channel capacity. Once capacity to send one message is
964 /// available, it is reserved for the caller.
965 ///
966 /// If the channel is full, the function waits for the number of unreceived
967 /// messages to become less than the channel capacity. Capacity to send one
968 /// message is reserved for the caller. A [`Permit`] is returned to track
969 /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
970 /// reserved capacity.
971 ///
972 /// Dropping [`Permit`] without sending a message releases the capacity back
973 /// to the channel.
974 ///
975 /// [`Permit`]: Permit
976 /// [`send`]: Permit::send
977 ///
978 /// # Cancel safety
979 ///
980 /// This channel uses a queue to ensure that calls to `send` and `reserve`
981 /// complete in the order they were requested. Cancelling a call to
982 /// `reserve` makes you lose your place in the queue.
983 ///
984 /// # Examples
985 ///
986 /// ```
987 /// use tokio::sync::mpsc;
988 ///
989 /// #[tokio::main]
990 /// async fn main() {
991 /// let (tx, mut rx) = mpsc::channel(1);
992 ///
993 /// // Reserve capacity
994 /// let permit = tx.reserve().await.unwrap();
995 ///
996 /// // Trying to send directly on the `tx` will fail due to no
997 /// // available capacity.
998 /// assert!(tx.try_send(123).is_err());
999 ///
1000 /// // Sending on the permit succeeds
1001 /// permit.send(456);
1002 ///
1003 /// // The value sent on the permit is received
1004 /// assert_eq!(rx.recv().await.unwrap(), 456);
1005 /// }
1006 /// ```
1007 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
1008 self.reserve_inner(1).await?;
1009 Ok(Permit { chan: &self.chan })
1010 }
1011
1012 /// Waits for channel capacity. Once capacity to send `n` messages is
1013 /// available, it is reserved for the caller.
1014 ///
1015 /// If the channel is full or if there are fewer than `n` permits available, the function waits
1016 /// for the number of unreceived messages to become `n` less than the channel capacity.
1017 /// Capacity to send `n` message is then reserved for the caller.
1018 ///
1019 /// A [`PermitIterator`] is returned to track the reserved capacity.
1020 /// You can call this [`Iterator`] until it is exhausted to
1021 /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1022 /// [`try_reserve_many`] except it awaits for the slots to become available.
1023 ///
1024 /// If the channel is closed, the function returns a [`SendError`].
1025 ///
1026 /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1027 /// permits back to the channel.
1028 ///
1029 /// [`PermitIterator`]: PermitIterator
1030 /// [`Permit`]: Permit
1031 /// [`send`]: Permit::send
1032 /// [`try_reserve_many`]: Sender::try_reserve_many
1033 ///
1034 /// # Cancel safety
1035 ///
1036 /// This channel uses a queue to ensure that calls to `send` and `reserve_many`
1037 /// complete in the order they were requested. Cancelling a call to
1038 /// `reserve_many` makes you lose your place in the queue.
1039 ///
1040 /// # Examples
1041 ///
1042 /// ```
1043 /// use tokio::sync::mpsc;
1044 ///
1045 /// #[tokio::main]
1046 /// async fn main() {
1047 /// let (tx, mut rx) = mpsc::channel(2);
1048 ///
1049 /// // Reserve capacity
1050 /// let mut permit = tx.reserve_many(2).await.unwrap();
1051 ///
1052 /// // Trying to send directly on the `tx` will fail due to no
1053 /// // available capacity.
1054 /// assert!(tx.try_send(123).is_err());
1055 ///
1056 /// // Sending with the permit iterator succeeds
1057 /// permit.next().unwrap().send(456);
1058 /// permit.next().unwrap().send(457);
1059 ///
1060 /// // The iterator should now be exhausted
1061 /// assert!(permit.next().is_none());
1062 ///
1063 /// // The value sent on the permit is received
1064 /// assert_eq!(rx.recv().await.unwrap(), 456);
1065 /// assert_eq!(rx.recv().await.unwrap(), 457);
1066 /// }
1067 /// ```
1068 pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
1069 self.reserve_inner(n).await?;
1070 Ok(PermitIterator {
1071 chan: &self.chan,
1072 n,
1073 })
1074 }
1075
1076 /// Waits for channel capacity, moving the `Sender` and returning an owned
1077 /// permit. Once capacity to send one message is available, it is reserved
1078 /// for the caller.
1079 ///
1080 /// This moves the sender _by value_, and returns an owned permit that can
1081 /// be used to send a message into the channel. Unlike [`Sender::reserve`],
1082 /// this method may be used in cases where the permit must be valid for the
1083 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1084 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1085 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1086 /// moved, it can be cloned prior to calling `reserve_owned`.
1087 ///
1088 /// If the channel is full, the function waits for the number of unreceived
1089 /// messages to become less than the channel capacity. Capacity to send one
1090 /// message is reserved for the caller. An [`OwnedPermit`] is returned to
1091 /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
1092 /// consumes the reserved capacity.
1093 ///
1094 /// Dropping the [`OwnedPermit`] without sending a message releases the
1095 /// capacity back to the channel.
1096 ///
1097 /// # Cancel safety
1098 ///
1099 /// This channel uses a queue to ensure that calls to `send` and `reserve`
1100 /// complete in the order they were requested. Cancelling a call to
1101 /// `reserve_owned` makes you lose your place in the queue.
1102 ///
1103 /// # Examples
1104 /// Sending a message using an [`OwnedPermit`]:
1105 /// ```
1106 /// use tokio::sync::mpsc;
1107 ///
1108 /// #[tokio::main]
1109 /// async fn main() {
1110 /// let (tx, mut rx) = mpsc::channel(1);
1111 ///
1112 /// // Reserve capacity, moving the sender.
1113 /// let permit = tx.reserve_owned().await.unwrap();
1114 ///
1115 /// // Send a message, consuming the permit and returning
1116 /// // the moved sender.
1117 /// let tx = permit.send(123);
1118 ///
1119 /// // The value sent on the permit is received.
1120 /// assert_eq!(rx.recv().await.unwrap(), 123);
1121 ///
1122 /// // The sender can now be used again.
1123 /// tx.send(456).await.unwrap();
1124 /// }
1125 /// ```
1126 ///
1127 /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
1128 /// by value, it can be inexpensively cloned before calling `reserve_owned`:
1129 ///
1130 /// ```
1131 /// use tokio::sync::mpsc;
1132 ///
1133 /// #[tokio::main]
1134 /// async fn main() {
1135 /// let (tx, mut rx) = mpsc::channel(1);
1136 ///
1137 /// // Clone the sender and reserve capacity.
1138 /// let permit = tx.clone().reserve_owned().await.unwrap();
1139 ///
1140 /// // Trying to send directly on the `tx` will fail due to no
1141 /// // available capacity.
1142 /// assert!(tx.try_send(123).is_err());
1143 ///
1144 /// // Sending on the permit succeeds.
1145 /// permit.send(456);
1146 ///
1147 /// // The value sent on the permit is received
1148 /// assert_eq!(rx.recv().await.unwrap(), 456);
1149 /// }
1150 /// ```
1151 ///
1152 /// [`Sender::reserve`]: Sender::reserve
1153 /// [`OwnedPermit`]: OwnedPermit
1154 /// [`send`]: OwnedPermit::send
1155 /// [`Arc::clone`]: std::sync::Arc::clone
1156 pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
1157 self.reserve_inner(1).await?;
1158 Ok(OwnedPermit {
1159 chan: Some(self.chan),
1160 })
1161 }
1162
1163 async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
1164 crate::trace::async_trace_leaf().await;
1165
1166 if n > self.max_capacity() {
1167 return Err(SendError(()));
1168 }
1169 match self.chan.semaphore().semaphore.acquire(n).await {
1170 Ok(()) => Ok(()),
1171 Err(_) => Err(SendError(())),
1172 }
1173 }
1174
1175 /// Tries to acquire a slot in the channel without waiting for the slot to become
1176 /// available.
1177 ///
1178 /// If the channel is full this function will return [`TrySendError`], otherwise
1179 /// if there is a slot available it will return a [`Permit`] that will then allow you
1180 /// to [`send`] on the channel with a guaranteed slot. This function is similar to
1181 /// [`reserve`] except it does not await for the slot to become available.
1182 ///
1183 /// Dropping [`Permit`] without sending a message releases the capacity back
1184 /// to the channel.
1185 ///
1186 /// [`Permit`]: Permit
1187 /// [`send`]: Permit::send
1188 /// [`reserve`]: Sender::reserve
1189 ///
1190 /// # Examples
1191 ///
1192 /// ```
1193 /// use tokio::sync::mpsc;
1194 ///
1195 /// #[tokio::main]
1196 /// async fn main() {
1197 /// let (tx, mut rx) = mpsc::channel(1);
1198 ///
1199 /// // Reserve capacity
1200 /// let permit = tx.try_reserve().unwrap();
1201 ///
1202 /// // Trying to send directly on the `tx` will fail due to no
1203 /// // available capacity.
1204 /// assert!(tx.try_send(123).is_err());
1205 ///
1206 /// // Trying to reserve an additional slot on the `tx` will
1207 /// // fail because there is no capacity.
1208 /// assert!(tx.try_reserve().is_err());
1209 ///
1210 /// // Sending on the permit succeeds
1211 /// permit.send(456);
1212 ///
1213 /// // The value sent on the permit is received
1214 /// assert_eq!(rx.recv().await.unwrap(), 456);
1215 ///
1216 /// }
1217 /// ```
1218 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
1219 match self.chan.semaphore().semaphore.try_acquire(1) {
1220 Ok(()) => {}
1221 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1222 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1223 }
1224
1225 Ok(Permit { chan: &self.chan })
1226 }
1227
1228 /// Tries to acquire `n` slots in the channel without waiting for the slot to become
1229 /// available.
1230 ///
1231 /// A [`PermitIterator`] is returned to track the reserved capacity.
1232 /// You can call this [`Iterator`] until it is exhausted to
1233 /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1234 /// [`reserve_many`] except it does not await for the slots to become available.
1235 ///
1236 /// If there are fewer than `n` permits available on the channel, then
1237 /// this function will return a [`TrySendError::Full`]. If the channel is closed
1238 /// this function will return a [`TrySendError::Closed`].
1239 ///
1240 /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1241 /// permits back to the channel.
1242 ///
1243 /// [`PermitIterator`]: PermitIterator
1244 /// [`send`]: Permit::send
1245 /// [`reserve_many`]: Sender::reserve_many
1246 ///
1247 /// # Examples
1248 ///
1249 /// ```
1250 /// use tokio::sync::mpsc;
1251 ///
1252 /// #[tokio::main]
1253 /// async fn main() {
1254 /// let (tx, mut rx) = mpsc::channel(2);
1255 ///
1256 /// // Reserve capacity
1257 /// let mut permit = tx.try_reserve_many(2).unwrap();
1258 ///
1259 /// // Trying to send directly on the `tx` will fail due to no
1260 /// // available capacity.
1261 /// assert!(tx.try_send(123).is_err());
1262 ///
1263 /// // Trying to reserve an additional slot on the `tx` will
1264 /// // fail because there is no capacity.
1265 /// assert!(tx.try_reserve().is_err());
1266 ///
1267 /// // Sending with the permit iterator succeeds
1268 /// permit.next().unwrap().send(456);
1269 /// permit.next().unwrap().send(457);
1270 ///
1271 /// // The iterator should now be exhausted
1272 /// assert!(permit.next().is_none());
1273 ///
1274 /// // The value sent on the permit is received
1275 /// assert_eq!(rx.recv().await.unwrap(), 456);
1276 /// assert_eq!(rx.recv().await.unwrap(), 457);
1277 ///
1278 /// // Trying to call try_reserve_many with 0 will return an empty iterator
1279 /// let mut permit = tx.try_reserve_many(0).unwrap();
1280 /// assert!(permit.next().is_none());
1281 ///
1282 /// // Trying to call try_reserve_many with a number greater than the channel
1283 /// // capacity will return an error
1284 /// let permit = tx.try_reserve_many(3);
1285 /// assert!(permit.is_err());
1286 ///
1287 /// // Trying to call try_reserve_many on a closed channel will return an error
1288 /// drop(rx);
1289 /// let permit = tx.try_reserve_many(1);
1290 /// assert!(permit.is_err());
1291 ///
1292 /// let permit = tx.try_reserve_many(0);
1293 /// assert!(permit.is_err());
1294 /// }
1295 /// ```
1296 pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
1297 if n > self.max_capacity() {
1298 return Err(TrySendError::Full(()));
1299 }
1300
1301 match self.chan.semaphore().semaphore.try_acquire(n) {
1302 Ok(()) => {}
1303 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1304 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1305 }
1306
1307 Ok(PermitIterator {
1308 chan: &self.chan,
1309 n,
1310 })
1311 }
1312
1313 /// Tries to acquire a slot in the channel without waiting for the slot to become
1314 /// available, returning an owned permit.
1315 ///
1316 /// This moves the sender _by value_, and returns an owned permit that can
1317 /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1318 /// this method may be used in cases where the permit must be valid for the
1319 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1320 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1321 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1322 /// moved, it can be cloned prior to calling `try_reserve_owned`.
1323 ///
1324 /// If the channel is full this function will return a [`TrySendError`].
1325 /// Since the sender is taken by value, the `TrySendError` returned in this
1326 /// case contains the sender, so that it may be used again. Otherwise, if
1327 /// there is a slot available, this method will return an [`OwnedPermit`]
1328 /// that can then be used to [`send`] on the channel with a guaranteed slot.
1329 /// This function is similar to [`reserve_owned`] except it does not await
1330 /// for the slot to become available.
1331 ///
1332 /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1333 /// to the channel.
1334 ///
1335 /// [`OwnedPermit`]: OwnedPermit
1336 /// [`send`]: OwnedPermit::send
1337 /// [`reserve_owned`]: Sender::reserve_owned
1338 /// [`Arc::clone`]: std::sync::Arc::clone
1339 ///
1340 /// # Examples
1341 ///
1342 /// ```
1343 /// use tokio::sync::mpsc;
1344 ///
1345 /// #[tokio::main]
1346 /// async fn main() {
1347 /// let (tx, mut rx) = mpsc::channel(1);
1348 ///
1349 /// // Reserve capacity
1350 /// let permit = tx.clone().try_reserve_owned().unwrap();
1351 ///
1352 /// // Trying to send directly on the `tx` will fail due to no
1353 /// // available capacity.
1354 /// assert!(tx.try_send(123).is_err());
1355 ///
1356 /// // Trying to reserve an additional slot on the `tx` will
1357 /// // fail because there is no capacity.
1358 /// assert!(tx.try_reserve().is_err());
1359 ///
1360 /// // Sending on the permit succeeds
1361 /// permit.send(456);
1362 ///
1363 /// // The value sent on the permit is received
1364 /// assert_eq!(rx.recv().await.unwrap(), 456);
1365 ///
1366 /// }
1367 /// ```
1368 pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1369 match self.chan.semaphore().semaphore.try_acquire(1) {
1370 Ok(()) => {}
1371 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1372 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1373 }
1374
1375 Ok(OwnedPermit {
1376 chan: Some(self.chan),
1377 })
1378 }
1379
1380 /// Returns `true` if senders belong to the same channel.
1381 ///
1382 /// # Examples
1383 ///
1384 /// ```
1385 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1386 /// let tx2 = tx.clone();
1387 /// assert!(tx.same_channel(&tx2));
1388 ///
1389 /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1390 /// assert!(!tx3.same_channel(&tx2));
1391 /// ```
1392 pub fn same_channel(&self, other: &Self) -> bool {
1393 self.chan.same_channel(&other.chan)
1394 }
1395
1396 /// Returns the current capacity of the channel.
1397 ///
1398 /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1399 /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
1400 /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1401 /// specified when calling [`channel`]
1402 ///
1403 /// # Examples
1404 ///
1405 /// ```
1406 /// use tokio::sync::mpsc;
1407 ///
1408 /// #[tokio::main]
1409 /// async fn main() {
1410 /// let (tx, mut rx) = mpsc::channel::<()>(5);
1411 ///
1412 /// assert_eq!(tx.capacity(), 5);
1413 ///
1414 /// // Making a reservation drops the capacity by one.
1415 /// let permit = tx.reserve().await.unwrap();
1416 /// assert_eq!(tx.capacity(), 4);
1417 ///
1418 /// // Sending and receiving a value increases the capacity by one.
1419 /// permit.send(());
1420 /// rx.recv().await.unwrap();
1421 /// assert_eq!(tx.capacity(), 5);
1422 /// }
1423 /// ```
1424 ///
1425 /// [`send`]: Sender::send
1426 /// [`reserve`]: Sender::reserve
1427 /// [`channel`]: channel
1428 /// [`max_capacity`]: Sender::max_capacity
1429 pub fn capacity(&self) -> usize {
1430 self.chan.semaphore().semaphore.available_permits()
1431 }
1432
1433 /// Converts the `Sender` to a [`WeakSender`] that does not count
1434 /// towards RAII semantics, i.e. if all `Sender` instances of the
1435 /// channel were dropped and only `WeakSender` instances remain,
1436 /// the channel is closed.
1437 #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
1438 pub fn downgrade(&self) -> WeakSender<T> {
1439 WeakSender {
1440 chan: self.chan.downgrade(),
1441 }
1442 }
1443
1444 /// Returns the maximum buffer capacity of the channel.
1445 ///
1446 /// The maximum capacity is the buffer capacity initially specified when calling
1447 /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1448 /// available buffer capacity: as messages are sent and received, the
1449 /// value returned by [`capacity`] will go up or down, whereas the value
1450 /// returned by `max_capacity` will remain constant.
1451 ///
1452 /// # Examples
1453 ///
1454 /// ```
1455 /// use tokio::sync::mpsc;
1456 ///
1457 /// #[tokio::main]
1458 /// async fn main() {
1459 /// let (tx, _rx) = mpsc::channel::<()>(5);
1460 ///
1461 /// // both max capacity and capacity are the same at first
1462 /// assert_eq!(tx.max_capacity(), 5);
1463 /// assert_eq!(tx.capacity(), 5);
1464 ///
1465 /// // Making a reservation doesn't change the max capacity.
1466 /// let permit = tx.reserve().await.unwrap();
1467 /// assert_eq!(tx.max_capacity(), 5);
1468 /// // but drops the capacity by one
1469 /// assert_eq!(tx.capacity(), 4);
1470 /// }
1471 /// ```
1472 ///
1473 /// [`channel`]: channel
1474 /// [`max_capacity`]: Sender::max_capacity
1475 /// [`capacity`]: Sender::capacity
1476 pub fn max_capacity(&self) -> usize {
1477 self.chan.semaphore().bound
1478 }
1479
1480 /// Returns the number of [`Sender`] handles.
1481 pub fn strong_count(&self) -> usize {
1482 self.chan.strong_count()
1483 }
1484
1485 /// Returns the number of [`WeakSender`] handles.
1486 pub fn weak_count(&self) -> usize {
1487 self.chan.weak_count()
1488 }
1489}
1490
1491impl<T> Clone for Sender<T> {
1492 fn clone(&self) -> Self {
1493 Sender {
1494 chan: self.chan.clone(),
1495 }
1496 }
1497}
1498
1499impl<T> fmt::Debug for Sender<T> {
1500 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1501 fmt.debug_struct("Sender")
1502 .field("chan", &self.chan)
1503 .finish()
1504 }
1505}
1506
1507impl<T> Clone for WeakSender<T> {
1508 fn clone(&self) -> Self {
1509 self.chan.increment_weak_count();
1510
1511 WeakSender {
1512 chan: self.chan.clone(),
1513 }
1514 }
1515}
1516
1517impl<T> Drop for WeakSender<T> {
1518 fn drop(&mut self) {
1519 self.chan.decrement_weak_count();
1520 }
1521}
1522
1523impl<T> WeakSender<T> {
1524 /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1525 /// if there are other `Sender` instances alive and the channel wasn't
1526 /// previously dropped, otherwise `None` is returned.
1527 pub fn upgrade(&self) -> Option<Sender<T>> {
1528 chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1529 }
1530
1531 /// Returns the number of [`Sender`] handles.
1532 pub fn strong_count(&self) -> usize {
1533 self.chan.strong_count()
1534 }
1535
1536 /// Returns the number of [`WeakSender`] handles.
1537 pub fn weak_count(&self) -> usize {
1538 self.chan.weak_count()
1539 }
1540}
1541
1542impl<T> fmt::Debug for WeakSender<T> {
1543 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1544 fmt.debug_struct("WeakSender").finish()
1545 }
1546}
1547
1548// ===== impl Permit =====
1549
1550impl<T> Permit<'_, T> {
1551 /// Sends a value using the reserved capacity.
1552 ///
1553 /// Capacity for the message has already been reserved. The message is sent
1554 /// to the receiver and the permit is consumed. The operation will succeed
1555 /// even if the receiver half has been closed. See [`Receiver::close`] for
1556 /// more details on performing a clean shutdown.
1557 ///
1558 /// [`Receiver::close`]: Receiver::close
1559 ///
1560 /// # Examples
1561 ///
1562 /// ```
1563 /// use tokio::sync::mpsc;
1564 ///
1565 /// #[tokio::main]
1566 /// async fn main() {
1567 /// let (tx, mut rx) = mpsc::channel(1);
1568 ///
1569 /// // Reserve capacity
1570 /// let permit = tx.reserve().await.unwrap();
1571 ///
1572 /// // Trying to send directly on the `tx` will fail due to no
1573 /// // available capacity.
1574 /// assert!(tx.try_send(123).is_err());
1575 ///
1576 /// // Send a message on the permit
1577 /// permit.send(456);
1578 ///
1579 /// // The value sent on the permit is received
1580 /// assert_eq!(rx.recv().await.unwrap(), 456);
1581 /// }
1582 /// ```
1583 pub fn send(self, value: T) {
1584 use std::mem;
1585
1586 self.chan.send(value);
1587
1588 // Avoid the drop logic
1589 mem::forget(self);
1590 }
1591}
1592
1593impl<T> Drop for Permit<'_, T> {
1594 fn drop(&mut self) {
1595 use chan::Semaphore;
1596
1597 let semaphore = self.chan.semaphore();
1598
1599 // Add the permit back to the semaphore
1600 semaphore.add_permit();
1601
1602 // If this is the last sender for this channel, wake the receiver so
1603 // that it can be notified that the channel is closed.
1604 if semaphore.is_closed() && semaphore.is_idle() {
1605 self.chan.wake_rx();
1606 }
1607 }
1608}
1609
1610impl<T> fmt::Debug for Permit<'_, T> {
1611 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1612 fmt.debug_struct("Permit")
1613 .field("chan", &self.chan)
1614 .finish()
1615 }
1616}
1617
1618// ===== impl PermitIterator =====
1619
1620impl<'a, T> Iterator for PermitIterator<'a, T> {
1621 type Item = Permit<'a, T>;
1622
1623 fn next(&mut self) -> Option<Self::Item> {
1624 if self.n == 0 {
1625 return None;
1626 }
1627
1628 self.n -= 1;
1629 Some(Permit { chan: self.chan })
1630 }
1631
1632 fn size_hint(&self) -> (usize, Option<usize>) {
1633 let n = self.n;
1634 (n, Some(n))
1635 }
1636}
1637impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
1638impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
1639
1640impl<T> Drop for PermitIterator<'_, T> {
1641 fn drop(&mut self) {
1642 use chan::Semaphore;
1643
1644 if self.n == 0 {
1645 return;
1646 }
1647
1648 let semaphore = self.chan.semaphore();
1649
1650 // Add the remaining permits back to the semaphore
1651 semaphore.add_permits(self.n);
1652
1653 // If this is the last sender for this channel, wake the receiver so
1654 // that it can be notified that the channel is closed.
1655 if semaphore.is_closed() && semaphore.is_idle() {
1656 self.chan.wake_rx();
1657 }
1658 }
1659}
1660
1661impl<T> fmt::Debug for PermitIterator<'_, T> {
1662 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1663 fmt.debug_struct("PermitIterator")
1664 .field("chan", &self.chan)
1665 .field("capacity", &self.n)
1666 .finish()
1667 }
1668}
1669
1670// ===== impl Permit =====
1671
1672impl<T> OwnedPermit<T> {
1673 /// Sends a value using the reserved capacity.
1674 ///
1675 /// Capacity for the message has already been reserved. The message is sent
1676 /// to the receiver and the permit is consumed. The operation will succeed
1677 /// even if the receiver half has been closed. See [`Receiver::close`] for
1678 /// more details on performing a clean shutdown.
1679 ///
1680 /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1681 /// the `OwnedPermit` was reserved.
1682 ///
1683 /// [`Receiver::close`]: Receiver::close
1684 ///
1685 /// # Examples
1686 ///
1687 /// ```
1688 /// use tokio::sync::mpsc;
1689 ///
1690 /// #[tokio::main]
1691 /// async fn main() {
1692 /// let (tx, mut rx) = mpsc::channel(1);
1693 ///
1694 /// // Reserve capacity
1695 /// let permit = tx.reserve_owned().await.unwrap();
1696 ///
1697 /// // Send a message on the permit, returning the sender.
1698 /// let tx = permit.send(456);
1699 ///
1700 /// // The value sent on the permit is received
1701 /// assert_eq!(rx.recv().await.unwrap(), 456);
1702 ///
1703 /// // We may now reuse `tx` to send another message.
1704 /// tx.send(789).await.unwrap();
1705 /// }
1706 /// ```
1707 pub fn send(mut self, value: T) -> Sender<T> {
1708 let chan = self.chan.take().unwrap_or_else(|| {
1709 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1710 });
1711 chan.send(value);
1712
1713 Sender { chan }
1714 }
1715
1716 /// Releases the reserved capacity *without* sending a message, returning the
1717 /// [`Sender`].
1718 ///
1719 /// # Examples
1720 ///
1721 /// ```
1722 /// use tokio::sync::mpsc;
1723 ///
1724 /// #[tokio::main]
1725 /// async fn main() {
1726 /// let (tx, rx) = mpsc::channel(1);
1727 ///
1728 /// // Clone the sender and reserve capacity
1729 /// let permit = tx.clone().reserve_owned().await.unwrap();
1730 ///
1731 /// // Trying to send on the original `tx` will fail, since the `permit`
1732 /// // has reserved all the available capacity.
1733 /// assert!(tx.try_send(123).is_err());
1734 ///
1735 /// // Release the permit without sending a message, returning the clone
1736 /// // of the sender.
1737 /// let tx2 = permit.release();
1738 ///
1739 /// // We may now reuse `tx` to send another message.
1740 /// tx.send(789).await.unwrap();
1741 /// # drop(rx); drop(tx2);
1742 /// }
1743 /// ```
1744 ///
1745 /// [`Sender`]: Sender
1746 pub fn release(mut self) -> Sender<T> {
1747 use chan::Semaphore;
1748
1749 let chan = self.chan.take().unwrap_or_else(|| {
1750 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1751 });
1752
1753 // Add the permit back to the semaphore
1754 chan.semaphore().add_permit();
1755 Sender { chan }
1756 }
1757}
1758
1759impl<T> Drop for OwnedPermit<T> {
1760 fn drop(&mut self) {
1761 use chan::Semaphore;
1762
1763 // Are we still holding onto the sender?
1764 if let Some(chan) = self.chan.take() {
1765 let semaphore = chan.semaphore();
1766
1767 // Add the permit back to the semaphore
1768 semaphore.add_permit();
1769
1770 // If this `OwnedPermit` is holding the last sender for this
1771 // channel, wake the receiver so that it can be notified that the
1772 // channel is closed.
1773 if semaphore.is_closed() && semaphore.is_idle() {
1774 chan.wake_rx();
1775 }
1776 }
1777
1778 // Otherwise, do nothing.
1779 }
1780}
1781
1782impl<T> fmt::Debug for OwnedPermit<T> {
1783 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1784 fmt.debug_struct("OwnedPermit")
1785 .field("chan", &self.chan)
1786 .finish()
1787 }
1788}