compio_runtime/
event.rs

1//! Asynchronous events.
2
3use std::{
4    pin::Pin,
5    sync::{
6        Arc,
7        atomic::{AtomicBool, Ordering},
8    },
9    task::{Context, Poll},
10};
11
12use futures_util::{Future, task::AtomicWaker};
13
14#[derive(Debug)]
15struct Inner {
16    waker: AtomicWaker,
17    set: AtomicBool,
18}
19
20#[derive(Debug, Clone)]
21struct Flag(Arc<Inner>);
22
23impl Flag {
24    pub fn new() -> Self {
25        Self(Arc::new(Inner {
26            waker: AtomicWaker::new(),
27            set: AtomicBool::new(false),
28        }))
29    }
30
31    pub fn notify(&self) {
32        self.0.set.store(true, Ordering::Relaxed);
33        self.0.waker.wake();
34    }
35
36    pub fn notified(&self) -> bool {
37        self.0.set.load(Ordering::Relaxed)
38    }
39}
40
41impl Future for Flag {
42    type Output = ();
43
44    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
45        // quick check to avoid registration if already done.
46        if self.0.set.load(Ordering::Relaxed) {
47            return Poll::Ready(());
48        }
49
50        self.0.waker.register(cx.waker());
51
52        // Need to check condition **after** `register` to avoid a race
53        // condition that would result in lost notifications.
54        if self.0.set.load(Ordering::Relaxed) {
55            Poll::Ready(())
56        } else {
57            Poll::Pending
58        }
59    }
60}
61
62/// An event that won't wake until [`EventHandle::notify`] is called
63/// successfully.
64#[derive(Debug)]
65pub struct Event {
66    flag: Flag,
67}
68
69impl Default for Event {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75impl Event {
76    /// Create [`Event`].
77    pub fn new() -> Self {
78        Self { flag: Flag::new() }
79    }
80
81    /// Get a notify handle.
82    pub fn handle(&self) -> EventHandle {
83        EventHandle::new(self.flag.clone())
84    }
85
86    /// Get if the event has been notified.
87    pub fn notified(&self) -> bool {
88        self.flag.notified()
89    }
90
91    /// Wait for [`EventHandle::notify`] called.
92    pub async fn wait(self) {
93        self.flag.await
94    }
95}
96
97/// A wake up handle to [`Event`].
98pub struct EventHandle {
99    flag: Flag,
100}
101
102impl EventHandle {
103    fn new(flag: Flag) -> Self {
104        Self { flag }
105    }
106
107    /// Notify the event.
108    pub fn notify(self) {
109        self.flag.notify()
110    }
111}