fedimint_core/db/
notifications.rs1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3
4use bitvec::vec::BitVec;
5use tokio::sync::futures::Notified;
6use tokio::sync::Notify;
7
8const NOTIFY_BUCKETS: usize = 32;
10
11#[derive(Debug)]
17pub struct Notifications {
18 buckets: Vec<Notify>,
19}
20
21impl Default for Notifications {
22 fn default() -> Self {
23 Self {
24 buckets: (0..NOTIFY_BUCKETS).map(|_| Notify::new()).collect(),
25 }
26 }
27}
28
29fn slot_index_for_hash(hash_value: u64) -> usize {
30 (hash_value % (NOTIFY_BUCKETS as u64)) as usize
31}
32
33fn slot_index_for_key<K: Hash>(key: K) -> usize {
34 let mut hasher = DefaultHasher::new();
35 key.hash(&mut hasher);
36 let hash_value = hasher.finish();
37 slot_index_for_hash(hash_value)
38}
39
40impl Notifications {
41 pub fn new() -> Self {
42 Self::default()
43 }
44
45 pub fn register<K>(&self, key: K) -> Notified
51 where
52 K: Hash,
53 {
54 self.buckets[slot_index_for_key(key)].notified()
55 }
56
57 pub fn notify<K>(&self, key: K)
61 where
62 K: Hash,
63 {
64 self.buckets[slot_index_for_key(key)].notify_waiters();
65 }
66
67 pub fn submit_queue(&self, queue: &NotifyQueue) {
69 for bucket in queue.buckets.iter_ones() {
70 self.buckets[bucket].notify_waiters();
71 }
72 }
73}
74
75#[derive(Debug)]
77pub struct NotifyQueue {
78 buckets: BitVec,
79}
80
81impl Default for NotifyQueue {
82 fn default() -> Self {
83 Self {
84 buckets: BitVec::repeat(false, NOTIFY_BUCKETS),
85 }
86 }
87}
88
89impl NotifyQueue {
90 pub fn new() -> Self {
91 Self::default()
92 }
93
94 pub fn add<K>(&mut self, key: &K)
95 where
96 K: Hash,
97 {
98 self.buckets.set(slot_index_for_key(key), true);
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use fedimint_core::db::test_utils::future_returns_shortly;
105
106 use super::*;
107
108 #[tokio::test]
109 async fn test_notification_after_notify() {
110 let notifs = Notifications::new();
111 let key = 1;
112 let sub = notifs.register(key);
113 notifs.notify(key);
114 assert!(future_returns_shortly(sub).await.is_some(), "should notify");
115 }
116
117 #[tokio::test]
118 async fn test_no_notification_without_notify() {
119 let notifs = Notifications::new();
120 let key = 1;
121 let sub = notifs.register(key);
122 assert!(
123 future_returns_shortly(sub).await.is_none(),
124 "should not notify"
125 );
126 }
127
128 #[tokio::test]
129 async fn test_multi_one() {
130 let notifs = Notifications::new();
131 let key1 = 1;
132 let sub1 = notifs.register(key1);
133 let sub2 = notifs.register(key1);
134 let sub3 = notifs.register(key1);
135 let sub4 = notifs.register(key1);
136 notifs.notify(key1);
137 assert!(
138 future_returns_shortly(sub1).await.is_some(),
139 "should notify"
140 );
141 assert!(
142 future_returns_shortly(sub2).await.is_some(),
143 "should notify"
144 );
145 assert!(
146 future_returns_shortly(sub3).await.is_some(),
147 "should notify"
148 );
149 assert!(
150 future_returns_shortly(sub4).await.is_some(),
151 "should notify"
152 );
153 }
154
155 #[tokio::test]
156 async fn test_multi() {
157 let notifs = Notifications::new();
158 let key1 = 1;
159 let key2 = 2;
160 let sub1 = notifs.register(key1);
161 let sub2 = notifs.register(key2);
162 notifs.notify(key1);
163 notifs.notify(key2);
164 assert!(
165 future_returns_shortly(sub1).await.is_some(),
166 "should notify"
167 );
168 assert!(
169 future_returns_shortly(sub2).await.is_some(),
170 "should notify"
171 );
172 }
173
174 #[tokio::test]
175 async fn test_notify_queue() {
176 let notifs = Notifications::new();
177 let key1 = 1;
178 let key2 = 2;
179 let sub1 = notifs.register(key1);
180 let sub2 = notifs.register(key2);
181 let mut queue = NotifyQueue::new();
182 queue.add(&key1);
183 queue.add(&key2);
184 notifs.submit_queue(&queue);
185 assert!(
186 future_returns_shortly(sub1).await.is_some(),
187 "should notify"
188 );
189 assert!(
190 future_returns_shortly(sub2).await.is_some(),
191 "should notify"
192 );
193 }
194}