broker_tokio/sync/
broadcast.rs

1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is also `Send` or `Sync` respectively.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! ## Lagging
22//!
23//! As sent messages must be retained until **all** [`Receiver`] handles receive
24//! a clone, broadcast channels are suspectible to the "slow receiver" problem.
25//! In this case, all but one receiver are able to receive values at the rate
26//! they are sent. Because one receiver is stalled, the channel starts to fill
27//! up.
28//!
29//! This broadcast channel implementation handles this case by setting a hard
30//! upper bound on the number of values the channel may retain at any given
31//! time. This upper bound is passed to the [`channel`] function as an argument.
32//!
33//! If a value is sent when the channel is at capacity, the oldest value
34//! currently held by the channel is released. This frees up space for the new
35//! value. Any receiver that has not yet seen the released value will return
36//! [`RecvError::Lagged`] the next time [`recv`] is called.
37//!
38//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
39//! updated to the oldest value contained by the channel. The next call to
40//! [`recv`] will return this value.
41//!
42//! This behavior enables a receiver to detect when it has lagged so far behind
43//! that data has been dropped. The caller may decide how to respond to this:
44//! either by aborting its task or by tolerating lost messages and resuming
45//! consumption of the channel.
46//!
47//! ## Closing
48//!
49//! When **all** [`Sender`] handles have been dropped, no new values may be
50//! sent. At this point, the channel is "closed". Once a receiver has received
51//! all values retained by the channel, the next call to [`recv`] will return
52//! with [`RecvError::Closed`].
53//!
54//! [`Sender`]: crate::sync::broadcast::Sender
55//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
56//! [`Receiver`]: crate::sync::broadcast::Receiver
57//! [`channel`]: crate::sync::broadcast::channel
58//! [`RecvError::Lagged`]: crate::sync::broadcast::RecvError::Lagged
59//! [`RecvError::Closed`]: crate::sync::broadcast::RecvError::Closed
60//! [`recv`]: crate::sync::broadcast::Receiver::recv
61//!
62//! # Examples
63//!
64//! Basic usage
65//!
66//! ```
67//! use tokio::sync::broadcast;
68//!
69//! #[tokio::main]
70//! async fn main() {
71//!     let (tx, mut rx1) = broadcast::channel(16);
72//!     let mut rx2 = tx.subscribe();
73//!
74//!     tokio::spawn(async move {
75//!         assert_eq!(rx1.recv().await.unwrap(), 10);
76//!         assert_eq!(rx1.recv().await.unwrap(), 20);
77//!     });
78//!
79//!     tokio::spawn(async move {
80//!         assert_eq!(rx2.recv().await.unwrap(), 10);
81//!         assert_eq!(rx2.recv().await.unwrap(), 20);
82//!     });
83//!
84//!     tx.send(10).unwrap();
85//!     tx.send(20).unwrap();
86//! }
87//! ```
88//!
89//! Handling lag
90//!
91//! ```
92//! use tokio::sync::broadcast;
93//!
94//! #[tokio::main]
95//! async fn main() {
96//!     let (tx, mut rx) = broadcast::channel(2);
97//!
98//!     tx.send(10).unwrap();
99//!     tx.send(20).unwrap();
100//!     tx.send(30).unwrap();
101//!
102//!     // The receiver lagged behind
103//!     assert!(rx.recv().await.is_err());
104//!
105//!     // At this point, we can abort or continue with lost messages
106//!
107//!     assert_eq!(20, rx.recv().await.unwrap());
108//!     assert_eq!(30, rx.recv().await.unwrap());
109//! }
110
111use crate::loom::cell::CausalCell;
112use crate::loom::future::AtomicWaker;
113use crate::loom::sync::atomic::{spin_loop_hint, AtomicBool, AtomicPtr, AtomicUsize};
114use crate::loom::sync::{Arc, Condvar, Mutex};
115
116use std::fmt;
117use std::ptr;
118use std::sync::atomic::Ordering::SeqCst;
119use std::task::{Context, Poll, Waker};
120use std::usize;
121
122/// Sending-half of the [`broadcast`] channel.
123///
124/// May be used from many threads. Messages can be sent with
125/// [`send`][Sender::send].
126///
127/// # Examples
128///
129/// ```
130/// use tokio::sync::broadcast;
131///
132/// #[tokio::main]
133/// async fn main() {
134///     let (tx, mut rx1) = broadcast::channel(16);
135///     let mut rx2 = tx.subscribe();
136///
137///     tokio::spawn(async move {
138///         assert_eq!(rx1.recv().await.unwrap(), 10);
139///         assert_eq!(rx1.recv().await.unwrap(), 20);
140///     });
141///
142///     tokio::spawn(async move {
143///         assert_eq!(rx2.recv().await.unwrap(), 10);
144///         assert_eq!(rx2.recv().await.unwrap(), 20);
145///     });
146///
147///     tx.send(10).unwrap();
148///     tx.send(20).unwrap();
149/// }
150/// ```
151///
152/// [`broadcast`]: crate::sync::broadcast
153pub struct Sender<T> {
154    shared: Arc<Shared<T>>,
155}
156
157/// Receiving-half of the [`broadcast`] channel.
158///
159/// Must not be used concurrently. Messages may be retrieved using
160/// [`recv`][Receiver::recv].
161///
162/// # Examples
163///
164/// ```
165/// use tokio::sync::broadcast;
166///
167/// #[tokio::main]
168/// async fn main() {
169///     let (tx, mut rx1) = broadcast::channel(16);
170///     let mut rx2 = tx.subscribe();
171///
172///     tokio::spawn(async move {
173///         assert_eq!(rx1.recv().await.unwrap(), 10);
174///         assert_eq!(rx1.recv().await.unwrap(), 20);
175///     });
176///
177///     tokio::spawn(async move {
178///         assert_eq!(rx2.recv().await.unwrap(), 10);
179///         assert_eq!(rx2.recv().await.unwrap(), 20);
180///     });
181///
182///     tx.send(10).unwrap();
183///     tx.send(20).unwrap();
184/// }
185/// ```
186///
187/// [`broadcast`]: crate::sync::broadcast
188pub struct Receiver<T> {
189    /// State shared with all receivers and senders.
190    shared: Arc<Shared<T>>,
191
192    /// Next position to read from
193    next: u64,
194
195    /// Waiter state
196    wait: Arc<WaitNode>,
197}
198
199/// Error returned by [`Sender::send`][Sender::send].
200///
201/// A **send** operation can only fail if there are no active receivers,
202/// implying that the message could never be received. The error contains the
203/// message being sent as a payload so it can be recovered.
204#[derive(Debug)]
205pub struct SendError<T>(pub T);
206
207/// An error returned from the [`recv`] function on a [`Receiver`].
208///
209/// [`recv`]: crate::sync::broadcast::Receiver::recv
210/// [`Receiver`]: crate::sync::broadcast::Receiver
211#[derive(Debug, PartialEq)]
212pub enum RecvError {
213    /// There are no more active senders implying no further messages will ever
214    /// be sent.
215    Closed,
216
217    /// The receiver lagged too far behind. Attempting to receive again will
218    /// return the oldest message still retained by the channel.
219    ///
220    /// Includes the number of skipped messages.
221    Lagged(u64),
222}
223
224/// An error returned from the [`try_recv`] function on a [`Receiver`].
225///
226/// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
227/// [`Receiver`]: crate::sync::broadcast::Receiver
228#[derive(Debug, PartialEq)]
229pub enum TryRecvError {
230    /// The channel is currently empty. There are still active
231    /// [`Sender`][Sender] handles, so data may yet become available.
232    Empty,
233
234    /// There are no more active senders implying no further messages will ever
235    /// be sent.
236    Closed,
237
238    /// The receiver lagged too far behind and has been forcibly disconnected.
239    /// Attempting to receive again will return the oldest message still
240    /// retained by the channel.
241    ///
242    /// Includes the number of skipped messages.
243    Lagged(u64),
244}
245
246/// Data shared between senders and receivers
247struct Shared<T> {
248    /// slots in the channel
249    buffer: Box<[Slot<T>]>,
250
251    /// Mask a position -> index
252    mask: usize,
253
254    /// Tail of the queue
255    tail: Mutex<Tail>,
256
257    /// Notifies a sender that the slot is unlocked
258    condvar: Condvar,
259
260    /// Stack of pending waiters
261    wait_stack: AtomicPtr<WaitNode>,
262
263    /// Number of outstanding Sender handles
264    num_tx: AtomicUsize,
265}
266
267/// Next position to write a value
268struct Tail {
269    /// Next position to write to
270    pos: u64,
271
272    /// Number of active receivers
273    rx_cnt: usize,
274}
275
276/// Slot in the buffer
277struct Slot<T> {
278    /// Remaining number of receivers that are expected to see this value.
279    ///
280    /// When this goes to zero, the value is released.
281    rem: AtomicUsize,
282
283    /// Used to lock the `write` field.
284    lock: AtomicUsize,
285
286    /// The value being broadcast
287    ///
288    /// Synchronized by `state`
289    write: Write<T>,
290}
291
292/// A write in the buffer
293struct Write<T> {
294    /// Uniquely identifies this write
295    pos: CausalCell<u64>,
296
297    /// The written value
298    val: CausalCell<Option<T>>,
299}
300
301/// Tracks a waiting receiver
302#[derive(Debug)]
303struct WaitNode {
304    /// True if queued
305    queued: AtomicBool,
306
307    /// Task to wake when a permit is made available.
308    waker: AtomicWaker,
309
310    /// Next pointer in the stack of waiting senders.
311    next: CausalCell<*const WaitNode>,
312}
313
314struct RecvGuard<'a, T> {
315    slot: &'a Slot<T>,
316    tail: &'a Mutex<Tail>,
317    condvar: &'a Condvar,
318}
319
320/// Max number of receivers. Reserve space to lock.
321const MAX_RECEIVERS: usize = usize::MAX >> 1;
322
323/// Create a bounded, multi-producer, multi-consumer channel where each sent
324/// value is broadcasted to all active receivers.
325///
326/// All data sent on [`Sender`] will become available on every active
327/// [`Receiver`] in the same order as it was sent.
328///
329/// The `Sender` can be cloned to `send` to the same channel from multiple
330/// points in the process or it can be used concurrently from an `Arc`. New
331/// `Receiver` handles are created by calling [`Sender::subscribe`].
332///
333/// If all [`Receiver`] handles are dropped, the `send` method will return a
334/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
335/// method will return a [`RecvError`].
336///
337/// [`Sender`]: crate::sync::broadcast::Sender
338/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
339/// [`Receiver`]: crate::sync::broadcast::Receiver
340/// [`recv`]: crate::sync::broadcast::Receiver::recv
341/// [`SendError`]: crate::sync::broadcast::SendError
342/// [`RecvError`]: crate::sync::broadcast::RecvError
343///
344/// # Examples
345///
346/// ```
347/// use tokio::sync::broadcast;
348///
349/// #[tokio::main]
350/// async fn main() {
351///     let (tx, mut rx1) = broadcast::channel(16);
352///     let mut rx2 = tx.subscribe();
353///
354///     tokio::spawn(async move {
355///         assert_eq!(rx1.recv().await.unwrap(), 10);
356///         assert_eq!(rx1.recv().await.unwrap(), 20);
357///     });
358///
359///     tokio::spawn(async move {
360///         assert_eq!(rx2.recv().await.unwrap(), 10);
361///         assert_eq!(rx2.recv().await.unwrap(), 20);
362///     });
363///
364///     tx.send(10).unwrap();
365///     tx.send(20).unwrap();
366/// }
367/// ```
368pub fn channel<T>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
369    assert!(capacity > 0, "capacity is empty");
370    assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
371
372    // Round to a power of two
373    capacity = capacity.next_power_of_two();
374
375    let mut buffer = Vec::with_capacity(capacity);
376
377    for i in 0..capacity {
378        buffer.push(Slot {
379            rem: AtomicUsize::new(0),
380            lock: AtomicUsize::new(0),
381            write: Write {
382                pos: CausalCell::new((i as u64).wrapping_sub(capacity as u64)),
383                val: CausalCell::new(None),
384            },
385        });
386    }
387
388    let shared = Arc::new(Shared {
389        buffer: buffer.into_boxed_slice(),
390        mask: capacity - 1,
391        tail: Mutex::new(Tail { pos: 0, rx_cnt: 1 }),
392        condvar: Condvar::new(),
393        wait_stack: AtomicPtr::new(ptr::null_mut()),
394        num_tx: AtomicUsize::new(1),
395    });
396
397    let rx = Receiver {
398        shared: shared.clone(),
399        next: 0,
400        wait: Arc::new(WaitNode {
401            queued: AtomicBool::new(false),
402            waker: AtomicWaker::new(),
403            next: CausalCell::new(ptr::null()),
404        }),
405    };
406
407    let tx = Sender { shared };
408
409    (tx, rx)
410}
411
412unsafe impl<T: Send> Send for Sender<T> {}
413unsafe impl<T: Send> Sync for Sender<T> {}
414
415unsafe impl<T: Send> Send for Receiver<T> {}
416unsafe impl<T: Send> Sync for Receiver<T> {}
417
418impl<T> Sender<T> {
419    /// Attempts to send a value to all active [`Receiver`] handles, returning
420    /// it back if it could not be sent.
421    ///
422    /// A successful send occurs when there is at least one active [`Receiver`]
423    /// handle. An unsuccessful send would be one where all associated
424    /// [`Receiver`] handles have already been dropped.
425    ///
426    /// # Return
427    ///
428    /// On success, the number of subscribed [`Receiver`] handles is returned.
429    /// This does not mean that this number of receivers will see the message as
430    /// a receiver may drop before receiving the message.
431    ///
432    /// # Note
433    ///
434    /// A return value of `Ok` **does not** mean that the sent value will be
435    /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
436    /// handles may be dropped before receiving the sent message.
437    ///
438    /// A return value of `Err` **does not** mean that future calls to `send`
439    /// will fail. New [`Receiver`] handles may be created by calling
440    /// [`subscribe`].
441    ///
442    /// [`Receiver`]: crate::sync::broadcast::Receiver
443    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
444    ///
445    /// # Examples
446    ///
447    /// ```
448    /// use tokio::sync::broadcast;
449    ///
450    /// #[tokio::main]
451    /// async fn main() {
452    ///     let (tx, mut rx1) = broadcast::channel(16);
453    ///     let mut rx2 = tx.subscribe();
454    ///
455    ///     tokio::spawn(async move {
456    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
457    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
458    ///     });
459    ///
460    ///     tokio::spawn(async move {
461    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
462    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
463    ///     });
464    ///
465    ///     tx.send(10).unwrap();
466    ///     tx.send(20).unwrap();
467    /// }
468    /// ```
469    pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
470        self.send2(Some(value))
471            .map_err(|SendError(maybe_v)| SendError(maybe_v.unwrap()))
472    }
473
474    /// Create a new [`Receiver`] handle that will receive values sent **after**
475    /// this call to `subscribe`.
476    ///
477    /// # Examples
478    ///
479    /// ```
480    /// use tokio::sync::broadcast;
481    ///
482    /// #[tokio::main]
483    /// async fn main() {
484    ///     let (tx, _rx) = broadcast::channel(16);
485    ///
486    ///     // Will not be seen
487    ///     tx.send(10).unwrap();
488    ///
489    ///     let mut rx = tx.subscribe();
490    ///
491    ///     tx.send(20).unwrap();
492    ///
493    ///     let value = rx.recv().await.unwrap();
494    ///     assert_eq!(20, value);
495    /// }
496    /// ```
497    pub fn subscribe(&self) -> Receiver<T> {
498        let shared = self.shared.clone();
499
500        let mut tail = shared.tail.lock().unwrap();
501
502        if tail.rx_cnt == MAX_RECEIVERS {
503            panic!("max receivers");
504        }
505
506        tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
507        let next = tail.pos;
508
509        drop(tail);
510
511        Receiver {
512            shared,
513            next,
514            wait: Arc::new(WaitNode {
515                queued: AtomicBool::new(false),
516                waker: AtomicWaker::new(),
517                next: CausalCell::new(ptr::null()),
518            }),
519        }
520    }
521
522    /// Returns the number of active receivers
523    ///
524    /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
525    /// [`subscribe`]. These are the handles that will receive values sent on
526    /// this [`Sender`].
527    ///
528    /// # Note
529    ///
530    /// It is not guaranteed that a sent message will reach this number of
531    /// receivers. Active receivers may never call [`recv`] again before
532    /// dropping.
533    ///
534    /// [`recv`]: crate::sync::broadcast::Receiver::recv
535    /// [`Receiver`]: crate::sync::broadcast::Receiver
536    /// [`Sender`]: crate::sync::broadcast::Sender
537    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
538    /// [`channel`]: crate::sync::broadcast::channel
539    ///
540    /// # Examples
541    ///
542    /// ```
543    /// use tokio::sync::broadcast;
544    ///
545    /// #[tokio::main]
546    /// async fn main() {
547    ///     let (tx, _rx1) = broadcast::channel(16);
548    ///
549    ///     assert_eq!(1, tx.receiver_count());
550    ///
551    ///     let mut _rx2 = tx.subscribe();
552    ///
553    ///     assert_eq!(2, tx.receiver_count());
554    ///
555    ///     tx.send(10).unwrap();
556    /// }
557    /// ```
558    pub fn receiver_count(&self) -> usize {
559        let tail = self.shared.tail.lock().unwrap();
560        tail.rx_cnt
561    }
562
563    fn send2(&self, value: Option<T>) -> Result<usize, SendError<Option<T>>> {
564        let mut tail = self.shared.tail.lock().unwrap();
565
566        if tail.rx_cnt == 0 {
567            return Err(SendError(value));
568        }
569
570        // Position to write into
571        let pos = tail.pos;
572        let rem = tail.rx_cnt;
573        let idx = (pos & self.shared.mask as u64) as usize;
574
575        // Update the tail position
576        tail.pos = tail.pos.wrapping_add(1);
577
578        // Get the slot
579        let slot = &self.shared.buffer[idx];
580
581        // Acquire the write lock
582        let mut prev = slot.lock.fetch_or(1, SeqCst);
583
584        while prev & !1 != 0 {
585            // Concurrent readers, we must go to sleep
586            tail = self.shared.condvar.wait(tail).unwrap();
587
588            prev = slot.lock.load(SeqCst);
589
590            if prev & 1 == 0 {
591                // The writer lock bit was cleared while this thread was
592                // sleeping. This can only happen if a newer write happened on
593                // this slot by another thread. Bail early as an optimization,
594                // there is nothing left to do.
595                return Ok(rem);
596            }
597        }
598
599        if tail.pos.wrapping_sub(pos) > self.shared.buffer.len() as u64 {
600            // There is a newer pending write to the same slot.
601            return Ok(rem);
602        }
603
604        // Slot lock acquired
605        slot.write.pos.with_mut(|ptr| unsafe { *ptr = pos });
606        slot.write.val.with_mut(|ptr| unsafe { *ptr = value });
607
608        // Set remaining receivers
609        slot.rem.store(rem, SeqCst);
610
611        // Release the slot lock
612        slot.lock.store(0, SeqCst);
613
614        // Release the mutex. This must happen after the slot lock is released,
615        // otherwise the writer lock bit could be cleared while another thread
616        // is in the critical section.
617        drop(tail);
618
619        // Notify waiting receivers
620        self.notify_rx();
621
622        Ok(rem)
623    }
624
625    fn notify_rx(&self) {
626        let mut curr = self.shared.wait_stack.swap(ptr::null_mut(), SeqCst) as *const WaitNode;
627
628        while !curr.is_null() {
629            let waiter = unsafe { Arc::from_raw(curr) };
630
631            // Update `curr` before toggling `queued` and waking
632            curr = waiter.next.with(|ptr| unsafe { *ptr });
633
634            // Unset queued
635            waiter.queued.store(false, SeqCst);
636
637            // Wake
638            waiter.waker.wake();
639        }
640    }
641}
642
643impl<T> Clone for Sender<T> {
644    fn clone(&self) -> Sender<T> {
645        let shared = self.shared.clone();
646        shared.num_tx.fetch_add(1, SeqCst);
647
648        Sender { shared }
649    }
650}
651
652impl<T> Drop for Sender<T> {
653    fn drop(&mut self) {
654        if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
655            let _ = self.send2(None);
656        }
657    }
658}
659
660impl<T> Receiver<T> {
661    /// Lock the next value if there is one.
662    ///
663    /// The caller is responsible for unlocking
664    fn recv_ref(&mut self, spin: bool) -> Result<RecvGuard<'_, T>, TryRecvError> {
665        let idx = (self.next & self.shared.mask as u64) as usize;
666
667        // The slot holding the next value to read
668        let slot = &self.shared.buffer[idx];
669
670        // Lock the slot
671        if !slot.try_rx_lock() {
672            if spin {
673                while !slot.try_rx_lock() {
674                    spin_loop_hint();
675                }
676            } else {
677                return Err(TryRecvError::Empty);
678            }
679        }
680
681        let guard = RecvGuard {
682            slot,
683            tail: &self.shared.tail,
684            condvar: &self.shared.condvar,
685        };
686
687        if guard.pos() != self.next {
688            let pos = guard.pos();
689
690            guard.drop_no_rem_dec();
691
692            if pos.wrapping_add(self.shared.buffer.len() as u64) == self.next {
693                return Err(TryRecvError::Empty);
694            } else {
695                let tail = self.shared.tail.lock().unwrap();
696
697                // `tail.pos` points to the slot the **next** send writes to.
698                // Because a receiver is lagging, this slot also holds the
699                // oldest value. To make the positions match, we subtract the
700                // capacity.
701                let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
702                let missed = next.wrapping_sub(self.next);
703
704                self.next = next;
705
706                return Err(TryRecvError::Lagged(missed));
707            }
708        }
709
710        self.next = self.next.wrapping_add(1);
711
712        Ok(guard)
713    }
714}
715
716impl<T> Receiver<T>
717where
718    T: Clone,
719{
720    /// Attempts to return a pending value on this receiver without awaiting.
721    ///
722    /// This is useful for a flavor of "optimistic check" before deciding to
723    /// await on a receiver.
724    ///
725    /// Compared with [`recv`], this function has three failure cases instead of one
726    /// (one for closed, one for an empty buffer, one for a lagging receiver).
727    ///
728    /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
729    /// dropped, indicating that no further values can be sent on the channel.
730    ///
731    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
732    /// sent values will overwrite old values. At this point, a call to [`recv`]
733    /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
734    /// internal cursor is updated to point to the oldest value still held by
735    /// the channel. A subsequent call to [`try_recv`] will return this value
736    /// **unless** it has been since overwritten. If there are no values to
737    /// receive, `Err(TryRecvError::Empty)` is returned.
738    ///
739    /// [`recv`]: crate::sync::broadcast::Receiver::recv
740    /// [`Receiver`]: crate::sync::broadcast::Receiver
741    ///
742    /// # Examples
743    ///
744    /// ```
745    /// use tokio::sync::broadcast;
746    ///
747    /// #[tokio::main]
748    /// async fn main() {
749    ///     let (tx, mut rx) = broadcast::channel(16);
750    ///
751    ///     assert!(rx.try_recv().is_err());
752    ///
753    ///     tx.send(10).unwrap();
754    ///
755    ///     let value = rx.try_recv().unwrap();
756    ///     assert_eq!(10, value);
757    /// }
758    /// ```
759    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
760        let guard = self.recv_ref(false)?;
761        guard.clone_value().ok_or(TryRecvError::Closed)
762    }
763
764    #[doc(hidden)] // TODO: document
765    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
766        if let Some(value) = ok_empty(self.try_recv())? {
767            return Poll::Ready(Ok(value));
768        }
769
770        self.register_waker(cx.waker());
771
772        if let Some(value) = ok_empty(self.try_recv())? {
773            Poll::Ready(Ok(value))
774        } else {
775            Poll::Pending
776        }
777    }
778
779    /// Receive the next value for this receiver.
780    ///
781    /// Each [`Receiver`] handle will receive a clone of all values sent
782    /// **after** it has subscribed.
783    ///
784    /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
785    /// dropped, indicating that no further values can be sent on the channel.
786    ///
787    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
788    /// sent values will overwrite old values. At this point, a call to [`recv`]
789    /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
790    /// internal cursor is updated to point to the oldest value still held by
791    /// the channel. A subsequent call to [`recv`] will return this value
792    /// **unless** it has been since overwritten.
793    ///
794    /// [`Receiver`]: crate::sync::broadcast::Receiver
795    /// [`recv`]: crate::sync::broadcast::Receiver::recv
796    ///
797    /// # Examples
798    ///
799    /// ```
800    /// use tokio::sync::broadcast;
801    ///
802    /// #[tokio::main]
803    /// async fn main() {
804    ///     let (tx, mut rx1) = broadcast::channel(16);
805    ///     let mut rx2 = tx.subscribe();
806    ///
807    ///     tokio::spawn(async move {
808    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
809    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
810    ///     });
811    ///
812    ///     tokio::spawn(async move {
813    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
814    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
815    ///     });
816    ///
817    ///     tx.send(10).unwrap();
818    ///     tx.send(20).unwrap();
819    /// }
820    /// ```
821    ///
822    /// Handling lag
823    ///
824    /// ```
825    /// use tokio::sync::broadcast;
826    ///
827    /// #[tokio::main]
828    /// async fn main() {
829    ///     let (tx, mut rx) = broadcast::channel(2);
830    ///
831    ///     tx.send(10).unwrap();
832    ///     tx.send(20).unwrap();
833    ///     tx.send(30).unwrap();
834    ///
835    ///     // The receiver lagged behind
836    ///     assert!(rx.recv().await.is_err());
837    ///
838    ///     // At this point, we can abort or continue with lost messages
839    ///
840    ///     assert_eq!(20, rx.recv().await.unwrap());
841    ///     assert_eq!(30, rx.recv().await.unwrap());
842    /// }
843    pub async fn recv(&mut self) -> Result<T, RecvError> {
844        use crate::future::poll_fn;
845
846        poll_fn(|cx| self.poll_recv(cx)).await
847    }
848
849    fn register_waker(&self, cx: &Waker) {
850        self.wait.waker.register_by_ref(cx);
851
852        if !self.wait.queued.load(SeqCst) {
853            // Set `queued` before queuing.
854            self.wait.queued.store(true, SeqCst);
855
856            let mut curr = self.shared.wait_stack.load(SeqCst);
857
858            // The ref count is decremented in `notify_rx` when all nodes are
859            // removed from the waiter stack.
860            let node = Arc::into_raw(self.wait.clone()) as *mut _;
861
862            loop {
863                // Safety: `queued == false` means the caller has exclusive
864                // access to `self.wait.next`.
865                self.wait.next.with_mut(|ptr| unsafe { *ptr = curr });
866
867                let res = self
868                    .shared
869                    .wait_stack
870                    .compare_exchange(curr, node, SeqCst, SeqCst);
871
872                match res {
873                    Ok(_) => return,
874                    Err(actual) => curr = actual,
875                }
876            }
877        }
878    }
879}
880
881#[cfg(feature = "stream")]
882impl<T> crate::stream::Stream for Receiver<T>
883where
884    T: Clone,
885{
886    type Item = Result<T, RecvError>;
887
888    fn poll_next(
889        mut self: std::pin::Pin<&mut Self>,
890        cx: &mut Context<'_>,
891    ) -> Poll<Option<Result<T, RecvError>>> {
892        self.poll_recv(cx).map(|v| match v {
893            Ok(v) => Some(Ok(v)),
894            lag @ Err(RecvError::Lagged(_)) => Some(lag),
895            Err(RecvError::Closed) => None,
896        })
897    }
898}
899
900impl<T> Drop for Receiver<T> {
901    fn drop(&mut self) {
902        let mut tail = self.shared.tail.lock().unwrap();
903
904        tail.rx_cnt -= 1;
905        let until = tail.pos;
906
907        drop(tail);
908
909        while self.next != until {
910            match self.recv_ref(true) {
911                // Ignore the value
912                Ok(_) => {}
913                // The channel is closed
914                Err(TryRecvError::Closed) => break,
915                // Ignore lagging, we will catch up
916                Err(TryRecvError::Lagged(..)) => {}
917                // Can't be empty
918                Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
919            }
920        }
921    }
922}
923
924impl<T> Drop for Shared<T> {
925    fn drop(&mut self) {
926        // Clear the wait stack
927        let mut curr = *self.wait_stack.get_mut() as *const WaitNode;
928
929        while !curr.is_null() {
930            let waiter = unsafe { Arc::from_raw(curr) };
931            curr = waiter.next.with(|ptr| unsafe { *ptr });
932        }
933    }
934}
935
936impl<T> fmt::Debug for Sender<T> {
937    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
938        write!(fmt, "broadcast::Sender")
939    }
940}
941
942impl<T> fmt::Debug for Receiver<T> {
943    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
944        write!(fmt, "broadcast::Receiver")
945    }
946}
947
948impl<T> Slot<T> {
949    /// Try to lock the slot for a receiver. If `false`, then a sender holds the
950    /// lock and the calling task will be notified once the sender has released
951    /// the lock.
952    fn try_rx_lock(&self) -> bool {
953        let mut curr = self.lock.load(SeqCst);
954
955        loop {
956            if curr & 1 == 1 {
957                // Locked by sender
958                return false;
959            }
960
961            // Only increment (by 2) if the LSB "lock" bit is not set.
962            let res = self.lock.compare_exchange(curr, curr + 2, SeqCst, SeqCst);
963
964            match res {
965                Ok(_) => return true,
966                Err(actual) => curr = actual,
967            }
968        }
969    }
970
971    fn rx_unlock(&self, tail: &Mutex<Tail>, condvar: &Condvar, rem_dec: bool) {
972        if rem_dec {
973            // Decrement the remaining counter
974            if 1 == self.rem.fetch_sub(1, SeqCst) {
975                // Last receiver, drop the value
976                self.write.val.with_mut(|ptr| unsafe { *ptr = None });
977            }
978        }
979
980        if 1 == self.lock.fetch_sub(2, SeqCst) - 2 {
981            // First acquire the lock to make sure our sender is waiting on the
982            // condition variable, otherwise the notification could be lost.
983            let _ = tail.lock().unwrap();
984            // Wake up senders
985            condvar.notify_all();
986        }
987    }
988}
989
990impl<'a, T> RecvGuard<'a, T> {
991    fn pos(&self) -> u64 {
992        self.slot.write.pos.with(|ptr| unsafe { *ptr })
993    }
994
995    fn clone_value(&self) -> Option<T>
996    where
997        T: Clone,
998    {
999        self.slot.write.val.with(|ptr| unsafe { (*ptr).clone() })
1000    }
1001
1002    fn drop_no_rem_dec(self) {
1003        use std::mem;
1004
1005        self.slot.rx_unlock(self.tail, self.condvar, false);
1006
1007        mem::forget(self);
1008    }
1009}
1010
1011impl<'a, T> Drop for RecvGuard<'a, T> {
1012    fn drop(&mut self) {
1013        self.slot.rx_unlock(self.tail, self.condvar, true)
1014    }
1015}
1016
1017fn ok_empty<T>(res: Result<T, TryRecvError>) -> Result<Option<T>, RecvError> {
1018    match res {
1019        Ok(value) => Ok(Some(value)),
1020        Err(TryRecvError::Empty) => Ok(None),
1021        Err(TryRecvError::Lagged(n)) => Err(RecvError::Lagged(n)),
1022        Err(TryRecvError::Closed) => Err(RecvError::Closed),
1023    }
1024}
1025
1026impl fmt::Display for RecvError {
1027    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1028        match self {
1029            RecvError::Closed => write!(f, "channel closed"),
1030            RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
1031        }
1032    }
1033}
1034
1035impl std::error::Error for RecvError {}
1036
1037impl fmt::Display for TryRecvError {
1038    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1039        match self {
1040            TryRecvError::Empty => write!(f, "channel empty"),
1041            TryRecvError::Closed => write!(f, "channel closed"),
1042            TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
1043        }
1044    }
1045}
1046
1047impl std::error::Error for TryRecvError {}