solana_accounts_db/
waitable_condvar.rs

1use std::{
2    sync::{Condvar, Mutex},
3    time::Duration,
4};
5
6// encapsulate complications of unneeded mutex and Condvar to give us event behavior of wait and notify
7// this will likely be wrapped in an arc somehow
8#[derive(Default, Debug)]
9pub struct WaitableCondvar {
10    pub mutex: Mutex<()>,
11    pub event: Condvar,
12}
13
14impl WaitableCondvar {
15    /// wake up all threads waiting on this event
16    pub fn notify_all(&self) {
17        self.event.notify_all();
18    }
19    /// wake up one thread waiting on this event
20    pub fn notify_one(&self) {
21        self.event.notify_one();
22    }
23    /// wait on the event
24    /// return true if timed out, false if event triggered
25    pub fn wait_timeout(&self, timeout: Duration) -> bool {
26        let lock = self.mutex.lock().unwrap();
27        let res = self.event.wait_timeout(lock, timeout).unwrap();
28        if res.1.timed_out() {
29            return true;
30        }
31        false
32    }
33}
34
35#[cfg(test)]
36pub mod tests {
37    use {
38        super::*,
39        std::{
40            sync::{
41                atomic::{AtomicBool, Ordering},
42                Arc,
43            },
44            thread::Builder,
45        },
46    };
47    #[ignore]
48    #[test]
49    fn test_waitable_condvar() {
50        let data = Arc::new(AtomicBool::new(false));
51        let data_ = data.clone();
52        let cv = Arc::new(WaitableCondvar::default());
53        let cv2 = Arc::new(WaitableCondvar::default());
54        let cv_ = cv.clone();
55        let cv2_ = cv2.clone();
56        let cv2__ = cv2.clone();
57        // several passes to check re-notification and drop one of the
58        let passes = 3;
59        let handle = Builder::new().spawn(move || {
60            for _pass in 0..passes {
61                let mut notified = false;
62                while cv2_.wait_timeout(Duration::from_millis(1)) {
63                    if !notified && data_.load(Ordering::Relaxed) {
64                        notified = true;
65                        cv_.notify_all();
66                    }
67                }
68                assert!(data_.swap(false, Ordering::Relaxed));
69            }
70        });
71        // just wait, but 1 less pass - verifies that notify_all works with multiple and with 1
72        let handle2 = Builder::new().spawn(move || {
73            for _pass in 0..(passes - 1) {
74                assert!(!cv2__.wait_timeout(Duration::from_millis(10000))); // long enough to not be intermittent, short enough to fail if we really don't get notified
75            }
76        });
77        for _pass in 0..passes {
78            assert!(cv.wait_timeout(Duration::from_millis(1)));
79            assert!(!data.swap(true, Ordering::Relaxed));
80            assert!(!cv.wait_timeout(Duration::from_millis(10000))); // should barely wait, but don't want intermittent
81            cv2.notify_all();
82        }
83        assert!(handle.unwrap().join().is_ok());
84        assert!(handle2.unwrap().join().is_ok());
85    }
86}