async_broadcast/
lib.rs

1//! Async broadcast channel
2//!
3//! An async multi-producer multi-consumer broadcast channel, where each consumer gets a clone of every
4//! message sent on the channel. For obvious reasons, the channel can only be used to broadcast types
5//! that implement [`Clone`].
6//!
7//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
8//! among multiple threads.
9//!
10//! When all `Sender`s or all `Receiver`s are dropped, the channel becomes closed. When a channel is
11//! closed, no more messages can be sent, but remaining messages can still be received.
12//!
13//! The channel can also be closed manually by calling [`Sender::close()`] or [`Receiver::close()`].
14//!
15//! ## Examples
16//!
17//! ```rust
18//! use async_broadcast::{broadcast, TryRecvError};
19//! use futures_lite::{future::block_on, stream::StreamExt};
20//!
21//! block_on(async move {
22//!     let (s1, mut r1) = broadcast(2);
23//!     let s2 = s1.clone();
24//!     let mut r2 = r1.clone();
25//!
26//!     // Send 2 messages from two different senders.
27//!     s1.broadcast(7).await.unwrap();
28//!     s2.broadcast(8).await.unwrap();
29//!
30//!     // Channel is now at capacity so sending more messages will result in an error.
31//!     assert!(s2.try_broadcast(9).unwrap_err().is_full());
32//!     assert!(s1.try_broadcast(10).unwrap_err().is_full());
33//!
34//!     // We can use `recv` method of the `Stream` implementation to receive messages.
35//!     assert_eq!(r1.next().await.unwrap(), 7);
36//!     assert_eq!(r1.recv().await.unwrap(), 8);
37//!     assert_eq!(r2.next().await.unwrap(), 7);
38//!     assert_eq!(r2.recv().await.unwrap(), 8);
39//!
40//!     // All receiver got all messages so channel is now empty.
41//!     assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
42//!     assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
43//!
44//!     // Drop both senders, which closes the channel.
45//!     drop(s1);
46//!     drop(s2);
47//!
48//!     assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
49//!     assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
50//! })
51//! ```
52//!
53//! ## Difference with `async-channel`
54//!
55//! This crate is similar to [`async-channel`] in that they both provide an MPMC channel but the
56//! main difference being that in `async-channel`, each message sent on the channel is only received
57//! by one of the receivers. `async-broadcast` on the other hand, delivers each message to every
58//! receiver (IOW broadcast) by cloning it for each receiver.
59//!
60//! [`async-channel`]: https://crates.io/crates/async-channel
61//!
62//! ## Difference with other broadcast crates
63//!
64//! * [`broadcaster`]: The main difference would be that `broadcaster` doesn't have a sender and
65//!   receiver split and both sides use clones of the same BroadcastChannel instance. The messages
66//!   are sent are sent to all channel clones. While this can work for many cases, the lack of
67//!   sender and receiver split, means that often times, you'll find yourself having to drain the
68//!   channel on the sending side yourself.
69//!
70//! * [`postage`]: this crate provides a [broadcast API][pba] similar to `async_broadcast`. However,
71//!   it:
72//!   - (at the time of this writing) duplicates [futures] API, which isn't ideal.
73//!   - Does not support overflow mode nor has the concept of inactive receivers, so a slow or
74//!     inactive receiver blocking the whole channel is not a solvable problem.
75//!   - Provides all kinds of channels, which is generally good but if you just need a broadcast
76//!     channel, `async_broadcast` is probably a better choice.
77//!
78//! * [`tokio::sync`]: Tokio's `sync` module provides a [broadcast channel][tbc] API. The differences
79//!    here are:
80//!   - While this implementation does provide [overflow mode][tom], it is the default behavior and not
81//!     opt-in.
82//!   - There is no equivalent of inactive receivers.
83//!   - While it's possible to build tokio with only the `sync` module, it comes with other APIs that
84//!     you may not need.
85//!
86//! [`broadcaster`]: https://crates.io/crates/broadcaster
87//! [`postage`]: https://crates.io/crates/postage
88//! [pba]: https://docs.rs/postage/0.4.1/postage/broadcast/fn.channel.html
89//! [futures]: https://crates.io/crates/futures
90//! [`tokio::sync`]: https://docs.rs/tokio/1.6.0/tokio/sync
91//! [tbc]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html
92//! [tom]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html#lagging
93//!
94#![forbid(unsafe_code)]
95#![deny(missing_debug_implementations, nonstandard_style, rust_2018_idioms)]
96#![warn(rustdoc::missing_doc_code_examples, unreachable_pub)]
97#![doc(
98    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
99)]
100#![doc(
101    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
102)]
103
104#[cfg(doctest)]
105mod doctests {
106    doc_comment::doctest!("../README.md");
107}
108
109use std::collections::VecDeque;
110use std::convert::TryInto;
111use std::error;
112use std::fmt;
113use std::future::Future;
114use std::marker::PhantomPinned;
115use std::pin::Pin;
116use std::sync::{Arc, Mutex};
117use std::task::{Context, Poll};
118
119use event_listener::{Event, EventListener};
120use event_listener_strategy::{easy_wrapper, EventListenerFuture};
121use futures_core::{ready, stream::Stream};
122use pin_project_lite::pin_project;
123
124/// Create a new broadcast channel.
125///
126/// The created channel has space to hold at most `cap` messages at a time.
127///
128/// # Panics
129///
130/// Capacity must be a positive number. If `cap` is zero, this function will panic.
131///
132/// # Examples
133///
134/// ```
135/// # futures_lite::future::block_on(async {
136/// use async_broadcast::{broadcast, TryRecvError, TrySendError};
137///
138/// let (s, mut r1) = broadcast(1);
139/// let mut r2 = r1.clone();
140///
141/// assert_eq!(s.broadcast(10).await, Ok(None));
142/// assert_eq!(s.try_broadcast(20), Err(TrySendError::Full(20)));
143///
144/// assert_eq!(r1.recv().await, Ok(10));
145/// assert_eq!(r2.recv().await, Ok(10));
146/// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
147/// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
148/// # });
149/// ```
150pub fn broadcast<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
151    assert!(cap > 0, "capacity cannot be zero");
152
153    let inner = Arc::new(Mutex::new(Inner {
154        queue: VecDeque::with_capacity(cap),
155        capacity: cap,
156        overflow: false,
157        await_active: true,
158        receiver_count: 1,
159        inactive_receiver_count: 0,
160        sender_count: 1,
161        head_pos: 0,
162        is_closed: false,
163        send_ops: Event::new(),
164        recv_ops: Event::new(),
165    }));
166
167    let s = Sender {
168        inner: inner.clone(),
169    };
170    let r = Receiver {
171        inner,
172        pos: 0,
173        listener: None,
174    };
175
176    (s, r)
177}
178
179#[derive(Debug)]
180struct Inner<T> {
181    queue: VecDeque<(T, usize)>,
182    // We assign the same capacity to the queue but that's just specifying the minimum capacity and
183    // the actual capacity could be anything. Hence the need to keep track of our own set capacity.
184    capacity: usize,
185    receiver_count: usize,
186    inactive_receiver_count: usize,
187    sender_count: usize,
188    /// Send sequence number of the front of the queue
189    head_pos: u64,
190    overflow: bool,
191    await_active: bool,
192
193    is_closed: bool,
194
195    /// Send operations waiting while the channel is full.
196    send_ops: Event,
197
198    /// Receive operations waiting while the channel is empty and not closed.
199    recv_ops: Event,
200}
201
202impl<T> Inner<T> {
203    /// Try receiving at the given position, returning either the element or a reference to it.
204    ///
205    /// Result is used here instead of Cow because we don't have a Clone bound on T.
206    fn try_recv_at(&mut self, pos: &mut u64) -> Result<Result<T, &T>, TryRecvError> {
207        let i = match pos.checked_sub(self.head_pos) {
208            Some(i) => i
209                .try_into()
210                .expect("Head position more than usize::MAX behind a receiver"),
211            None => {
212                let count = self.head_pos - *pos;
213                *pos = self.head_pos;
214                return Err(TryRecvError::Overflowed(count));
215            }
216        };
217
218        let last_waiter;
219        if let Some((_elt, waiters)) = self.queue.get_mut(i) {
220            *pos += 1;
221            *waiters -= 1;
222            last_waiter = *waiters == 0;
223        } else {
224            debug_assert_eq!(i, self.queue.len());
225            if self.is_closed {
226                return Err(TryRecvError::Closed);
227            } else {
228                return Err(TryRecvError::Empty);
229            }
230        }
231
232        // If we read from the front of the queue and this is the last receiver reading it
233        // we can pop the queue instead of cloning the message
234        if last_waiter {
235            // Only the first element of the queue should have 0 waiters
236            assert_eq!(i, 0);
237
238            // Remove the element from the queue, adjust space, and notify senders
239            let elt = self.queue.pop_front().unwrap().0;
240            self.head_pos += 1;
241            if !self.overflow {
242                // Notify 1 awaiting senders that there is now room. If there is still room in the
243                // queue, the notified operation will notify another awaiting sender.
244                self.send_ops.notify(1);
245            }
246
247            Ok(Ok(elt))
248        } else {
249            Ok(Err(&self.queue[i].0))
250        }
251    }
252
253    /// Closes the channel and notifies all waiting operations.
254    ///
255    /// Returns `true` if this call has closed the channel and it was not closed already.
256    fn close(&mut self) -> bool {
257        if self.is_closed {
258            return false;
259        }
260
261        self.is_closed = true;
262        // Notify all waiting senders and receivers.
263        self.send_ops.notify(usize::MAX);
264        self.recv_ops.notify(usize::MAX);
265
266        true
267    }
268
269    /// Set the channel capacity.
270    ///
271    /// There are times when you need to change the channel's capacity after creating it. If the
272    /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
273    /// dropped to shrink the channel.
274    fn set_capacity(&mut self, new_cap: usize) {
275        self.capacity = new_cap;
276        if new_cap > self.queue.capacity() {
277            let diff = new_cap - self.queue.capacity();
278            self.queue.reserve(diff);
279        }
280
281        // Ensure queue doesn't have more than `new_cap` messages.
282        if new_cap < self.queue.len() {
283            let diff = self.queue.len() - new_cap;
284            self.queue.drain(0..diff);
285            self.head_pos += diff as u64;
286        }
287    }
288
289    /// Close the channel if there aren't any receivers present anymore
290    fn close_channel(&mut self) {
291        if self.receiver_count == 0 && self.inactive_receiver_count == 0 {
292            self.close();
293        }
294    }
295}
296
297/// The sending side of the broadcast channel.
298///
299/// Senders can be cloned and shared among threads. When all senders associated with a channel are
300/// dropped, the channel becomes closed.
301///
302/// The channel can also be closed manually by calling [`Sender::close()`].
303#[derive(Debug)]
304pub struct Sender<T> {
305    inner: Arc<Mutex<Inner<T>>>,
306}
307
308impl<T> Sender<T> {
309    /// Returns the channel capacity.
310    ///
311    /// # Examples
312    ///
313    /// ```
314    /// use async_broadcast::broadcast;
315    ///
316    /// let (s, r) = broadcast::<i32>(5);
317    /// assert_eq!(s.capacity(), 5);
318    /// ```
319    pub fn capacity(&self) -> usize {
320        self.inner.lock().unwrap().capacity
321    }
322
323    /// Set the channel capacity.
324    ///
325    /// There are times when you need to change the channel's capacity after creating it. If the
326    /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
327    /// dropped to shrink the channel.
328    ///
329    /// # Examples
330    ///
331    /// ```
332    /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
333    ///
334    /// let (mut s, mut r) = broadcast::<i32>(3);
335    /// assert_eq!(s.capacity(), 3);
336    /// s.try_broadcast(1).unwrap();
337    /// s.try_broadcast(2).unwrap();
338    /// s.try_broadcast(3).unwrap();
339    ///
340    /// s.set_capacity(1);
341    /// assert_eq!(s.capacity(), 1);
342    /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
343    /// assert_eq!(r.try_recv().unwrap(), 3);
344    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
345    /// s.try_broadcast(1).unwrap();
346    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
347    ///
348    /// s.set_capacity(2);
349    /// assert_eq!(s.capacity(), 2);
350    /// s.try_broadcast(2).unwrap();
351    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
352    /// ```
353    pub fn set_capacity(&mut self, new_cap: usize) {
354        self.inner.lock().unwrap().set_capacity(new_cap);
355    }
356
357    /// If overflow mode is enabled on this channel.
358    ///
359    /// # Examples
360    ///
361    /// ```
362    /// use async_broadcast::broadcast;
363    ///
364    /// let (s, r) = broadcast::<i32>(5);
365    /// assert!(!s.overflow());
366    /// ```
367    pub fn overflow(&self) -> bool {
368        self.inner.lock().unwrap().overflow
369    }
370
371    /// Set overflow mode on the channel.
372    ///
373    /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
374    /// full. It achieves that by removing the oldest message from the channel.
375    ///
376    /// # Examples
377    ///
378    /// ```
379    /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
380    ///
381    /// let (mut s, mut r) = broadcast::<i32>(2);
382    /// s.try_broadcast(1).unwrap();
383    /// s.try_broadcast(2).unwrap();
384    /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
385    /// s.set_overflow(true);
386    /// assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
387    /// assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
388    ///
389    /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
390    /// assert_eq!(r.try_recv().unwrap(), 3);
391    /// assert_eq!(r.try_recv().unwrap(), 4);
392    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
393    /// ```
394    pub fn set_overflow(&mut self, overflow: bool) {
395        self.inner.lock().unwrap().overflow = overflow;
396    }
397
398    /// If sender will wait for active receivers.
399    ///
400    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
401    /// `true`.
402    ///
403    /// # Examples
404    ///
405    /// ```
406    /// use async_broadcast::broadcast;
407    ///
408    /// let (s, _) = broadcast::<i32>(5);
409    /// assert!(s.await_active());
410    /// ```
411    pub fn await_active(&self) -> bool {
412        self.inner.lock().unwrap().await_active
413    }
414
415    /// Specify if sender will wait for active receivers.
416    ///
417    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
418    /// `true`.
419    ///
420    /// # Examples
421    ///
422    /// ```
423    /// # futures_lite::future::block_on(async {
424    /// use async_broadcast::broadcast;
425    ///
426    /// let (mut s, mut r) = broadcast::<i32>(2);
427    /// s.broadcast(1).await.unwrap();
428    ///
429    /// let _ = r.deactivate();
430    /// s.set_await_active(false);
431    /// assert!(s.broadcast(2).await.is_err());
432    /// # });
433    /// ```
434    pub fn set_await_active(&mut self, await_active: bool) {
435        self.inner.lock().unwrap().await_active = await_active;
436    }
437
438    /// Closes the channel.
439    ///
440    /// Returns `true` if this call has closed the channel and it was not closed already.
441    ///
442    /// The remaining messages can still be received.
443    ///
444    /// # Examples
445    ///
446    /// ```
447    /// # futures_lite::future::block_on(async {
448    /// use async_broadcast::{broadcast, RecvError};
449    ///
450    /// let (s, mut r) = broadcast(1);
451    /// s.broadcast(1).await.unwrap();
452    /// assert!(s.close());
453    ///
454    /// assert_eq!(r.recv().await.unwrap(), 1);
455    /// assert_eq!(r.recv().await, Err(RecvError::Closed));
456    /// # });
457    /// ```
458    pub fn close(&self) -> bool {
459        self.inner.lock().unwrap().close()
460    }
461
462    /// Returns `true` if the channel is closed.
463    ///
464    /// # Examples
465    ///
466    /// ```
467    /// # futures_lite::future::block_on(async {
468    /// use async_broadcast::{broadcast, RecvError};
469    ///
470    /// let (s, r) = broadcast::<()>(1);
471    /// assert!(!s.is_closed());
472    ///
473    /// drop(r);
474    /// assert!(s.is_closed());
475    /// # });
476    /// ```
477    pub fn is_closed(&self) -> bool {
478        self.inner.lock().unwrap().is_closed
479    }
480
481    /// Returns `true` if the channel is empty.
482    ///
483    /// # Examples
484    ///
485    /// ```
486    /// # futures_lite::future::block_on(async {
487    /// use async_broadcast::broadcast;
488    ///
489    /// let (s, r) = broadcast(1);
490    ///
491    /// assert!(s.is_empty());
492    /// s.broadcast(1).await;
493    /// assert!(!s.is_empty());
494    /// # });
495    /// ```
496    pub fn is_empty(&self) -> bool {
497        self.inner.lock().unwrap().queue.is_empty()
498    }
499
500    /// Returns `true` if the channel is full.
501    ///
502    /// # Examples
503    ///
504    /// ```
505    /// # futures_lite::future::block_on(async {
506    /// use async_broadcast::broadcast;
507    ///
508    /// let (s, r) = broadcast(1);
509    ///
510    /// assert!(!s.is_full());
511    /// s.broadcast(1).await;
512    /// assert!(s.is_full());
513    /// # });
514    /// ```
515    pub fn is_full(&self) -> bool {
516        let inner = self.inner.lock().unwrap();
517
518        inner.queue.len() == inner.capacity
519    }
520
521    /// Returns the number of messages in the channel.
522    ///
523    /// # Examples
524    ///
525    /// ```
526    /// # futures_lite::future::block_on(async {
527    /// use async_broadcast::broadcast;
528    ///
529    /// let (s, r) = broadcast(2);
530    /// assert_eq!(s.len(), 0);
531    ///
532    /// s.broadcast(1).await;
533    /// s.broadcast(2).await;
534    /// assert_eq!(s.len(), 2);
535    /// # });
536    /// ```
537    pub fn len(&self) -> usize {
538        self.inner.lock().unwrap().queue.len()
539    }
540
541    /// Returns the number of receivers for the channel.
542    ///
543    /// This does not include inactive receivers. Use [`Sender::inactive_receiver_count`] if you
544    /// are interested in that.
545    ///
546    /// # Examples
547    ///
548    /// ```
549    /// use async_broadcast::broadcast;
550    ///
551    /// let (s, r) = broadcast::<()>(1);
552    /// assert_eq!(s.receiver_count(), 1);
553    /// let r = r.deactivate();
554    /// assert_eq!(s.receiver_count(), 0);
555    ///
556    /// let r2 = r.activate_cloned();
557    /// assert_eq!(r.receiver_count(), 1);
558    /// assert_eq!(r.inactive_receiver_count(), 1);
559    /// ```
560    pub fn receiver_count(&self) -> usize {
561        self.inner.lock().unwrap().receiver_count
562    }
563
564    /// Returns the number of inactive receivers for the channel.
565    ///
566    /// # Examples
567    ///
568    /// ```
569    /// use async_broadcast::broadcast;
570    ///
571    /// let (s, r) = broadcast::<()>(1);
572    /// assert_eq!(s.receiver_count(), 1);
573    /// let r = r.deactivate();
574    /// assert_eq!(s.receiver_count(), 0);
575    ///
576    /// let r2 = r.activate_cloned();
577    /// assert_eq!(r.receiver_count(), 1);
578    /// assert_eq!(r.inactive_receiver_count(), 1);
579    /// ```
580    pub fn inactive_receiver_count(&self) -> usize {
581        self.inner.lock().unwrap().inactive_receiver_count
582    }
583
584    /// Returns the number of senders for the channel.
585    ///
586    /// # Examples
587    ///
588    /// ```
589    /// # futures_lite::future::block_on(async {
590    /// use async_broadcast::broadcast;
591    ///
592    /// let (s, r) = broadcast::<()>(1);
593    /// assert_eq!(s.sender_count(), 1);
594    ///
595    /// let s2 = s.clone();
596    /// assert_eq!(s.sender_count(), 2);
597    /// # });
598    /// ```
599    pub fn sender_count(&self) -> usize {
600        self.inner.lock().unwrap().sender_count
601    }
602
603    /// Produce a new Receiver for this channel.
604    ///
605    /// The new receiver starts with zero messages available.  This will not re-open the channel if
606    /// it was closed due to all receivers being dropped.
607    ///
608    /// # Examples
609    ///
610    /// ```
611    /// # futures_lite::future::block_on(async {
612    /// use async_broadcast::{broadcast, RecvError};
613    ///
614    /// let (s, mut r1) = broadcast(2);
615    ///
616    /// assert_eq!(s.broadcast(1).await, Ok(None));
617    ///
618    /// let mut r2 = s.new_receiver();
619    ///
620    /// assert_eq!(s.broadcast(2).await, Ok(None));
621    /// drop(s);
622    ///
623    /// assert_eq!(r1.recv().await, Ok(1));
624    /// assert_eq!(r1.recv().await, Ok(2));
625    /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
626    ///
627    /// assert_eq!(r2.recv().await, Ok(2));
628    /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
629    /// # });
630    /// ```
631    pub fn new_receiver(&self) -> Receiver<T> {
632        let mut inner = self.inner.lock().unwrap();
633        inner.receiver_count += 1;
634        Receiver {
635            inner: self.inner.clone(),
636            pos: inner.head_pos + inner.queue.len() as u64,
637            listener: None,
638        }
639    }
640}
641
642impl<T: Clone> Sender<T> {
643    /// Broadcasts a message on the channel.
644    ///
645    /// If the channel is full, this method waits until there is space for a message unless:
646    ///
647    /// 1. overflow mode (set through [`Sender::set_overflow`]) is enabled, in which case it removes
648    ///    the oldest message from the channel to make room for the new message. The removed message
649    ///    is returned to the caller.
650    /// 2. this behavior is disabled using [`Sender::set_await_active`], in which case, it returns
651    ///    [`SendError`] immediately.
652    ///
653    /// If the channel is closed, this method returns an error.
654    ///
655    /// The future returned by this function is pinned to the heap. If the future being `Unpin` is
656    /// not important to you, or if you just `.await` this future, use the [`broadcast_direct`] method
657    /// instead.
658    ///
659    /// # Examples
660    ///
661    /// ```
662    /// # futures_lite::future::block_on(async {
663    /// use async_broadcast::{broadcast, SendError};
664    ///
665    /// let (s, r) = broadcast(1);
666    ///
667    /// assert_eq!(s.broadcast(1).await, Ok(None));
668    /// drop(r);
669    /// assert_eq!(s.broadcast(2).await, Err(SendError(2)));
670    /// # });
671    /// ```
672    pub fn broadcast(&self, msg: T) -> Pin<Box<Send<'_, T>>> {
673        Box::pin(self.broadcast_direct(msg))
674    }
675
676    /// Broadcasts a message on the channel without pinning the future to the heap.
677    ///
678    /// The future returned by this method is not `Unpin` and must be pinned before use. This is
679    /// the desired behavior if you just `.await` on the future. For other uses cases, use the
680    /// [`broadcast`] method instead.
681    ///
682    /// # Examples
683    ///
684    /// ```
685    /// # futures_lite::future::block_on(async {
686    /// use async_broadcast::{broadcast, SendError};
687    ///
688    /// let (s, r) = broadcast(1);
689    ///
690    /// assert_eq!(s.broadcast_direct(1).await, Ok(None));
691    /// drop(r);
692    /// assert_eq!(s.broadcast_direct(2).await, Err(SendError(2)));
693    /// # });
694    /// ```
695    pub fn broadcast_direct(&self, msg: T) -> Send<'_, T> {
696        Send::_new(SendInner {
697            sender: self,
698            listener: None,
699            msg: Some(msg),
700            _pin: PhantomPinned,
701        })
702    }
703
704    /// Attempts to broadcast a message on the channel.
705    ///
706    /// If the channel is full, this method returns an error unless overflow mode (set through
707    /// [`Sender::set_overflow`]) is enabled. If the overflow mode is enabled, it removes the
708    /// oldest message from the channel to make room for the new message. The removed message
709    /// is returned to the caller.
710    ///
711    /// If the channel is closed, this method returns an error.
712    ///
713    /// # Examples
714    ///
715    /// ```
716    /// use async_broadcast::{broadcast, TrySendError};
717    ///
718    /// let (s, r) = broadcast(1);
719    ///
720    /// assert_eq!(s.try_broadcast(1), Ok(None));
721    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
722    ///
723    /// drop(r);
724    /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Closed(3)));
725    /// ```
726    pub fn try_broadcast(&self, msg: T) -> Result<Option<T>, TrySendError<T>> {
727        let mut ret = None;
728        let mut inner = self.inner.lock().unwrap();
729
730        if inner.is_closed {
731            return Err(TrySendError::Closed(msg));
732        } else if inner.receiver_count == 0 {
733            assert!(inner.inactive_receiver_count != 0);
734
735            return Err(TrySendError::Inactive(msg));
736        } else if inner.queue.len() == inner.capacity {
737            if inner.overflow {
738                // Make room by popping a message.
739                ret = inner.queue.pop_front().map(|(m, _)| m);
740            } else {
741                return Err(TrySendError::Full(msg));
742            }
743        }
744        let receiver_count = inner.receiver_count;
745        inner.queue.push_back((msg, receiver_count));
746        if ret.is_some() {
747            inner.head_pos += 1;
748        }
749
750        // Notify all awaiting receive operations.
751        inner.recv_ops.notify(usize::MAX);
752
753        Ok(ret)
754    }
755
756    /// Broadcasts a message on the channel using the blocking strategy.
757    ///
758    /// If the channel is full, this method will block until there is room.
759    ///
760    /// If the channel is closed, this method returns an error.
761    ///
762    /// # Blocking
763    ///
764    /// Rather than using asynchronous waiting, like the [`send`](Self::broadcast) method,
765    /// this method will block the current thread until the message is sent.
766    ///
767    /// This method should not be used in an asynchronous context. It is intended
768    /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
769    /// Calling this method in an asynchronous context may result in deadlocks.
770    ///
771    /// # Examples
772    ///
773    /// ```
774    /// use async_broadcast::{broadcast, SendError};
775    ///
776    /// let (s, r) = broadcast(1);
777    ///
778    /// assert_eq!(s.broadcast_blocking(1), Ok(None));
779    /// drop(r);
780    /// assert_eq!(s.broadcast_blocking(2), Err(SendError(2)));
781    /// ```
782    #[cfg(not(target_family = "wasm"))]
783    pub fn broadcast_blocking(&self, msg: T) -> Result<Option<T>, SendError<T>> {
784        self.broadcast_direct(msg).wait()
785    }
786}
787
788impl<T> Drop for Sender<T> {
789    fn drop(&mut self) {
790        let mut inner = self.inner.lock().unwrap();
791
792        inner.sender_count -= 1;
793
794        if inner.sender_count == 0 {
795            inner.close();
796        }
797    }
798}
799
800impl<T> Clone for Sender<T> {
801    fn clone(&self) -> Self {
802        self.inner.lock().unwrap().sender_count += 1;
803
804        Sender {
805            inner: self.inner.clone(),
806        }
807    }
808}
809
810/// The receiving side of a channel.
811///
812/// Receivers can be cloned and shared among threads. When all (active) receivers associated with a
813/// channel are dropped, the channel becomes closed. You can deactivate a receiver using
814/// [`Receiver::deactivate`] if you would like the channel to remain open without keeping active
815/// receivers around.
816#[derive(Debug)]
817pub struct Receiver<T> {
818    inner: Arc<Mutex<Inner<T>>>,
819    pos: u64,
820
821    /// Listens for a send or close event to unblock this stream.
822    listener: Option<EventListener>,
823}
824
825impl<T> Receiver<T> {
826    /// Returns the channel capacity.
827    ///
828    /// # Examples
829    ///
830    /// ```
831    /// use async_broadcast::broadcast;
832    ///
833    /// let (_s, r) = broadcast::<i32>(5);
834    /// assert_eq!(r.capacity(), 5);
835    /// ```
836    pub fn capacity(&self) -> usize {
837        self.inner.lock().unwrap().capacity
838    }
839
840    /// Set the channel capacity.
841    ///
842    /// There are times when you need to change the channel's capacity after creating it. If the
843    /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
844    /// dropped to shrink the channel.
845    ///
846    /// # Examples
847    ///
848    /// ```
849    /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
850    ///
851    /// let (s, mut r) = broadcast::<i32>(3);
852    /// assert_eq!(r.capacity(), 3);
853    /// s.try_broadcast(1).unwrap();
854    /// s.try_broadcast(2).unwrap();
855    /// s.try_broadcast(3).unwrap();
856    ///
857    /// r.set_capacity(1);
858    /// assert_eq!(r.capacity(), 1);
859    /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
860    /// assert_eq!(r.try_recv().unwrap(), 3);
861    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
862    /// s.try_broadcast(1).unwrap();
863    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
864    ///
865    /// r.set_capacity(2);
866    /// assert_eq!(r.capacity(), 2);
867    /// s.try_broadcast(2).unwrap();
868    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
869    /// ```
870    pub fn set_capacity(&mut self, new_cap: usize) {
871        self.inner.lock().unwrap().set_capacity(new_cap);
872    }
873
874    /// If overflow mode is enabled on this channel.
875    ///
876    /// # Examples
877    ///
878    /// ```
879    /// use async_broadcast::broadcast;
880    ///
881    /// let (_s, r) = broadcast::<i32>(5);
882    /// assert!(!r.overflow());
883    /// ```
884    pub fn overflow(&self) -> bool {
885        self.inner.lock().unwrap().overflow
886    }
887
888    /// Set overflow mode on the channel.
889    ///
890    /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
891    /// full. It achieves that by removing the oldest message from the channel.
892    ///
893    /// # Examples
894    ///
895    /// ```
896    /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
897    ///
898    /// let (s, mut r) = broadcast::<i32>(2);
899    /// s.try_broadcast(1).unwrap();
900    /// s.try_broadcast(2).unwrap();
901    /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
902    /// r.set_overflow(true);
903    /// assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
904    /// assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
905    ///
906    /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
907    /// assert_eq!(r.try_recv().unwrap(), 3);
908    /// assert_eq!(r.try_recv().unwrap(), 4);
909    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
910    /// ```
911    pub fn set_overflow(&mut self, overflow: bool) {
912        self.inner.lock().unwrap().overflow = overflow;
913    }
914
915    /// If sender will wait for active receivers.
916    ///
917    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
918    /// `true`.
919    ///
920    /// # Examples
921    ///
922    /// ```
923    /// use async_broadcast::broadcast;
924    ///
925    /// let (_, r) = broadcast::<i32>(5);
926    /// assert!(r.await_active());
927    /// ```
928    pub fn await_active(&self) -> bool {
929        self.inner.lock().unwrap().await_active
930    }
931
932    /// Specify if sender will wait for active receivers.
933    ///
934    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
935    /// `true`.
936    ///
937    /// # Examples
938    ///
939    /// ```
940    /// # futures_lite::future::block_on(async {
941    /// use async_broadcast::broadcast;
942    ///
943    /// let (s, mut r) = broadcast::<i32>(2);
944    /// s.broadcast(1).await.unwrap();
945    ///
946    /// r.set_await_active(false);
947    /// let _ = r.deactivate();
948    /// assert!(s.broadcast(2).await.is_err());
949    /// # });
950    /// ```
951    pub fn set_await_active(&mut self, await_active: bool) {
952        self.inner.lock().unwrap().await_active = await_active;
953    }
954
955    /// Closes the channel.
956    ///
957    /// Returns `true` if this call has closed the channel and it was not closed already.
958    ///
959    /// The remaining messages can still be received.
960    ///
961    /// # Examples
962    ///
963    /// ```
964    /// # futures_lite::future::block_on(async {
965    /// use async_broadcast::{broadcast, RecvError};
966    ///
967    /// let (s, mut r) = broadcast(1);
968    /// s.broadcast(1).await.unwrap();
969    /// assert!(s.close());
970    ///
971    /// assert_eq!(r.recv().await.unwrap(), 1);
972    /// assert_eq!(r.recv().await, Err(RecvError::Closed));
973    /// # });
974    /// ```
975    pub fn close(&self) -> bool {
976        self.inner.lock().unwrap().close()
977    }
978
979    /// Returns `true` if the channel is closed.
980    ///
981    /// # Examples
982    ///
983    /// ```
984    /// # futures_lite::future::block_on(async {
985    /// use async_broadcast::{broadcast, RecvError};
986    ///
987    /// let (s, r) = broadcast::<()>(1);
988    /// assert!(!s.is_closed());
989    ///
990    /// drop(r);
991    /// assert!(s.is_closed());
992    /// # });
993    /// ```
994    pub fn is_closed(&self) -> bool {
995        self.inner.lock().unwrap().is_closed
996    }
997
998    /// Returns `true` if the channel is empty.
999    ///
1000    /// # Examples
1001    ///
1002    /// ```
1003    /// # futures_lite::future::block_on(async {
1004    /// use async_broadcast::broadcast;
1005    ///
1006    /// let (s, r) = broadcast(1);
1007    ///
1008    /// assert!(s.is_empty());
1009    /// s.broadcast(1).await;
1010    /// assert!(!s.is_empty());
1011    /// # });
1012    /// ```
1013    pub fn is_empty(&self) -> bool {
1014        self.inner.lock().unwrap().queue.is_empty()
1015    }
1016
1017    /// Returns `true` if the channel is full.
1018    ///
1019    /// # Examples
1020    ///
1021    /// ```
1022    /// # futures_lite::future::block_on(async {
1023    /// use async_broadcast::broadcast;
1024    ///
1025    /// let (s, r) = broadcast(1);
1026    ///
1027    /// assert!(!s.is_full());
1028    /// s.broadcast(1).await;
1029    /// assert!(s.is_full());
1030    /// # });
1031    /// ```
1032    pub fn is_full(&self) -> bool {
1033        let inner = self.inner.lock().unwrap();
1034
1035        inner.queue.len() == inner.capacity
1036    }
1037
1038    /// Returns the number of messages in the channel.
1039    ///
1040    /// # Examples
1041    ///
1042    /// ```
1043    /// # futures_lite::future::block_on(async {
1044    /// use async_broadcast::broadcast;
1045    ///
1046    /// let (s, r) = broadcast(2);
1047    /// assert_eq!(s.len(), 0);
1048    ///
1049    /// s.broadcast(1).await;
1050    /// s.broadcast(2).await;
1051    /// assert_eq!(s.len(), 2);
1052    /// # });
1053    /// ```
1054    pub fn len(&self) -> usize {
1055        self.inner.lock().unwrap().queue.len()
1056    }
1057
1058    /// Returns the number of receivers for the channel.
1059    ///
1060    /// This does not include inactive receivers. Use [`Receiver::inactive_receiver_count`] if you
1061    /// are interested in that.
1062    ///
1063    /// # Examples
1064    ///
1065    /// ```
1066    /// use async_broadcast::broadcast;
1067    ///
1068    /// let (s, r) = broadcast::<()>(1);
1069    /// assert_eq!(s.receiver_count(), 1);
1070    /// let r = r.deactivate();
1071    /// assert_eq!(s.receiver_count(), 0);
1072    ///
1073    /// let r2 = r.activate_cloned();
1074    /// assert_eq!(r.receiver_count(), 1);
1075    /// assert_eq!(r.inactive_receiver_count(), 1);
1076    /// ```
1077    pub fn receiver_count(&self) -> usize {
1078        self.inner.lock().unwrap().receiver_count
1079    }
1080
1081    /// Returns the number of inactive receivers for the channel.
1082    ///
1083    /// # Examples
1084    ///
1085    /// ```
1086    /// use async_broadcast::broadcast;
1087    ///
1088    /// let (s, r) = broadcast::<()>(1);
1089    /// assert_eq!(s.receiver_count(), 1);
1090    /// let r = r.deactivate();
1091    /// assert_eq!(s.receiver_count(), 0);
1092    ///
1093    /// let r2 = r.activate_cloned();
1094    /// assert_eq!(r.receiver_count(), 1);
1095    /// assert_eq!(r.inactive_receiver_count(), 1);
1096    /// ```
1097    pub fn inactive_receiver_count(&self) -> usize {
1098        self.inner.lock().unwrap().inactive_receiver_count
1099    }
1100
1101    /// Returns the number of senders for the channel.
1102    ///
1103    /// # Examples
1104    ///
1105    /// ```
1106    /// # futures_lite::future::block_on(async {
1107    /// use async_broadcast::broadcast;
1108    ///
1109    /// let (s, r) = broadcast::<()>(1);
1110    /// assert_eq!(s.sender_count(), 1);
1111    ///
1112    /// let s2 = s.clone();
1113    /// assert_eq!(s.sender_count(), 2);
1114    /// # });
1115    /// ```
1116    pub fn sender_count(&self) -> usize {
1117        self.inner.lock().unwrap().sender_count
1118    }
1119
1120    /// Downgrade to a [`InactiveReceiver`].
1121    ///
1122    /// An inactive receiver is one that can not and does not receive any messages. Its only purpose
1123    /// is keep the associated channel open even when there are no (active) receivers. An inactive
1124    /// receiver can be upgraded into a [`Receiver`] using [`InactiveReceiver::activate`] or
1125    /// [`InactiveReceiver::activate_cloned`].
1126    ///
1127    /// [`Sender::try_broadcast`] will return [`TrySendError::Inactive`] if only inactive
1128    /// receivers exists for the associated channel and [`Sender::broadcast`] will wait until an
1129    /// active receiver is available.
1130    ///
1131    /// # Examples
1132    ///
1133    /// ```
1134    /// # futures_lite::future::block_on(async {
1135    /// use async_broadcast::{broadcast, TrySendError};
1136    ///
1137    /// let (s, r) = broadcast(1);
1138    /// let inactive = r.deactivate();
1139    /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1140    ///
1141    /// let mut r = inactive.activate();
1142    /// assert_eq!(s.broadcast(10).await, Ok(None));
1143    /// assert_eq!(r.recv().await, Ok(10));
1144    /// # });
1145    /// ```
1146    pub fn deactivate(self) -> InactiveReceiver<T> {
1147        // Drop::drop impl of Receiver will take care of `receiver_count`.
1148        self.inner.lock().unwrap().inactive_receiver_count += 1;
1149
1150        InactiveReceiver {
1151            inner: self.inner.clone(),
1152        }
1153    }
1154}
1155
1156impl<T: Clone> Receiver<T> {
1157    /// Receives a message from the channel.
1158    ///
1159    /// If the channel is empty, this method waits until there is a message.
1160    ///
1161    /// If the channel is closed, this method receives a message or returns an error if there are
1162    /// no more messages.
1163    ///
1164    /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1165    /// this method returns an error and readjusts its cursor to point to the first available
1166    /// message.
1167    ///
1168    /// The future returned by this function is pinned to the heap. If the future being `Unpin` is
1169    /// not important to you, or if you just `.await` this future, use the [`recv_direct`] method
1170    /// instead.
1171    ///
1172    /// # Examples
1173    ///
1174    /// ```
1175    /// # futures_lite::future::block_on(async {
1176    /// use async_broadcast::{broadcast, RecvError};
1177    ///
1178    /// let (s, mut r1) = broadcast(1);
1179    /// let mut r2 = r1.clone();
1180    ///
1181    /// assert_eq!(s.broadcast(1).await, Ok(None));
1182    /// drop(s);
1183    ///
1184    /// assert_eq!(r1.recv().await, Ok(1));
1185    /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1186    /// assert_eq!(r2.recv().await, Ok(1));
1187    /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1188    /// # });
1189    /// ```
1190    pub fn recv(&mut self) -> Pin<Box<Recv<'_, T>>> {
1191        Box::pin(self.recv_direct())
1192    }
1193
1194    /// Receives a message from the channel without pinning the future to the heap.
1195    ///
1196    /// The future returned by this method is not `Unpin` and must be pinned before use. This is
1197    /// the desired behavior if you just `.await` on the future. For other uses cases, use the
1198    /// [`recv`] method instead.
1199    ///
1200    /// # Examples
1201    ///
1202    /// ```
1203    /// # futures_lite::future::block_on(async {
1204    /// use async_broadcast::{broadcast, RecvError};
1205    ///
1206    /// let (s, mut r1) = broadcast(1);
1207    /// let mut r2 = r1.clone();
1208    ///
1209    /// assert_eq!(s.broadcast(1).await, Ok(None));
1210    /// drop(s);
1211    ///
1212    /// assert_eq!(r1.recv_direct().await, Ok(1));
1213    /// assert_eq!(r1.recv_direct().await, Err(RecvError::Closed));
1214    /// assert_eq!(r2.recv_direct().await, Ok(1));
1215    /// assert_eq!(r2.recv_direct().await, Err(RecvError::Closed));
1216    /// # });
1217    /// ```
1218    pub fn recv_direct(&mut self) -> Recv<'_, T> {
1219        Recv::_new(RecvInner {
1220            receiver: self,
1221            listener: None,
1222            _pin: PhantomPinned,
1223        })
1224    }
1225
1226    /// Attempts to receive a message from the channel.
1227    ///
1228    /// If the channel is empty or closed, this method returns an error.
1229    ///
1230    /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1231    /// this method returns an error and readjusts its cursor to point to the first available
1232    /// message.
1233    ///
1234    /// # Examples
1235    ///
1236    /// ```
1237    /// # futures_lite::future::block_on(async {
1238    /// use async_broadcast::{broadcast, TryRecvError};
1239    ///
1240    /// let (s, mut r1) = broadcast(1);
1241    /// let mut r2 = r1.clone();
1242    /// assert_eq!(s.broadcast(1).await, Ok(None));
1243    ///
1244    /// assert_eq!(r1.try_recv(), Ok(1));
1245    /// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
1246    /// assert_eq!(r2.try_recv(), Ok(1));
1247    /// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
1248    ///
1249    /// drop(s);
1250    /// assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
1251    /// assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
1252    /// # });
1253    /// ```
1254    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1255        self.inner
1256            .lock()
1257            .unwrap()
1258            .try_recv_at(&mut self.pos)
1259            .map(|cow| cow.unwrap_or_else(T::clone))
1260    }
1261
1262    /// Receives a message from the channel using the blocking strategy.
1263    ///
1264    /// If the channel is empty, this method will block until there is a message.
1265    ///
1266    /// If the channel is closed, this method receives a message or returns an error if there are
1267    /// no more messages.
1268    ///
1269    /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1270    /// this method returns an error and readjusts its cursor to point to the first available
1271    /// message.
1272    ///
1273    /// # Blocking
1274    ///
1275    /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
1276    /// this method will block the current thread until the message is sent.
1277    ///
1278    /// This method should not be used in an asynchronous context. It is intended
1279    /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
1280    /// Calling this method in an asynchronous context may result in deadlocks.
1281    ///
1282    /// # Examples
1283    ///
1284    /// ```
1285    /// use async_broadcast::{broadcast, RecvError};
1286    ///
1287    /// let (s, mut r) = broadcast(1);
1288    ///
1289    /// assert_eq!(s.broadcast_blocking(1), Ok(None));
1290    /// drop(s);
1291    ///
1292    /// assert_eq!(r.recv_blocking(), Ok(1));
1293    /// assert_eq!(r.recv_blocking(), Err(RecvError::Closed));
1294    /// ```
1295    #[cfg(not(target_family = "wasm"))]
1296    pub fn recv_blocking(&mut self) -> Result<T, RecvError> {
1297        self.recv_direct().wait()
1298    }
1299
1300    /// Produce a new Sender for this channel.
1301    ///
1302    /// This will not re-open the channel if it was closed due to all senders being dropped.
1303    ///
1304    /// # Examples
1305    ///
1306    /// ```
1307    /// # futures_lite::future::block_on(async {
1308    /// use async_broadcast::{broadcast, RecvError};
1309    ///
1310    /// let (s1, mut r) = broadcast(2);
1311    ///
1312    /// assert_eq!(s1.broadcast(1).await, Ok(None));
1313    ///
1314    /// let mut s2 = r.new_sender();
1315    ///
1316    /// assert_eq!(s2.broadcast(2).await, Ok(None));
1317    /// drop(s1);
1318    /// drop(s2);
1319    ///
1320    /// assert_eq!(r.recv().await, Ok(1));
1321    /// assert_eq!(r.recv().await, Ok(2));
1322    /// assert_eq!(r.recv().await, Err(RecvError::Closed));
1323    /// # });
1324    /// ```
1325    pub fn new_sender(&self) -> Sender<T> {
1326        self.inner.lock().unwrap().sender_count += 1;
1327
1328        Sender {
1329            inner: self.inner.clone(),
1330        }
1331    }
1332
1333    /// Produce a new Receiver for this channel.
1334    ///
1335    /// Unlike [`Receiver::clone`], this method creates a new receiver that starts with zero
1336    /// messages available.  This is slightly faster than a real clone.
1337    ///
1338    /// # Examples
1339    ///
1340    /// ```
1341    /// # futures_lite::future::block_on(async {
1342    /// use async_broadcast::{broadcast, RecvError};
1343    ///
1344    /// let (s, mut r1) = broadcast(2);
1345    ///
1346    /// assert_eq!(s.broadcast(1).await, Ok(None));
1347    ///
1348    /// let mut r2 = r1.new_receiver();
1349    ///
1350    /// assert_eq!(s.broadcast(2).await, Ok(None));
1351    /// drop(s);
1352    ///
1353    /// assert_eq!(r1.recv().await, Ok(1));
1354    /// assert_eq!(r1.recv().await, Ok(2));
1355    /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1356    ///
1357    /// assert_eq!(r2.recv().await, Ok(2));
1358    /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1359    /// # });
1360    /// ```
1361    pub fn new_receiver(&self) -> Self {
1362        let mut inner = self.inner.lock().unwrap();
1363        inner.receiver_count += 1;
1364        Receiver {
1365            inner: self.inner.clone(),
1366            pos: inner.head_pos + inner.queue.len() as u64,
1367            listener: None,
1368        }
1369    }
1370
1371    /// A low level poll method that is similar to [`Receiver::recv()`] or
1372    /// [`Receiver::recv_direct()`], and can be useful for building stream implementations which
1373    /// use a [`Receiver`] under the hood and want to know if the stream has overflowed.
1374    ///
1375    /// Prefer to use [`Receiver::recv()`] or [`Receiver::recv_direct()`] when otherwise possible.
1376    ///
1377    /// # Errors
1378    ///
1379    /// If the number of messages that have been sent has overflowed the channel capacity, a
1380    /// [`RecvError::Overflowed`] variant is returned containing the number of items that
1381    /// overflowed and were lost.
1382    ///
1383    /// # Examples
1384    ///
1385    /// This example shows how the [`Receiver::poll_recv`] method can be used to allow a custom
1386    /// stream implementation to internally make use of a [`Receiver`]. This example implementation
1387    /// differs from the stream implementation of [`Receiver`] because it returns an error if
1388    /// the channel capacity overflows, which the built in [`Receiver`] stream doesn't do.
1389    ///
1390    /// ```
1391    /// use futures_core::Stream;
1392    /// use async_broadcast::{Receiver, RecvError};
1393    /// use std::{pin::Pin, task::{Poll, Context}};
1394    ///
1395    /// struct MyStream(Receiver<i32>);
1396    ///
1397    /// impl futures_core::Stream for MyStream {
1398    ///     type Item = Result<i32, RecvError>;
1399    ///     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1400    ///         Pin::new(&mut self.0).poll_recv(cx)
1401    ///     }
1402    /// }
1403    /// ```
1404    pub fn poll_recv(
1405        mut self: Pin<&mut Self>,
1406        cx: &mut Context<'_>,
1407    ) -> Poll<Option<Result<T, RecvError>>> {
1408        loop {
1409            // If this stream is listening for events, first wait for a notification.
1410            if let Some(listener) = self.listener.as_mut() {
1411                ready!(Pin::new(listener).poll(cx));
1412                self.listener = None;
1413            }
1414
1415            loop {
1416                // Attempt to receive a message.
1417                match self.try_recv() {
1418                    Ok(msg) => {
1419                        // The stream is not blocked on an event - drop the listener.
1420                        self.listener = None;
1421                        return Poll::Ready(Some(Ok(msg)));
1422                    }
1423                    Err(TryRecvError::Closed) => {
1424                        // The stream is not blocked on an event - drop the listener.
1425                        self.listener = None;
1426                        return Poll::Ready(None);
1427                    }
1428                    Err(TryRecvError::Overflowed(n)) => {
1429                        // The stream is not blocked on an event - drop the listener.
1430                        self.listener = None;
1431                        return Poll::Ready(Some(Err(RecvError::Overflowed(n))));
1432                    }
1433                    Err(TryRecvError::Empty) => {}
1434                }
1435
1436                // Receiving failed - now start listening for notifications or wait for one.
1437                match self.listener.as_mut() {
1438                    None => {
1439                        // Start listening and then try receiving again.
1440                        self.listener = {
1441                            let inner = self.inner.lock().unwrap();
1442                            Some(inner.recv_ops.listen())
1443                        };
1444                    }
1445                    Some(_) => {
1446                        // Go back to the outer loop to poll the listener.
1447                        break;
1448                    }
1449                }
1450            }
1451        }
1452    }
1453}
1454
1455impl<T> Drop for Receiver<T> {
1456    fn drop(&mut self) {
1457        let mut inner = self.inner.lock().unwrap();
1458
1459        // Remove ourself from each item's counter
1460        loop {
1461            match inner.try_recv_at(&mut self.pos) {
1462                Ok(_) => continue,
1463                Err(TryRecvError::Overflowed(_)) => continue,
1464                Err(TryRecvError::Closed) => break,
1465                Err(TryRecvError::Empty) => break,
1466            }
1467        }
1468
1469        inner.receiver_count -= 1;
1470
1471        inner.close_channel();
1472    }
1473}
1474
1475impl<T> Clone for Receiver<T> {
1476    /// Produce a clone of this Receiver that has the same messages queued.
1477    ///
1478    /// # Examples
1479    ///
1480    /// ```
1481    /// # futures_lite::future::block_on(async {
1482    /// use async_broadcast::{broadcast, RecvError};
1483    ///
1484    /// let (s, mut r1) = broadcast(1);
1485    ///
1486    /// assert_eq!(s.broadcast(1).await, Ok(None));
1487    /// drop(s);
1488    ///
1489    /// let mut r2 = r1.clone();
1490    ///
1491    /// assert_eq!(r1.recv().await, Ok(1));
1492    /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1493    /// assert_eq!(r2.recv().await, Ok(1));
1494    /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1495    /// # });
1496    /// ```
1497    fn clone(&self) -> Self {
1498        let mut inner = self.inner.lock().unwrap();
1499        inner.receiver_count += 1;
1500        // increment the waiter count on all items not yet received by this object
1501        let n = self.pos.saturating_sub(inner.head_pos) as usize;
1502        for (_elt, waiters) in inner.queue.iter_mut().skip(n) {
1503            *waiters += 1;
1504        }
1505        Receiver {
1506            inner: self.inner.clone(),
1507            pos: self.pos,
1508            listener: None,
1509        }
1510    }
1511}
1512
1513impl<T: Clone> Stream for Receiver<T> {
1514    type Item = T;
1515
1516    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1517        loop {
1518            match ready!(self.as_mut().poll_recv(cx)) {
1519                Some(Ok(val)) => return Poll::Ready(Some(val)),
1520                // If overflowed, we expect future operations to succeed so try again.
1521                Some(Err(RecvError::Overflowed(_))) => continue,
1522                // RecvError::Closed should never appear here, but handle it anyway.
1523                None | Some(Err(RecvError::Closed)) => return Poll::Ready(None),
1524            }
1525        }
1526    }
1527}
1528
1529impl<T: Clone> futures_core::stream::FusedStream for Receiver<T> {
1530    fn is_terminated(&self) -> bool {
1531        let inner = self.inner.lock().unwrap();
1532
1533        inner.is_closed && inner.queue.is_empty()
1534    }
1535}
1536
1537/// An error returned from [`Sender::broadcast()`].
1538///
1539/// Received because the channel is closed or no active receivers were present while `await-active`
1540/// was set to `false` (See [`Sender::set_await_active`] for details).
1541#[derive(PartialEq, Eq, Clone, Copy)]
1542pub struct SendError<T>(pub T);
1543
1544impl<T> SendError<T> {
1545    /// Unwraps the message that couldn't be sent.
1546    pub fn into_inner(self) -> T {
1547        self.0
1548    }
1549}
1550
1551impl<T> error::Error for SendError<T> {}
1552
1553impl<T> fmt::Debug for SendError<T> {
1554    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1555        write!(f, "SendError(..)")
1556    }
1557}
1558
1559impl<T> fmt::Display for SendError<T> {
1560    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1561        write!(f, "sending into a closed channel")
1562    }
1563}
1564
1565/// An error returned from [`Sender::try_broadcast()`].
1566#[derive(PartialEq, Eq, Clone, Copy)]
1567pub enum TrySendError<T> {
1568    /// The channel is full but not closed.
1569    Full(T),
1570
1571    /// The channel is closed.
1572    Closed(T),
1573
1574    /// There are currently no active receivers, only inactive ones.
1575    Inactive(T),
1576}
1577
1578impl<T> TrySendError<T> {
1579    /// Unwraps the message that couldn't be sent.
1580    pub fn into_inner(self) -> T {
1581        match self {
1582            TrySendError::Full(t) => t,
1583            TrySendError::Closed(t) => t,
1584            TrySendError::Inactive(t) => t,
1585        }
1586    }
1587
1588    /// Returns `true` if the channel is full but not closed.
1589    pub fn is_full(&self) -> bool {
1590        match self {
1591            TrySendError::Full(_) => true,
1592            TrySendError::Closed(_) | TrySendError::Inactive(_) => false,
1593        }
1594    }
1595
1596    /// Returns `true` if the channel is closed.
1597    pub fn is_closed(&self) -> bool {
1598        match self {
1599            TrySendError::Full(_) | TrySendError::Inactive(_) => false,
1600            TrySendError::Closed(_) => true,
1601        }
1602    }
1603
1604    /// Returns `true` if there are currently no active receivers, only inactive ones.
1605    pub fn is_disconnected(&self) -> bool {
1606        match self {
1607            TrySendError::Full(_) | TrySendError::Closed(_) => false,
1608            TrySendError::Inactive(_) => true,
1609        }
1610    }
1611}
1612
1613impl<T> error::Error for TrySendError<T> {}
1614
1615impl<T> fmt::Debug for TrySendError<T> {
1616    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1617        match *self {
1618            TrySendError::Full(..) => write!(f, "Full(..)"),
1619            TrySendError::Closed(..) => write!(f, "Closed(..)"),
1620            TrySendError::Inactive(..) => write!(f, "Inactive(..)"),
1621        }
1622    }
1623}
1624
1625impl<T> fmt::Display for TrySendError<T> {
1626    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1627        match *self {
1628            TrySendError::Full(..) => write!(f, "sending into a full channel"),
1629            TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
1630            TrySendError::Inactive(..) => write!(f, "sending into the void (no active receivers)"),
1631        }
1632    }
1633}
1634
1635/// An error returned from [`Receiver::recv()`].
1636#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1637pub enum RecvError {
1638    /// The channel has overflowed since the last element was seen.  Future recv operations will
1639    /// succeed, but some messages have been skipped.
1640    ///
1641    /// Contains the number of messages missed.
1642    Overflowed(u64),
1643
1644    /// The channel is empty and closed.
1645    Closed,
1646}
1647
1648impl error::Error for RecvError {}
1649
1650impl fmt::Display for RecvError {
1651    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1652        match self {
1653            Self::Overflowed(n) => write!(f, "receiving skipped {} messages", n),
1654            Self::Closed => write!(f, "receiving from an empty and closed channel"),
1655        }
1656    }
1657}
1658
1659/// An error returned from [`Receiver::try_recv()`].
1660#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1661pub enum TryRecvError {
1662    /// The channel has overflowed since the last element was seen.  Future recv operations will
1663    /// succeed, but some messages have been skipped.
1664    Overflowed(u64),
1665
1666    /// The channel is empty but not closed.
1667    Empty,
1668
1669    /// The channel is empty and closed.
1670    Closed,
1671}
1672
1673impl TryRecvError {
1674    /// Returns `true` if the channel is empty but not closed.
1675    pub fn is_empty(&self) -> bool {
1676        match self {
1677            TryRecvError::Empty => true,
1678            TryRecvError::Closed => false,
1679            TryRecvError::Overflowed(_) => false,
1680        }
1681    }
1682
1683    /// Returns `true` if the channel is empty and closed.
1684    pub fn is_closed(&self) -> bool {
1685        match self {
1686            TryRecvError::Empty => false,
1687            TryRecvError::Closed => true,
1688            TryRecvError::Overflowed(_) => false,
1689        }
1690    }
1691
1692    /// Returns `true` if this error indicates the receiver missed messages.
1693    pub fn is_overflowed(&self) -> bool {
1694        match self {
1695            TryRecvError::Empty => false,
1696            TryRecvError::Closed => false,
1697            TryRecvError::Overflowed(_) => true,
1698        }
1699    }
1700}
1701
1702impl error::Error for TryRecvError {}
1703
1704impl fmt::Display for TryRecvError {
1705    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1706        match *self {
1707            TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1708            TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1709            TryRecvError::Overflowed(n) => {
1710                write!(f, "receiving operation observed {} lost messages", n)
1711            }
1712        }
1713    }
1714}
1715
1716easy_wrapper! {
1717    /// A future returned by [`Sender::broadcast()`].
1718    #[derive(Debug)]
1719    #[must_use = "futures do nothing unless .awaited"]
1720    pub struct Send<'a, T: Clone>(SendInner<'a, T> => Result<Option<T>, SendError<T>>);
1721    #[cfg(not(target_family = "wasm"))]
1722    pub(crate) wait();
1723}
1724
1725pin_project! {
1726    #[derive(Debug)]
1727    struct SendInner<'a, T> {
1728        sender: &'a Sender<T>,
1729        listener: Option<EventListener>,
1730        msg: Option<T>,
1731
1732        // Keeping this type `!Unpin` enables future optimizations.
1733        #[pin]
1734        _pin: PhantomPinned
1735    }
1736}
1737
1738impl<T: Clone> EventListenerFuture for SendInner<'_, T> {
1739    type Output = Result<Option<T>, SendError<T>>;
1740
1741    fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
1742        self: Pin<&mut Self>,
1743        strategy: &mut S,
1744        context: &mut S::Context,
1745    ) -> Poll<Self::Output> {
1746        let this = self.project();
1747
1748        loop {
1749            let msg = this.msg.take().unwrap();
1750            let inner = &this.sender.inner;
1751
1752            // Attempt to send a message.
1753            match this.sender.try_broadcast(msg) {
1754                Ok(msg) => {
1755                    let inner = inner.lock().unwrap();
1756
1757                    if inner.queue.len() < inner.capacity {
1758                        // Not full still, so notify the next awaiting sender.
1759                        inner.send_ops.notify(1);
1760                    }
1761
1762                    return Poll::Ready(Ok(msg));
1763                }
1764                Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1765                Err(TrySendError::Full(m)) => *this.msg = Some(m),
1766                Err(TrySendError::Inactive(m)) if inner.lock().unwrap().await_active => {
1767                    *this.msg = Some(m)
1768                }
1769                Err(TrySendError::Inactive(m)) => return Poll::Ready(Err(SendError(m))),
1770            }
1771
1772            // Sending failed - now start listening for notifications or wait for one.
1773            match &this.listener {
1774                None => {
1775                    // Start listening and then try sending again.
1776                    let inner = inner.lock().unwrap();
1777                    *this.listener = Some(inner.send_ops.listen());
1778                }
1779                Some(_) => {
1780                    // Wait for a notification.
1781                    ready!(strategy.poll(this.listener, context));
1782                    *this.listener = None;
1783                }
1784            }
1785        }
1786    }
1787}
1788
1789easy_wrapper! {
1790    /// A future returned by [`Receiver::recv()`].
1791    #[derive(Debug)]
1792    #[must_use = "futures do nothing unless .awaited"]
1793    pub struct Recv<'a, T: Clone>(RecvInner<'a, T> => Result<T, RecvError>);
1794    #[cfg(not(target_family = "wasm"))]
1795    pub(crate) wait();
1796}
1797
1798pin_project! {
1799    #[derive(Debug)]
1800    struct RecvInner<'a, T> {
1801        receiver: &'a mut Receiver<T>,
1802        listener: Option<EventListener>,
1803
1804        // Keeping this type `!Unpin` enables future optimizations.
1805        #[pin]
1806        _pin: PhantomPinned
1807    }
1808}
1809
1810impl<T: Clone> EventListenerFuture for RecvInner<'_, T> {
1811    type Output = Result<T, RecvError>;
1812
1813    fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
1814        self: Pin<&mut Self>,
1815        strategy: &mut S,
1816        context: &mut S::Context,
1817    ) -> Poll<Self::Output> {
1818        let this = self.project();
1819
1820        loop {
1821            // Attempt to receive a message.
1822            match this.receiver.try_recv() {
1823                Ok(msg) => return Poll::Ready(Ok(msg)),
1824                Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1825                Err(TryRecvError::Overflowed(n)) => {
1826                    return Poll::Ready(Err(RecvError::Overflowed(n)));
1827                }
1828                Err(TryRecvError::Empty) => {}
1829            }
1830
1831            // Receiving failed - now start listening for notifications or wait for one.
1832            match &this.listener {
1833                None => {
1834                    // Start listening and then try receiving again.
1835                    *this.listener = {
1836                        let inner = this.receiver.inner.lock().unwrap();
1837                        Some(inner.recv_ops.listen())
1838                    };
1839                }
1840                Some(_) => {
1841                    // Wait for a notification.
1842                    ready!(strategy.poll(this.listener, context));
1843                    *this.listener = None;
1844                }
1845            }
1846        }
1847    }
1848}
1849
1850/// An inactive  receiver.
1851///
1852/// An inactive receiver is a receiver that is unable to receive messages. It's only useful for
1853/// keeping a channel open even when no associated active receivers exist.
1854#[derive(Debug)]
1855pub struct InactiveReceiver<T> {
1856    inner: Arc<Mutex<Inner<T>>>,
1857}
1858
1859impl<T> InactiveReceiver<T> {
1860    /// Convert to an activate [`Receiver`].
1861    ///
1862    /// Consumes `self`. Use [`InactiveReceiver::activate_cloned`] if you want to keep `self`.
1863    ///
1864    /// # Examples
1865    ///
1866    /// ```
1867    /// use async_broadcast::{broadcast, TrySendError};
1868    ///
1869    /// let (s, r) = broadcast(1);
1870    /// let inactive = r.deactivate();
1871    /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1872    ///
1873    /// let mut r = inactive.activate();
1874    /// assert_eq!(s.try_broadcast(10), Ok(None));
1875    /// assert_eq!(r.try_recv(), Ok(10));
1876    /// ```
1877    pub fn activate(self) -> Receiver<T> {
1878        self.activate_cloned()
1879    }
1880
1881    /// Create an activate [`Receiver`] for the associated channel.
1882    ///
1883    /// # Examples
1884    ///
1885    /// ```
1886    /// use async_broadcast::{broadcast, TrySendError};
1887    ///
1888    /// let (s, r) = broadcast(1);
1889    /// let inactive = r.deactivate();
1890    /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1891    ///
1892    /// let mut r = inactive.activate_cloned();
1893    /// assert_eq!(s.try_broadcast(10), Ok(None));
1894    /// assert_eq!(r.try_recv(), Ok(10));
1895    /// ```
1896    pub fn activate_cloned(&self) -> Receiver<T> {
1897        let mut inner = self.inner.lock().unwrap();
1898        inner.receiver_count += 1;
1899
1900        if inner.receiver_count == 1 {
1901            // Notify 1 awaiting senders that there is now a receiver. If there is still room in the
1902            // queue, the notified operation will notify another awaiting sender.
1903            inner.send_ops.notify(1);
1904        }
1905
1906        Receiver {
1907            inner: self.inner.clone(),
1908            pos: inner.head_pos + inner.queue.len() as u64,
1909            listener: None,
1910        }
1911    }
1912
1913    /// Returns the channel capacity.
1914    ///
1915    /// See [`Receiver::capacity`] documentation for examples.
1916    pub fn capacity(&self) -> usize {
1917        self.inner.lock().unwrap().capacity
1918    }
1919
1920    /// Set the channel capacity.
1921    ///
1922    /// There are times when you need to change the channel's capacity after creating it. If the
1923    /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
1924    /// dropped to shrink the channel.
1925    ///
1926    /// See [`Receiver::set_capacity`] documentation for examples.
1927    pub fn set_capacity(&mut self, new_cap: usize) {
1928        self.inner.lock().unwrap().set_capacity(new_cap);
1929    }
1930
1931    /// If overflow mode is enabled on this channel.
1932    ///
1933    /// See [`Receiver::overflow`] documentation for examples.
1934    pub fn overflow(&self) -> bool {
1935        self.inner.lock().unwrap().overflow
1936    }
1937
1938    /// Set overflow mode on the channel.
1939    ///
1940    /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
1941    /// full. It achieves that by removing the oldest message from the channel.
1942    ///
1943    /// See [`Receiver::set_overflow`] documentation for examples.
1944    pub fn set_overflow(&mut self, overflow: bool) {
1945        self.inner.lock().unwrap().overflow = overflow;
1946    }
1947
1948    /// If sender will wait for active receivers.
1949    ///
1950    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
1951    /// `true`.
1952    ///
1953    /// # Examples
1954    ///
1955    /// ```
1956    /// use async_broadcast::broadcast;
1957    ///
1958    /// let (_, r) = broadcast::<i32>(5);
1959    /// let r = r.deactivate();
1960    /// assert!(r.await_active());
1961    /// ```
1962    pub fn await_active(&self) -> bool {
1963        self.inner.lock().unwrap().await_active
1964    }
1965
1966    /// Specify if sender will wait for active receivers.
1967    ///
1968    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
1969    /// `true`.
1970    ///
1971    /// # Examples
1972    ///
1973    /// ```
1974    /// # futures_lite::future::block_on(async {
1975    /// use async_broadcast::broadcast;
1976    ///
1977    /// let (s, r) = broadcast::<i32>(2);
1978    /// s.broadcast(1).await.unwrap();
1979    ///
1980    /// let mut r = r.deactivate();
1981    /// r.set_await_active(false);
1982    /// assert!(s.broadcast(2).await.is_err());
1983    /// # });
1984    /// ```
1985    pub fn set_await_active(&mut self, await_active: bool) {
1986        self.inner.lock().unwrap().await_active = await_active;
1987    }
1988
1989    /// Closes the channel.
1990    ///
1991    /// Returns `true` if this call has closed the channel and it was not closed already.
1992    ///
1993    /// The remaining messages can still be received.
1994    ///
1995    /// See [`Receiver::close`] documentation for examples.
1996    pub fn close(&self) -> bool {
1997        self.inner.lock().unwrap().close()
1998    }
1999
2000    /// Returns `true` if the channel is closed.
2001    ///
2002    /// See [`Receiver::is_closed`] documentation for examples.
2003    pub fn is_closed(&self) -> bool {
2004        self.inner.lock().unwrap().is_closed
2005    }
2006
2007    /// Returns `true` if the channel is empty.
2008    ///
2009    /// See [`Receiver::is_empty`] documentation for examples.
2010    pub fn is_empty(&self) -> bool {
2011        self.inner.lock().unwrap().queue.is_empty()
2012    }
2013
2014    /// Returns `true` if the channel is full.
2015    ///
2016    /// See [`Receiver::is_full`] documentation for examples.
2017    pub fn is_full(&self) -> bool {
2018        let inner = self.inner.lock().unwrap();
2019
2020        inner.queue.len() == inner.capacity
2021    }
2022
2023    /// Returns the number of messages in the channel.
2024    ///
2025    /// See [`Receiver::len`] documentation for examples.
2026    pub fn len(&self) -> usize {
2027        self.inner.lock().unwrap().queue.len()
2028    }
2029
2030    /// Returns the number of receivers for the channel.
2031    ///
2032    /// This does not include inactive receivers. Use [`InactiveReceiver::inactive_receiver_count`]
2033    /// if you're interested in that.
2034    ///
2035    /// # Examples
2036    ///
2037    /// ```
2038    /// use async_broadcast::broadcast;
2039    ///
2040    /// let (s, r) = broadcast::<()>(1);
2041    /// assert_eq!(s.receiver_count(), 1);
2042    /// let r = r.deactivate();
2043    /// assert_eq!(s.receiver_count(), 0);
2044    ///
2045    /// let r2 = r.activate_cloned();
2046    /// assert_eq!(r.receiver_count(), 1);
2047    /// assert_eq!(r.inactive_receiver_count(), 1);
2048    /// ```
2049    pub fn receiver_count(&self) -> usize {
2050        self.inner.lock().unwrap().receiver_count
2051    }
2052
2053    /// Returns the number of inactive receivers for the channel.
2054    ///
2055    /// # Examples
2056    ///
2057    /// ```
2058    /// use async_broadcast::broadcast;
2059    ///
2060    /// let (s, r) = broadcast::<()>(1);
2061    /// assert_eq!(s.receiver_count(), 1);
2062    /// let r = r.deactivate();
2063    /// assert_eq!(s.receiver_count(), 0);
2064    ///
2065    /// let r2 = r.activate_cloned();
2066    /// assert_eq!(r.receiver_count(), 1);
2067    /// assert_eq!(r.inactive_receiver_count(), 1);
2068    /// ```
2069    pub fn inactive_receiver_count(&self) -> usize {
2070        self.inner.lock().unwrap().inactive_receiver_count
2071    }
2072
2073    /// Returns the number of senders for the channel.
2074    ///
2075    /// See [`Receiver::sender_count`] documentation for examples.
2076    pub fn sender_count(&self) -> usize {
2077        self.inner.lock().unwrap().sender_count
2078    }
2079}
2080
2081impl<T> Clone for InactiveReceiver<T> {
2082    fn clone(&self) -> Self {
2083        self.inner.lock().unwrap().inactive_receiver_count += 1;
2084
2085        InactiveReceiver {
2086            inner: self.inner.clone(),
2087        }
2088    }
2089}
2090
2091impl<T> Drop for InactiveReceiver<T> {
2092    fn drop(&mut self) {
2093        let mut inner = self.inner.lock().unwrap();
2094
2095        inner.inactive_receiver_count -= 1;
2096        inner.close_channel();
2097    }
2098}