fedimint_core/db/
notifications.rs

1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3
4use bitvec::vec::BitVec;
5use tokio::sync::futures::Notified;
6use tokio::sync::Notify;
7
8/// Number of buckets used for `Notifications`.
9const NOTIFY_BUCKETS: usize = 32;
10
11/// The state of Notification.
12///
13/// This stores `NOTIFY_BUCKETS` number of `Notifies`.
14/// Each key is assigned a bucket based on its hash value.
15/// This will cause some false positives.
16#[derive(Debug)]
17pub struct Notifications {
18    buckets: Vec<Notify>,
19}
20
21impl Default for Notifications {
22    fn default() -> Self {
23        Self {
24            buckets: (0..NOTIFY_BUCKETS).map(|_| Notify::new()).collect(),
25        }
26    }
27}
28
29fn slot_index_for_hash(hash_value: u64) -> usize {
30    (hash_value % (NOTIFY_BUCKETS as u64)) as usize
31}
32
33fn slot_index_for_key<K: Hash>(key: K) -> usize {
34    let mut hasher = DefaultHasher::new();
35    key.hash(&mut hasher);
36    let hash_value = hasher.finish();
37    slot_index_for_hash(hash_value)
38}
39
40impl Notifications {
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    /// This registers for notification when called.
46    ///
47    /// Then waits for the notification when .awaited.
48    ///
49    /// NOTE: This may some false positives.
50    pub fn register<K>(&self, key: K) -> Notified
51    where
52        K: Hash,
53    {
54        self.buckets[slot_index_for_key(key)].notified()
55    }
56
57    /// Notify a key.
58    ///
59    /// All the waiters for this keys will be notified.
60    pub fn notify<K>(&self, key: K)
61    where
62        K: Hash,
63    {
64        self.buckets[slot_index_for_key(key)].notify_waiters();
65    }
66
67    /// Notifies the waiters about the notifications recorded in `NotifyQueue`.
68    pub fn submit_queue(&self, queue: &NotifyQueue) {
69        for bucket in queue.buckets.iter_ones() {
70            self.buckets[bucket].notify_waiters();
71        }
72    }
73}
74
75/// Save notifications to be sent after transaction is complete.
76#[derive(Debug)]
77pub struct NotifyQueue {
78    buckets: BitVec,
79}
80
81impl Default for NotifyQueue {
82    fn default() -> Self {
83        Self {
84            buckets: BitVec::repeat(false, NOTIFY_BUCKETS),
85        }
86    }
87}
88
89impl NotifyQueue {
90    pub fn new() -> Self {
91        Self::default()
92    }
93
94    pub fn add<K>(&mut self, key: &K)
95    where
96        K: Hash,
97    {
98        self.buckets.set(slot_index_for_key(key), true);
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use fedimint_core::db::test_utils::future_returns_shortly;
105
106    use super::*;
107
108    #[tokio::test]
109    async fn test_notification_after_notify() {
110        let notifs = Notifications::new();
111        let key = 1;
112        let sub = notifs.register(key);
113        notifs.notify(key);
114        assert!(future_returns_shortly(sub).await.is_some(), "should notify");
115    }
116
117    #[tokio::test]
118    async fn test_no_notification_without_notify() {
119        let notifs = Notifications::new();
120        let key = 1;
121        let sub = notifs.register(key);
122        assert!(
123            future_returns_shortly(sub).await.is_none(),
124            "should not notify"
125        );
126    }
127
128    #[tokio::test]
129    async fn test_multi_one() {
130        let notifs = Notifications::new();
131        let key1 = 1;
132        let sub1 = notifs.register(key1);
133        let sub2 = notifs.register(key1);
134        let sub3 = notifs.register(key1);
135        let sub4 = notifs.register(key1);
136        notifs.notify(key1);
137        assert!(
138            future_returns_shortly(sub1).await.is_some(),
139            "should notify"
140        );
141        assert!(
142            future_returns_shortly(sub2).await.is_some(),
143            "should notify"
144        );
145        assert!(
146            future_returns_shortly(sub3).await.is_some(),
147            "should notify"
148        );
149        assert!(
150            future_returns_shortly(sub4).await.is_some(),
151            "should notify"
152        );
153    }
154
155    #[tokio::test]
156    async fn test_multi() {
157        let notifs = Notifications::new();
158        let key1 = 1;
159        let key2 = 2;
160        let sub1 = notifs.register(key1);
161        let sub2 = notifs.register(key2);
162        notifs.notify(key1);
163        notifs.notify(key2);
164        assert!(
165            future_returns_shortly(sub1).await.is_some(),
166            "should notify"
167        );
168        assert!(
169            future_returns_shortly(sub2).await.is_some(),
170            "should notify"
171        );
172    }
173
174    #[tokio::test]
175    async fn test_notify_queue() {
176        let notifs = Notifications::new();
177        let key1 = 1;
178        let key2 = 2;
179        let sub1 = notifs.register(key1);
180        let sub2 = notifs.register(key2);
181        let mut queue = NotifyQueue::new();
182        queue.add(&key1);
183        queue.add(&key2);
184        notifs.submit_queue(&queue);
185        assert!(
186            future_returns_shortly(sub1).await.is_some(),
187            "should notify"
188        );
189        assert!(
190            future_returns_shortly(sub2).await.is_some(),
191            "should notify"
192        );
193    }
194}