async_broadcast/lib.rs
1//! Async broadcast channel
2//!
3//! An async multi-producer multi-consumer broadcast channel, where each consumer gets a clone of every
4//! message sent on the channel. For obvious reasons, the channel can only be used to broadcast types
5//! that implement [`Clone`].
6//!
7//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
8//! among multiple threads.
9//!
10//! When all `Sender`s or all `Receiver`s are dropped, the channel becomes closed. When a channel is
11//! closed, no more messages can be sent, but remaining messages can still be received.
12//!
13//! The channel can also be closed manually by calling [`Sender::close()`] or [`Receiver::close()`].
14//!
15//! ## Examples
16//!
17//! ```rust
18//! use async_broadcast::{broadcast, TryRecvError};
19//! use futures_lite::{future::block_on, stream::StreamExt};
20//!
21//! block_on(async move {
22//! let (s1, mut r1) = broadcast(2);
23//! let s2 = s1.clone();
24//! let mut r2 = r1.clone();
25//!
26//! // Send 2 messages from two different senders.
27//! s1.broadcast(7).await.unwrap();
28//! s2.broadcast(8).await.unwrap();
29//!
30//! // Channel is now at capacity so sending more messages will result in an error.
31//! assert!(s2.try_broadcast(9).unwrap_err().is_full());
32//! assert!(s1.try_broadcast(10).unwrap_err().is_full());
33//!
34//! // We can use `recv` method of the `Stream` implementation to receive messages.
35//! assert_eq!(r1.next().await.unwrap(), 7);
36//! assert_eq!(r1.recv().await.unwrap(), 8);
37//! assert_eq!(r2.next().await.unwrap(), 7);
38//! assert_eq!(r2.recv().await.unwrap(), 8);
39//!
40//! // All receiver got all messages so channel is now empty.
41//! assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
42//! assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
43//!
44//! // Drop both senders, which closes the channel.
45//! drop(s1);
46//! drop(s2);
47//!
48//! assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
49//! assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
50//! })
51//! ```
52//!
53//! ## Difference with `async-channel`
54//!
55//! This crate is similar to [`async-channel`] in that they both provide an MPMC channel but the
56//! main difference being that in `async-channel`, each message sent on the channel is only received
57//! by one of the receivers. `async-broadcast` on the other hand, delivers each message to every
58//! receiver (IOW broadcast) by cloning it for each receiver.
59//!
60//! [`async-channel`]: https://crates.io/crates/async-channel
61//!
62//! ## Difference with other broadcast crates
63//!
64//! * [`broadcaster`]: The main difference would be that `broadcaster` doesn't have a sender and
65//! receiver split and both sides use clones of the same BroadcastChannel instance. The messages
66//! are sent are sent to all channel clones. While this can work for many cases, the lack of
67//! sender and receiver split, means that often times, you'll find yourself having to drain the
68//! channel on the sending side yourself.
69//!
70//! * [`postage`]: this crate provides a [broadcast API][pba] similar to `async_broadcast`. However,
71//! it:
72//! - (at the time of this writing) duplicates [futures] API, which isn't ideal.
73//! - Does not support overflow mode nor has the concept of inactive receivers, so a slow or
74//! inactive receiver blocking the whole channel is not a solvable problem.
75//! - Provides all kinds of channels, which is generally good but if you just need a broadcast
76//! channel, `async_broadcast` is probably a better choice.
77//!
78//! * [`tokio::sync`]: Tokio's `sync` module provides a [broadcast channel][tbc] API. The differences
79//! here are:
80//! - While this implementation does provide [overflow mode][tom], it is the default behavior and not
81//! opt-in.
82//! - There is no equivalent of inactive receivers.
83//! - While it's possible to build tokio with only the `sync` module, it comes with other APIs that
84//! you may not need.
85//!
86//! [`broadcaster`]: https://crates.io/crates/broadcaster
87//! [`postage`]: https://crates.io/crates/postage
88//! [pba]: https://docs.rs/postage/0.4.1/postage/broadcast/fn.channel.html
89//! [futures]: https://crates.io/crates/futures
90//! [`tokio::sync`]: https://docs.rs/tokio/1.6.0/tokio/sync
91//! [tbc]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html
92//! [tom]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html#lagging
93//!
94#![forbid(unsafe_code)]
95#![deny(missing_debug_implementations, nonstandard_style, rust_2018_idioms)]
96#![warn(rustdoc::missing_doc_code_examples, unreachable_pub)]
97#![doc(
98 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
99)]
100#![doc(
101 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
102)]
103
104#[cfg(doctest)]
105mod doctests {
106 doc_comment::doctest!("../README.md");
107}
108
109use std::collections::VecDeque;
110use std::convert::TryInto;
111use std::error;
112use std::fmt;
113use std::future::Future;
114use std::marker::PhantomPinned;
115use std::pin::Pin;
116use std::sync::{Arc, Mutex};
117use std::task::{Context, Poll};
118
119use event_listener::{Event, EventListener};
120use event_listener_strategy::{easy_wrapper, EventListenerFuture};
121use futures_core::{ready, stream::Stream};
122use pin_project_lite::pin_project;
123
124/// Create a new broadcast channel.
125///
126/// The created channel has space to hold at most `cap` messages at a time.
127///
128/// # Panics
129///
130/// Capacity must be a positive number. If `cap` is zero, this function will panic.
131///
132/// # Examples
133///
134/// ```
135/// # futures_lite::future::block_on(async {
136/// use async_broadcast::{broadcast, TryRecvError, TrySendError};
137///
138/// let (s, mut r1) = broadcast(1);
139/// let mut r2 = r1.clone();
140///
141/// assert_eq!(s.broadcast(10).await, Ok(None));
142/// assert_eq!(s.try_broadcast(20), Err(TrySendError::Full(20)));
143///
144/// assert_eq!(r1.recv().await, Ok(10));
145/// assert_eq!(r2.recv().await, Ok(10));
146/// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
147/// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
148/// # });
149/// ```
150pub fn broadcast<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
151 assert!(cap > 0, "capacity cannot be zero");
152
153 let inner = Arc::new(Mutex::new(Inner {
154 queue: VecDeque::with_capacity(cap),
155 capacity: cap,
156 overflow: false,
157 await_active: true,
158 receiver_count: 1,
159 inactive_receiver_count: 0,
160 sender_count: 1,
161 head_pos: 0,
162 is_closed: false,
163 send_ops: Event::new(),
164 recv_ops: Event::new(),
165 }));
166
167 let s = Sender {
168 inner: inner.clone(),
169 };
170 let r = Receiver {
171 inner,
172 pos: 0,
173 listener: None,
174 };
175
176 (s, r)
177}
178
179#[derive(Debug)]
180struct Inner<T> {
181 queue: VecDeque<(T, usize)>,
182 // We assign the same capacity to the queue but that's just specifying the minimum capacity and
183 // the actual capacity could be anything. Hence the need to keep track of our own set capacity.
184 capacity: usize,
185 receiver_count: usize,
186 inactive_receiver_count: usize,
187 sender_count: usize,
188 /// Send sequence number of the front of the queue
189 head_pos: u64,
190 overflow: bool,
191 await_active: bool,
192
193 is_closed: bool,
194
195 /// Send operations waiting while the channel is full.
196 send_ops: Event,
197
198 /// Receive operations waiting while the channel is empty and not closed.
199 recv_ops: Event,
200}
201
202impl<T> Inner<T> {
203 /// Try receiving at the given position, returning either the element or a reference to it.
204 ///
205 /// Result is used here instead of Cow because we don't have a Clone bound on T.
206 fn try_recv_at(&mut self, pos: &mut u64) -> Result<Result<T, &T>, TryRecvError> {
207 let i = match pos.checked_sub(self.head_pos) {
208 Some(i) => i
209 .try_into()
210 .expect("Head position more than usize::MAX behind a receiver"),
211 None => {
212 let count = self.head_pos - *pos;
213 *pos = self.head_pos;
214 return Err(TryRecvError::Overflowed(count));
215 }
216 };
217
218 let last_waiter;
219 if let Some((_elt, waiters)) = self.queue.get_mut(i) {
220 *pos += 1;
221 *waiters -= 1;
222 last_waiter = *waiters == 0;
223 } else {
224 debug_assert_eq!(i, self.queue.len());
225 if self.is_closed {
226 return Err(TryRecvError::Closed);
227 } else {
228 return Err(TryRecvError::Empty);
229 }
230 }
231
232 // If we read from the front of the queue and this is the last receiver reading it
233 // we can pop the queue instead of cloning the message
234 if last_waiter {
235 // Only the first element of the queue should have 0 waiters
236 assert_eq!(i, 0);
237
238 // Remove the element from the queue, adjust space, and notify senders
239 let elt = self.queue.pop_front().unwrap().0;
240 self.head_pos += 1;
241 if !self.overflow {
242 // Notify 1 awaiting senders that there is now room. If there is still room in the
243 // queue, the notified operation will notify another awaiting sender.
244 self.send_ops.notify(1);
245 }
246
247 Ok(Ok(elt))
248 } else {
249 Ok(Err(&self.queue[i].0))
250 }
251 }
252
253 /// Closes the channel and notifies all waiting operations.
254 ///
255 /// Returns `true` if this call has closed the channel and it was not closed already.
256 fn close(&mut self) -> bool {
257 if self.is_closed {
258 return false;
259 }
260
261 self.is_closed = true;
262 // Notify all waiting senders and receivers.
263 self.send_ops.notify(usize::MAX);
264 self.recv_ops.notify(usize::MAX);
265
266 true
267 }
268
269 /// Set the channel capacity.
270 ///
271 /// There are times when you need to change the channel's capacity after creating it. If the
272 /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
273 /// dropped to shrink the channel.
274 fn set_capacity(&mut self, new_cap: usize) {
275 self.capacity = new_cap;
276 if new_cap > self.queue.capacity() {
277 let diff = new_cap - self.queue.capacity();
278 self.queue.reserve(diff);
279 }
280
281 // Ensure queue doesn't have more than `new_cap` messages.
282 if new_cap < self.queue.len() {
283 let diff = self.queue.len() - new_cap;
284 self.queue.drain(0..diff);
285 self.head_pos += diff as u64;
286 }
287 }
288
289 /// Close the channel if there aren't any receivers present anymore
290 fn close_channel(&mut self) {
291 if self.receiver_count == 0 && self.inactive_receiver_count == 0 {
292 self.close();
293 }
294 }
295}
296
297/// The sending side of the broadcast channel.
298///
299/// Senders can be cloned and shared among threads. When all senders associated with a channel are
300/// dropped, the channel becomes closed.
301///
302/// The channel can also be closed manually by calling [`Sender::close()`].
303#[derive(Debug)]
304pub struct Sender<T> {
305 inner: Arc<Mutex<Inner<T>>>,
306}
307
308impl<T> Sender<T> {
309 /// Returns the channel capacity.
310 ///
311 /// # Examples
312 ///
313 /// ```
314 /// use async_broadcast::broadcast;
315 ///
316 /// let (s, r) = broadcast::<i32>(5);
317 /// assert_eq!(s.capacity(), 5);
318 /// ```
319 pub fn capacity(&self) -> usize {
320 self.inner.lock().unwrap().capacity
321 }
322
323 /// Set the channel capacity.
324 ///
325 /// There are times when you need to change the channel's capacity after creating it. If the
326 /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
327 /// dropped to shrink the channel.
328 ///
329 /// # Examples
330 ///
331 /// ```
332 /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
333 ///
334 /// let (mut s, mut r) = broadcast::<i32>(3);
335 /// assert_eq!(s.capacity(), 3);
336 /// s.try_broadcast(1).unwrap();
337 /// s.try_broadcast(2).unwrap();
338 /// s.try_broadcast(3).unwrap();
339 ///
340 /// s.set_capacity(1);
341 /// assert_eq!(s.capacity(), 1);
342 /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
343 /// assert_eq!(r.try_recv().unwrap(), 3);
344 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
345 /// s.try_broadcast(1).unwrap();
346 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
347 ///
348 /// s.set_capacity(2);
349 /// assert_eq!(s.capacity(), 2);
350 /// s.try_broadcast(2).unwrap();
351 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
352 /// ```
353 pub fn set_capacity(&mut self, new_cap: usize) {
354 self.inner.lock().unwrap().set_capacity(new_cap);
355 }
356
357 /// If overflow mode is enabled on this channel.
358 ///
359 /// # Examples
360 ///
361 /// ```
362 /// use async_broadcast::broadcast;
363 ///
364 /// let (s, r) = broadcast::<i32>(5);
365 /// assert!(!s.overflow());
366 /// ```
367 pub fn overflow(&self) -> bool {
368 self.inner.lock().unwrap().overflow
369 }
370
371 /// Set overflow mode on the channel.
372 ///
373 /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
374 /// full. It achieves that by removing the oldest message from the channel.
375 ///
376 /// # Examples
377 ///
378 /// ```
379 /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
380 ///
381 /// let (mut s, mut r) = broadcast::<i32>(2);
382 /// s.try_broadcast(1).unwrap();
383 /// s.try_broadcast(2).unwrap();
384 /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
385 /// s.set_overflow(true);
386 /// assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
387 /// assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
388 ///
389 /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
390 /// assert_eq!(r.try_recv().unwrap(), 3);
391 /// assert_eq!(r.try_recv().unwrap(), 4);
392 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
393 /// ```
394 pub fn set_overflow(&mut self, overflow: bool) {
395 self.inner.lock().unwrap().overflow = overflow;
396 }
397
398 /// If sender will wait for active receivers.
399 ///
400 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
401 /// `true`.
402 ///
403 /// # Examples
404 ///
405 /// ```
406 /// use async_broadcast::broadcast;
407 ///
408 /// let (s, _) = broadcast::<i32>(5);
409 /// assert!(s.await_active());
410 /// ```
411 pub fn await_active(&self) -> bool {
412 self.inner.lock().unwrap().await_active
413 }
414
415 /// Specify if sender will wait for active receivers.
416 ///
417 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
418 /// `true`.
419 ///
420 /// # Examples
421 ///
422 /// ```
423 /// # futures_lite::future::block_on(async {
424 /// use async_broadcast::broadcast;
425 ///
426 /// let (mut s, mut r) = broadcast::<i32>(2);
427 /// s.broadcast(1).await.unwrap();
428 ///
429 /// let _ = r.deactivate();
430 /// s.set_await_active(false);
431 /// assert!(s.broadcast(2).await.is_err());
432 /// # });
433 /// ```
434 pub fn set_await_active(&mut self, await_active: bool) {
435 self.inner.lock().unwrap().await_active = await_active;
436 }
437
438 /// Closes the channel.
439 ///
440 /// Returns `true` if this call has closed the channel and it was not closed already.
441 ///
442 /// The remaining messages can still be received.
443 ///
444 /// # Examples
445 ///
446 /// ```
447 /// # futures_lite::future::block_on(async {
448 /// use async_broadcast::{broadcast, RecvError};
449 ///
450 /// let (s, mut r) = broadcast(1);
451 /// s.broadcast(1).await.unwrap();
452 /// assert!(s.close());
453 ///
454 /// assert_eq!(r.recv().await.unwrap(), 1);
455 /// assert_eq!(r.recv().await, Err(RecvError::Closed));
456 /// # });
457 /// ```
458 pub fn close(&self) -> bool {
459 self.inner.lock().unwrap().close()
460 }
461
462 /// Returns `true` if the channel is closed.
463 ///
464 /// # Examples
465 ///
466 /// ```
467 /// # futures_lite::future::block_on(async {
468 /// use async_broadcast::{broadcast, RecvError};
469 ///
470 /// let (s, r) = broadcast::<()>(1);
471 /// assert!(!s.is_closed());
472 ///
473 /// drop(r);
474 /// assert!(s.is_closed());
475 /// # });
476 /// ```
477 pub fn is_closed(&self) -> bool {
478 self.inner.lock().unwrap().is_closed
479 }
480
481 /// Returns `true` if the channel is empty.
482 ///
483 /// # Examples
484 ///
485 /// ```
486 /// # futures_lite::future::block_on(async {
487 /// use async_broadcast::broadcast;
488 ///
489 /// let (s, r) = broadcast(1);
490 ///
491 /// assert!(s.is_empty());
492 /// s.broadcast(1).await;
493 /// assert!(!s.is_empty());
494 /// # });
495 /// ```
496 pub fn is_empty(&self) -> bool {
497 self.inner.lock().unwrap().queue.is_empty()
498 }
499
500 /// Returns `true` if the channel is full.
501 ///
502 /// # Examples
503 ///
504 /// ```
505 /// # futures_lite::future::block_on(async {
506 /// use async_broadcast::broadcast;
507 ///
508 /// let (s, r) = broadcast(1);
509 ///
510 /// assert!(!s.is_full());
511 /// s.broadcast(1).await;
512 /// assert!(s.is_full());
513 /// # });
514 /// ```
515 pub fn is_full(&self) -> bool {
516 let inner = self.inner.lock().unwrap();
517
518 inner.queue.len() == inner.capacity
519 }
520
521 /// Returns the number of messages in the channel.
522 ///
523 /// # Examples
524 ///
525 /// ```
526 /// # futures_lite::future::block_on(async {
527 /// use async_broadcast::broadcast;
528 ///
529 /// let (s, r) = broadcast(2);
530 /// assert_eq!(s.len(), 0);
531 ///
532 /// s.broadcast(1).await;
533 /// s.broadcast(2).await;
534 /// assert_eq!(s.len(), 2);
535 /// # });
536 /// ```
537 pub fn len(&self) -> usize {
538 self.inner.lock().unwrap().queue.len()
539 }
540
541 /// Returns the number of receivers for the channel.
542 ///
543 /// This does not include inactive receivers. Use [`Sender::inactive_receiver_count`] if you
544 /// are interested in that.
545 ///
546 /// # Examples
547 ///
548 /// ```
549 /// use async_broadcast::broadcast;
550 ///
551 /// let (s, r) = broadcast::<()>(1);
552 /// assert_eq!(s.receiver_count(), 1);
553 /// let r = r.deactivate();
554 /// assert_eq!(s.receiver_count(), 0);
555 ///
556 /// let r2 = r.activate_cloned();
557 /// assert_eq!(r.receiver_count(), 1);
558 /// assert_eq!(r.inactive_receiver_count(), 1);
559 /// ```
560 pub fn receiver_count(&self) -> usize {
561 self.inner.lock().unwrap().receiver_count
562 }
563
564 /// Returns the number of inactive receivers for the channel.
565 ///
566 /// # Examples
567 ///
568 /// ```
569 /// use async_broadcast::broadcast;
570 ///
571 /// let (s, r) = broadcast::<()>(1);
572 /// assert_eq!(s.receiver_count(), 1);
573 /// let r = r.deactivate();
574 /// assert_eq!(s.receiver_count(), 0);
575 ///
576 /// let r2 = r.activate_cloned();
577 /// assert_eq!(r.receiver_count(), 1);
578 /// assert_eq!(r.inactive_receiver_count(), 1);
579 /// ```
580 pub fn inactive_receiver_count(&self) -> usize {
581 self.inner.lock().unwrap().inactive_receiver_count
582 }
583
584 /// Returns the number of senders for the channel.
585 ///
586 /// # Examples
587 ///
588 /// ```
589 /// # futures_lite::future::block_on(async {
590 /// use async_broadcast::broadcast;
591 ///
592 /// let (s, r) = broadcast::<()>(1);
593 /// assert_eq!(s.sender_count(), 1);
594 ///
595 /// let s2 = s.clone();
596 /// assert_eq!(s.sender_count(), 2);
597 /// # });
598 /// ```
599 pub fn sender_count(&self) -> usize {
600 self.inner.lock().unwrap().sender_count
601 }
602
603 /// Produce a new Receiver for this channel.
604 ///
605 /// The new receiver starts with zero messages available. This will not re-open the channel if
606 /// it was closed due to all receivers being dropped.
607 ///
608 /// # Examples
609 ///
610 /// ```
611 /// # futures_lite::future::block_on(async {
612 /// use async_broadcast::{broadcast, RecvError};
613 ///
614 /// let (s, mut r1) = broadcast(2);
615 ///
616 /// assert_eq!(s.broadcast(1).await, Ok(None));
617 ///
618 /// let mut r2 = s.new_receiver();
619 ///
620 /// assert_eq!(s.broadcast(2).await, Ok(None));
621 /// drop(s);
622 ///
623 /// assert_eq!(r1.recv().await, Ok(1));
624 /// assert_eq!(r1.recv().await, Ok(2));
625 /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
626 ///
627 /// assert_eq!(r2.recv().await, Ok(2));
628 /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
629 /// # });
630 /// ```
631 pub fn new_receiver(&self) -> Receiver<T> {
632 let mut inner = self.inner.lock().unwrap();
633 inner.receiver_count += 1;
634 Receiver {
635 inner: self.inner.clone(),
636 pos: inner.head_pos + inner.queue.len() as u64,
637 listener: None,
638 }
639 }
640}
641
642impl<T: Clone> Sender<T> {
643 /// Broadcasts a message on the channel.
644 ///
645 /// If the channel is full, this method waits until there is space for a message unless:
646 ///
647 /// 1. overflow mode (set through [`Sender::set_overflow`]) is enabled, in which case it removes
648 /// the oldest message from the channel to make room for the new message. The removed message
649 /// is returned to the caller.
650 /// 2. this behavior is disabled using [`Sender::set_await_active`], in which case, it returns
651 /// [`SendError`] immediately.
652 ///
653 /// If the channel is closed, this method returns an error.
654 ///
655 /// The future returned by this function is pinned to the heap. If the future being `Unpin` is
656 /// not important to you, or if you just `.await` this future, use the [`broadcast_direct`] method
657 /// instead.
658 ///
659 /// # Examples
660 ///
661 /// ```
662 /// # futures_lite::future::block_on(async {
663 /// use async_broadcast::{broadcast, SendError};
664 ///
665 /// let (s, r) = broadcast(1);
666 ///
667 /// assert_eq!(s.broadcast(1).await, Ok(None));
668 /// drop(r);
669 /// assert_eq!(s.broadcast(2).await, Err(SendError(2)));
670 /// # });
671 /// ```
672 pub fn broadcast(&self, msg: T) -> Pin<Box<Send<'_, T>>> {
673 Box::pin(self.broadcast_direct(msg))
674 }
675
676 /// Broadcasts a message on the channel without pinning the future to the heap.
677 ///
678 /// The future returned by this method is not `Unpin` and must be pinned before use. This is
679 /// the desired behavior if you just `.await` on the future. For other uses cases, use the
680 /// [`broadcast`] method instead.
681 ///
682 /// # Examples
683 ///
684 /// ```
685 /// # futures_lite::future::block_on(async {
686 /// use async_broadcast::{broadcast, SendError};
687 ///
688 /// let (s, r) = broadcast(1);
689 ///
690 /// assert_eq!(s.broadcast_direct(1).await, Ok(None));
691 /// drop(r);
692 /// assert_eq!(s.broadcast_direct(2).await, Err(SendError(2)));
693 /// # });
694 /// ```
695 pub fn broadcast_direct(&self, msg: T) -> Send<'_, T> {
696 Send::_new(SendInner {
697 sender: self,
698 listener: None,
699 msg: Some(msg),
700 _pin: PhantomPinned,
701 })
702 }
703
704 /// Attempts to broadcast a message on the channel.
705 ///
706 /// If the channel is full, this method returns an error unless overflow mode (set through
707 /// [`Sender::set_overflow`]) is enabled. If the overflow mode is enabled, it removes the
708 /// oldest message from the channel to make room for the new message. The removed message
709 /// is returned to the caller.
710 ///
711 /// If the channel is closed, this method returns an error.
712 ///
713 /// # Examples
714 ///
715 /// ```
716 /// use async_broadcast::{broadcast, TrySendError};
717 ///
718 /// let (s, r) = broadcast(1);
719 ///
720 /// assert_eq!(s.try_broadcast(1), Ok(None));
721 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
722 ///
723 /// drop(r);
724 /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Closed(3)));
725 /// ```
726 pub fn try_broadcast(&self, msg: T) -> Result<Option<T>, TrySendError<T>> {
727 let mut ret = None;
728 let mut inner = self.inner.lock().unwrap();
729
730 if inner.is_closed {
731 return Err(TrySendError::Closed(msg));
732 } else if inner.receiver_count == 0 {
733 assert!(inner.inactive_receiver_count != 0);
734
735 return Err(TrySendError::Inactive(msg));
736 } else if inner.queue.len() == inner.capacity {
737 if inner.overflow {
738 // Make room by popping a message.
739 ret = inner.queue.pop_front().map(|(m, _)| m);
740 } else {
741 return Err(TrySendError::Full(msg));
742 }
743 }
744 let receiver_count = inner.receiver_count;
745 inner.queue.push_back((msg, receiver_count));
746 if ret.is_some() {
747 inner.head_pos += 1;
748 }
749
750 // Notify all awaiting receive operations.
751 inner.recv_ops.notify(usize::MAX);
752
753 Ok(ret)
754 }
755
756 /// Broadcasts a message on the channel using the blocking strategy.
757 ///
758 /// If the channel is full, this method will block until there is room.
759 ///
760 /// If the channel is closed, this method returns an error.
761 ///
762 /// # Blocking
763 ///
764 /// Rather than using asynchronous waiting, like the [`send`](Self::broadcast) method,
765 /// this method will block the current thread until the message is sent.
766 ///
767 /// This method should not be used in an asynchronous context. It is intended
768 /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
769 /// Calling this method in an asynchronous context may result in deadlocks.
770 ///
771 /// # Examples
772 ///
773 /// ```
774 /// use async_broadcast::{broadcast, SendError};
775 ///
776 /// let (s, r) = broadcast(1);
777 ///
778 /// assert_eq!(s.broadcast_blocking(1), Ok(None));
779 /// drop(r);
780 /// assert_eq!(s.broadcast_blocking(2), Err(SendError(2)));
781 /// ```
782 #[cfg(not(target_family = "wasm"))]
783 pub fn broadcast_blocking(&self, msg: T) -> Result<Option<T>, SendError<T>> {
784 self.broadcast_direct(msg).wait()
785 }
786}
787
788impl<T> Drop for Sender<T> {
789 fn drop(&mut self) {
790 let mut inner = self.inner.lock().unwrap();
791
792 inner.sender_count -= 1;
793
794 if inner.sender_count == 0 {
795 inner.close();
796 }
797 }
798}
799
800impl<T> Clone for Sender<T> {
801 fn clone(&self) -> Self {
802 self.inner.lock().unwrap().sender_count += 1;
803
804 Sender {
805 inner: self.inner.clone(),
806 }
807 }
808}
809
810/// The receiving side of a channel.
811///
812/// Receivers can be cloned and shared among threads. When all (active) receivers associated with a
813/// channel are dropped, the channel becomes closed. You can deactivate a receiver using
814/// [`Receiver::deactivate`] if you would like the channel to remain open without keeping active
815/// receivers around.
816#[derive(Debug)]
817pub struct Receiver<T> {
818 inner: Arc<Mutex<Inner<T>>>,
819 pos: u64,
820
821 /// Listens for a send or close event to unblock this stream.
822 listener: Option<EventListener>,
823}
824
825impl<T> Receiver<T> {
826 /// Returns the channel capacity.
827 ///
828 /// # Examples
829 ///
830 /// ```
831 /// use async_broadcast::broadcast;
832 ///
833 /// let (_s, r) = broadcast::<i32>(5);
834 /// assert_eq!(r.capacity(), 5);
835 /// ```
836 pub fn capacity(&self) -> usize {
837 self.inner.lock().unwrap().capacity
838 }
839
840 /// Set the channel capacity.
841 ///
842 /// There are times when you need to change the channel's capacity after creating it. If the
843 /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
844 /// dropped to shrink the channel.
845 ///
846 /// # Examples
847 ///
848 /// ```
849 /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
850 ///
851 /// let (s, mut r) = broadcast::<i32>(3);
852 /// assert_eq!(r.capacity(), 3);
853 /// s.try_broadcast(1).unwrap();
854 /// s.try_broadcast(2).unwrap();
855 /// s.try_broadcast(3).unwrap();
856 ///
857 /// r.set_capacity(1);
858 /// assert_eq!(r.capacity(), 1);
859 /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
860 /// assert_eq!(r.try_recv().unwrap(), 3);
861 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
862 /// s.try_broadcast(1).unwrap();
863 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
864 ///
865 /// r.set_capacity(2);
866 /// assert_eq!(r.capacity(), 2);
867 /// s.try_broadcast(2).unwrap();
868 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
869 /// ```
870 pub fn set_capacity(&mut self, new_cap: usize) {
871 self.inner.lock().unwrap().set_capacity(new_cap);
872 }
873
874 /// If overflow mode is enabled on this channel.
875 ///
876 /// # Examples
877 ///
878 /// ```
879 /// use async_broadcast::broadcast;
880 ///
881 /// let (_s, r) = broadcast::<i32>(5);
882 /// assert!(!r.overflow());
883 /// ```
884 pub fn overflow(&self) -> bool {
885 self.inner.lock().unwrap().overflow
886 }
887
888 /// Set overflow mode on the channel.
889 ///
890 /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
891 /// full. It achieves that by removing the oldest message from the channel.
892 ///
893 /// # Examples
894 ///
895 /// ```
896 /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
897 ///
898 /// let (s, mut r) = broadcast::<i32>(2);
899 /// s.try_broadcast(1).unwrap();
900 /// s.try_broadcast(2).unwrap();
901 /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
902 /// r.set_overflow(true);
903 /// assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
904 /// assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
905 ///
906 /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
907 /// assert_eq!(r.try_recv().unwrap(), 3);
908 /// assert_eq!(r.try_recv().unwrap(), 4);
909 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
910 /// ```
911 pub fn set_overflow(&mut self, overflow: bool) {
912 self.inner.lock().unwrap().overflow = overflow;
913 }
914
915 /// If sender will wait for active receivers.
916 ///
917 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
918 /// `true`.
919 ///
920 /// # Examples
921 ///
922 /// ```
923 /// use async_broadcast::broadcast;
924 ///
925 /// let (_, r) = broadcast::<i32>(5);
926 /// assert!(r.await_active());
927 /// ```
928 pub fn await_active(&self) -> bool {
929 self.inner.lock().unwrap().await_active
930 }
931
932 /// Specify if sender will wait for active receivers.
933 ///
934 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
935 /// `true`.
936 ///
937 /// # Examples
938 ///
939 /// ```
940 /// # futures_lite::future::block_on(async {
941 /// use async_broadcast::broadcast;
942 ///
943 /// let (s, mut r) = broadcast::<i32>(2);
944 /// s.broadcast(1).await.unwrap();
945 ///
946 /// r.set_await_active(false);
947 /// let _ = r.deactivate();
948 /// assert!(s.broadcast(2).await.is_err());
949 /// # });
950 /// ```
951 pub fn set_await_active(&mut self, await_active: bool) {
952 self.inner.lock().unwrap().await_active = await_active;
953 }
954
955 /// Closes the channel.
956 ///
957 /// Returns `true` if this call has closed the channel and it was not closed already.
958 ///
959 /// The remaining messages can still be received.
960 ///
961 /// # Examples
962 ///
963 /// ```
964 /// # futures_lite::future::block_on(async {
965 /// use async_broadcast::{broadcast, RecvError};
966 ///
967 /// let (s, mut r) = broadcast(1);
968 /// s.broadcast(1).await.unwrap();
969 /// assert!(s.close());
970 ///
971 /// assert_eq!(r.recv().await.unwrap(), 1);
972 /// assert_eq!(r.recv().await, Err(RecvError::Closed));
973 /// # });
974 /// ```
975 pub fn close(&self) -> bool {
976 self.inner.lock().unwrap().close()
977 }
978
979 /// Returns `true` if the channel is closed.
980 ///
981 /// # Examples
982 ///
983 /// ```
984 /// # futures_lite::future::block_on(async {
985 /// use async_broadcast::{broadcast, RecvError};
986 ///
987 /// let (s, r) = broadcast::<()>(1);
988 /// assert!(!s.is_closed());
989 ///
990 /// drop(r);
991 /// assert!(s.is_closed());
992 /// # });
993 /// ```
994 pub fn is_closed(&self) -> bool {
995 self.inner.lock().unwrap().is_closed
996 }
997
998 /// Returns `true` if the channel is empty.
999 ///
1000 /// # Examples
1001 ///
1002 /// ```
1003 /// # futures_lite::future::block_on(async {
1004 /// use async_broadcast::broadcast;
1005 ///
1006 /// let (s, r) = broadcast(1);
1007 ///
1008 /// assert!(s.is_empty());
1009 /// s.broadcast(1).await;
1010 /// assert!(!s.is_empty());
1011 /// # });
1012 /// ```
1013 pub fn is_empty(&self) -> bool {
1014 self.inner.lock().unwrap().queue.is_empty()
1015 }
1016
1017 /// Returns `true` if the channel is full.
1018 ///
1019 /// # Examples
1020 ///
1021 /// ```
1022 /// # futures_lite::future::block_on(async {
1023 /// use async_broadcast::broadcast;
1024 ///
1025 /// let (s, r) = broadcast(1);
1026 ///
1027 /// assert!(!s.is_full());
1028 /// s.broadcast(1).await;
1029 /// assert!(s.is_full());
1030 /// # });
1031 /// ```
1032 pub fn is_full(&self) -> bool {
1033 let inner = self.inner.lock().unwrap();
1034
1035 inner.queue.len() == inner.capacity
1036 }
1037
1038 /// Returns the number of messages in the channel.
1039 ///
1040 /// # Examples
1041 ///
1042 /// ```
1043 /// # futures_lite::future::block_on(async {
1044 /// use async_broadcast::broadcast;
1045 ///
1046 /// let (s, r) = broadcast(2);
1047 /// assert_eq!(s.len(), 0);
1048 ///
1049 /// s.broadcast(1).await;
1050 /// s.broadcast(2).await;
1051 /// assert_eq!(s.len(), 2);
1052 /// # });
1053 /// ```
1054 pub fn len(&self) -> usize {
1055 self.inner.lock().unwrap().queue.len()
1056 }
1057
1058 /// Returns the number of receivers for the channel.
1059 ///
1060 /// This does not include inactive receivers. Use [`Receiver::inactive_receiver_count`] if you
1061 /// are interested in that.
1062 ///
1063 /// # Examples
1064 ///
1065 /// ```
1066 /// use async_broadcast::broadcast;
1067 ///
1068 /// let (s, r) = broadcast::<()>(1);
1069 /// assert_eq!(s.receiver_count(), 1);
1070 /// let r = r.deactivate();
1071 /// assert_eq!(s.receiver_count(), 0);
1072 ///
1073 /// let r2 = r.activate_cloned();
1074 /// assert_eq!(r.receiver_count(), 1);
1075 /// assert_eq!(r.inactive_receiver_count(), 1);
1076 /// ```
1077 pub fn receiver_count(&self) -> usize {
1078 self.inner.lock().unwrap().receiver_count
1079 }
1080
1081 /// Returns the number of inactive receivers for the channel.
1082 ///
1083 /// # Examples
1084 ///
1085 /// ```
1086 /// use async_broadcast::broadcast;
1087 ///
1088 /// let (s, r) = broadcast::<()>(1);
1089 /// assert_eq!(s.receiver_count(), 1);
1090 /// let r = r.deactivate();
1091 /// assert_eq!(s.receiver_count(), 0);
1092 ///
1093 /// let r2 = r.activate_cloned();
1094 /// assert_eq!(r.receiver_count(), 1);
1095 /// assert_eq!(r.inactive_receiver_count(), 1);
1096 /// ```
1097 pub fn inactive_receiver_count(&self) -> usize {
1098 self.inner.lock().unwrap().inactive_receiver_count
1099 }
1100
1101 /// Returns the number of senders for the channel.
1102 ///
1103 /// # Examples
1104 ///
1105 /// ```
1106 /// # futures_lite::future::block_on(async {
1107 /// use async_broadcast::broadcast;
1108 ///
1109 /// let (s, r) = broadcast::<()>(1);
1110 /// assert_eq!(s.sender_count(), 1);
1111 ///
1112 /// let s2 = s.clone();
1113 /// assert_eq!(s.sender_count(), 2);
1114 /// # });
1115 /// ```
1116 pub fn sender_count(&self) -> usize {
1117 self.inner.lock().unwrap().sender_count
1118 }
1119
1120 /// Downgrade to a [`InactiveReceiver`].
1121 ///
1122 /// An inactive receiver is one that can not and does not receive any messages. Its only purpose
1123 /// is keep the associated channel open even when there are no (active) receivers. An inactive
1124 /// receiver can be upgraded into a [`Receiver`] using [`InactiveReceiver::activate`] or
1125 /// [`InactiveReceiver::activate_cloned`].
1126 ///
1127 /// [`Sender::try_broadcast`] will return [`TrySendError::Inactive`] if only inactive
1128 /// receivers exists for the associated channel and [`Sender::broadcast`] will wait until an
1129 /// active receiver is available.
1130 ///
1131 /// # Examples
1132 ///
1133 /// ```
1134 /// # futures_lite::future::block_on(async {
1135 /// use async_broadcast::{broadcast, TrySendError};
1136 ///
1137 /// let (s, r) = broadcast(1);
1138 /// let inactive = r.deactivate();
1139 /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1140 ///
1141 /// let mut r = inactive.activate();
1142 /// assert_eq!(s.broadcast(10).await, Ok(None));
1143 /// assert_eq!(r.recv().await, Ok(10));
1144 /// # });
1145 /// ```
1146 pub fn deactivate(self) -> InactiveReceiver<T> {
1147 // Drop::drop impl of Receiver will take care of `receiver_count`.
1148 self.inner.lock().unwrap().inactive_receiver_count += 1;
1149
1150 InactiveReceiver {
1151 inner: self.inner.clone(),
1152 }
1153 }
1154}
1155
1156impl<T: Clone> Receiver<T> {
1157 /// Receives a message from the channel.
1158 ///
1159 /// If the channel is empty, this method waits until there is a message.
1160 ///
1161 /// If the channel is closed, this method receives a message or returns an error if there are
1162 /// no more messages.
1163 ///
1164 /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1165 /// this method returns an error and readjusts its cursor to point to the first available
1166 /// message.
1167 ///
1168 /// The future returned by this function is pinned to the heap. If the future being `Unpin` is
1169 /// not important to you, or if you just `.await` this future, use the [`recv_direct`] method
1170 /// instead.
1171 ///
1172 /// # Examples
1173 ///
1174 /// ```
1175 /// # futures_lite::future::block_on(async {
1176 /// use async_broadcast::{broadcast, RecvError};
1177 ///
1178 /// let (s, mut r1) = broadcast(1);
1179 /// let mut r2 = r1.clone();
1180 ///
1181 /// assert_eq!(s.broadcast(1).await, Ok(None));
1182 /// drop(s);
1183 ///
1184 /// assert_eq!(r1.recv().await, Ok(1));
1185 /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1186 /// assert_eq!(r2.recv().await, Ok(1));
1187 /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1188 /// # });
1189 /// ```
1190 pub fn recv(&mut self) -> Pin<Box<Recv<'_, T>>> {
1191 Box::pin(self.recv_direct())
1192 }
1193
1194 /// Receives a message from the channel without pinning the future to the heap.
1195 ///
1196 /// The future returned by this method is not `Unpin` and must be pinned before use. This is
1197 /// the desired behavior if you just `.await` on the future. For other uses cases, use the
1198 /// [`recv`] method instead.
1199 ///
1200 /// # Examples
1201 ///
1202 /// ```
1203 /// # futures_lite::future::block_on(async {
1204 /// use async_broadcast::{broadcast, RecvError};
1205 ///
1206 /// let (s, mut r1) = broadcast(1);
1207 /// let mut r2 = r1.clone();
1208 ///
1209 /// assert_eq!(s.broadcast(1).await, Ok(None));
1210 /// drop(s);
1211 ///
1212 /// assert_eq!(r1.recv_direct().await, Ok(1));
1213 /// assert_eq!(r1.recv_direct().await, Err(RecvError::Closed));
1214 /// assert_eq!(r2.recv_direct().await, Ok(1));
1215 /// assert_eq!(r2.recv_direct().await, Err(RecvError::Closed));
1216 /// # });
1217 /// ```
1218 pub fn recv_direct(&mut self) -> Recv<'_, T> {
1219 Recv::_new(RecvInner {
1220 receiver: self,
1221 listener: None,
1222 _pin: PhantomPinned,
1223 })
1224 }
1225
1226 /// Attempts to receive a message from the channel.
1227 ///
1228 /// If the channel is empty or closed, this method returns an error.
1229 ///
1230 /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1231 /// this method returns an error and readjusts its cursor to point to the first available
1232 /// message.
1233 ///
1234 /// # Examples
1235 ///
1236 /// ```
1237 /// # futures_lite::future::block_on(async {
1238 /// use async_broadcast::{broadcast, TryRecvError};
1239 ///
1240 /// let (s, mut r1) = broadcast(1);
1241 /// let mut r2 = r1.clone();
1242 /// assert_eq!(s.broadcast(1).await, Ok(None));
1243 ///
1244 /// assert_eq!(r1.try_recv(), Ok(1));
1245 /// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
1246 /// assert_eq!(r2.try_recv(), Ok(1));
1247 /// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
1248 ///
1249 /// drop(s);
1250 /// assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
1251 /// assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
1252 /// # });
1253 /// ```
1254 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1255 self.inner
1256 .lock()
1257 .unwrap()
1258 .try_recv_at(&mut self.pos)
1259 .map(|cow| cow.unwrap_or_else(T::clone))
1260 }
1261
1262 /// Receives a message from the channel using the blocking strategy.
1263 ///
1264 /// If the channel is empty, this method will block until there is a message.
1265 ///
1266 /// If the channel is closed, this method receives a message or returns an error if there are
1267 /// no more messages.
1268 ///
1269 /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1270 /// this method returns an error and readjusts its cursor to point to the first available
1271 /// message.
1272 ///
1273 /// # Blocking
1274 ///
1275 /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
1276 /// this method will block the current thread until the message is sent.
1277 ///
1278 /// This method should not be used in an asynchronous context. It is intended
1279 /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
1280 /// Calling this method in an asynchronous context may result in deadlocks.
1281 ///
1282 /// # Examples
1283 ///
1284 /// ```
1285 /// use async_broadcast::{broadcast, RecvError};
1286 ///
1287 /// let (s, mut r) = broadcast(1);
1288 ///
1289 /// assert_eq!(s.broadcast_blocking(1), Ok(None));
1290 /// drop(s);
1291 ///
1292 /// assert_eq!(r.recv_blocking(), Ok(1));
1293 /// assert_eq!(r.recv_blocking(), Err(RecvError::Closed));
1294 /// ```
1295 #[cfg(not(target_family = "wasm"))]
1296 pub fn recv_blocking(&mut self) -> Result<T, RecvError> {
1297 self.recv_direct().wait()
1298 }
1299
1300 /// Produce a new Sender for this channel.
1301 ///
1302 /// This will not re-open the channel if it was closed due to all senders being dropped.
1303 ///
1304 /// # Examples
1305 ///
1306 /// ```
1307 /// # futures_lite::future::block_on(async {
1308 /// use async_broadcast::{broadcast, RecvError};
1309 ///
1310 /// let (s1, mut r) = broadcast(2);
1311 ///
1312 /// assert_eq!(s1.broadcast(1).await, Ok(None));
1313 ///
1314 /// let mut s2 = r.new_sender();
1315 ///
1316 /// assert_eq!(s2.broadcast(2).await, Ok(None));
1317 /// drop(s1);
1318 /// drop(s2);
1319 ///
1320 /// assert_eq!(r.recv().await, Ok(1));
1321 /// assert_eq!(r.recv().await, Ok(2));
1322 /// assert_eq!(r.recv().await, Err(RecvError::Closed));
1323 /// # });
1324 /// ```
1325 pub fn new_sender(&self) -> Sender<T> {
1326 self.inner.lock().unwrap().sender_count += 1;
1327
1328 Sender {
1329 inner: self.inner.clone(),
1330 }
1331 }
1332
1333 /// Produce a new Receiver for this channel.
1334 ///
1335 /// Unlike [`Receiver::clone`], this method creates a new receiver that starts with zero
1336 /// messages available. This is slightly faster than a real clone.
1337 ///
1338 /// # Examples
1339 ///
1340 /// ```
1341 /// # futures_lite::future::block_on(async {
1342 /// use async_broadcast::{broadcast, RecvError};
1343 ///
1344 /// let (s, mut r1) = broadcast(2);
1345 ///
1346 /// assert_eq!(s.broadcast(1).await, Ok(None));
1347 ///
1348 /// let mut r2 = r1.new_receiver();
1349 ///
1350 /// assert_eq!(s.broadcast(2).await, Ok(None));
1351 /// drop(s);
1352 ///
1353 /// assert_eq!(r1.recv().await, Ok(1));
1354 /// assert_eq!(r1.recv().await, Ok(2));
1355 /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1356 ///
1357 /// assert_eq!(r2.recv().await, Ok(2));
1358 /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1359 /// # });
1360 /// ```
1361 pub fn new_receiver(&self) -> Self {
1362 let mut inner = self.inner.lock().unwrap();
1363 inner.receiver_count += 1;
1364 Receiver {
1365 inner: self.inner.clone(),
1366 pos: inner.head_pos + inner.queue.len() as u64,
1367 listener: None,
1368 }
1369 }
1370
1371 /// A low level poll method that is similar to [`Receiver::recv()`] or
1372 /// [`Receiver::recv_direct()`], and can be useful for building stream implementations which
1373 /// use a [`Receiver`] under the hood and want to know if the stream has overflowed.
1374 ///
1375 /// Prefer to use [`Receiver::recv()`] or [`Receiver::recv_direct()`] when otherwise possible.
1376 ///
1377 /// # Errors
1378 ///
1379 /// If the number of messages that have been sent has overflowed the channel capacity, a
1380 /// [`RecvError::Overflowed`] variant is returned containing the number of items that
1381 /// overflowed and were lost.
1382 ///
1383 /// # Examples
1384 ///
1385 /// This example shows how the [`Receiver::poll_recv`] method can be used to allow a custom
1386 /// stream implementation to internally make use of a [`Receiver`]. This example implementation
1387 /// differs from the stream implementation of [`Receiver`] because it returns an error if
1388 /// the channel capacity overflows, which the built in [`Receiver`] stream doesn't do.
1389 ///
1390 /// ```
1391 /// use futures_core::Stream;
1392 /// use async_broadcast::{Receiver, RecvError};
1393 /// use std::{pin::Pin, task::{Poll, Context}};
1394 ///
1395 /// struct MyStream(Receiver<i32>);
1396 ///
1397 /// impl futures_core::Stream for MyStream {
1398 /// type Item = Result<i32, RecvError>;
1399 /// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1400 /// Pin::new(&mut self.0).poll_recv(cx)
1401 /// }
1402 /// }
1403 /// ```
1404 pub fn poll_recv(
1405 mut self: Pin<&mut Self>,
1406 cx: &mut Context<'_>,
1407 ) -> Poll<Option<Result<T, RecvError>>> {
1408 loop {
1409 // If this stream is listening for events, first wait for a notification.
1410 if let Some(listener) = self.listener.as_mut() {
1411 ready!(Pin::new(listener).poll(cx));
1412 self.listener = None;
1413 }
1414
1415 loop {
1416 // Attempt to receive a message.
1417 match self.try_recv() {
1418 Ok(msg) => {
1419 // The stream is not blocked on an event - drop the listener.
1420 self.listener = None;
1421 return Poll::Ready(Some(Ok(msg)));
1422 }
1423 Err(TryRecvError::Closed) => {
1424 // The stream is not blocked on an event - drop the listener.
1425 self.listener = None;
1426 return Poll::Ready(None);
1427 }
1428 Err(TryRecvError::Overflowed(n)) => {
1429 // The stream is not blocked on an event - drop the listener.
1430 self.listener = None;
1431 return Poll::Ready(Some(Err(RecvError::Overflowed(n))));
1432 }
1433 Err(TryRecvError::Empty) => {}
1434 }
1435
1436 // Receiving failed - now start listening for notifications or wait for one.
1437 match self.listener.as_mut() {
1438 None => {
1439 // Start listening and then try receiving again.
1440 self.listener = {
1441 let inner = self.inner.lock().unwrap();
1442 Some(inner.recv_ops.listen())
1443 };
1444 }
1445 Some(_) => {
1446 // Go back to the outer loop to poll the listener.
1447 break;
1448 }
1449 }
1450 }
1451 }
1452 }
1453}
1454
1455impl<T> Drop for Receiver<T> {
1456 fn drop(&mut self) {
1457 let mut inner = self.inner.lock().unwrap();
1458
1459 // Remove ourself from each item's counter
1460 loop {
1461 match inner.try_recv_at(&mut self.pos) {
1462 Ok(_) => continue,
1463 Err(TryRecvError::Overflowed(_)) => continue,
1464 Err(TryRecvError::Closed) => break,
1465 Err(TryRecvError::Empty) => break,
1466 }
1467 }
1468
1469 inner.receiver_count -= 1;
1470
1471 inner.close_channel();
1472 }
1473}
1474
1475impl<T> Clone for Receiver<T> {
1476 /// Produce a clone of this Receiver that has the same messages queued.
1477 ///
1478 /// # Examples
1479 ///
1480 /// ```
1481 /// # futures_lite::future::block_on(async {
1482 /// use async_broadcast::{broadcast, RecvError};
1483 ///
1484 /// let (s, mut r1) = broadcast(1);
1485 ///
1486 /// assert_eq!(s.broadcast(1).await, Ok(None));
1487 /// drop(s);
1488 ///
1489 /// let mut r2 = r1.clone();
1490 ///
1491 /// assert_eq!(r1.recv().await, Ok(1));
1492 /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1493 /// assert_eq!(r2.recv().await, Ok(1));
1494 /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1495 /// # });
1496 /// ```
1497 fn clone(&self) -> Self {
1498 let mut inner = self.inner.lock().unwrap();
1499 inner.receiver_count += 1;
1500 // increment the waiter count on all items not yet received by this object
1501 let n = self.pos.saturating_sub(inner.head_pos) as usize;
1502 for (_elt, waiters) in inner.queue.iter_mut().skip(n) {
1503 *waiters += 1;
1504 }
1505 Receiver {
1506 inner: self.inner.clone(),
1507 pos: self.pos,
1508 listener: None,
1509 }
1510 }
1511}
1512
1513impl<T: Clone> Stream for Receiver<T> {
1514 type Item = T;
1515
1516 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1517 loop {
1518 match ready!(self.as_mut().poll_recv(cx)) {
1519 Some(Ok(val)) => return Poll::Ready(Some(val)),
1520 // If overflowed, we expect future operations to succeed so try again.
1521 Some(Err(RecvError::Overflowed(_))) => continue,
1522 // RecvError::Closed should never appear here, but handle it anyway.
1523 None | Some(Err(RecvError::Closed)) => return Poll::Ready(None),
1524 }
1525 }
1526 }
1527}
1528
1529impl<T: Clone> futures_core::stream::FusedStream for Receiver<T> {
1530 fn is_terminated(&self) -> bool {
1531 let inner = self.inner.lock().unwrap();
1532
1533 inner.is_closed && inner.queue.is_empty()
1534 }
1535}
1536
1537/// An error returned from [`Sender::broadcast()`].
1538///
1539/// Received because the channel is closed or no active receivers were present while `await-active`
1540/// was set to `false` (See [`Sender::set_await_active`] for details).
1541#[derive(PartialEq, Eq, Clone, Copy)]
1542pub struct SendError<T>(pub T);
1543
1544impl<T> SendError<T> {
1545 /// Unwraps the message that couldn't be sent.
1546 pub fn into_inner(self) -> T {
1547 self.0
1548 }
1549}
1550
1551impl<T> error::Error for SendError<T> {}
1552
1553impl<T> fmt::Debug for SendError<T> {
1554 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1555 write!(f, "SendError(..)")
1556 }
1557}
1558
1559impl<T> fmt::Display for SendError<T> {
1560 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1561 write!(f, "sending into a closed channel")
1562 }
1563}
1564
1565/// An error returned from [`Sender::try_broadcast()`].
1566#[derive(PartialEq, Eq, Clone, Copy)]
1567pub enum TrySendError<T> {
1568 /// The channel is full but not closed.
1569 Full(T),
1570
1571 /// The channel is closed.
1572 Closed(T),
1573
1574 /// There are currently no active receivers, only inactive ones.
1575 Inactive(T),
1576}
1577
1578impl<T> TrySendError<T> {
1579 /// Unwraps the message that couldn't be sent.
1580 pub fn into_inner(self) -> T {
1581 match self {
1582 TrySendError::Full(t) => t,
1583 TrySendError::Closed(t) => t,
1584 TrySendError::Inactive(t) => t,
1585 }
1586 }
1587
1588 /// Returns `true` if the channel is full but not closed.
1589 pub fn is_full(&self) -> bool {
1590 match self {
1591 TrySendError::Full(_) => true,
1592 TrySendError::Closed(_) | TrySendError::Inactive(_) => false,
1593 }
1594 }
1595
1596 /// Returns `true` if the channel is closed.
1597 pub fn is_closed(&self) -> bool {
1598 match self {
1599 TrySendError::Full(_) | TrySendError::Inactive(_) => false,
1600 TrySendError::Closed(_) => true,
1601 }
1602 }
1603
1604 /// Returns `true` if there are currently no active receivers, only inactive ones.
1605 pub fn is_disconnected(&self) -> bool {
1606 match self {
1607 TrySendError::Full(_) | TrySendError::Closed(_) => false,
1608 TrySendError::Inactive(_) => true,
1609 }
1610 }
1611}
1612
1613impl<T> error::Error for TrySendError<T> {}
1614
1615impl<T> fmt::Debug for TrySendError<T> {
1616 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1617 match *self {
1618 TrySendError::Full(..) => write!(f, "Full(..)"),
1619 TrySendError::Closed(..) => write!(f, "Closed(..)"),
1620 TrySendError::Inactive(..) => write!(f, "Inactive(..)"),
1621 }
1622 }
1623}
1624
1625impl<T> fmt::Display for TrySendError<T> {
1626 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1627 match *self {
1628 TrySendError::Full(..) => write!(f, "sending into a full channel"),
1629 TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
1630 TrySendError::Inactive(..) => write!(f, "sending into the void (no active receivers)"),
1631 }
1632 }
1633}
1634
1635/// An error returned from [`Receiver::recv()`].
1636#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1637pub enum RecvError {
1638 /// The channel has overflowed since the last element was seen. Future recv operations will
1639 /// succeed, but some messages have been skipped.
1640 ///
1641 /// Contains the number of messages missed.
1642 Overflowed(u64),
1643
1644 /// The channel is empty and closed.
1645 Closed,
1646}
1647
1648impl error::Error for RecvError {}
1649
1650impl fmt::Display for RecvError {
1651 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1652 match self {
1653 Self::Overflowed(n) => write!(f, "receiving skipped {} messages", n),
1654 Self::Closed => write!(f, "receiving from an empty and closed channel"),
1655 }
1656 }
1657}
1658
1659/// An error returned from [`Receiver::try_recv()`].
1660#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1661pub enum TryRecvError {
1662 /// The channel has overflowed since the last element was seen. Future recv operations will
1663 /// succeed, but some messages have been skipped.
1664 Overflowed(u64),
1665
1666 /// The channel is empty but not closed.
1667 Empty,
1668
1669 /// The channel is empty and closed.
1670 Closed,
1671}
1672
1673impl TryRecvError {
1674 /// Returns `true` if the channel is empty but not closed.
1675 pub fn is_empty(&self) -> bool {
1676 match self {
1677 TryRecvError::Empty => true,
1678 TryRecvError::Closed => false,
1679 TryRecvError::Overflowed(_) => false,
1680 }
1681 }
1682
1683 /// Returns `true` if the channel is empty and closed.
1684 pub fn is_closed(&self) -> bool {
1685 match self {
1686 TryRecvError::Empty => false,
1687 TryRecvError::Closed => true,
1688 TryRecvError::Overflowed(_) => false,
1689 }
1690 }
1691
1692 /// Returns `true` if this error indicates the receiver missed messages.
1693 pub fn is_overflowed(&self) -> bool {
1694 match self {
1695 TryRecvError::Empty => false,
1696 TryRecvError::Closed => false,
1697 TryRecvError::Overflowed(_) => true,
1698 }
1699 }
1700}
1701
1702impl error::Error for TryRecvError {}
1703
1704impl fmt::Display for TryRecvError {
1705 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1706 match *self {
1707 TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1708 TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1709 TryRecvError::Overflowed(n) => {
1710 write!(f, "receiving operation observed {} lost messages", n)
1711 }
1712 }
1713 }
1714}
1715
1716easy_wrapper! {
1717 /// A future returned by [`Sender::broadcast()`].
1718 #[derive(Debug)]
1719 #[must_use = "futures do nothing unless .awaited"]
1720 pub struct Send<'a, T: Clone>(SendInner<'a, T> => Result<Option<T>, SendError<T>>);
1721 #[cfg(not(target_family = "wasm"))]
1722 pub(crate) wait();
1723}
1724
1725pin_project! {
1726 #[derive(Debug)]
1727 struct SendInner<'a, T> {
1728 sender: &'a Sender<T>,
1729 listener: Option<EventListener>,
1730 msg: Option<T>,
1731
1732 // Keeping this type `!Unpin` enables future optimizations.
1733 #[pin]
1734 _pin: PhantomPinned
1735 }
1736}
1737
1738impl<T: Clone> EventListenerFuture for SendInner<'_, T> {
1739 type Output = Result<Option<T>, SendError<T>>;
1740
1741 fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
1742 self: Pin<&mut Self>,
1743 strategy: &mut S,
1744 context: &mut S::Context,
1745 ) -> Poll<Self::Output> {
1746 let this = self.project();
1747
1748 loop {
1749 let msg = this.msg.take().unwrap();
1750 let inner = &this.sender.inner;
1751
1752 // Attempt to send a message.
1753 match this.sender.try_broadcast(msg) {
1754 Ok(msg) => {
1755 let inner = inner.lock().unwrap();
1756
1757 if inner.queue.len() < inner.capacity {
1758 // Not full still, so notify the next awaiting sender.
1759 inner.send_ops.notify(1);
1760 }
1761
1762 return Poll::Ready(Ok(msg));
1763 }
1764 Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1765 Err(TrySendError::Full(m)) => *this.msg = Some(m),
1766 Err(TrySendError::Inactive(m)) if inner.lock().unwrap().await_active => {
1767 *this.msg = Some(m)
1768 }
1769 Err(TrySendError::Inactive(m)) => return Poll::Ready(Err(SendError(m))),
1770 }
1771
1772 // Sending failed - now start listening for notifications or wait for one.
1773 match &this.listener {
1774 None => {
1775 // Start listening and then try sending again.
1776 let inner = inner.lock().unwrap();
1777 *this.listener = Some(inner.send_ops.listen());
1778 }
1779 Some(_) => {
1780 // Wait for a notification.
1781 ready!(strategy.poll(this.listener, context));
1782 *this.listener = None;
1783 }
1784 }
1785 }
1786 }
1787}
1788
1789easy_wrapper! {
1790 /// A future returned by [`Receiver::recv()`].
1791 #[derive(Debug)]
1792 #[must_use = "futures do nothing unless .awaited"]
1793 pub struct Recv<'a, T: Clone>(RecvInner<'a, T> => Result<T, RecvError>);
1794 #[cfg(not(target_family = "wasm"))]
1795 pub(crate) wait();
1796}
1797
1798pin_project! {
1799 #[derive(Debug)]
1800 struct RecvInner<'a, T> {
1801 receiver: &'a mut Receiver<T>,
1802 listener: Option<EventListener>,
1803
1804 // Keeping this type `!Unpin` enables future optimizations.
1805 #[pin]
1806 _pin: PhantomPinned
1807 }
1808}
1809
1810impl<T: Clone> EventListenerFuture for RecvInner<'_, T> {
1811 type Output = Result<T, RecvError>;
1812
1813 fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
1814 self: Pin<&mut Self>,
1815 strategy: &mut S,
1816 context: &mut S::Context,
1817 ) -> Poll<Self::Output> {
1818 let this = self.project();
1819
1820 loop {
1821 // Attempt to receive a message.
1822 match this.receiver.try_recv() {
1823 Ok(msg) => return Poll::Ready(Ok(msg)),
1824 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1825 Err(TryRecvError::Overflowed(n)) => {
1826 return Poll::Ready(Err(RecvError::Overflowed(n)));
1827 }
1828 Err(TryRecvError::Empty) => {}
1829 }
1830
1831 // Receiving failed - now start listening for notifications or wait for one.
1832 match &this.listener {
1833 None => {
1834 // Start listening and then try receiving again.
1835 *this.listener = {
1836 let inner = this.receiver.inner.lock().unwrap();
1837 Some(inner.recv_ops.listen())
1838 };
1839 }
1840 Some(_) => {
1841 // Wait for a notification.
1842 ready!(strategy.poll(this.listener, context));
1843 *this.listener = None;
1844 }
1845 }
1846 }
1847 }
1848}
1849
1850/// An inactive receiver.
1851///
1852/// An inactive receiver is a receiver that is unable to receive messages. It's only useful for
1853/// keeping a channel open even when no associated active receivers exist.
1854#[derive(Debug)]
1855pub struct InactiveReceiver<T> {
1856 inner: Arc<Mutex<Inner<T>>>,
1857}
1858
1859impl<T> InactiveReceiver<T> {
1860 /// Convert to an activate [`Receiver`].
1861 ///
1862 /// Consumes `self`. Use [`InactiveReceiver::activate_cloned`] if you want to keep `self`.
1863 ///
1864 /// # Examples
1865 ///
1866 /// ```
1867 /// use async_broadcast::{broadcast, TrySendError};
1868 ///
1869 /// let (s, r) = broadcast(1);
1870 /// let inactive = r.deactivate();
1871 /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1872 ///
1873 /// let mut r = inactive.activate();
1874 /// assert_eq!(s.try_broadcast(10), Ok(None));
1875 /// assert_eq!(r.try_recv(), Ok(10));
1876 /// ```
1877 pub fn activate(self) -> Receiver<T> {
1878 self.activate_cloned()
1879 }
1880
1881 /// Create an activate [`Receiver`] for the associated channel.
1882 ///
1883 /// # Examples
1884 ///
1885 /// ```
1886 /// use async_broadcast::{broadcast, TrySendError};
1887 ///
1888 /// let (s, r) = broadcast(1);
1889 /// let inactive = r.deactivate();
1890 /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1891 ///
1892 /// let mut r = inactive.activate_cloned();
1893 /// assert_eq!(s.try_broadcast(10), Ok(None));
1894 /// assert_eq!(r.try_recv(), Ok(10));
1895 /// ```
1896 pub fn activate_cloned(&self) -> Receiver<T> {
1897 let mut inner = self.inner.lock().unwrap();
1898 inner.receiver_count += 1;
1899
1900 if inner.receiver_count == 1 {
1901 // Notify 1 awaiting senders that there is now a receiver. If there is still room in the
1902 // queue, the notified operation will notify another awaiting sender.
1903 inner.send_ops.notify(1);
1904 }
1905
1906 Receiver {
1907 inner: self.inner.clone(),
1908 pos: inner.head_pos + inner.queue.len() as u64,
1909 listener: None,
1910 }
1911 }
1912
1913 /// Returns the channel capacity.
1914 ///
1915 /// See [`Receiver::capacity`] documentation for examples.
1916 pub fn capacity(&self) -> usize {
1917 self.inner.lock().unwrap().capacity
1918 }
1919
1920 /// Set the channel capacity.
1921 ///
1922 /// There are times when you need to change the channel's capacity after creating it. If the
1923 /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
1924 /// dropped to shrink the channel.
1925 ///
1926 /// See [`Receiver::set_capacity`] documentation for examples.
1927 pub fn set_capacity(&mut self, new_cap: usize) {
1928 self.inner.lock().unwrap().set_capacity(new_cap);
1929 }
1930
1931 /// If overflow mode is enabled on this channel.
1932 ///
1933 /// See [`Receiver::overflow`] documentation for examples.
1934 pub fn overflow(&self) -> bool {
1935 self.inner.lock().unwrap().overflow
1936 }
1937
1938 /// Set overflow mode on the channel.
1939 ///
1940 /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
1941 /// full. It achieves that by removing the oldest message from the channel.
1942 ///
1943 /// See [`Receiver::set_overflow`] documentation for examples.
1944 pub fn set_overflow(&mut self, overflow: bool) {
1945 self.inner.lock().unwrap().overflow = overflow;
1946 }
1947
1948 /// If sender will wait for active receivers.
1949 ///
1950 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
1951 /// `true`.
1952 ///
1953 /// # Examples
1954 ///
1955 /// ```
1956 /// use async_broadcast::broadcast;
1957 ///
1958 /// let (_, r) = broadcast::<i32>(5);
1959 /// let r = r.deactivate();
1960 /// assert!(r.await_active());
1961 /// ```
1962 pub fn await_active(&self) -> bool {
1963 self.inner.lock().unwrap().await_active
1964 }
1965
1966 /// Specify if sender will wait for active receivers.
1967 ///
1968 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
1969 /// `true`.
1970 ///
1971 /// # Examples
1972 ///
1973 /// ```
1974 /// # futures_lite::future::block_on(async {
1975 /// use async_broadcast::broadcast;
1976 ///
1977 /// let (s, r) = broadcast::<i32>(2);
1978 /// s.broadcast(1).await.unwrap();
1979 ///
1980 /// let mut r = r.deactivate();
1981 /// r.set_await_active(false);
1982 /// assert!(s.broadcast(2).await.is_err());
1983 /// # });
1984 /// ```
1985 pub fn set_await_active(&mut self, await_active: bool) {
1986 self.inner.lock().unwrap().await_active = await_active;
1987 }
1988
1989 /// Closes the channel.
1990 ///
1991 /// Returns `true` if this call has closed the channel and it was not closed already.
1992 ///
1993 /// The remaining messages can still be received.
1994 ///
1995 /// See [`Receiver::close`] documentation for examples.
1996 pub fn close(&self) -> bool {
1997 self.inner.lock().unwrap().close()
1998 }
1999
2000 /// Returns `true` if the channel is closed.
2001 ///
2002 /// See [`Receiver::is_closed`] documentation for examples.
2003 pub fn is_closed(&self) -> bool {
2004 self.inner.lock().unwrap().is_closed
2005 }
2006
2007 /// Returns `true` if the channel is empty.
2008 ///
2009 /// See [`Receiver::is_empty`] documentation for examples.
2010 pub fn is_empty(&self) -> bool {
2011 self.inner.lock().unwrap().queue.is_empty()
2012 }
2013
2014 /// Returns `true` if the channel is full.
2015 ///
2016 /// See [`Receiver::is_full`] documentation for examples.
2017 pub fn is_full(&self) -> bool {
2018 let inner = self.inner.lock().unwrap();
2019
2020 inner.queue.len() == inner.capacity
2021 }
2022
2023 /// Returns the number of messages in the channel.
2024 ///
2025 /// See [`Receiver::len`] documentation for examples.
2026 pub fn len(&self) -> usize {
2027 self.inner.lock().unwrap().queue.len()
2028 }
2029
2030 /// Returns the number of receivers for the channel.
2031 ///
2032 /// This does not include inactive receivers. Use [`InactiveReceiver::inactive_receiver_count`]
2033 /// if you're interested in that.
2034 ///
2035 /// # Examples
2036 ///
2037 /// ```
2038 /// use async_broadcast::broadcast;
2039 ///
2040 /// let (s, r) = broadcast::<()>(1);
2041 /// assert_eq!(s.receiver_count(), 1);
2042 /// let r = r.deactivate();
2043 /// assert_eq!(s.receiver_count(), 0);
2044 ///
2045 /// let r2 = r.activate_cloned();
2046 /// assert_eq!(r.receiver_count(), 1);
2047 /// assert_eq!(r.inactive_receiver_count(), 1);
2048 /// ```
2049 pub fn receiver_count(&self) -> usize {
2050 self.inner.lock().unwrap().receiver_count
2051 }
2052
2053 /// Returns the number of inactive receivers for the channel.
2054 ///
2055 /// # Examples
2056 ///
2057 /// ```
2058 /// use async_broadcast::broadcast;
2059 ///
2060 /// let (s, r) = broadcast::<()>(1);
2061 /// assert_eq!(s.receiver_count(), 1);
2062 /// let r = r.deactivate();
2063 /// assert_eq!(s.receiver_count(), 0);
2064 ///
2065 /// let r2 = r.activate_cloned();
2066 /// assert_eq!(r.receiver_count(), 1);
2067 /// assert_eq!(r.inactive_receiver_count(), 1);
2068 /// ```
2069 pub fn inactive_receiver_count(&self) -> usize {
2070 self.inner.lock().unwrap().inactive_receiver_count
2071 }
2072
2073 /// Returns the number of senders for the channel.
2074 ///
2075 /// See [`Receiver::sender_count`] documentation for examples.
2076 pub fn sender_count(&self) -> usize {
2077 self.inner.lock().unwrap().sender_count
2078 }
2079}
2080
2081impl<T> Clone for InactiveReceiver<T> {
2082 fn clone(&self) -> Self {
2083 self.inner.lock().unwrap().inactive_receiver_count += 1;
2084
2085 InactiveReceiver {
2086 inner: self.inner.clone(),
2087 }
2088 }
2089}
2090
2091impl<T> Drop for InactiveReceiver<T> {
2092 fn drop(&mut self) {
2093 let mut inner = self.inner.lock().unwrap();
2094
2095 inner.inactive_receiver_count -= 1;
2096 inner.close_channel();
2097 }
2098}