madsim_real_tokio/sync/
notify.rs

1// Allow `unreachable_pub` warnings when sync is not enabled
2// due to the usage of `Notify` within the `rt` feature set.
3// When this module is compiled with `sync` enabled we will warn on
4// this lint. When `rt` is enabled we use `pub(crate)` which
5// triggers this warning but it is safe to ignore in this case.
6#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7
8use crate::loom::cell::UnsafeCell;
9use crate::loom::sync::atomic::AtomicUsize;
10use crate::loom::sync::Mutex;
11use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
12use crate::util::WakeList;
13
14use std::future::Future;
15use std::marker::PhantomPinned;
16use std::panic::{RefUnwindSafe, UnwindSafe};
17use std::pin::Pin;
18use std::ptr::NonNull;
19use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
20use std::task::{Context, Poll, Waker};
21
22type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
23type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
24
25/// Notifies a single task to wake up.
26///
27/// `Notify` provides a basic mechanism to notify a single task of an event.
28/// `Notify` itself does not carry any data. Instead, it is to be used to signal
29/// another task to perform an operation.
30///
31/// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
32/// [`notified().await`] method waits for a permit to become available, and
33/// [`notify_one()`] sets a permit **if there currently are no available
34/// permits**.
35///
36/// The synchronization details of `Notify` are similar to
37/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
38/// value contains a single permit. [`notified().await`] waits for the permit to
39/// be made available, consumes the permit, and resumes.  [`notify_one()`] sets
40/// the permit, waking a pending task if there is one.
41///
42/// If `notify_one()` is called **before** `notified().await`, then the next
43/// call to `notified().await` will complete immediately, consuming the permit.
44/// Any subsequent calls to `notified().await` will wait for a new permit.
45///
46/// If `notify_one()` is called **multiple** times before `notified().await`,
47/// only a **single** permit is stored. The next call to `notified().await` will
48/// complete immediately, but the one after will wait for a new permit.
49///
50/// # Examples
51///
52/// Basic usage.
53///
54/// ```
55/// use tokio::sync::Notify;
56/// use std::sync::Arc;
57///
58/// #[tokio::main]
59/// async fn main() {
60///     let notify = Arc::new(Notify::new());
61///     let notify2 = notify.clone();
62///
63///     let handle = tokio::spawn(async move {
64///         notify2.notified().await;
65///         println!("received notification");
66///     });
67///
68///     println!("sending notification");
69///     notify.notify_one();
70///
71///     // Wait for task to receive notification.
72///     handle.await.unwrap();
73/// }
74/// ```
75///
76/// Unbound multi-producer single-consumer (mpsc) channel.
77///
78/// No wakeups can be lost when using this channel because the call to
79/// `notify_one()` will store a permit in the `Notify`, which the following call
80/// to `notified()` will consume.
81///
82/// ```
83/// use tokio::sync::Notify;
84///
85/// use std::collections::VecDeque;
86/// use std::sync::Mutex;
87///
88/// struct Channel<T> {
89///     values: Mutex<VecDeque<T>>,
90///     notify: Notify,
91/// }
92///
93/// impl<T> Channel<T> {
94///     pub fn send(&self, value: T) {
95///         self.values.lock().unwrap()
96///             .push_back(value);
97///
98///         // Notify the consumer a value is available
99///         self.notify.notify_one();
100///     }
101///
102///     // This is a single-consumer channel, so several concurrent calls to
103///     // `recv` are not allowed.
104///     pub async fn recv(&self) -> T {
105///         loop {
106///             // Drain values
107///             if let Some(value) = self.values.lock().unwrap().pop_front() {
108///                 return value;
109///             }
110///
111///             // Wait for values to be available
112///             self.notify.notified().await;
113///         }
114///     }
115/// }
116/// ```
117///
118/// Unbound multi-producer multi-consumer (mpmc) channel.
119///
120/// The call to [`enable`] is important because otherwise if you have two
121/// calls to `recv` and two calls to `send` in parallel, the following could
122/// happen:
123///
124///  1. Both calls to `try_recv` return `None`.
125///  2. Both new elements are added to the vector.
126///  3. The `notify_one` method is called twice, adding only a single
127///     permit to the `Notify`.
128///  4. Both calls to `recv` reach the `Notified` future. One of them
129///     consumes the permit, and the other sleeps forever.
130///
131/// By adding the `Notified` futures to the list by calling `enable` before
132/// `try_recv`, the `notify_one` calls in step three would remove the
133/// futures from the list and mark them notified instead of adding a permit
134/// to the `Notify`. This ensures that both futures are woken.
135///
136/// Notice that this failure can only happen if there are two concurrent calls
137/// to `recv`. This is why the mpsc example above does not require a call to
138/// `enable`.
139///
140/// ```
141/// use tokio::sync::Notify;
142///
143/// use std::collections::VecDeque;
144/// use std::sync::Mutex;
145///
146/// struct Channel<T> {
147///     messages: Mutex<VecDeque<T>>,
148///     notify_on_sent: Notify,
149/// }
150///
151/// impl<T> Channel<T> {
152///     pub fn send(&self, msg: T) {
153///         let mut locked_queue = self.messages.lock().unwrap();
154///         locked_queue.push_back(msg);
155///         drop(locked_queue);
156///
157///         // Send a notification to one of the calls currently
158///         // waiting in a call to `recv`.
159///         self.notify_on_sent.notify_one();
160///     }
161///
162///     pub fn try_recv(&self) -> Option<T> {
163///         let mut locked_queue = self.messages.lock().unwrap();
164///         locked_queue.pop_front()
165///     }
166///
167///     pub async fn recv(&self) -> T {
168///         let future = self.notify_on_sent.notified();
169///         tokio::pin!(future);
170///
171///         loop {
172///             // Make sure that no wakeup is lost if we get
173///             // `None` from `try_recv`.
174///             future.as_mut().enable();
175///
176///             if let Some(msg) = self.try_recv() {
177///                 return msg;
178///             }
179///
180///             // Wait for a call to `notify_one`.
181///             //
182///             // This uses `.as_mut()` to avoid consuming the future,
183///             // which lets us call `Pin::set` below.
184///             future.as_mut().await;
185///
186///             // Reset the future in case another call to
187///             // `try_recv` got the message before us.
188///             future.set(self.notify_on_sent.notified());
189///         }
190///     }
191/// }
192/// ```
193///
194/// [park]: std::thread::park
195/// [unpark]: std::thread::Thread::unpark
196/// [`notified().await`]: Notify::notified()
197/// [`notify_one()`]: Notify::notify_one()
198/// [`enable`]: Notified::enable()
199/// [`Semaphore`]: crate::sync::Semaphore
200#[derive(Debug)]
201pub struct Notify {
202    // `state` uses 2 bits to store one of `EMPTY`,
203    // `WAITING` or `NOTIFIED`. The rest of the bits
204    // are used to store the number of times `notify_waiters`
205    // was called.
206    //
207    // Throughout the code there are two assumptions:
208    // - state can be transitioned *from* `WAITING` only if
209    //   `waiters` lock is held
210    // - number of times `notify_waiters` was called can
211    //   be modified only if `waiters` lock is held
212    state: AtomicUsize,
213    waiters: Mutex<WaitList>,
214}
215
216#[derive(Debug)]
217struct Waiter {
218    /// Intrusive linked-list pointers.
219    pointers: linked_list::Pointers<Waiter>,
220
221    /// Waiting task's waker. Depending on the value of `notification`,
222    /// this field is either protected by the `waiters` lock in
223    /// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
224    waker: UnsafeCell<Option<Waker>>,
225
226    /// Notification for this waiter.
227    /// * if it's `None`, then `waker` is protected by the `waiters` lock.
228    /// * if it's `Some`, then `waker` is exclusively owned by the
229    ///   enclosing `Waiter` and can be accessed without locking.
230    notification: AtomicNotification,
231
232    /// Should not be `Unpin`.
233    _p: PhantomPinned,
234}
235
236impl Waiter {
237    fn new() -> Waiter {
238        Waiter {
239            pointers: linked_list::Pointers::new(),
240            waker: UnsafeCell::new(None),
241            notification: AtomicNotification::none(),
242            _p: PhantomPinned,
243        }
244    }
245}
246
247generate_addr_of_methods! {
248    impl<> Waiter {
249        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
250            &self.pointers
251        }
252    }
253}
254
255// No notification.
256const NOTIFICATION_NONE: usize = 0;
257
258// Notification type used by `notify_one`.
259const NOTIFICATION_ONE: usize = 1;
260
261// Notification type used by `notify_waiters`.
262const NOTIFICATION_ALL: usize = 2;
263
264/// Notification for a `Waiter`.
265/// This struct is equivalent to `Option<Notification>`, but uses
266/// `AtomicUsize` inside for atomic operations.
267#[derive(Debug)]
268struct AtomicNotification(AtomicUsize);
269
270impl AtomicNotification {
271    fn none() -> Self {
272        AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
273    }
274
275    /// Store-release a notification.
276    /// This method should be called exactly once.
277    fn store_release(&self, notification: Notification) {
278        self.0.store(notification as usize, Release);
279    }
280
281    fn load(&self, ordering: Ordering) -> Option<Notification> {
282        match self.0.load(ordering) {
283            NOTIFICATION_NONE => None,
284            NOTIFICATION_ONE => Some(Notification::One),
285            NOTIFICATION_ALL => Some(Notification::All),
286            _ => unreachable!(),
287        }
288    }
289
290    /// Clears the notification.
291    /// This method is used by a `Notified` future to consume the
292    /// notification. It uses relaxed ordering and should be only
293    /// used once the atomic notification is no longer shared.
294    fn clear(&self) {
295        self.0.store(NOTIFICATION_NONE, Relaxed);
296    }
297}
298
299#[derive(Debug, PartialEq, Eq)]
300#[repr(usize)]
301enum Notification {
302    One = NOTIFICATION_ONE,
303    All = NOTIFICATION_ALL,
304}
305
306/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
307/// and gates the access to it on `notify.waiters` mutex. It also empties
308/// the list on drop.
309struct NotifyWaitersList<'a> {
310    list: GuardedWaitList,
311    is_empty: bool,
312    notify: &'a Notify,
313}
314
315impl<'a> NotifyWaitersList<'a> {
316    fn new(
317        unguarded_list: WaitList,
318        guard: Pin<&'a Waiter>,
319        notify: &'a Notify,
320    ) -> NotifyWaitersList<'a> {
321        let guard_ptr = NonNull::from(guard.get_ref());
322        let list = unguarded_list.into_guarded(guard_ptr);
323        NotifyWaitersList {
324            list,
325            is_empty: false,
326            notify,
327        }
328    }
329
330    /// Removes the last element from the guarded list. Modifying this list
331    /// requires an exclusive access to the main list in `Notify`.
332    fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> {
333        let result = self.list.pop_back();
334        if result.is_none() {
335            // Save information about emptiness to avoid waiting for lock
336            // in the destructor.
337            self.is_empty = true;
338        }
339        result
340    }
341}
342
343impl Drop for NotifyWaitersList<'_> {
344    fn drop(&mut self) {
345        // If the list is not empty, we unlink all waiters from it.
346        // We do not wake the waiters to avoid double panics.
347        if !self.is_empty {
348            let _lock_guard = self.notify.waiters.lock();
349            while let Some(waiter) = self.list.pop_back() {
350                // Safety: we never make mutable references to waiters.
351                let waiter = unsafe { waiter.as_ref() };
352                waiter.notification.store_release(Notification::All);
353            }
354        }
355    }
356}
357
358/// Future returned from [`Notify::notified()`].
359///
360/// This future is fused, so once it has completed, any future calls to poll
361/// will immediately return `Poll::Ready`.
362#[derive(Debug)]
363pub struct Notified<'a> {
364    /// The `Notify` being received on.
365    notify: &'a Notify,
366
367    /// The current state of the receiving process.
368    state: State,
369
370    /// Number of calls to `notify_waiters` at the time of creation.
371    notify_waiters_calls: usize,
372
373    /// Entry in the waiter `LinkedList`.
374    waiter: Waiter,
375}
376
377unsafe impl<'a> Send for Notified<'a> {}
378unsafe impl<'a> Sync for Notified<'a> {}
379
380#[derive(Debug)]
381enum State {
382    Init,
383    Waiting,
384    Done,
385}
386
387const NOTIFY_WAITERS_SHIFT: usize = 2;
388const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
389const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
390
391/// Initial "idle" state.
392const EMPTY: usize = 0;
393
394/// One or more threads are currently waiting to be notified.
395const WAITING: usize = 1;
396
397/// Pending notification.
398const NOTIFIED: usize = 2;
399
400fn set_state(data: usize, state: usize) -> usize {
401    (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
402}
403
404fn get_state(data: usize) -> usize {
405    data & STATE_MASK
406}
407
408fn get_num_notify_waiters_calls(data: usize) -> usize {
409    (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
410}
411
412fn inc_num_notify_waiters_calls(data: usize) -> usize {
413    data + (1 << NOTIFY_WAITERS_SHIFT)
414}
415
416fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
417    data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
418}
419
420impl Notify {
421    /// Create a new `Notify`, initialized without a permit.
422    ///
423    /// # Examples
424    ///
425    /// ```
426    /// use tokio::sync::Notify;
427    ///
428    /// let notify = Notify::new();
429    /// ```
430    pub fn new() -> Notify {
431        Notify {
432            state: AtomicUsize::new(0),
433            waiters: Mutex::new(LinkedList::new()),
434        }
435    }
436
437    /// Create a new `Notify`, initialized without a permit.
438    ///
439    /// When using the `tracing` [unstable feature], a `Notify` created with
440    /// `const_new` will not be instrumented. As such, it will not be visible
441    /// in [`tokio-console`]. Instead, [`Notify::new`] should be used to create
442    /// an instrumented object if that is needed.
443    ///
444    /// # Examples
445    ///
446    /// ```
447    /// use tokio::sync::Notify;
448    ///
449    /// static NOTIFY: Notify = Notify::const_new();
450    /// ```
451    ///
452    /// [`tokio-console`]: https://github.com/tokio-rs/console
453    /// [unstable feature]: crate#unstable-features
454    #[cfg(not(all(loom, test)))]
455    pub const fn const_new() -> Notify {
456        Notify {
457            state: AtomicUsize::new(0),
458            waiters: Mutex::const_new(LinkedList::new()),
459        }
460    }
461
462    /// Wait for a notification.
463    ///
464    /// Equivalent to:
465    ///
466    /// ```ignore
467    /// async fn notified(&self);
468    /// ```
469    ///
470    /// Each `Notify` value holds a single permit. If a permit is available from
471    /// an earlier call to [`notify_one()`], then `notified().await` will complete
472    /// immediately, consuming that permit. Otherwise, `notified().await` waits
473    /// for a permit to be made available by the next call to `notify_one()`.
474    ///
475    /// The `Notified` future is not guaranteed to receive wakeups from calls to
476    /// `notify_one()` if it has not yet been polled. See the documentation for
477    /// [`Notified::enable()`] for more details.
478    ///
479    /// The `Notified` future is guaranteed to receive wakeups from
480    /// `notify_waiters()` as soon as it has been created, even if it has not
481    /// yet been polled.
482    ///
483    /// [`notify_one()`]: Notify::notify_one
484    /// [`Notified::enable()`]: Notified::enable
485    ///
486    /// # Cancel safety
487    ///
488    /// This method uses a queue to fairly distribute notifications in the order
489    /// they were requested. Cancelling a call to `notified` makes you lose your
490    /// place in the queue.
491    ///
492    /// # Examples
493    ///
494    /// ```
495    /// use tokio::sync::Notify;
496    /// use std::sync::Arc;
497    ///
498    /// #[tokio::main]
499    /// async fn main() {
500    ///     let notify = Arc::new(Notify::new());
501    ///     let notify2 = notify.clone();
502    ///
503    ///     tokio::spawn(async move {
504    ///         notify2.notified().await;
505    ///         println!("received notification");
506    ///     });
507    ///
508    ///     println!("sending notification");
509    ///     notify.notify_one();
510    /// }
511    /// ```
512    pub fn notified(&self) -> Notified<'_> {
513        // we load the number of times notify_waiters
514        // was called and store that in the future.
515        let state = self.state.load(SeqCst);
516        Notified {
517            notify: self,
518            state: State::Init,
519            notify_waiters_calls: get_num_notify_waiters_calls(state),
520            waiter: Waiter::new(),
521        }
522    }
523
524    /// Notifies a waiting task.
525    ///
526    /// If a task is currently waiting, that task is notified. Otherwise, a
527    /// permit is stored in this `Notify` value and the **next** call to
528    /// [`notified().await`] will complete immediately consuming the permit made
529    /// available by this call to `notify_one()`.
530    ///
531    /// At most one permit may be stored by `Notify`. Many sequential calls to
532    /// `notify_one` will result in a single permit being stored. The next call to
533    /// `notified().await` will complete immediately, but the one after that
534    /// will wait.
535    ///
536    /// [`notified().await`]: Notify::notified()
537    ///
538    /// # Examples
539    ///
540    /// ```
541    /// use tokio::sync::Notify;
542    /// use std::sync::Arc;
543    ///
544    /// #[tokio::main]
545    /// async fn main() {
546    ///     let notify = Arc::new(Notify::new());
547    ///     let notify2 = notify.clone();
548    ///
549    ///     tokio::spawn(async move {
550    ///         notify2.notified().await;
551    ///         println!("received notification");
552    ///     });
553    ///
554    ///     println!("sending notification");
555    ///     notify.notify_one();
556    /// }
557    /// ```
558    // Alias for old name in 0.x
559    #[cfg_attr(docsrs, doc(alias = "notify"))]
560    pub fn notify_one(&self) {
561        // Load the current state
562        let mut curr = self.state.load(SeqCst);
563
564        // If the state is `EMPTY`, transition to `NOTIFIED` and return.
565        while let EMPTY | NOTIFIED = get_state(curr) {
566            // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
567            // happens-before synchronization must happen between this atomic
568            // operation and a task calling `notified().await`.
569            let new = set_state(curr, NOTIFIED);
570            let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
571
572            match res {
573                // No waiters, no further work to do
574                Ok(_) => return,
575                Err(actual) => {
576                    curr = actual;
577                }
578            }
579        }
580
581        // There are waiters, the lock must be acquired to notify.
582        let mut waiters = self.waiters.lock();
583
584        // The state must be reloaded while the lock is held. The state may only
585        // transition out of WAITING while the lock is held.
586        curr = self.state.load(SeqCst);
587
588        if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
589            drop(waiters);
590            waker.wake();
591        }
592    }
593
594    /// Notifies all waiting tasks.
595    ///
596    /// If a task is currently waiting, that task is notified. Unlike with
597    /// `notify_one()`, no permit is stored to be used by the next call to
598    /// `notified().await`. The purpose of this method is to notify all
599    /// already registered waiters. Registering for notification is done by
600    /// acquiring an instance of the `Notified` future via calling `notified()`.
601    ///
602    /// # Examples
603    ///
604    /// ```
605    /// use tokio::sync::Notify;
606    /// use std::sync::Arc;
607    ///
608    /// #[tokio::main]
609    /// async fn main() {
610    ///     let notify = Arc::new(Notify::new());
611    ///     let notify2 = notify.clone();
612    ///
613    ///     let notified1 = notify.notified();
614    ///     let notified2 = notify.notified();
615    ///
616    ///     let handle = tokio::spawn(async move {
617    ///         println!("sending notifications");
618    ///         notify2.notify_waiters();
619    ///     });
620    ///
621    ///     notified1.await;
622    ///     notified2.await;
623    ///     println!("received notifications");
624    /// }
625    /// ```
626    pub fn notify_waiters(&self) {
627        let mut waiters = self.waiters.lock();
628
629        // The state must be loaded while the lock is held. The state may only
630        // transition out of WAITING while the lock is held.
631        let curr = self.state.load(SeqCst);
632
633        if matches!(get_state(curr), EMPTY | NOTIFIED) {
634            // There are no waiting tasks. All we need to do is increment the
635            // number of times this method was called.
636            atomic_inc_num_notify_waiters_calls(&self.state);
637            return;
638        }
639
640        // Increment the number of times this method was called
641        // and transition to empty.
642        let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
643        self.state.store(new_state, SeqCst);
644
645        // It is critical for `GuardedLinkedList` safety that the guard node is
646        // pinned in memory and is not dropped until the guarded list is dropped.
647        let guard = Waiter::new();
648        pin!(guard);
649
650        // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
651        // underneath to allow every waiter to safely remove itself from it.
652        //
653        // * This list will be still guarded by the `waiters` lock.
654        //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
655        // * This wrapper will empty the list on drop. It is critical for safety
656        //   that we will not leave any list entry with a pointer to the local
657        //   guard node after this function returns / panics.
658        let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
659
660        let mut wakers = WakeList::new();
661        'outer: loop {
662            while wakers.can_push() {
663                match list.pop_back_locked(&mut waiters) {
664                    Some(waiter) => {
665                        // Safety: we never make mutable references to waiters.
666                        let waiter = unsafe { waiter.as_ref() };
667
668                        // Safety: we hold the lock, so we can access the waker.
669                        if let Some(waker) =
670                            unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
671                        {
672                            wakers.push(waker);
673                        }
674
675                        // This waiter is unlinked and will not be shared ever again, release it.
676                        waiter.notification.store_release(Notification::All);
677                    }
678                    None => {
679                        break 'outer;
680                    }
681                }
682            }
683
684            // Release the lock before notifying.
685            drop(waiters);
686
687            // One of the wakers may panic, but the remaining waiters will still
688            // be unlinked from the list in `NotifyWaitersList` destructor.
689            wakers.wake_all();
690
691            // Acquire the lock again.
692            waiters = self.waiters.lock();
693        }
694
695        // Release the lock before notifying
696        drop(waiters);
697
698        wakers.wake_all();
699    }
700}
701
702impl Default for Notify {
703    fn default() -> Notify {
704        Notify::new()
705    }
706}
707
708impl UnwindSafe for Notify {}
709impl RefUnwindSafe for Notify {}
710
711fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
712    match get_state(curr) {
713        EMPTY | NOTIFIED => {
714            let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
715
716            match res {
717                Ok(_) => None,
718                Err(actual) => {
719                    let actual_state = get_state(actual);
720                    assert!(actual_state == EMPTY || actual_state == NOTIFIED);
721                    state.store(set_state(actual, NOTIFIED), SeqCst);
722                    None
723                }
724            }
725        }
726        WAITING => {
727            // At this point, it is guaranteed that the state will not
728            // concurrently change as holding the lock is required to
729            // transition **out** of `WAITING`.
730            //
731            // Get a pending waiter
732            let waiter = waiters.pop_back().unwrap();
733
734            // Safety: we never make mutable references to waiters.
735            let waiter = unsafe { waiter.as_ref() };
736
737            // Safety: we hold the lock, so we can access the waker.
738            let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
739
740            // This waiter is unlinked and will not be shared ever again, release it.
741            waiter.notification.store_release(Notification::One);
742
743            if waiters.is_empty() {
744                // As this the **final** waiter in the list, the state
745                // must be transitioned to `EMPTY`. As transitioning
746                // **from** `WAITING` requires the lock to be held, a
747                // `store` is sufficient.
748                state.store(set_state(curr, EMPTY), SeqCst);
749            }
750            waker
751        }
752        _ => unreachable!(),
753    }
754}
755
756// ===== impl Notified =====
757
758impl Notified<'_> {
759    /// Adds this future to the list of futures that are ready to receive
760    /// wakeups from calls to [`notify_one`].
761    ///
762    /// Polling the future also adds it to the list, so this method should only
763    /// be used if you want to add the future to the list before the first call
764    /// to `poll`. (In fact, this method is equivalent to calling `poll` except
765    /// that no `Waker` is registered.)
766    ///
767    /// This has no effect on notifications sent using [`notify_waiters`], which
768    /// are received as long as they happen after the creation of the `Notified`
769    /// regardless of whether `enable` or `poll` has been called.
770    ///
771    /// This method returns true if the `Notified` is ready. This happens in the
772    /// following situations:
773    ///
774    ///  1. The `notify_waiters` method was called between the creation of the
775    ///     `Notified` and the call to this method.
776    ///  2. This is the first call to `enable` or `poll` on this future, and the
777    ///     `Notify` was holding a permit from a previous call to `notify_one`.
778    ///     The call consumes the permit in that case.
779    ///  3. The future has previously been enabled or polled, and it has since
780    ///     then been marked ready by either consuming a permit from the
781    ///     `Notify`, or by a call to `notify_one` or `notify_waiters` that
782    ///     removed it from the list of futures ready to receive wakeups.
783    ///
784    /// If this method returns true, any future calls to poll on the same future
785    /// will immediately return `Poll::Ready`.
786    ///
787    /// # Examples
788    ///
789    /// Unbound multi-producer multi-consumer (mpmc) channel.
790    ///
791    /// The call to `enable` is important because otherwise if you have two
792    /// calls to `recv` and two calls to `send` in parallel, the following could
793    /// happen:
794    ///
795    ///  1. Both calls to `try_recv` return `None`.
796    ///  2. Both new elements are added to the vector.
797    ///  3. The `notify_one` method is called twice, adding only a single
798    ///     permit to the `Notify`.
799    ///  4. Both calls to `recv` reach the `Notified` future. One of them
800    ///     consumes the permit, and the other sleeps forever.
801    ///
802    /// By adding the `Notified` futures to the list by calling `enable` before
803    /// `try_recv`, the `notify_one` calls in step three would remove the
804    /// futures from the list and mark them notified instead of adding a permit
805    /// to the `Notify`. This ensures that both futures are woken.
806    ///
807    /// ```
808    /// use tokio::sync::Notify;
809    ///
810    /// use std::collections::VecDeque;
811    /// use std::sync::Mutex;
812    ///
813    /// struct Channel<T> {
814    ///     messages: Mutex<VecDeque<T>>,
815    ///     notify_on_sent: Notify,
816    /// }
817    ///
818    /// impl<T> Channel<T> {
819    ///     pub fn send(&self, msg: T) {
820    ///         let mut locked_queue = self.messages.lock().unwrap();
821    ///         locked_queue.push_back(msg);
822    ///         drop(locked_queue);
823    ///
824    ///         // Send a notification to one of the calls currently
825    ///         // waiting in a call to `recv`.
826    ///         self.notify_on_sent.notify_one();
827    ///     }
828    ///
829    ///     pub fn try_recv(&self) -> Option<T> {
830    ///         let mut locked_queue = self.messages.lock().unwrap();
831    ///         locked_queue.pop_front()
832    ///     }
833    ///
834    ///     pub async fn recv(&self) -> T {
835    ///         let future = self.notify_on_sent.notified();
836    ///         tokio::pin!(future);
837    ///
838    ///         loop {
839    ///             // Make sure that no wakeup is lost if we get
840    ///             // `None` from `try_recv`.
841    ///             future.as_mut().enable();
842    ///
843    ///             if let Some(msg) = self.try_recv() {
844    ///                 return msg;
845    ///             }
846    ///
847    ///             // Wait for a call to `notify_one`.
848    ///             //
849    ///             // This uses `.as_mut()` to avoid consuming the future,
850    ///             // which lets us call `Pin::set` below.
851    ///             future.as_mut().await;
852    ///
853    ///             // Reset the future in case another call to
854    ///             // `try_recv` got the message before us.
855    ///             future.set(self.notify_on_sent.notified());
856    ///         }
857    ///     }
858    /// }
859    /// ```
860    ///
861    /// [`notify_one`]: Notify::notify_one()
862    /// [`notify_waiters`]: Notify::notify_waiters()
863    pub fn enable(self: Pin<&mut Self>) -> bool {
864        self.poll_notified(None).is_ready()
865    }
866
867    /// A custom `project` implementation is used in place of `pin-project-lite`
868    /// as a custom drop implementation is needed.
869    fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter) {
870        unsafe {
871            // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
872
873            is_unpin::<&Notify>();
874            is_unpin::<State>();
875            is_unpin::<usize>();
876
877            let me = self.get_unchecked_mut();
878            (
879                me.notify,
880                &mut me.state,
881                &me.notify_waiters_calls,
882                &me.waiter,
883            )
884        }
885    }
886
887    fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
888        let (notify, state, notify_waiters_calls, waiter) = self.project();
889
890        'outer_loop: loop {
891            match *state {
892                State::Init => {
893                    let curr = notify.state.load(SeqCst);
894
895                    // Optimistically try acquiring a pending notification
896                    let res = notify.state.compare_exchange(
897                        set_state(curr, NOTIFIED),
898                        set_state(curr, EMPTY),
899                        SeqCst,
900                        SeqCst,
901                    );
902
903                    if res.is_ok() {
904                        // Acquired the notification
905                        *state = State::Done;
906                        continue 'outer_loop;
907                    }
908
909                    // Clone the waker before locking, a waker clone can be
910                    // triggering arbitrary code.
911                    let waker = waker.cloned();
912
913                    // Acquire the lock and attempt to transition to the waiting
914                    // state.
915                    let mut waiters = notify.waiters.lock();
916
917                    // Reload the state with the lock held
918                    let mut curr = notify.state.load(SeqCst);
919
920                    // if notify_waiters has been called after the future
921                    // was created, then we are done
922                    if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
923                        *state = State::Done;
924                        continue 'outer_loop;
925                    }
926
927                    // Transition the state to WAITING.
928                    loop {
929                        match get_state(curr) {
930                            EMPTY => {
931                                // Transition to WAITING
932                                let res = notify.state.compare_exchange(
933                                    set_state(curr, EMPTY),
934                                    set_state(curr, WAITING),
935                                    SeqCst,
936                                    SeqCst,
937                                );
938
939                                if let Err(actual) = res {
940                                    assert_eq!(get_state(actual), NOTIFIED);
941                                    curr = actual;
942                                } else {
943                                    break;
944                                }
945                            }
946                            WAITING => break,
947                            NOTIFIED => {
948                                // Try consuming the notification
949                                let res = notify.state.compare_exchange(
950                                    set_state(curr, NOTIFIED),
951                                    set_state(curr, EMPTY),
952                                    SeqCst,
953                                    SeqCst,
954                                );
955
956                                match res {
957                                    Ok(_) => {
958                                        // Acquired the notification
959                                        *state = State::Done;
960                                        continue 'outer_loop;
961                                    }
962                                    Err(actual) => {
963                                        assert_eq!(get_state(actual), EMPTY);
964                                        curr = actual;
965                                    }
966                                }
967                            }
968                            _ => unreachable!(),
969                        }
970                    }
971
972                    let mut old_waker = None;
973                    if waker.is_some() {
974                        // Safety: called while locked.
975                        //
976                        // The use of `old_waiter` here is not necessary, as the field is always
977                        // None when we reach this line.
978                        unsafe {
979                            old_waker =
980                                waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker));
981                        }
982                    }
983
984                    // Insert the waiter into the linked list
985                    waiters.push_front(NonNull::from(waiter));
986
987                    *state = State::Waiting;
988
989                    drop(waiters);
990                    drop(old_waker);
991
992                    return Poll::Pending;
993                }
994                State::Waiting => {
995                    #[cfg(tokio_taskdump)]
996                    if let Some(waker) = waker {
997                        let mut ctx = Context::from_waker(waker);
998                        ready!(crate::trace::trace_leaf(&mut ctx));
999                    }
1000
1001                    if waiter.notification.load(Acquire).is_some() {
1002                        // Safety: waiter is already unlinked and will not be shared again,
1003                        // so we have an exclusive access to `waker`.
1004                        drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
1005
1006                        waiter.notification.clear();
1007                        *state = State::Done;
1008                        return Poll::Ready(());
1009                    }
1010
1011                    // Our waiter was not notified, implying it is still stored in a waiter
1012                    // list (guarded by `notify.waiters`). In order to access the waker
1013                    // fields, we must acquire the lock.
1014
1015                    let mut old_waker = None;
1016                    let mut waiters = notify.waiters.lock();
1017
1018                    // We hold the lock and notifications are set only with the lock held,
1019                    // so this can be relaxed, because the happens-before relationship is
1020                    // established through the mutex.
1021                    if waiter.notification.load(Relaxed).is_some() {
1022                        // Safety: waiter is already unlinked and will not be shared again,
1023                        // so we have an exclusive access to `waker`.
1024                        old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1025
1026                        waiter.notification.clear();
1027
1028                        // Drop the old waker after releasing the lock.
1029                        drop(waiters);
1030                        drop(old_waker);
1031
1032                        *state = State::Done;
1033                        return Poll::Ready(());
1034                    }
1035
1036                    // Load the state with the lock held.
1037                    let curr = notify.state.load(SeqCst);
1038
1039                    if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1040                        // Before we add a waiter to the list we check if these numbers are
1041                        // different while holding the lock. If these numbers are different now,
1042                        // it means that there is a call to `notify_waiters` in progress and this
1043                        // waiter must be contained by a guarded list used in `notify_waiters`.
1044                        // We can treat the waiter as notified and remove it from the list, as
1045                        // it would have been notified in the `notify_waiters` call anyways.
1046
1047                        // Safety: we hold the lock, so we can modify the waker.
1048                        old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1049
1050                        // Safety: we hold the lock, so we have an exclusive access to the list.
1051                        // The list is used in `notify_waiters`, so it must be guarded.
1052                        unsafe { waiters.remove(NonNull::from(waiter)) };
1053
1054                        *state = State::Done;
1055                    } else {
1056                        // Safety: we hold the lock, so we can modify the waker.
1057                        unsafe {
1058                            waiter.waker.with_mut(|v| {
1059                                if let Some(waker) = waker {
1060                                    let should_update = match &*v {
1061                                        Some(current_waker) => !current_waker.will_wake(waker),
1062                                        None => true,
1063                                    };
1064                                    if should_update {
1065                                        old_waker = std::mem::replace(&mut *v, Some(waker.clone()));
1066                                    }
1067                                }
1068                            });
1069                        }
1070
1071                        // Drop the old waker after releasing the lock.
1072                        drop(waiters);
1073                        drop(old_waker);
1074
1075                        return Poll::Pending;
1076                    }
1077
1078                    // Explicit drop of the lock to indicate the scope that the
1079                    // lock is held. Because holding the lock is required to
1080                    // ensure safe access to fields not held within the lock, it
1081                    // is helpful to visualize the scope of the critical
1082                    // section.
1083                    drop(waiters);
1084
1085                    // Drop the old waker after releasing the lock.
1086                    drop(old_waker);
1087                }
1088                State::Done => {
1089                    #[cfg(tokio_taskdump)]
1090                    if let Some(waker) = waker {
1091                        let mut ctx = Context::from_waker(waker);
1092                        ready!(crate::trace::trace_leaf(&mut ctx));
1093                    }
1094                    return Poll::Ready(());
1095                }
1096            }
1097        }
1098    }
1099}
1100
1101impl Future for Notified<'_> {
1102    type Output = ();
1103
1104    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1105        self.poll_notified(Some(cx.waker()))
1106    }
1107}
1108
1109impl Drop for Notified<'_> {
1110    fn drop(&mut self) {
1111        // Safety: The type only transitions to a "Waiting" state when pinned.
1112        let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() };
1113
1114        // This is where we ensure safety. The `Notified` value is being
1115        // dropped, which means we must ensure that the waiter entry is no
1116        // longer stored in the linked list.
1117        if matches!(*state, State::Waiting) {
1118            let mut waiters = notify.waiters.lock();
1119            let mut notify_state = notify.state.load(SeqCst);
1120
1121            // We hold the lock, so this field is not concurrently accessed by
1122            // `notify_*` functions and we can use the relaxed ordering.
1123            let notification = waiter.notification.load(Relaxed);
1124
1125            // remove the entry from the list (if not already removed)
1126            //
1127            // Safety: we hold the lock, so we have an exclusive access to every list the
1128            // waiter may be contained in. If the node is not contained in the `waiters`
1129            // list, then it is contained by a guarded list used by `notify_waiters`.
1130            unsafe { waiters.remove(NonNull::from(waiter)) };
1131
1132            if waiters.is_empty() && get_state(notify_state) == WAITING {
1133                notify_state = set_state(notify_state, EMPTY);
1134                notify.state.store(notify_state, SeqCst);
1135            }
1136
1137            // See if the node was notified but not received. In this case, if
1138            // the notification was triggered via `notify_one`, it must be sent
1139            // to the next waiter.
1140            if notification == Some(Notification::One) {
1141                if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
1142                    drop(waiters);
1143                    waker.wake();
1144                }
1145            }
1146        }
1147    }
1148}
1149
1150/// # Safety
1151///
1152/// `Waiter` is forced to be !Unpin.
1153unsafe impl linked_list::Link for Waiter {
1154    type Handle = NonNull<Waiter>;
1155    type Target = Waiter;
1156
1157    fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1158        *handle
1159    }
1160
1161    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1162        ptr
1163    }
1164
1165    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1166        Waiter::addr_of_pointers(target)
1167    }
1168}
1169
1170fn is_unpin<T: Unpin>() {}