broker_tokio/sync/broadcast.rs
1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is also `Send` or `Sync` respectively.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! ## Lagging
22//!
23//! As sent messages must be retained until **all** [`Receiver`] handles receive
24//! a clone, broadcast channels are suspectible to the "slow receiver" problem.
25//! In this case, all but one receiver are able to receive values at the rate
26//! they are sent. Because one receiver is stalled, the channel starts to fill
27//! up.
28//!
29//! This broadcast channel implementation handles this case by setting a hard
30//! upper bound on the number of values the channel may retain at any given
31//! time. This upper bound is passed to the [`channel`] function as an argument.
32//!
33//! If a value is sent when the channel is at capacity, the oldest value
34//! currently held by the channel is released. This frees up space for the new
35//! value. Any receiver that has not yet seen the released value will return
36//! [`RecvError::Lagged`] the next time [`recv`] is called.
37//!
38//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
39//! updated to the oldest value contained by the channel. The next call to
40//! [`recv`] will return this value.
41//!
42//! This behavior enables a receiver to detect when it has lagged so far behind
43//! that data has been dropped. The caller may decide how to respond to this:
44//! either by aborting its task or by tolerating lost messages and resuming
45//! consumption of the channel.
46//!
47//! ## Closing
48//!
49//! When **all** [`Sender`] handles have been dropped, no new values may be
50//! sent. At this point, the channel is "closed". Once a receiver has received
51//! all values retained by the channel, the next call to [`recv`] will return
52//! with [`RecvError::Closed`].
53//!
54//! [`Sender`]: crate::sync::broadcast::Sender
55//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
56//! [`Receiver`]: crate::sync::broadcast::Receiver
57//! [`channel`]: crate::sync::broadcast::channel
58//! [`RecvError::Lagged`]: crate::sync::broadcast::RecvError::Lagged
59//! [`RecvError::Closed`]: crate::sync::broadcast::RecvError::Closed
60//! [`recv`]: crate::sync::broadcast::Receiver::recv
61//!
62//! # Examples
63//!
64//! Basic usage
65//!
66//! ```
67//! use tokio::sync::broadcast;
68//!
69//! #[tokio::main]
70//! async fn main() {
71//! let (tx, mut rx1) = broadcast::channel(16);
72//! let mut rx2 = tx.subscribe();
73//!
74//! tokio::spawn(async move {
75//! assert_eq!(rx1.recv().await.unwrap(), 10);
76//! assert_eq!(rx1.recv().await.unwrap(), 20);
77//! });
78//!
79//! tokio::spawn(async move {
80//! assert_eq!(rx2.recv().await.unwrap(), 10);
81//! assert_eq!(rx2.recv().await.unwrap(), 20);
82//! });
83//!
84//! tx.send(10).unwrap();
85//! tx.send(20).unwrap();
86//! }
87//! ```
88//!
89//! Handling lag
90//!
91//! ```
92//! use tokio::sync::broadcast;
93//!
94//! #[tokio::main]
95//! async fn main() {
96//! let (tx, mut rx) = broadcast::channel(2);
97//!
98//! tx.send(10).unwrap();
99//! tx.send(20).unwrap();
100//! tx.send(30).unwrap();
101//!
102//! // The receiver lagged behind
103//! assert!(rx.recv().await.is_err());
104//!
105//! // At this point, we can abort or continue with lost messages
106//!
107//! assert_eq!(20, rx.recv().await.unwrap());
108//! assert_eq!(30, rx.recv().await.unwrap());
109//! }
110
111use crate::loom::cell::CausalCell;
112use crate::loom::future::AtomicWaker;
113use crate::loom::sync::atomic::{spin_loop_hint, AtomicBool, AtomicPtr, AtomicUsize};
114use crate::loom::sync::{Arc, Condvar, Mutex};
115
116use std::fmt;
117use std::ptr;
118use std::sync::atomic::Ordering::SeqCst;
119use std::task::{Context, Poll, Waker};
120use std::usize;
121
122/// Sending-half of the [`broadcast`] channel.
123///
124/// May be used from many threads. Messages can be sent with
125/// [`send`][Sender::send].
126///
127/// # Examples
128///
129/// ```
130/// use tokio::sync::broadcast;
131///
132/// #[tokio::main]
133/// async fn main() {
134/// let (tx, mut rx1) = broadcast::channel(16);
135/// let mut rx2 = tx.subscribe();
136///
137/// tokio::spawn(async move {
138/// assert_eq!(rx1.recv().await.unwrap(), 10);
139/// assert_eq!(rx1.recv().await.unwrap(), 20);
140/// });
141///
142/// tokio::spawn(async move {
143/// assert_eq!(rx2.recv().await.unwrap(), 10);
144/// assert_eq!(rx2.recv().await.unwrap(), 20);
145/// });
146///
147/// tx.send(10).unwrap();
148/// tx.send(20).unwrap();
149/// }
150/// ```
151///
152/// [`broadcast`]: crate::sync::broadcast
153pub struct Sender<T> {
154 shared: Arc<Shared<T>>,
155}
156
157/// Receiving-half of the [`broadcast`] channel.
158///
159/// Must not be used concurrently. Messages may be retrieved using
160/// [`recv`][Receiver::recv].
161///
162/// # Examples
163///
164/// ```
165/// use tokio::sync::broadcast;
166///
167/// #[tokio::main]
168/// async fn main() {
169/// let (tx, mut rx1) = broadcast::channel(16);
170/// let mut rx2 = tx.subscribe();
171///
172/// tokio::spawn(async move {
173/// assert_eq!(rx1.recv().await.unwrap(), 10);
174/// assert_eq!(rx1.recv().await.unwrap(), 20);
175/// });
176///
177/// tokio::spawn(async move {
178/// assert_eq!(rx2.recv().await.unwrap(), 10);
179/// assert_eq!(rx2.recv().await.unwrap(), 20);
180/// });
181///
182/// tx.send(10).unwrap();
183/// tx.send(20).unwrap();
184/// }
185/// ```
186///
187/// [`broadcast`]: crate::sync::broadcast
188pub struct Receiver<T> {
189 /// State shared with all receivers and senders.
190 shared: Arc<Shared<T>>,
191
192 /// Next position to read from
193 next: u64,
194
195 /// Waiter state
196 wait: Arc<WaitNode>,
197}
198
199/// Error returned by [`Sender::send`][Sender::send].
200///
201/// A **send** operation can only fail if there are no active receivers,
202/// implying that the message could never be received. The error contains the
203/// message being sent as a payload so it can be recovered.
204#[derive(Debug)]
205pub struct SendError<T>(pub T);
206
207/// An error returned from the [`recv`] function on a [`Receiver`].
208///
209/// [`recv`]: crate::sync::broadcast::Receiver::recv
210/// [`Receiver`]: crate::sync::broadcast::Receiver
211#[derive(Debug, PartialEq)]
212pub enum RecvError {
213 /// There are no more active senders implying no further messages will ever
214 /// be sent.
215 Closed,
216
217 /// The receiver lagged too far behind. Attempting to receive again will
218 /// return the oldest message still retained by the channel.
219 ///
220 /// Includes the number of skipped messages.
221 Lagged(u64),
222}
223
224/// An error returned from the [`try_recv`] function on a [`Receiver`].
225///
226/// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
227/// [`Receiver`]: crate::sync::broadcast::Receiver
228#[derive(Debug, PartialEq)]
229pub enum TryRecvError {
230 /// The channel is currently empty. There are still active
231 /// [`Sender`][Sender] handles, so data may yet become available.
232 Empty,
233
234 /// There are no more active senders implying no further messages will ever
235 /// be sent.
236 Closed,
237
238 /// The receiver lagged too far behind and has been forcibly disconnected.
239 /// Attempting to receive again will return the oldest message still
240 /// retained by the channel.
241 ///
242 /// Includes the number of skipped messages.
243 Lagged(u64),
244}
245
246/// Data shared between senders and receivers
247struct Shared<T> {
248 /// slots in the channel
249 buffer: Box<[Slot<T>]>,
250
251 /// Mask a position -> index
252 mask: usize,
253
254 /// Tail of the queue
255 tail: Mutex<Tail>,
256
257 /// Notifies a sender that the slot is unlocked
258 condvar: Condvar,
259
260 /// Stack of pending waiters
261 wait_stack: AtomicPtr<WaitNode>,
262
263 /// Number of outstanding Sender handles
264 num_tx: AtomicUsize,
265}
266
267/// Next position to write a value
268struct Tail {
269 /// Next position to write to
270 pos: u64,
271
272 /// Number of active receivers
273 rx_cnt: usize,
274}
275
276/// Slot in the buffer
277struct Slot<T> {
278 /// Remaining number of receivers that are expected to see this value.
279 ///
280 /// When this goes to zero, the value is released.
281 rem: AtomicUsize,
282
283 /// Used to lock the `write` field.
284 lock: AtomicUsize,
285
286 /// The value being broadcast
287 ///
288 /// Synchronized by `state`
289 write: Write<T>,
290}
291
292/// A write in the buffer
293struct Write<T> {
294 /// Uniquely identifies this write
295 pos: CausalCell<u64>,
296
297 /// The written value
298 val: CausalCell<Option<T>>,
299}
300
301/// Tracks a waiting receiver
302#[derive(Debug)]
303struct WaitNode {
304 /// True if queued
305 queued: AtomicBool,
306
307 /// Task to wake when a permit is made available.
308 waker: AtomicWaker,
309
310 /// Next pointer in the stack of waiting senders.
311 next: CausalCell<*const WaitNode>,
312}
313
314struct RecvGuard<'a, T> {
315 slot: &'a Slot<T>,
316 tail: &'a Mutex<Tail>,
317 condvar: &'a Condvar,
318}
319
320/// Max number of receivers. Reserve space to lock.
321const MAX_RECEIVERS: usize = usize::MAX >> 1;
322
323/// Create a bounded, multi-producer, multi-consumer channel where each sent
324/// value is broadcasted to all active receivers.
325///
326/// All data sent on [`Sender`] will become available on every active
327/// [`Receiver`] in the same order as it was sent.
328///
329/// The `Sender` can be cloned to `send` to the same channel from multiple
330/// points in the process or it can be used concurrently from an `Arc`. New
331/// `Receiver` handles are created by calling [`Sender::subscribe`].
332///
333/// If all [`Receiver`] handles are dropped, the `send` method will return a
334/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
335/// method will return a [`RecvError`].
336///
337/// [`Sender`]: crate::sync::broadcast::Sender
338/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
339/// [`Receiver`]: crate::sync::broadcast::Receiver
340/// [`recv`]: crate::sync::broadcast::Receiver::recv
341/// [`SendError`]: crate::sync::broadcast::SendError
342/// [`RecvError`]: crate::sync::broadcast::RecvError
343///
344/// # Examples
345///
346/// ```
347/// use tokio::sync::broadcast;
348///
349/// #[tokio::main]
350/// async fn main() {
351/// let (tx, mut rx1) = broadcast::channel(16);
352/// let mut rx2 = tx.subscribe();
353///
354/// tokio::spawn(async move {
355/// assert_eq!(rx1.recv().await.unwrap(), 10);
356/// assert_eq!(rx1.recv().await.unwrap(), 20);
357/// });
358///
359/// tokio::spawn(async move {
360/// assert_eq!(rx2.recv().await.unwrap(), 10);
361/// assert_eq!(rx2.recv().await.unwrap(), 20);
362/// });
363///
364/// tx.send(10).unwrap();
365/// tx.send(20).unwrap();
366/// }
367/// ```
368pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
369 assert!(capacity > 0, "capacity is empty");
370 assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
371
372 // Round to a power of two
373 capacity = capacity.next_power_of_two();
374
375 let mut buffer = Vec::with_capacity(capacity);
376
377 for i in 0..capacity {
378 buffer.push(Slot {
379 rem: AtomicUsize::new(0),
380 lock: AtomicUsize::new(0),
381 write: Write {
382 pos: CausalCell::new((i as u64).wrapping_sub(capacity as u64)),
383 val: CausalCell::new(None),
384 },
385 });
386 }
387
388 let shared = Arc::new(Shared {
389 buffer: buffer.into_boxed_slice(),
390 mask: capacity - 1,
391 tail: Mutex::new(Tail { pos: 0, rx_cnt: 1 }),
392 condvar: Condvar::new(),
393 wait_stack: AtomicPtr::new(ptr::null_mut()),
394 num_tx: AtomicUsize::new(1),
395 });
396
397 let rx = Receiver {
398 shared: shared.clone(),
399 next: 0,
400 wait: Arc::new(WaitNode {
401 queued: AtomicBool::new(false),
402 waker: AtomicWaker::new(),
403 next: CausalCell::new(ptr::null()),
404 }),
405 };
406
407 let tx = Sender { shared };
408
409 (tx, rx)
410}
411
412unsafe impl<T: Send> Send for Sender<T> {}
413unsafe impl<T: Send> Sync for Sender<T> {}
414
415unsafe impl<T: Send> Send for Receiver<T> {}
416unsafe impl<T: Send> Sync for Receiver<T> {}
417
418impl<T> Sender<T> {
419 /// Attempts to send a value to all active [`Receiver`] handles, returning
420 /// it back if it could not be sent.
421 ///
422 /// A successful send occurs when there is at least one active [`Receiver`]
423 /// handle. An unsuccessful send would be one where all associated
424 /// [`Receiver`] handles have already been dropped.
425 ///
426 /// # Return
427 ///
428 /// On success, the number of subscribed [`Receiver`] handles is returned.
429 /// This does not mean that this number of receivers will see the message as
430 /// a receiver may drop before receiving the message.
431 ///
432 /// # Note
433 ///
434 /// A return value of `Ok` **does not** mean that the sent value will be
435 /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
436 /// handles may be dropped before receiving the sent message.
437 ///
438 /// A return value of `Err` **does not** mean that future calls to `send`
439 /// will fail. New [`Receiver`] handles may be created by calling
440 /// [`subscribe`].
441 ///
442 /// [`Receiver`]: crate::sync::broadcast::Receiver
443 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
444 ///
445 /// # Examples
446 ///
447 /// ```
448 /// use tokio::sync::broadcast;
449 ///
450 /// #[tokio::main]
451 /// async fn main() {
452 /// let (tx, mut rx1) = broadcast::channel(16);
453 /// let mut rx2 = tx.subscribe();
454 ///
455 /// tokio::spawn(async move {
456 /// assert_eq!(rx1.recv().await.unwrap(), 10);
457 /// assert_eq!(rx1.recv().await.unwrap(), 20);
458 /// });
459 ///
460 /// tokio::spawn(async move {
461 /// assert_eq!(rx2.recv().await.unwrap(), 10);
462 /// assert_eq!(rx2.recv().await.unwrap(), 20);
463 /// });
464 ///
465 /// tx.send(10).unwrap();
466 /// tx.send(20).unwrap();
467 /// }
468 /// ```
469 pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
470 self.send2(Some(value))
471 .map_err(|SendError(maybe_v)| SendError(maybe_v.unwrap()))
472 }
473
474 /// Create a new [`Receiver`] handle that will receive values sent **after**
475 /// this call to `subscribe`.
476 ///
477 /// # Examples
478 ///
479 /// ```
480 /// use tokio::sync::broadcast;
481 ///
482 /// #[tokio::main]
483 /// async fn main() {
484 /// let (tx, _rx) = broadcast::channel(16);
485 ///
486 /// // Will not be seen
487 /// tx.send(10).unwrap();
488 ///
489 /// let mut rx = tx.subscribe();
490 ///
491 /// tx.send(20).unwrap();
492 ///
493 /// let value = rx.recv().await.unwrap();
494 /// assert_eq!(20, value);
495 /// }
496 /// ```
497 pub fn subscribe(&self) -> Receiver<T> {
498 let shared = self.shared.clone();
499
500 let mut tail = shared.tail.lock().unwrap();
501
502 if tail.rx_cnt == MAX_RECEIVERS {
503 panic!("max receivers");
504 }
505
506 tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
507 let next = tail.pos;
508
509 drop(tail);
510
511 Receiver {
512 shared,
513 next,
514 wait: Arc::new(WaitNode {
515 queued: AtomicBool::new(false),
516 waker: AtomicWaker::new(),
517 next: CausalCell::new(ptr::null()),
518 }),
519 }
520 }
521
522 /// Returns the number of active receivers
523 ///
524 /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
525 /// [`subscribe`]. These are the handles that will receive values sent on
526 /// this [`Sender`].
527 ///
528 /// # Note
529 ///
530 /// It is not guaranteed that a sent message will reach this number of
531 /// receivers. Active receivers may never call [`recv`] again before
532 /// dropping.
533 ///
534 /// [`recv`]: crate::sync::broadcast::Receiver::recv
535 /// [`Receiver`]: crate::sync::broadcast::Receiver
536 /// [`Sender`]: crate::sync::broadcast::Sender
537 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
538 /// [`channel`]: crate::sync::broadcast::channel
539 ///
540 /// # Examples
541 ///
542 /// ```
543 /// use tokio::sync::broadcast;
544 ///
545 /// #[tokio::main]
546 /// async fn main() {
547 /// let (tx, _rx1) = broadcast::channel(16);
548 ///
549 /// assert_eq!(1, tx.receiver_count());
550 ///
551 /// let mut _rx2 = tx.subscribe();
552 ///
553 /// assert_eq!(2, tx.receiver_count());
554 ///
555 /// tx.send(10).unwrap();
556 /// }
557 /// ```
558 pub fn receiver_count(&self) -> usize {
559 let tail = self.shared.tail.lock().unwrap();
560 tail.rx_cnt
561 }
562
563 fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> {
564 let mut tail = self.shared.tail.lock().unwrap();
565
566 if tail.rx_cnt == 0 {
567 return Err(SendError(value));
568 }
569
570 // Position to write into
571 let pos = tail.pos;
572 let rem = tail.rx_cnt;
573 let idx = (pos & self.shared.mask as u64) as usize;
574
575 // Update the tail position
576 tail.pos = tail.pos.wrapping_add(1);
577
578 // Get the slot
579 let slot = &self.shared.buffer[idx];
580
581 // Acquire the write lock
582 let mut prev = slot.lock.fetch_or(1, SeqCst);
583
584 while prev & !1 != 0 {
585 // Concurrent readers, we must go to sleep
586 tail = self.shared.condvar.wait(tail).unwrap();
587
588 prev = slot.lock.load(SeqCst);
589
590 if prev & 1 == 0 {
591 // The writer lock bit was cleared while this thread was
592 // sleeping. This can only happen if a newer write happened on
593 // this slot by another thread. Bail early as an optimization,
594 // there is nothing left to do.
595 return Ok(rem);
596 }
597 }
598
599 if tail.pos.wrapping_sub(pos) > self.shared.buffer.len() as u64 {
600 // There is a newer pending write to the same slot.
601 return Ok(rem);
602 }
603
604 // Slot lock acquired
605 slot.write.pos.with_mut(|ptr| unsafe { *ptr = pos });
606 slot.write.val.with_mut(|ptr| unsafe { *ptr = value });
607
608 // Set remaining receivers
609 slot.rem.store(rem, SeqCst);
610
611 // Release the slot lock
612 slot.lock.store(0, SeqCst);
613
614 // Release the mutex. This must happen after the slot lock is released,
615 // otherwise the writer lock bit could be cleared while another thread
616 // is in the critical section.
617 drop(tail);
618
619 // Notify waiting receivers
620 self.notify_rx();
621
622 Ok(rem)
623 }
624
625 fn notify_rx(&self) {
626 let mut curr = self.shared.wait_stack.swap(ptr::null_mut(), SeqCst) as *const WaitNode;
627
628 while !curr.is_null() {
629 let waiter = unsafe { Arc::from_raw(curr) };
630
631 // Update `curr` before toggling `queued` and waking
632 curr = waiter.next.with(|ptr| unsafe { *ptr });
633
634 // Unset queued
635 waiter.queued.store(false, SeqCst);
636
637 // Wake
638 waiter.waker.wake();
639 }
640 }
641}
642
643impl<T> Clone for Sender<T> {
644 fn clone(&self) -> Sender<T> {
645 let shared = self.shared.clone();
646 shared.num_tx.fetch_add(1, SeqCst);
647
648 Sender { shared }
649 }
650}
651
652impl<T> Drop for Sender<T> {
653 fn drop(&mut self) {
654 if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
655 let _ = self.send2(None);
656 }
657 }
658}
659
660impl<T> Receiver<T> {
661 /// Lock the next value if there is one.
662 ///
663 /// The caller is responsible for unlocking
664 fn recv_ref(&mut self, spin: bool) -> Result<RecvGuard<'_, T>, TryRecvError> {
665 let idx = (self.next & self.shared.mask as u64) as usize;
666
667 // The slot holding the next value to read
668 let slot = &self.shared.buffer[idx];
669
670 // Lock the slot
671 if !slot.try_rx_lock() {
672 if spin {
673 while !slot.try_rx_lock() {
674 spin_loop_hint();
675 }
676 } else {
677 return Err(TryRecvError::Empty);
678 }
679 }
680
681 let guard = RecvGuard {
682 slot,
683 tail: &self.shared.tail,
684 condvar: &self.shared.condvar,
685 };
686
687 if guard.pos() != self.next {
688 let pos = guard.pos();
689
690 guard.drop_no_rem_dec();
691
692 if pos.wrapping_add(self.shared.buffer.len() as u64) == self.next {
693 return Err(TryRecvError::Empty);
694 } else {
695 let tail = self.shared.tail.lock().unwrap();
696
697 // `tail.pos` points to the slot the **next** send writes to.
698 // Because a receiver is lagging, this slot also holds the
699 // oldest value. To make the positions match, we subtract the
700 // capacity.
701 let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
702 let missed = next.wrapping_sub(self.next);
703
704 self.next = next;
705
706 return Err(TryRecvError::Lagged(missed));
707 }
708 }
709
710 self.next = self.next.wrapping_add(1);
711
712 Ok(guard)
713 }
714}
715
716impl<T> Receiver<T>
717where
718 T: Clone,
719{
720 /// Attempts to return a pending value on this receiver without awaiting.
721 ///
722 /// This is useful for a flavor of "optimistic check" before deciding to
723 /// await on a receiver.
724 ///
725 /// Compared with [`recv`], this function has three failure cases instead of one
726 /// (one for closed, one for an empty buffer, one for a lagging receiver).
727 ///
728 /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
729 /// dropped, indicating that no further values can be sent on the channel.
730 ///
731 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
732 /// sent values will overwrite old values. At this point, a call to [`recv`]
733 /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
734 /// internal cursor is updated to point to the oldest value still held by
735 /// the channel. A subsequent call to [`try_recv`] will return this value
736 /// **unless** it has been since overwritten. If there are no values to
737 /// receive, `Err(TryRecvError::Empty)` is returned.
738 ///
739 /// [`recv`]: crate::sync::broadcast::Receiver::recv
740 /// [`Receiver`]: crate::sync::broadcast::Receiver
741 ///
742 /// # Examples
743 ///
744 /// ```
745 /// use tokio::sync::broadcast;
746 ///
747 /// #[tokio::main]
748 /// async fn main() {
749 /// let (tx, mut rx) = broadcast::channel(16);
750 ///
751 /// assert!(rx.try_recv().is_err());
752 ///
753 /// tx.send(10).unwrap();
754 ///
755 /// let value = rx.try_recv().unwrap();
756 /// assert_eq!(10, value);
757 /// }
758 /// ```
759 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
760 let guard = self.recv_ref(false)?;
761 guard.clone_value().ok_or(TryRecvError::Closed)
762 }
763
764 #[doc(hidden)] // TODO: document
765 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
766 if let Some(value) = ok_empty(self.try_recv())? {
767 return Poll::Ready(Ok(value));
768 }
769
770 self.register_waker(cx.waker());
771
772 if let Some(value) = ok_empty(self.try_recv())? {
773 Poll::Ready(Ok(value))
774 } else {
775 Poll::Pending
776 }
777 }
778
779 /// Receive the next value for this receiver.
780 ///
781 /// Each [`Receiver`] handle will receive a clone of all values sent
782 /// **after** it has subscribed.
783 ///
784 /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
785 /// dropped, indicating that no further values can be sent on the channel.
786 ///
787 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
788 /// sent values will overwrite old values. At this point, a call to [`recv`]
789 /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
790 /// internal cursor is updated to point to the oldest value still held by
791 /// the channel. A subsequent call to [`recv`] will return this value
792 /// **unless** it has been since overwritten.
793 ///
794 /// [`Receiver`]: crate::sync::broadcast::Receiver
795 /// [`recv`]: crate::sync::broadcast::Receiver::recv
796 ///
797 /// # Examples
798 ///
799 /// ```
800 /// use tokio::sync::broadcast;
801 ///
802 /// #[tokio::main]
803 /// async fn main() {
804 /// let (tx, mut rx1) = broadcast::channel(16);
805 /// let mut rx2 = tx.subscribe();
806 ///
807 /// tokio::spawn(async move {
808 /// assert_eq!(rx1.recv().await.unwrap(), 10);
809 /// assert_eq!(rx1.recv().await.unwrap(), 20);
810 /// });
811 ///
812 /// tokio::spawn(async move {
813 /// assert_eq!(rx2.recv().await.unwrap(), 10);
814 /// assert_eq!(rx2.recv().await.unwrap(), 20);
815 /// });
816 ///
817 /// tx.send(10).unwrap();
818 /// tx.send(20).unwrap();
819 /// }
820 /// ```
821 ///
822 /// Handling lag
823 ///
824 /// ```
825 /// use tokio::sync::broadcast;
826 ///
827 /// #[tokio::main]
828 /// async fn main() {
829 /// let (tx, mut rx) = broadcast::channel(2);
830 ///
831 /// tx.send(10).unwrap();
832 /// tx.send(20).unwrap();
833 /// tx.send(30).unwrap();
834 ///
835 /// // The receiver lagged behind
836 /// assert!(rx.recv().await.is_err());
837 ///
838 /// // At this point, we can abort or continue with lost messages
839 ///
840 /// assert_eq!(20, rx.recv().await.unwrap());
841 /// assert_eq!(30, rx.recv().await.unwrap());
842 /// }
843 pub async fn recv(&mut self) -> Result<T, RecvError> {
844 use crate::future::poll_fn;
845
846 poll_fn(|cx| self.poll_recv(cx)).await
847 }
848
849 fn register_waker(&self, cx: &Waker) {
850 self.wait.waker.register_by_ref(cx);
851
852 if !self.wait.queued.load(SeqCst) {
853 // Set `queued` before queuing.
854 self.wait.queued.store(true, SeqCst);
855
856 let mut curr = self.shared.wait_stack.load(SeqCst);
857
858 // The ref count is decremented in `notify_rx` when all nodes are
859 // removed from the waiter stack.
860 let node = Arc::into_raw(self.wait.clone()) as *mut _;
861
862 loop {
863 // Safety: `queued == false` means the caller has exclusive
864 // access to `self.wait.next`.
865 self.wait.next.with_mut(|ptr| unsafe { *ptr = curr });
866
867 let res = self
868 .shared
869 .wait_stack
870 .compare_exchange(curr, node, SeqCst, SeqCst);
871
872 match res {
873 Ok(_) => return,
874 Err(actual) => curr = actual,
875 }
876 }
877 }
878 }
879}
880
881#[cfg(feature = "stream")]
882impl<T> crate::stream::Stream for Receiver<T>
883where
884 T: Clone,
885{
886 type Item = Result<T, RecvError>;
887
888 fn poll_next(
889 mut self: std::pin::Pin<&mut Self>,
890 cx: &mut Context<'_>,
891 ) -> Poll<Option<Result<T, RecvError>>> {
892 self.poll_recv(cx).map(|v| match v {
893 Ok(v) => Some(Ok(v)),
894 lag @ Err(RecvError::Lagged(_)) => Some(lag),
895 Err(RecvError::Closed) => None,
896 })
897 }
898}
899
900impl<T> Drop for Receiver<T> {
901 fn drop(&mut self) {
902 let mut tail = self.shared.tail.lock().unwrap();
903
904 tail.rx_cnt -= 1;
905 let until = tail.pos;
906
907 drop(tail);
908
909 while self.next != until {
910 match self.recv_ref(true) {
911 // Ignore the value
912 Ok(_) => {}
913 // The channel is closed
914 Err(TryRecvError::Closed) => break,
915 // Ignore lagging, we will catch up
916 Err(TryRecvError::Lagged(..)) => {}
917 // Can't be empty
918 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
919 }
920 }
921 }
922}
923
924impl<T> Drop for Shared<T> {
925 fn drop(&mut self) {
926 // Clear the wait stack
927 let mut curr = *self.wait_stack.get_mut() as *const WaitNode;
928
929 while !curr.is_null() {
930 let waiter = unsafe { Arc::from_raw(curr) };
931 curr = waiter.next.with(|ptr| unsafe { *ptr });
932 }
933 }
934}
935
936impl<T> fmt::Debug for Sender<T> {
937 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
938 write!(fmt, "broadcast::Sender")
939 }
940}
941
942impl<T> fmt::Debug for Receiver<T> {
943 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
944 write!(fmt, "broadcast::Receiver")
945 }
946}
947
948impl<T> Slot<T> {
949 /// Try to lock the slot for a receiver. If `false`, then a sender holds the
950 /// lock and the calling task will be notified once the sender has released
951 /// the lock.
952 fn try_rx_lock(&self) -> bool {
953 let mut curr = self.lock.load(SeqCst);
954
955 loop {
956 if curr & 1 == 1 {
957 // Locked by sender
958 return false;
959 }
960
961 // Only increment (by 2) if the LSB "lock" bit is not set.
962 let res = self.lock.compare_exchange(curr, curr + 2, SeqCst, SeqCst);
963
964 match res {
965 Ok(_) => return true,
966 Err(actual) => curr = actual,
967 }
968 }
969 }
970
971 fn rx_unlock(&self, tail: &Mutex<Tail>, condvar: &Condvar, rem_dec: bool) {
972 if rem_dec {
973 // Decrement the remaining counter
974 if 1 == self.rem.fetch_sub(1, SeqCst) {
975 // Last receiver, drop the value
976 self.write.val.with_mut(|ptr| unsafe { *ptr = None });
977 }
978 }
979
980 if 1 == self.lock.fetch_sub(2, SeqCst) - 2 {
981 // First acquire the lock to make sure our sender is waiting on the
982 // condition variable, otherwise the notification could be lost.
983 let _ = tail.lock().unwrap();
984 // Wake up senders
985 condvar.notify_all();
986 }
987 }
988}
989
990impl<'a, T> RecvGuard<'a, T> {
991 fn pos(&self) -> u64 {
992 self.slot.write.pos.with(|ptr| unsafe { *ptr })
993 }
994
995 fn clone_value(&self) -> Option<T>
996 where
997 T: Clone,
998 {
999 self.slot.write.val.with(|ptr| unsafe { (*ptr).clone() })
1000 }
1001
1002 fn drop_no_rem_dec(self) {
1003 use std::mem;
1004
1005 self.slot.rx_unlock(self.tail, self.condvar, false);
1006
1007 mem::forget(self);
1008 }
1009}
1010
1011impl<'a, T> Drop for RecvGuard<'a, T> {
1012 fn drop(&mut self) {
1013 self.slot.rx_unlock(self.tail, self.condvar, true)
1014 }
1015}
1016
1017fn ok_empty<T>(res: Result<T, TryRecvError>) -> Result<Option<T>, RecvError> {
1018 match res {
1019 Ok(value) => Ok(Some(value)),
1020 Err(TryRecvError::Empty) => Ok(None),
1021 Err(TryRecvError::Lagged(n)) => Err(RecvError::Lagged(n)),
1022 Err(TryRecvError::Closed) => Err(RecvError::Closed),
1023 }
1024}
1025
1026impl fmt::Display for RecvError {
1027 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1028 match self {
1029 RecvError::Closed => write!(f, "channel closed"),
1030 RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
1031 }
1032 }
1033}
1034
1035impl std::error::Error for RecvError {}
1036
1037impl fmt::Display for TryRecvError {
1038 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1039 match self {
1040 TryRecvError::Empty => write!(f, "channel empty"),
1041 TryRecvError::Closed => write!(f, "channel closed"),
1042 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
1043 }
1044 }
1045}
1046
1047impl std::error::Error for TryRecvError {}