madsim_real_tokio/sync/mpsc/
bounded.rs

1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7    use crate::sync::mpsc::error::SendTimeoutError;
8    use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`] function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23    chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// # Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// #[tokio::main]
44/// async fn main() {
45///     let (tx, _rx) = channel::<i32>(15);
46///     let tx_weak = tx.downgrade();
47///
48///     // Upgrading will succeed because `tx` still exists.
49///     assert!(tx_weak.upgrade().is_some());
50///
51///     // If we drop `tx`, then it will fail.
52///     drop(tx);
53///     assert!(tx_weak.clone().upgrade().is_none());
54/// }
55/// ```
56pub struct WeakSender<T> {
57    chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68    chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel.
72///
73/// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
74/// and are used to guarantee channel capacity before generating `n` messages to send.
75///
76/// [`Sender::reserve_many()`]: Sender::reserve_many
77/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
78pub struct PermitIterator<'a, T> {
79    chan: &'a chan::Tx<T, Semaphore>,
80    n: usize,
81}
82
83/// Owned permit to send one value into the channel.
84///
85/// This is identical to the [`Permit`] type, except that it moves the sender
86/// rather than borrowing it.
87///
88/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
89/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
90/// before generating a message to send.
91///
92/// [`Permit`]: Permit
93/// [`Sender::reserve_owned()`]: Sender::reserve_owned
94/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
95pub struct OwnedPermit<T> {
96    chan: Option<chan::Tx<T, Semaphore>>,
97}
98
99/// Receives values from the associated `Sender`.
100///
101/// Instances are created by the [`channel`] function.
102///
103/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
104///
105/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
106pub struct Receiver<T> {
107    /// The channel receiver.
108    chan: chan::Rx<T, Semaphore>,
109}
110
111/// Creates a bounded mpsc channel for communicating between asynchronous tasks
112/// with backpressure.
113///
114/// The channel will buffer up to the provided number of messages.  Once the
115/// buffer is full, attempts to send new messages will wait until a message is
116/// received from the channel. The provided buffer capacity must be at least 1.
117///
118/// All data sent on `Sender` will become available on `Receiver` in the same
119/// order as it was sent.
120///
121/// The `Sender` can be cloned to `send` to the same channel from multiple code
122/// locations. Only one `Receiver` is supported.
123///
124/// If the `Receiver` is disconnected while trying to `send`, the `send` method
125/// will return a `SendError`. Similarly, if `Sender` is disconnected while
126/// trying to `recv`, the `recv` method will return `None`.
127///
128/// # Panics
129///
130/// Panics if the buffer capacity is 0.
131///
132/// # Examples
133///
134/// ```rust
135/// use tokio::sync::mpsc;
136///
137/// #[tokio::main]
138/// async fn main() {
139///     let (tx, mut rx) = mpsc::channel(100);
140///
141///     tokio::spawn(async move {
142///         for i in 0..10 {
143///             if let Err(_) = tx.send(i).await {
144///                 println!("receiver dropped");
145///                 return;
146///             }
147///         }
148///     });
149///
150///     while let Some(i) = rx.recv().await {
151///         println!("got = {}", i);
152///     }
153/// }
154/// ```
155#[track_caller]
156pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
157    assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
158    let semaphore = Semaphore {
159        semaphore: semaphore::Semaphore::new(buffer),
160        bound: buffer,
161    };
162    let (tx, rx) = chan::channel(semaphore);
163
164    let tx = Sender::new(tx);
165    let rx = Receiver::new(rx);
166
167    (tx, rx)
168}
169
170/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
171/// representing the channel bound.
172#[derive(Debug)]
173pub(crate) struct Semaphore {
174    pub(crate) semaphore: semaphore::Semaphore,
175    pub(crate) bound: usize,
176}
177
178impl<T> Receiver<T> {
179    pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
180        Receiver { chan }
181    }
182
183    /// Receives the next value for this receiver.
184    ///
185    /// This method returns `None` if the channel has been closed and there are
186    /// no remaining messages in the channel's buffer. This indicates that no
187    /// further values can ever be received from this `Receiver`. The channel is
188    /// closed when all senders have been dropped, or when [`close`] is called.
189    ///
190    /// If there are no messages in the channel's buffer, but the channel has
191    /// not yet been closed, this method will sleep until a message is sent or
192    /// the channel is closed.  Note that if [`close`] is called, but there are
193    /// still outstanding [`Permits`] from before it was closed, the channel is
194    /// not considered closed by `recv` until the permits are released.
195    ///
196    /// # Cancel safety
197    ///
198    /// This method is cancel safe. If `recv` is used as the event in a
199    /// [`tokio::select!`](crate::select) statement and some other branch
200    /// completes first, it is guaranteed that no messages were received on this
201    /// channel.
202    ///
203    /// [`close`]: Self::close
204    /// [`Permits`]: struct@crate::sync::mpsc::Permit
205    ///
206    /// # Examples
207    ///
208    /// ```
209    /// use tokio::sync::mpsc;
210    ///
211    /// #[tokio::main]
212    /// async fn main() {
213    ///     let (tx, mut rx) = mpsc::channel(100);
214    ///
215    ///     tokio::spawn(async move {
216    ///         tx.send("hello").await.unwrap();
217    ///     });
218    ///
219    ///     assert_eq!(Some("hello"), rx.recv().await);
220    ///     assert_eq!(None, rx.recv().await);
221    /// }
222    /// ```
223    ///
224    /// Values are buffered:
225    ///
226    /// ```
227    /// use tokio::sync::mpsc;
228    ///
229    /// #[tokio::main]
230    /// async fn main() {
231    ///     let (tx, mut rx) = mpsc::channel(100);
232    ///
233    ///     tx.send("hello").await.unwrap();
234    ///     tx.send("world").await.unwrap();
235    ///
236    ///     assert_eq!(Some("hello"), rx.recv().await);
237    ///     assert_eq!(Some("world"), rx.recv().await);
238    /// }
239    /// ```
240    pub async fn recv(&mut self) -> Option<T> {
241        use crate::future::poll_fn;
242        poll_fn(|cx| self.chan.recv(cx)).await
243    }
244
245    /// Receives the next values for this receiver and extends `buffer`.
246    ///
247    /// This method extends `buffer` by no more than a fixed number of values
248    /// as specified by `limit`. If `limit` is zero, the function immediately
249    /// returns `0`. The return value is the number of values added to `buffer`.
250    ///
251    /// For `limit > 0`, if there are no messages in the channel's queue, but
252    /// the channel has not yet been closed, this method will sleep until a
253    /// message is sent or the channel is closed. Note that if [`close`] is
254    /// called, but there are still outstanding [`Permits`] from before it was
255    /// closed, the channel is not considered closed by `recv_many` until the
256    /// permits are released.
257    ///
258    /// For non-zero values of `limit`, this method will never return `0` unless
259    /// the channel has been closed and there are no remaining messages in the
260    /// channel's queue. This indicates that no further values can ever be
261    /// received from this `Receiver`. The channel is closed when all senders
262    /// have been dropped, or when [`close`] is called.
263    ///
264    /// The capacity of `buffer` is increased as needed.
265    ///
266    /// # Cancel safety
267    ///
268    /// This method is cancel safe. If `recv_many` is used as the event in a
269    /// [`tokio::select!`](crate::select) statement and some other branch
270    /// completes first, it is guaranteed that no messages were received on this
271    /// channel.
272    ///
273    /// [`close`]: Self::close
274    /// [`Permits`]: struct@crate::sync::mpsc::Permit
275    ///
276    /// # Examples
277    ///
278    /// ```
279    /// use tokio::sync::mpsc;
280    ///
281    /// #[tokio::main]
282    /// async fn main() {
283    ///     let mut buffer: Vec<&str> = Vec::with_capacity(2);
284    ///     let limit = 2;
285    ///     let (tx, mut rx) = mpsc::channel(100);
286    ///     let tx2 = tx.clone();
287    ///     tx2.send("first").await.unwrap();
288    ///     tx2.send("second").await.unwrap();
289    ///     tx2.send("third").await.unwrap();
290    ///
291    ///     // Call `recv_many` to receive up to `limit` (2) values.
292    ///     assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
293    ///     assert_eq!(vec!["first", "second"], buffer);
294    ///
295    ///     // If the buffer is full, the next call to `recv_many`
296    ///     // reserves additional capacity.
297    ///     assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
298    ///
299    ///     tokio::spawn(async move {
300    ///         tx.send("fourth").await.unwrap();
301    ///     });
302    ///
303    ///     // 'tx' is dropped, but `recv_many`
304    ///     // is guaranteed not to return 0 as the channel
305    ///     // is not yet closed.
306    ///     assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
307    ///     assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
308    ///
309    ///     // Once the last sender is dropped, the channel is
310    ///     // closed and `recv_many` returns 0, capacity unchanged.
311    ///     drop(tx2);
312    ///     assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
313    ///     assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
314    /// }
315    /// ```
316    pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
317        use crate::future::poll_fn;
318        poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
319    }
320
321    /// Tries to receive the next value for this receiver.
322    ///
323    /// This method returns the [`Empty`] error if the channel is currently
324    /// empty, but there are still outstanding [senders] or [permits].
325    ///
326    /// This method returns the [`Disconnected`] error if the channel is
327    /// currently empty, and there are no outstanding [senders] or [permits].
328    ///
329    /// Unlike the [`poll_recv`] method, this method will never return an
330    /// [`Empty`] error spuriously.
331    ///
332    /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
333    /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
334    /// [`poll_recv`]: Self::poll_recv
335    /// [senders]: crate::sync::mpsc::Sender
336    /// [permits]: crate::sync::mpsc::Permit
337    ///
338    /// # Examples
339    ///
340    /// ```
341    /// use tokio::sync::mpsc;
342    /// use tokio::sync::mpsc::error::TryRecvError;
343    ///
344    /// #[tokio::main]
345    /// async fn main() {
346    ///     let (tx, mut rx) = mpsc::channel(100);
347    ///
348    ///     tx.send("hello").await.unwrap();
349    ///
350    ///     assert_eq!(Ok("hello"), rx.try_recv());
351    ///     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
352    ///
353    ///     tx.send("hello").await.unwrap();
354    ///     // Drop the last sender, closing the channel.
355    ///     drop(tx);
356    ///
357    ///     assert_eq!(Ok("hello"), rx.try_recv());
358    ///     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
359    /// }
360    /// ```
361    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
362        self.chan.try_recv()
363    }
364
365    /// Blocking receive to call outside of asynchronous contexts.
366    ///
367    /// This method returns `None` if the channel has been closed and there are
368    /// no remaining messages in the channel's buffer. This indicates that no
369    /// further values can ever be received from this `Receiver`. The channel is
370    /// closed when all senders have been dropped, or when [`close`] is called.
371    ///
372    /// If there are no messages in the channel's buffer, but the channel has
373    /// not yet been closed, this method will block until a message is sent or
374    /// the channel is closed.
375    ///
376    /// This method is intended for use cases where you are sending from
377    /// asynchronous code to synchronous code, and will work even if the sender
378    /// is not using [`blocking_send`] to send the message.
379    ///
380    /// Note that if [`close`] is called, but there are still outstanding
381    /// [`Permits`] from before it was closed, the channel is not considered
382    /// closed by `blocking_recv` until the permits are released.
383    ///
384    /// [`close`]: Self::close
385    /// [`Permits`]: struct@crate::sync::mpsc::Permit
386    /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
387    ///
388    /// # Panics
389    ///
390    /// This function panics if called within an asynchronous execution
391    /// context.
392    ///
393    /// # Examples
394    ///
395    /// ```
396    /// use std::thread;
397    /// use tokio::runtime::Runtime;
398    /// use tokio::sync::mpsc;
399    ///
400    /// fn main() {
401    ///     let (tx, mut rx) = mpsc::channel::<u8>(10);
402    ///
403    ///     let sync_code = thread::spawn(move || {
404    ///         assert_eq!(Some(10), rx.blocking_recv());
405    ///     });
406    ///
407    ///     Runtime::new()
408    ///         .unwrap()
409    ///         .block_on(async move {
410    ///             let _ = tx.send(10).await;
411    ///         });
412    ///     sync_code.join().unwrap()
413    /// }
414    /// ```
415    #[track_caller]
416    #[cfg(feature = "sync")]
417    #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
418    pub fn blocking_recv(&mut self) -> Option<T> {
419        crate::future::block_on(self.recv())
420    }
421
422    /// Closes the receiving half of a channel without dropping it.
423    ///
424    /// This prevents any further messages from being sent on the channel while
425    /// still enabling the receiver to drain messages that are buffered. Any
426    /// outstanding [`Permit`] values will still be able to send messages.
427    ///
428    /// To guarantee that no messages are dropped, after calling `close()`,
429    /// `recv()` must be called until `None` is returned. If there are
430    /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
431    /// not return `None` until those are released.
432    ///
433    /// [`Permit`]: Permit
434    /// [`OwnedPermit`]: OwnedPermit
435    ///
436    /// # Examples
437    ///
438    /// ```
439    /// use tokio::sync::mpsc;
440    ///
441    /// #[tokio::main]
442    /// async fn main() {
443    ///     let (tx, mut rx) = mpsc::channel(20);
444    ///
445    ///     tokio::spawn(async move {
446    ///         let mut i = 0;
447    ///         while let Ok(permit) = tx.reserve().await {
448    ///             permit.send(i);
449    ///             i += 1;
450    ///         }
451    ///     });
452    ///
453    ///     rx.close();
454    ///
455    ///     while let Some(msg) = rx.recv().await {
456    ///         println!("got {}", msg);
457    ///     }
458    ///
459    ///     // Channel closed and no messages are lost.
460    /// }
461    /// ```
462    pub fn close(&mut self) {
463        self.chan.close();
464    }
465
466    /// Checks if a channel is closed.
467    ///
468    /// This method returns `true` if the channel has been closed. The channel is closed
469    /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
470    ///
471    /// [`Sender`]: crate::sync::mpsc::Sender
472    /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
473    ///
474    /// # Examples
475    /// ```
476    /// use tokio::sync::mpsc;
477    ///
478    /// #[tokio::main]
479    /// async fn main() {
480    ///     let (_tx, mut rx) = mpsc::channel::<()>(10);
481    ///     assert!(!rx.is_closed());
482    ///
483    ///     rx.close();
484    ///     
485    ///     assert!(rx.is_closed());
486    /// }
487    /// ```
488    pub fn is_closed(&self) -> bool {
489        self.chan.is_closed()
490    }
491
492    /// Checks if a channel is empty.
493    ///
494    /// This method returns `true` if the channel has no messages.
495    ///
496    /// # Examples
497    /// ```
498    /// use tokio::sync::mpsc;
499    ///
500    /// #[tokio::main]
501    /// async fn main() {
502    ///     let (tx, rx) = mpsc::channel(10);
503    ///     assert!(rx.is_empty());
504    ///
505    ///     tx.send(0).await.unwrap();
506    ///     assert!(!rx.is_empty());
507    /// }
508    ///
509    /// ```
510    pub fn is_empty(&self) -> bool {
511        self.chan.is_empty()
512    }
513
514    /// Returns the number of messages in the channel.
515    ///
516    /// # Examples
517    /// ```
518    /// use tokio::sync::mpsc;
519    ///
520    /// #[tokio::main]
521    /// async fn main() {
522    ///     let (tx, rx) = mpsc::channel(10);
523    ///     assert_eq!(0, rx.len());
524    ///
525    ///     tx.send(0).await.unwrap();
526    ///     assert_eq!(1, rx.len());
527    /// }
528    /// ```
529    pub fn len(&self) -> usize {
530        self.chan.len()
531    }
532
533    /// Polls to receive the next message on this channel.
534    ///
535    /// This method returns:
536    ///
537    ///  * `Poll::Pending` if no messages are available but the channel is not
538    ///    closed, or if a spurious failure happens.
539    ///  * `Poll::Ready(Some(message))` if a message is available.
540    ///  * `Poll::Ready(None)` if the channel has been closed and all messages
541    ///    sent before it was closed have been received.
542    ///
543    /// When the method returns `Poll::Pending`, the `Waker` in the provided
544    /// `Context` is scheduled to receive a wakeup when a message is sent on any
545    /// receiver, or when the channel is closed.  Note that on multiple calls to
546    /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
547    /// passed to the most recent call is scheduled to receive a wakeup.
548    ///
549    /// If this method returns `Poll::Pending` due to a spurious failure, then
550    /// the `Waker` will be notified when the situation causing the spurious
551    /// failure has been resolved. Note that receiving such a wakeup does not
552    /// guarantee that the next call will succeed — it could fail with another
553    /// spurious failure.
554    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
555        self.chan.recv(cx)
556    }
557
558    /// Polls to receive multiple messages on this channel, extending the provided buffer.
559    ///
560    /// This method returns:
561    /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
562    ///   spurious failure happens.
563    /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
564    ///   stored in `buffer`. This can be less than, or equal to, `limit`.
565    /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
566    ///
567    /// When the method returns `Poll::Pending`, the `Waker` in the provided
568    /// `Context` is scheduled to receive a wakeup when a message is sent on any
569    /// receiver, or when the channel is closed.  Note that on multiple calls to
570    /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
571    /// passed to the most recent call is scheduled to receive a wakeup.
572    ///
573    /// Note that this method does not guarantee that exactly `limit` messages
574    /// are received. Rather, if at least one message is available, it returns
575    /// as many messages as it can up to the given limit. This method returns
576    /// zero only if the channel is closed (or if `limit` is zero).
577    ///
578    /// # Examples
579    ///
580    /// ```
581    /// use std::task::{Context, Poll};
582    /// use std::pin::Pin;
583    /// use tokio::sync::mpsc;
584    /// use futures::Future;
585    ///
586    /// struct MyReceiverFuture<'a> {
587    ///     receiver: mpsc::Receiver<i32>,
588    ///     buffer: &'a mut Vec<i32>,
589    ///     limit: usize,
590    /// }
591    ///
592    /// impl<'a> Future for MyReceiverFuture<'a> {
593    ///     type Output = usize; // Number of messages received
594    ///
595    ///     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
596    ///         let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
597    ///
598    ///         // Now `receiver` and `buffer` are mutable references, and `limit` is copied
599    ///         match receiver.poll_recv_many(cx, *buffer, *limit) {
600    ///             Poll::Pending => Poll::Pending,
601    ///             Poll::Ready(count) => Poll::Ready(count),
602    ///         }
603    ///     }
604    /// }
605    ///
606    /// #[tokio::main]
607    /// async fn main() {
608    ///     let (tx, rx) = mpsc::channel(32);
609    ///     let mut buffer = Vec::new();
610    ///
611    ///     let my_receiver_future = MyReceiverFuture {
612    ///         receiver: rx,
613    ///         buffer: &mut buffer,
614    ///         limit: 3,
615    ///     };
616    ///
617    ///     for i in 0..10 {
618    ///         tx.send(i).await.unwrap();
619    ///     }
620    ///
621    ///     let count = my_receiver_future.await;
622    ///     assert_eq!(count, 3);
623    ///     assert_eq!(buffer, vec![0,1,2])
624    /// }
625    /// ```
626    pub fn poll_recv_many(
627        &mut self,
628        cx: &mut Context<'_>,
629        buffer: &mut Vec<T>,
630        limit: usize,
631    ) -> Poll<usize> {
632        self.chan.recv_many(cx, buffer, limit)
633    }
634}
635
636impl<T> fmt::Debug for Receiver<T> {
637    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
638        fmt.debug_struct("Receiver")
639            .field("chan", &self.chan)
640            .finish()
641    }
642}
643
644impl<T> Unpin for Receiver<T> {}
645
646impl<T> Sender<T> {
647    pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
648        Sender { chan }
649    }
650
651    /// Sends a value, waiting until there is capacity.
652    ///
653    /// A successful send occurs when it is determined that the other end of the
654    /// channel has not hung up already. An unsuccessful send would be one where
655    /// the corresponding receiver has already been closed. Note that a return
656    /// value of `Err` means that the data will never be received, but a return
657    /// value of `Ok` does not mean that the data will be received. It is
658    /// possible for the corresponding receiver to hang up immediately after
659    /// this function returns `Ok`.
660    ///
661    /// # Errors
662    ///
663    /// If the receive half of the channel is closed, either due to [`close`]
664    /// being called or the [`Receiver`] handle dropping, the function returns
665    /// an error. The error includes the value passed to `send`.
666    ///
667    /// [`close`]: Receiver::close
668    /// [`Receiver`]: Receiver
669    ///
670    /// # Cancel safety
671    ///
672    /// If `send` is used as the event in a [`tokio::select!`](crate::select)
673    /// statement and some other branch completes first, then it is guaranteed
674    /// that the message was not sent. **However, in that case, the message
675    /// is dropped and will be lost.**
676    ///
677    /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
678    /// capacity, then use the returned [`Permit`] to send the message.
679    ///
680    /// This channel uses a queue to ensure that calls to `send` and `reserve`
681    /// complete in the order they were requested.  Cancelling a call to
682    /// `send` makes you lose your place in the queue.
683    ///
684    /// # Examples
685    ///
686    /// In the following example, each call to `send` will block until the
687    /// previously sent value was received.
688    ///
689    /// ```rust
690    /// use tokio::sync::mpsc;
691    ///
692    /// #[tokio::main]
693    /// async fn main() {
694    ///     let (tx, mut rx) = mpsc::channel(1);
695    ///
696    ///     tokio::spawn(async move {
697    ///         for i in 0..10 {
698    ///             if let Err(_) = tx.send(i).await {
699    ///                 println!("receiver dropped");
700    ///                 return;
701    ///             }
702    ///         }
703    ///     });
704    ///
705    ///     while let Some(i) = rx.recv().await {
706    ///         println!("got = {}", i);
707    ///     }
708    /// }
709    /// ```
710    pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
711        match self.reserve().await {
712            Ok(permit) => {
713                permit.send(value);
714                Ok(())
715            }
716            Err(_) => Err(SendError(value)),
717        }
718    }
719
720    /// Completes when the receiver has dropped.
721    ///
722    /// This allows the producers to get notified when interest in the produced
723    /// values is canceled and immediately stop doing work.
724    ///
725    /// # Cancel safety
726    ///
727    /// This method is cancel safe. Once the channel is closed, it stays closed
728    /// forever and all future calls to `closed` will return immediately.
729    ///
730    /// # Examples
731    ///
732    /// ```
733    /// use tokio::sync::mpsc;
734    ///
735    /// #[tokio::main]
736    /// async fn main() {
737    ///     let (tx1, rx) = mpsc::channel::<()>(1);
738    ///     let tx2 = tx1.clone();
739    ///     let tx3 = tx1.clone();
740    ///     let tx4 = tx1.clone();
741    ///     let tx5 = tx1.clone();
742    ///     tokio::spawn(async move {
743    ///         drop(rx);
744    ///     });
745    ///
746    ///     futures::join!(
747    ///         tx1.closed(),
748    ///         tx2.closed(),
749    ///         tx3.closed(),
750    ///         tx4.closed(),
751    ///         tx5.closed()
752    ///     );
753    ///     println!("Receiver dropped");
754    /// }
755    /// ```
756    pub async fn closed(&self) {
757        self.chan.closed().await;
758    }
759
760    /// Attempts to immediately send a message on this `Sender`
761    ///
762    /// This method differs from [`send`] by returning immediately if the channel's
763    /// buffer is full or no receiver is waiting to acquire some data. Compared
764    /// with [`send`], this function has two failure cases instead of one (one for
765    /// disconnection, one for a full buffer).
766    ///
767    /// # Errors
768    ///
769    /// If the channel capacity has been reached, i.e., the channel has `n`
770    /// buffered values where `n` is the argument passed to [`channel`], then an
771    /// error is returned.
772    ///
773    /// If the receive half of the channel is closed, either due to [`close`]
774    /// being called or the [`Receiver`] handle dropping, the function returns
775    /// an error. The error includes the value passed to `send`.
776    ///
777    /// [`send`]: Sender::send
778    /// [`channel`]: channel
779    /// [`close`]: Receiver::close
780    ///
781    /// # Examples
782    ///
783    /// ```
784    /// use tokio::sync::mpsc;
785    ///
786    /// #[tokio::main]
787    /// async fn main() {
788    ///     // Create a channel with buffer size 1
789    ///     let (tx1, mut rx) = mpsc::channel(1);
790    ///     let tx2 = tx1.clone();
791    ///
792    ///     tokio::spawn(async move {
793    ///         tx1.send(1).await.unwrap();
794    ///         tx1.send(2).await.unwrap();
795    ///         // task waits until the receiver receives a value.
796    ///     });
797    ///
798    ///     tokio::spawn(async move {
799    ///         // This will return an error and send
800    ///         // no message if the buffer is full
801    ///         let _ = tx2.try_send(3);
802    ///     });
803    ///
804    ///     let mut msg;
805    ///     msg = rx.recv().await.unwrap();
806    ///     println!("message {} received", msg);
807    ///
808    ///     msg = rx.recv().await.unwrap();
809    ///     println!("message {} received", msg);
810    ///
811    ///     // Third message may have never been sent
812    ///     match rx.recv().await {
813    ///         Some(msg) => println!("message {} received", msg),
814    ///         None => println!("the third message was never sent"),
815    ///     }
816    /// }
817    /// ```
818    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
819        match self.chan.semaphore().semaphore.try_acquire(1) {
820            Ok(()) => {}
821            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
822            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
823        }
824
825        // Send the message
826        self.chan.send(message);
827        Ok(())
828    }
829
830    /// Sends a value, waiting until there is capacity, but only for a limited time.
831    ///
832    /// Shares the same success and error conditions as [`send`], adding one more
833    /// condition for an unsuccessful send, which is when the provided timeout has
834    /// elapsed, and there is no capacity available.
835    ///
836    /// [`send`]: Sender::send
837    ///
838    /// # Errors
839    ///
840    /// If the receive half of the channel is closed, either due to [`close`]
841    /// being called or the [`Receiver`] having been dropped,
842    /// the function returns an error. The error includes the value passed to `send`.
843    ///
844    /// [`close`]: Receiver::close
845    /// [`Receiver`]: Receiver
846    ///
847    /// # Panics
848    ///
849    /// This function panics if it is called outside the context of a Tokio
850    /// runtime [with time enabled](crate::runtime::Builder::enable_time).
851    ///
852    /// # Examples
853    ///
854    /// In the following example, each call to `send_timeout` will block until the
855    /// previously sent value was received, unless the timeout has elapsed.
856    ///
857    /// ```rust
858    /// use tokio::sync::mpsc;
859    /// use tokio::time::{sleep, Duration};
860    ///
861    /// #[tokio::main]
862    /// async fn main() {
863    ///     let (tx, mut rx) = mpsc::channel(1);
864    ///
865    ///     tokio::spawn(async move {
866    ///         for i in 0..10 {
867    ///             if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
868    ///                 println!("send error: #{:?}", e);
869    ///                 return;
870    ///             }
871    ///         }
872    ///     });
873    ///
874    ///     while let Some(i) = rx.recv().await {
875    ///         println!("got = {}", i);
876    ///         sleep(Duration::from_millis(200)).await;
877    ///     }
878    /// }
879    /// ```
880    #[cfg(feature = "time")]
881    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
882    pub async fn send_timeout(
883        &self,
884        value: T,
885        timeout: Duration,
886    ) -> Result<(), SendTimeoutError<T>> {
887        let permit = match crate::time::timeout(timeout, self.reserve()).await {
888            Err(_) => {
889                return Err(SendTimeoutError::Timeout(value));
890            }
891            Ok(Err(_)) => {
892                return Err(SendTimeoutError::Closed(value));
893            }
894            Ok(Ok(permit)) => permit,
895        };
896
897        permit.send(value);
898        Ok(())
899    }
900
901    /// Blocking send to call outside of asynchronous contexts.
902    ///
903    /// This method is intended for use cases where you are sending from
904    /// synchronous code to asynchronous code, and will work even if the
905    /// receiver is not using [`blocking_recv`] to receive the message.
906    ///
907    /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
908    ///
909    /// # Panics
910    ///
911    /// This function panics if called within an asynchronous execution
912    /// context.
913    ///
914    /// # Examples
915    ///
916    /// ```
917    /// use std::thread;
918    /// use tokio::runtime::Runtime;
919    /// use tokio::sync::mpsc;
920    ///
921    /// fn main() {
922    ///     let (tx, mut rx) = mpsc::channel::<u8>(1);
923    ///
924    ///     let sync_code = thread::spawn(move || {
925    ///         tx.blocking_send(10).unwrap();
926    ///     });
927    ///
928    ///     Runtime::new().unwrap().block_on(async move {
929    ///         assert_eq!(Some(10), rx.recv().await);
930    ///     });
931    ///     sync_code.join().unwrap()
932    /// }
933    /// ```
934    #[track_caller]
935    #[cfg(feature = "sync")]
936    #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
937    pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
938        crate::future::block_on(self.send(value))
939    }
940
941    /// Checks if the channel has been closed. This happens when the
942    /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
943    /// called.
944    ///
945    /// [`Receiver`]: crate::sync::mpsc::Receiver
946    /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
947    ///
948    /// ```
949    /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
950    /// assert!(!tx.is_closed());
951    ///
952    /// let tx2 = tx.clone();
953    /// assert!(!tx2.is_closed());
954    ///
955    /// drop(rx);
956    /// assert!(tx.is_closed());
957    /// assert!(tx2.is_closed());
958    /// ```
959    pub fn is_closed(&self) -> bool {
960        self.chan.is_closed()
961    }
962
963    /// Waits for channel capacity. Once capacity to send one message is
964    /// available, it is reserved for the caller.
965    ///
966    /// If the channel is full, the function waits for the number of unreceived
967    /// messages to become less than the channel capacity. Capacity to send one
968    /// message is reserved for the caller. A [`Permit`] is returned to track
969    /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
970    /// reserved capacity.
971    ///
972    /// Dropping [`Permit`] without sending a message releases the capacity back
973    /// to the channel.
974    ///
975    /// [`Permit`]: Permit
976    /// [`send`]: Permit::send
977    ///
978    /// # Cancel safety
979    ///
980    /// This channel uses a queue to ensure that calls to `send` and `reserve`
981    /// complete in the order they were requested.  Cancelling a call to
982    /// `reserve` makes you lose your place in the queue.
983    ///
984    /// # Examples
985    ///
986    /// ```
987    /// use tokio::sync::mpsc;
988    ///
989    /// #[tokio::main]
990    /// async fn main() {
991    ///     let (tx, mut rx) = mpsc::channel(1);
992    ///
993    ///     // Reserve capacity
994    ///     let permit = tx.reserve().await.unwrap();
995    ///
996    ///     // Trying to send directly on the `tx` will fail due to no
997    ///     // available capacity.
998    ///     assert!(tx.try_send(123).is_err());
999    ///
1000    ///     // Sending on the permit succeeds
1001    ///     permit.send(456);
1002    ///
1003    ///     // The value sent on the permit is received
1004    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1005    /// }
1006    /// ```
1007    pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
1008        self.reserve_inner(1).await?;
1009        Ok(Permit { chan: &self.chan })
1010    }
1011
1012    /// Waits for channel capacity. Once capacity to send `n` messages is
1013    /// available, it is reserved for the caller.
1014    ///
1015    /// If the channel is full or if there are fewer than `n` permits available, the function waits
1016    /// for the number of unreceived messages to become `n` less than the channel capacity.
1017    /// Capacity to send `n` message is then reserved for the caller.
1018    ///
1019    /// A [`PermitIterator`] is returned to track the reserved capacity.
1020    /// You can call this [`Iterator`] until it is exhausted to
1021    /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1022    /// [`try_reserve_many`] except it awaits for the slots to become available.
1023    ///
1024    /// If the channel is closed, the function returns a [`SendError`].
1025    ///
1026    /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1027    /// permits back to the channel.
1028    ///
1029    /// [`PermitIterator`]: PermitIterator
1030    /// [`Permit`]: Permit
1031    /// [`send`]: Permit::send
1032    /// [`try_reserve_many`]: Sender::try_reserve_many
1033    ///
1034    /// # Cancel safety
1035    ///
1036    /// This channel uses a queue to ensure that calls to `send` and `reserve_many`
1037    /// complete in the order they were requested. Cancelling a call to
1038    /// `reserve_many` makes you lose your place in the queue.
1039    ///
1040    /// # Examples
1041    ///
1042    /// ```
1043    /// use tokio::sync::mpsc;
1044    ///
1045    /// #[tokio::main]
1046    /// async fn main() {
1047    ///     let (tx, mut rx) = mpsc::channel(2);
1048    ///
1049    ///     // Reserve capacity
1050    ///     let mut permit = tx.reserve_many(2).await.unwrap();
1051    ///
1052    ///     // Trying to send directly on the `tx` will fail due to no
1053    ///     // available capacity.
1054    ///     assert!(tx.try_send(123).is_err());
1055    ///
1056    ///     // Sending with the permit iterator succeeds
1057    ///     permit.next().unwrap().send(456);
1058    ///     permit.next().unwrap().send(457);
1059    ///
1060    ///     // The iterator should now be exhausted
1061    ///     assert!(permit.next().is_none());
1062    ///     
1063    ///     // The value sent on the permit is received
1064    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1065    ///     assert_eq!(rx.recv().await.unwrap(), 457);
1066    /// }
1067    /// ```
1068    pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
1069        self.reserve_inner(n).await?;
1070        Ok(PermitIterator {
1071            chan: &self.chan,
1072            n,
1073        })
1074    }
1075
1076    /// Waits for channel capacity, moving the `Sender` and returning an owned
1077    /// permit. Once capacity to send one message is available, it is reserved
1078    /// for the caller.
1079    ///
1080    /// This moves the sender _by value_, and returns an owned permit that can
1081    /// be used to send a message into the channel. Unlike [`Sender::reserve`],
1082    /// this method may be used in cases where the permit must be valid for the
1083    /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1084    /// essentially a reference count increment, comparable to [`Arc::clone`]),
1085    /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1086    /// moved, it can be cloned prior to calling `reserve_owned`.
1087    ///
1088    /// If the channel is full, the function waits for the number of unreceived
1089    /// messages to become less than the channel capacity. Capacity to send one
1090    /// message is reserved for the caller. An [`OwnedPermit`] is returned to
1091    /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
1092    /// consumes the reserved capacity.
1093    ///
1094    /// Dropping the [`OwnedPermit`] without sending a message releases the
1095    /// capacity back to the channel.
1096    ///
1097    /// # Cancel safety
1098    ///
1099    /// This channel uses a queue to ensure that calls to `send` and `reserve`
1100    /// complete in the order they were requested.  Cancelling a call to
1101    /// `reserve_owned` makes you lose your place in the queue.
1102    ///
1103    /// # Examples
1104    /// Sending a message using an [`OwnedPermit`]:
1105    /// ```
1106    /// use tokio::sync::mpsc;
1107    ///
1108    /// #[tokio::main]
1109    /// async fn main() {
1110    ///     let (tx, mut rx) = mpsc::channel(1);
1111    ///
1112    ///     // Reserve capacity, moving the sender.
1113    ///     let permit = tx.reserve_owned().await.unwrap();
1114    ///
1115    ///     // Send a message, consuming the permit and returning
1116    ///     // the moved sender.
1117    ///     let tx = permit.send(123);
1118    ///
1119    ///     // The value sent on the permit is received.
1120    ///     assert_eq!(rx.recv().await.unwrap(), 123);
1121    ///
1122    ///     // The sender can now be used again.
1123    ///     tx.send(456).await.unwrap();
1124    /// }
1125    /// ```
1126    ///
1127    /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
1128    /// by value, it can be inexpensively cloned before calling `reserve_owned`:
1129    ///
1130    /// ```
1131    /// use tokio::sync::mpsc;
1132    ///
1133    /// #[tokio::main]
1134    /// async fn main() {
1135    ///     let (tx, mut rx) = mpsc::channel(1);
1136    ///
1137    ///     // Clone the sender and reserve capacity.
1138    ///     let permit = tx.clone().reserve_owned().await.unwrap();
1139    ///
1140    ///     // Trying to send directly on the `tx` will fail due to no
1141    ///     // available capacity.
1142    ///     assert!(tx.try_send(123).is_err());
1143    ///
1144    ///     // Sending on the permit succeeds.
1145    ///     permit.send(456);
1146    ///
1147    ///     // The value sent on the permit is received
1148    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1149    /// }
1150    /// ```
1151    ///
1152    /// [`Sender::reserve`]: Sender::reserve
1153    /// [`OwnedPermit`]: OwnedPermit
1154    /// [`send`]: OwnedPermit::send
1155    /// [`Arc::clone`]: std::sync::Arc::clone
1156    pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
1157        self.reserve_inner(1).await?;
1158        Ok(OwnedPermit {
1159            chan: Some(self.chan),
1160        })
1161    }
1162
1163    async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
1164        crate::trace::async_trace_leaf().await;
1165
1166        if n > self.max_capacity() {
1167            return Err(SendError(()));
1168        }
1169        match self.chan.semaphore().semaphore.acquire(n).await {
1170            Ok(()) => Ok(()),
1171            Err(_) => Err(SendError(())),
1172        }
1173    }
1174
1175    /// Tries to acquire a slot in the channel without waiting for the slot to become
1176    /// available.
1177    ///
1178    /// If the channel is full this function will return [`TrySendError`], otherwise
1179    /// if there is a slot available it will return a [`Permit`] that will then allow you
1180    /// to [`send`] on the channel with a guaranteed slot. This function is similar to
1181    /// [`reserve`] except it does not await for the slot to become available.
1182    ///
1183    /// Dropping [`Permit`] without sending a message releases the capacity back
1184    /// to the channel.
1185    ///
1186    /// [`Permit`]: Permit
1187    /// [`send`]: Permit::send
1188    /// [`reserve`]: Sender::reserve
1189    ///
1190    /// # Examples
1191    ///
1192    /// ```
1193    /// use tokio::sync::mpsc;
1194    ///
1195    /// #[tokio::main]
1196    /// async fn main() {
1197    ///     let (tx, mut rx) = mpsc::channel(1);
1198    ///
1199    ///     // Reserve capacity
1200    ///     let permit = tx.try_reserve().unwrap();
1201    ///
1202    ///     // Trying to send directly on the `tx` will fail due to no
1203    ///     // available capacity.
1204    ///     assert!(tx.try_send(123).is_err());
1205    ///
1206    ///     // Trying to reserve an additional slot on the `tx` will
1207    ///     // fail because there is no capacity.
1208    ///     assert!(tx.try_reserve().is_err());
1209    ///
1210    ///     // Sending on the permit succeeds
1211    ///     permit.send(456);
1212    ///
1213    ///     // The value sent on the permit is received
1214    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1215    ///
1216    /// }
1217    /// ```
1218    pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
1219        match self.chan.semaphore().semaphore.try_acquire(1) {
1220            Ok(()) => {}
1221            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1222            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1223        }
1224
1225        Ok(Permit { chan: &self.chan })
1226    }
1227
1228    /// Tries to acquire `n` slots in the channel without waiting for the slot to become
1229    /// available.
1230    ///
1231    /// A [`PermitIterator`] is returned to track the reserved capacity.
1232    /// You can call this [`Iterator`] until it is exhausted to
1233    /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1234    /// [`reserve_many`] except it does not await for the slots to become available.
1235    ///
1236    /// If there are fewer than `n` permits available on the channel, then
1237    /// this function will return a [`TrySendError::Full`]. If the channel is closed
1238    /// this function will return a [`TrySendError::Closed`].
1239    ///
1240    /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1241    /// permits back to the channel.
1242    ///
1243    /// [`PermitIterator`]: PermitIterator
1244    /// [`send`]: Permit::send
1245    /// [`reserve_many`]: Sender::reserve_many
1246    ///
1247    /// # Examples
1248    ///
1249    /// ```
1250    /// use tokio::sync::mpsc;
1251    ///
1252    /// #[tokio::main]
1253    /// async fn main() {
1254    ///     let (tx, mut rx) = mpsc::channel(2);
1255    ///
1256    ///     // Reserve capacity
1257    ///     let mut permit = tx.try_reserve_many(2).unwrap();
1258    ///
1259    ///     // Trying to send directly on the `tx` will fail due to no
1260    ///     // available capacity.
1261    ///     assert!(tx.try_send(123).is_err());
1262    ///
1263    ///     // Trying to reserve an additional slot on the `tx` will
1264    ///     // fail because there is no capacity.
1265    ///     assert!(tx.try_reserve().is_err());
1266    ///
1267    ///     // Sending with the permit iterator succeeds
1268    ///     permit.next().unwrap().send(456);
1269    ///     permit.next().unwrap().send(457);
1270    ///
1271    ///     // The iterator should now be exhausted
1272    ///     assert!(permit.next().is_none());
1273    ///
1274    ///     // The value sent on the permit is received
1275    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1276    ///     assert_eq!(rx.recv().await.unwrap(), 457);
1277    ///     
1278    ///     // Trying to call try_reserve_many with 0 will return an empty iterator
1279    ///     let mut permit = tx.try_reserve_many(0).unwrap();
1280    ///     assert!(permit.next().is_none());
1281    ///
1282    ///     // Trying to call try_reserve_many with a number greater than the channel
1283    ///     // capacity will return an error
1284    ///     let permit = tx.try_reserve_many(3);
1285    ///     assert!(permit.is_err());
1286    ///
1287    ///     // Trying to call try_reserve_many on a closed channel will return an error
1288    ///     drop(rx);
1289    ///     let permit = tx.try_reserve_many(1);
1290    ///     assert!(permit.is_err());
1291    ///
1292    ///     let permit = tx.try_reserve_many(0);
1293    ///     assert!(permit.is_err());
1294    /// }
1295    /// ```
1296    pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
1297        if n > self.max_capacity() {
1298            return Err(TrySendError::Full(()));
1299        }
1300
1301        match self.chan.semaphore().semaphore.try_acquire(n) {
1302            Ok(()) => {}
1303            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1304            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1305        }
1306
1307        Ok(PermitIterator {
1308            chan: &self.chan,
1309            n,
1310        })
1311    }
1312
1313    /// Tries to acquire a slot in the channel without waiting for the slot to become
1314    /// available, returning an owned permit.
1315    ///
1316    /// This moves the sender _by value_, and returns an owned permit that can
1317    /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1318    /// this method may be used in cases where the permit must be valid for the
1319    /// `'static` lifetime.  `Sender`s may be cloned cheaply (`Sender::clone` is
1320    /// essentially a reference count increment, comparable to [`Arc::clone`]),
1321    /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1322    /// moved, it can be cloned prior to calling `try_reserve_owned`.
1323    ///
1324    /// If the channel is full this function will return a [`TrySendError`].
1325    /// Since the sender is taken by value, the `TrySendError` returned in this
1326    /// case contains the sender, so that it may be used again. Otherwise, if
1327    /// there is a slot available, this method will return an [`OwnedPermit`]
1328    /// that can then be used to [`send`] on the channel with a guaranteed slot.
1329    /// This function is similar to  [`reserve_owned`] except it does not await
1330    /// for the slot to become available.
1331    ///
1332    /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1333    /// to the channel.
1334    ///
1335    /// [`OwnedPermit`]: OwnedPermit
1336    /// [`send`]: OwnedPermit::send
1337    /// [`reserve_owned`]: Sender::reserve_owned
1338    /// [`Arc::clone`]: std::sync::Arc::clone
1339    ///
1340    /// # Examples
1341    ///
1342    /// ```
1343    /// use tokio::sync::mpsc;
1344    ///
1345    /// #[tokio::main]
1346    /// async fn main() {
1347    ///     let (tx, mut rx) = mpsc::channel(1);
1348    ///
1349    ///     // Reserve capacity
1350    ///     let permit = tx.clone().try_reserve_owned().unwrap();
1351    ///
1352    ///     // Trying to send directly on the `tx` will fail due to no
1353    ///     // available capacity.
1354    ///     assert!(tx.try_send(123).is_err());
1355    ///
1356    ///     // Trying to reserve an additional slot on the `tx` will
1357    ///     // fail because there is no capacity.
1358    ///     assert!(tx.try_reserve().is_err());
1359    ///
1360    ///     // Sending on the permit succeeds
1361    ///     permit.send(456);
1362    ///
1363    ///     // The value sent on the permit is received
1364    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1365    ///
1366    /// }
1367    /// ```
1368    pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1369        match self.chan.semaphore().semaphore.try_acquire(1) {
1370            Ok(()) => {}
1371            Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1372            Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1373        }
1374
1375        Ok(OwnedPermit {
1376            chan: Some(self.chan),
1377        })
1378    }
1379
1380    /// Returns `true` if senders belong to the same channel.
1381    ///
1382    /// # Examples
1383    ///
1384    /// ```
1385    /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1386    /// let  tx2 = tx.clone();
1387    /// assert!(tx.same_channel(&tx2));
1388    ///
1389    /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1390    /// assert!(!tx3.same_channel(&tx2));
1391    /// ```
1392    pub fn same_channel(&self, other: &Self) -> bool {
1393        self.chan.same_channel(&other.chan)
1394    }
1395
1396    /// Returns the current capacity of the channel.
1397    ///
1398    /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1399    /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
1400    /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1401    /// specified when calling [`channel`]
1402    ///
1403    /// # Examples
1404    ///
1405    /// ```
1406    /// use tokio::sync::mpsc;
1407    ///
1408    /// #[tokio::main]
1409    /// async fn main() {
1410    ///     let (tx, mut rx) = mpsc::channel::<()>(5);
1411    ///
1412    ///     assert_eq!(tx.capacity(), 5);
1413    ///
1414    ///     // Making a reservation drops the capacity by one.
1415    ///     let permit = tx.reserve().await.unwrap();
1416    ///     assert_eq!(tx.capacity(), 4);
1417    ///
1418    ///     // Sending and receiving a value increases the capacity by one.
1419    ///     permit.send(());
1420    ///     rx.recv().await.unwrap();
1421    ///     assert_eq!(tx.capacity(), 5);
1422    /// }
1423    /// ```
1424    ///
1425    /// [`send`]: Sender::send
1426    /// [`reserve`]: Sender::reserve
1427    /// [`channel`]: channel
1428    /// [`max_capacity`]: Sender::max_capacity
1429    pub fn capacity(&self) -> usize {
1430        self.chan.semaphore().semaphore.available_permits()
1431    }
1432
1433    /// Converts the `Sender` to a [`WeakSender`] that does not count
1434    /// towards RAII semantics, i.e. if all `Sender` instances of the
1435    /// channel were dropped and only `WeakSender` instances remain,
1436    /// the channel is closed.
1437    #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
1438    pub fn downgrade(&self) -> WeakSender<T> {
1439        WeakSender {
1440            chan: self.chan.downgrade(),
1441        }
1442    }
1443
1444    /// Returns the maximum buffer capacity of the channel.
1445    ///
1446    /// The maximum capacity is the buffer capacity initially specified when calling
1447    /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1448    /// available buffer capacity: as messages are sent and received, the
1449    /// value returned by [`capacity`] will go up or down, whereas the value
1450    /// returned by `max_capacity` will remain constant.
1451    ///
1452    /// # Examples
1453    ///
1454    /// ```
1455    /// use tokio::sync::mpsc;
1456    ///
1457    /// #[tokio::main]
1458    /// async fn main() {
1459    ///     let (tx, _rx) = mpsc::channel::<()>(5);
1460    ///
1461    ///     // both max capacity and capacity are the same at first
1462    ///     assert_eq!(tx.max_capacity(), 5);
1463    ///     assert_eq!(tx.capacity(), 5);
1464    ///
1465    ///     // Making a reservation doesn't change the max capacity.
1466    ///     let permit = tx.reserve().await.unwrap();
1467    ///     assert_eq!(tx.max_capacity(), 5);
1468    ///     // but drops the capacity by one
1469    ///     assert_eq!(tx.capacity(), 4);
1470    /// }
1471    /// ```
1472    ///
1473    /// [`channel`]: channel
1474    /// [`max_capacity`]: Sender::max_capacity
1475    /// [`capacity`]: Sender::capacity
1476    pub fn max_capacity(&self) -> usize {
1477        self.chan.semaphore().bound
1478    }
1479
1480    /// Returns the number of [`Sender`] handles.
1481    pub fn strong_count(&self) -> usize {
1482        self.chan.strong_count()
1483    }
1484
1485    /// Returns the number of [`WeakSender`] handles.
1486    pub fn weak_count(&self) -> usize {
1487        self.chan.weak_count()
1488    }
1489}
1490
1491impl<T> Clone for Sender<T> {
1492    fn clone(&self) -> Self {
1493        Sender {
1494            chan: self.chan.clone(),
1495        }
1496    }
1497}
1498
1499impl<T> fmt::Debug for Sender<T> {
1500    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1501        fmt.debug_struct("Sender")
1502            .field("chan", &self.chan)
1503            .finish()
1504    }
1505}
1506
1507impl<T> Clone for WeakSender<T> {
1508    fn clone(&self) -> Self {
1509        self.chan.increment_weak_count();
1510
1511        WeakSender {
1512            chan: self.chan.clone(),
1513        }
1514    }
1515}
1516
1517impl<T> Drop for WeakSender<T> {
1518    fn drop(&mut self) {
1519        self.chan.decrement_weak_count();
1520    }
1521}
1522
1523impl<T> WeakSender<T> {
1524    /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1525    /// if there are other `Sender` instances alive and the channel wasn't
1526    /// previously dropped, otherwise `None` is returned.
1527    pub fn upgrade(&self) -> Option<Sender<T>> {
1528        chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1529    }
1530
1531    /// Returns the number of [`Sender`] handles.
1532    pub fn strong_count(&self) -> usize {
1533        self.chan.strong_count()
1534    }
1535
1536    /// Returns the number of [`WeakSender`] handles.
1537    pub fn weak_count(&self) -> usize {
1538        self.chan.weak_count()
1539    }
1540}
1541
1542impl<T> fmt::Debug for WeakSender<T> {
1543    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1544        fmt.debug_struct("WeakSender").finish()
1545    }
1546}
1547
1548// ===== impl Permit =====
1549
1550impl<T> Permit<'_, T> {
1551    /// Sends a value using the reserved capacity.
1552    ///
1553    /// Capacity for the message has already been reserved. The message is sent
1554    /// to the receiver and the permit is consumed. The operation will succeed
1555    /// even if the receiver half has been closed. See [`Receiver::close`] for
1556    /// more details on performing a clean shutdown.
1557    ///
1558    /// [`Receiver::close`]: Receiver::close
1559    ///
1560    /// # Examples
1561    ///
1562    /// ```
1563    /// use tokio::sync::mpsc;
1564    ///
1565    /// #[tokio::main]
1566    /// async fn main() {
1567    ///     let (tx, mut rx) = mpsc::channel(1);
1568    ///
1569    ///     // Reserve capacity
1570    ///     let permit = tx.reserve().await.unwrap();
1571    ///
1572    ///     // Trying to send directly on the `tx` will fail due to no
1573    ///     // available capacity.
1574    ///     assert!(tx.try_send(123).is_err());
1575    ///
1576    ///     // Send a message on the permit
1577    ///     permit.send(456);
1578    ///
1579    ///     // The value sent on the permit is received
1580    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1581    /// }
1582    /// ```
1583    pub fn send(self, value: T) {
1584        use std::mem;
1585
1586        self.chan.send(value);
1587
1588        // Avoid the drop logic
1589        mem::forget(self);
1590    }
1591}
1592
1593impl<T> Drop for Permit<'_, T> {
1594    fn drop(&mut self) {
1595        use chan::Semaphore;
1596
1597        let semaphore = self.chan.semaphore();
1598
1599        // Add the permit back to the semaphore
1600        semaphore.add_permit();
1601
1602        // If this is the last sender for this channel, wake the receiver so
1603        // that it can be notified that the channel is closed.
1604        if semaphore.is_closed() && semaphore.is_idle() {
1605            self.chan.wake_rx();
1606        }
1607    }
1608}
1609
1610impl<T> fmt::Debug for Permit<'_, T> {
1611    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1612        fmt.debug_struct("Permit")
1613            .field("chan", &self.chan)
1614            .finish()
1615    }
1616}
1617
1618// ===== impl PermitIterator =====
1619
1620impl<'a, T> Iterator for PermitIterator<'a, T> {
1621    type Item = Permit<'a, T>;
1622
1623    fn next(&mut self) -> Option<Self::Item> {
1624        if self.n == 0 {
1625            return None;
1626        }
1627
1628        self.n -= 1;
1629        Some(Permit { chan: self.chan })
1630    }
1631
1632    fn size_hint(&self) -> (usize, Option<usize>) {
1633        let n = self.n;
1634        (n, Some(n))
1635    }
1636}
1637impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
1638impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
1639
1640impl<T> Drop for PermitIterator<'_, T> {
1641    fn drop(&mut self) {
1642        use chan::Semaphore;
1643
1644        if self.n == 0 {
1645            return;
1646        }
1647
1648        let semaphore = self.chan.semaphore();
1649
1650        // Add the remaining permits back to the semaphore
1651        semaphore.add_permits(self.n);
1652
1653        // If this is the last sender for this channel, wake the receiver so
1654        // that it can be notified that the channel is closed.
1655        if semaphore.is_closed() && semaphore.is_idle() {
1656            self.chan.wake_rx();
1657        }
1658    }
1659}
1660
1661impl<T> fmt::Debug for PermitIterator<'_, T> {
1662    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1663        fmt.debug_struct("PermitIterator")
1664            .field("chan", &self.chan)
1665            .field("capacity", &self.n)
1666            .finish()
1667    }
1668}
1669
1670// ===== impl Permit =====
1671
1672impl<T> OwnedPermit<T> {
1673    /// Sends a value using the reserved capacity.
1674    ///
1675    /// Capacity for the message has already been reserved. The message is sent
1676    /// to the receiver and the permit is consumed. The operation will succeed
1677    /// even if the receiver half has been closed. See [`Receiver::close`] for
1678    /// more details on performing a clean shutdown.
1679    ///
1680    /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1681    /// the `OwnedPermit` was reserved.
1682    ///
1683    /// [`Receiver::close`]: Receiver::close
1684    ///
1685    /// # Examples
1686    ///
1687    /// ```
1688    /// use tokio::sync::mpsc;
1689    ///
1690    /// #[tokio::main]
1691    /// async fn main() {
1692    ///     let (tx, mut rx) = mpsc::channel(1);
1693    ///
1694    ///     // Reserve capacity
1695    ///     let permit = tx.reserve_owned().await.unwrap();
1696    ///
1697    ///     // Send a message on the permit, returning the sender.
1698    ///     let tx = permit.send(456);
1699    ///
1700    ///     // The value sent on the permit is received
1701    ///     assert_eq!(rx.recv().await.unwrap(), 456);
1702    ///
1703    ///     // We may now reuse `tx` to send another message.
1704    ///     tx.send(789).await.unwrap();
1705    /// }
1706    /// ```
1707    pub fn send(mut self, value: T) -> Sender<T> {
1708        let chan = self.chan.take().unwrap_or_else(|| {
1709            unreachable!("OwnedPermit channel is only taken when the permit is moved")
1710        });
1711        chan.send(value);
1712
1713        Sender { chan }
1714    }
1715
1716    /// Releases the reserved capacity *without* sending a message, returning the
1717    /// [`Sender`].
1718    ///
1719    /// # Examples
1720    ///
1721    /// ```
1722    /// use tokio::sync::mpsc;
1723    ///
1724    /// #[tokio::main]
1725    /// async fn main() {
1726    ///     let (tx, rx) = mpsc::channel(1);
1727    ///
1728    ///     // Clone the sender and reserve capacity
1729    ///     let permit = tx.clone().reserve_owned().await.unwrap();
1730    ///
1731    ///     // Trying to send on the original `tx` will fail, since the `permit`
1732    ///     // has reserved all the available capacity.
1733    ///     assert!(tx.try_send(123).is_err());
1734    ///
1735    ///     // Release the permit without sending a message, returning the clone
1736    ///     // of the sender.
1737    ///     let tx2 = permit.release();
1738    ///
1739    ///     // We may now reuse `tx` to send another message.
1740    ///     tx.send(789).await.unwrap();
1741    ///     # drop(rx); drop(tx2);
1742    /// }
1743    /// ```
1744    ///
1745    /// [`Sender`]: Sender
1746    pub fn release(mut self) -> Sender<T> {
1747        use chan::Semaphore;
1748
1749        let chan = self.chan.take().unwrap_or_else(|| {
1750            unreachable!("OwnedPermit channel is only taken when the permit is moved")
1751        });
1752
1753        // Add the permit back to the semaphore
1754        chan.semaphore().add_permit();
1755        Sender { chan }
1756    }
1757}
1758
1759impl<T> Drop for OwnedPermit<T> {
1760    fn drop(&mut self) {
1761        use chan::Semaphore;
1762
1763        // Are we still holding onto the sender?
1764        if let Some(chan) = self.chan.take() {
1765            let semaphore = chan.semaphore();
1766
1767            // Add the permit back to the semaphore
1768            semaphore.add_permit();
1769
1770            // If this `OwnedPermit` is holding the last sender for this
1771            // channel, wake the receiver so that it can be notified that the
1772            // channel is closed.
1773            if semaphore.is_closed() && semaphore.is_idle() {
1774                chan.wake_rx();
1775            }
1776        }
1777
1778        // Otherwise, do nothing.
1779    }
1780}
1781
1782impl<T> fmt::Debug for OwnedPermit<T> {
1783    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1784        fmt.debug_struct("OwnedPermit")
1785            .field("chan", &self.chan)
1786            .finish()
1787    }
1788}