embassy_sync/waitqueue/multi_waker.rs
1use core::task::Waker;
2
3use heapless::Vec;
4
5/// Utility struct to register and wake multiple wakers.
6pub struct MultiWakerRegistration<const N: usize> {
7 wakers: Vec<Waker, N>,
8}
9
10impl<const N: usize> MultiWakerRegistration<N> {
11 /// Create a new empty instance
12 pub const fn new() -> Self {
13 Self { wakers: Vec::new() }
14 }
15
16 /// Register a waker. If the buffer is full the function returns it in the error
17 pub fn register(&mut self, w: &Waker) {
18 // If we already have some waker that wakes the same task as `w`, do nothing.
19 // This avoids cloning wakers, and avoids unnecessary mass-wakes.
20 for w2 in &self.wakers {
21 if w.will_wake(w2) {
22 return;
23 }
24 }
25
26 if self.wakers.is_full() {
27 // All waker slots were full. It's a bit inefficient, but we can wake everything.
28 // Any future that is still active will simply reregister.
29 // This won't happen a lot, so it's ok.
30 self.wake();
31 }
32
33 if self.wakers.push(w.clone()).is_err() {
34 // This can't happen unless N=0
35 // (Either `wakers` wasn't full, or it was in which case `wake()` empied it)
36 panic!("tried to push a waker to a zero-length MultiWakerRegistration")
37 }
38 }
39
40 /// Wake all registered wakers. This clears the buffer
41 pub fn wake(&mut self) {
42 // heapless::Vec has no `drain()`, do it unsafely ourselves...
43
44 // First set length to 0, without dropping the contents.
45 // This is necessary for soundness: if wake() panics and we're using panic=unwind.
46 // Setting len=0 upfront ensures other code can't observe the vec in an inconsistent state.
47 // (it'll leak wakers, but that's not UB)
48 let len = self.wakers.len();
49 unsafe { self.wakers.set_len(0) }
50
51 for i in 0..len {
52 // Move a waker out of the vec.
53 let waker = unsafe { self.wakers.as_mut_ptr().add(i).read() };
54 // Wake it by value, which consumes (drops) it.
55 waker.wake();
56 }
57 }
58}