embassy_sync/
watch.rs

1//! A synchronization primitive for passing the latest value to **multiple** receivers.
2
3use core::cell::RefCell;
4use core::future::{poll_fn, Future};
5use core::marker::PhantomData;
6use core::ops::{Deref, DerefMut};
7use core::task::{Context, Poll};
8
9use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex;
11use crate::waitqueue::MultiWakerRegistration;
12
13/// The `Watch` is a single-slot signaling primitive that allows multiple receivers to concurrently await
14/// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers,
15/// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous
16/// value when a new one is sent, without waiting for all receivers to read the previous value.
17///
18/// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks
19/// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are
20/// always provided with the latest value.
21///
22/// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`]
23/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`]
24/// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the
25/// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel.
26/// ```
27///
28/// use futures_executor::block_on;
29/// use embassy_sync::watch::Watch;
30/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
31///
32/// let f = async {
33///
34/// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
35///
36/// // Obtain receivers and sender
37/// let mut rcv0 = WATCH.receiver().unwrap();
38/// let mut rcv1 = WATCH.dyn_receiver().unwrap();
39/// let mut snd = WATCH.sender();
40///
41/// // No more receivers, and no update
42/// assert!(WATCH.receiver().is_none());
43/// assert_eq!(rcv1.try_changed(), None);
44///
45/// snd.send(10);
46///
47/// // Receive the new value (async or try)
48/// assert_eq!(rcv0.changed().await, 10);
49/// assert_eq!(rcv1.try_changed(), Some(10));
50///
51/// // No update
52/// assert_eq!(rcv0.try_changed(), None);
53/// assert_eq!(rcv1.try_changed(), None);
54///
55/// snd.send(20);
56///
57/// // Using `get` marks the value as seen
58/// assert_eq!(rcv1.get().await, 20);
59/// assert_eq!(rcv1.try_changed(), None);
60///
61/// // But `get` also returns when unchanged
62/// assert_eq!(rcv1.get().await, 20);
63/// assert_eq!(rcv1.get().await, 20);
64///
65/// };
66/// block_on(f);
67/// ```
68pub struct Watch<M: RawMutex, T: Clone, const N: usize> {
69    mutex: Mutex<M, RefCell<WatchState<T, N>>>,
70}
71
72struct WatchState<T: Clone, const N: usize> {
73    data: Option<T>,
74    current_id: u64,
75    wakers: MultiWakerRegistration<N>,
76    receiver_count: usize,
77}
78
79trait SealedWatchBehavior<T> {
80    /// Poll the `Watch` for the current value, making it as seen.
81    fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
82
83    /// Poll the `Watch` for the value if it matches the predicate function
84    /// `f`, making it as seen.
85    fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
86
87    /// Poll the `Watch` for a changed value, marking it as seen, if an id is given.
88    fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
89
90    /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen.
91    fn try_changed(&self, id: &mut u64) -> Option<T>;
92
93    /// Poll the `Watch` for a changed value that matches the predicate function
94    /// `f`, marking it as seen.
95    fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
96
97    /// Tries to retrieve the value of the `Watch` if it has changed and matches the
98    /// predicate function `f`, marking it as seen.
99    fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
100
101    /// Used when a receiver is dropped to decrement the receiver count.
102    ///
103    /// ## This method should not be called by the user.
104    fn drop_receiver(&self);
105
106    /// Clears the value of the `Watch`.
107    fn clear(&self);
108
109    /// Sends a new value to the `Watch`.
110    fn send(&self, val: T);
111
112    /// Modify the value of the `Watch` using a closure. Returns `false` if the
113    /// `Watch` does not already contain a value.
114    fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>));
115
116    /// Modify the value of the `Watch` using a closure. Returns `false` if the
117    /// `Watch` does not already contain a value.
118    fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool);
119}
120
121/// A trait representing the 'inner' behavior of the `Watch`.
122#[allow(private_bounds)]
123pub trait WatchBehavior<T: Clone>: SealedWatchBehavior<T> {
124    /// Tries to get the value of the `Watch`, marking it as seen, if an id is given.
125    fn try_get(&self, id: Option<&mut u64>) -> Option<T>;
126
127    /// Tries to get the value of the `Watch` if it matches the predicate function
128    /// `f`, marking it as seen.
129    fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
130
131    /// Checks if the `Watch` is been initialized with a value.
132    fn contains_value(&self) -> bool;
133}
134
135impl<M: RawMutex, T: Clone, const N: usize> SealedWatchBehavior<T> for Watch<M, T, N> {
136    fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
137        self.mutex.lock(|state| {
138            let mut s = state.borrow_mut();
139            match &s.data {
140                Some(data) => {
141                    *id = s.current_id;
142                    Poll::Ready(data.clone())
143                }
144                None => {
145                    s.wakers.register(cx.waker());
146                    Poll::Pending
147                }
148            }
149        })
150    }
151
152    fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
153        self.mutex.lock(|state| {
154            let mut s = state.borrow_mut();
155            match s.data {
156                Some(ref data) if f(data) => {
157                    *id = s.current_id;
158                    Poll::Ready(data.clone())
159                }
160                _ => {
161                    s.wakers.register(cx.waker());
162                    Poll::Pending
163                }
164            }
165        })
166    }
167
168    fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
169        self.mutex.lock(|state| {
170            let mut s = state.borrow_mut();
171            match (&s.data, s.current_id > *id) {
172                (Some(data), true) => {
173                    *id = s.current_id;
174                    Poll::Ready(data.clone())
175                }
176                _ => {
177                    s.wakers.register(cx.waker());
178                    Poll::Pending
179                }
180            }
181        })
182    }
183
184    fn try_changed(&self, id: &mut u64) -> Option<T> {
185        self.mutex.lock(|state| {
186            let s = state.borrow();
187            match s.current_id > *id {
188                true => {
189                    *id = s.current_id;
190                    s.data.clone()
191                }
192                false => None,
193            }
194        })
195    }
196
197    fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
198        self.mutex.lock(|state| {
199            let mut s = state.borrow_mut();
200            match (&s.data, s.current_id > *id) {
201                (Some(data), true) if f(data) => {
202                    *id = s.current_id;
203                    Poll::Ready(data.clone())
204                }
205                _ => {
206                    s.wakers.register(cx.waker());
207                    Poll::Pending
208                }
209            }
210        })
211    }
212
213    fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
214        self.mutex.lock(|state| {
215            let s = state.borrow();
216            match (&s.data, s.current_id > *id) {
217                (Some(data), true) if f(data) => {
218                    *id = s.current_id;
219                    s.data.clone()
220                }
221                _ => None,
222            }
223        })
224    }
225
226    fn drop_receiver(&self) {
227        self.mutex.lock(|state| {
228            let mut s = state.borrow_mut();
229            s.receiver_count -= 1;
230        })
231    }
232
233    fn clear(&self) {
234        self.mutex.lock(|state| {
235            let mut s = state.borrow_mut();
236            s.data = None;
237        })
238    }
239
240    fn send(&self, val: T) {
241        self.mutex.lock(|state| {
242            let mut s = state.borrow_mut();
243            s.data = Some(val);
244            s.current_id += 1;
245            s.wakers.wake();
246        })
247    }
248
249    fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
250        self.mutex.lock(|state| {
251            let mut s = state.borrow_mut();
252            f(&mut s.data);
253            s.current_id += 1;
254            s.wakers.wake();
255        })
256    }
257
258    fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) {
259        self.mutex.lock(|state| {
260            let mut s = state.borrow_mut();
261            if f(&mut s.data) {
262                s.current_id += 1;
263                s.wakers.wake();
264            }
265        })
266    }
267}
268
269impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
270    fn try_get(&self, id: Option<&mut u64>) -> Option<T> {
271        self.mutex.lock(|state| {
272            let s = state.borrow();
273            if let Some(id) = id {
274                *id = s.current_id;
275            }
276            s.data.clone()
277        })
278    }
279
280    fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
281        self.mutex.lock(|state| {
282            let s = state.borrow();
283            match s.data {
284                Some(ref data) if f(data) => {
285                    if let Some(id) = id {
286                        *id = s.current_id;
287                    }
288                    Some(data.clone())
289                }
290                _ => None,
291            }
292        })
293    }
294
295    fn contains_value(&self) -> bool {
296        self.mutex.lock(|state| state.borrow().data.is_some())
297    }
298}
299
300impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
301    /// Create a new `Watch` channel.
302    pub const fn new() -> Self {
303        Self {
304            mutex: Mutex::new(RefCell::new(WatchState {
305                data: None,
306                current_id: 0,
307                wakers: MultiWakerRegistration::new(),
308                receiver_count: 0,
309            })),
310        }
311    }
312
313    /// Create a new `Watch` channel with default data.
314    pub const fn new_with(data: T) -> Self {
315        Self {
316            mutex: Mutex::new(RefCell::new(WatchState {
317                data: Some(data),
318                current_id: 0,
319                wakers: MultiWakerRegistration::new(),
320                receiver_count: 0,
321            })),
322        }
323    }
324
325    /// Create a new [`Sender`] for the `Watch`.
326    pub fn sender(&self) -> Sender<'_, M, T, N> {
327        Sender(Snd::new(self))
328    }
329
330    /// Create a new [`DynSender`] for the `Watch`.
331    pub fn dyn_sender(&self) -> DynSender<'_, T> {
332        DynSender(Snd::new(self))
333    }
334
335    /// Try to create a new [`Receiver`] for the `Watch`. If the
336    /// maximum number of receivers has been reached, `None` is returned.
337    pub fn receiver(&self) -> Option<Receiver<'_, M, T, N>> {
338        self.mutex.lock(|state| {
339            let mut s = state.borrow_mut();
340            if s.receiver_count < N {
341                s.receiver_count += 1;
342                Some(Receiver(Rcv::new(self, 0)))
343            } else {
344                None
345            }
346        })
347    }
348
349    /// Try to create a new [`DynReceiver`] for the `Watch`. If the
350    /// maximum number of receivers has been reached, `None` is returned.
351    pub fn dyn_receiver(&self) -> Option<DynReceiver<'_, T>> {
352        self.mutex.lock(|state| {
353            let mut s = state.borrow_mut();
354            if s.receiver_count < N {
355                s.receiver_count += 1;
356                Some(DynReceiver(Rcv::new(self, 0)))
357            } else {
358                None
359            }
360        })
361    }
362
363    /// Try to create a new [`AnonReceiver`] for the `Watch`.
364    pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> {
365        AnonReceiver(AnonRcv::new(self, 0))
366    }
367
368    /// Try to create a new [`DynAnonReceiver`] for the `Watch`.
369    pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> {
370        DynAnonReceiver(AnonRcv::new(self, 0))
371    }
372
373    /// Returns the message ID of the latest message sent to the `Watch`.
374    ///
375    /// This counter is monotonic, and is incremented every time a new message is sent.
376    pub fn get_msg_id(&self) -> u64 {
377        self.mutex.lock(|state| state.borrow().current_id)
378    }
379
380    /// Tries to get the value of the `Watch`.
381    pub fn try_get(&self) -> Option<T> {
382        WatchBehavior::try_get(self, None)
383    }
384
385    /// Tries to get the value of the `Watch` if it matches the predicate function `f`.
386    pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
387    where
388        F: Fn(&T) -> bool,
389    {
390        WatchBehavior::try_get_and(self, None, &mut f)
391    }
392}
393
394/// A receiver can `.await` a change in the `Watch` value.
395pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
396    watch: &'a W,
397    _phantom: PhantomData<T>,
398}
399
400impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> {
401    fn clone(&self) -> Self {
402        Self {
403            watch: self.watch,
404            _phantom: PhantomData,
405        }
406    }
407}
408
409impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
410    /// Creates a new `Receiver` with a reference to the `Watch`.
411    fn new(watch: &'a W) -> Self {
412        Self {
413            watch,
414            _phantom: PhantomData,
415        }
416    }
417
418    /// Sends a new value to the `Watch`.
419    pub fn send(&self, val: T) {
420        self.watch.send(val)
421    }
422
423    /// Clears the value of the `Watch`.
424    /// This will cause calls to [`Rcv::get`] to be pending.
425    pub fn clear(&self) {
426        self.watch.clear()
427    }
428
429    /// Tries to retrieve the value of the `Watch`.
430    pub fn try_get(&self) -> Option<T> {
431        self.watch.try_get(None)
432    }
433
434    /// Tries to peek the current value of the `Watch` if it matches the predicate
435    /// function `f`.
436    pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
437    where
438        F: Fn(&T) -> bool,
439    {
440        self.watch.try_get_and(None, &mut f)
441    }
442
443    /// Returns true if the `Watch` contains a value.
444    pub fn contains_value(&self) -> bool {
445        self.watch.contains_value()
446    }
447
448    /// Modify the value of the `Watch` using a closure.
449    pub fn send_modify<F>(&self, mut f: F)
450    where
451        F: Fn(&mut Option<T>),
452    {
453        self.watch.send_modify(&mut f)
454    }
455
456    /// Modify the value of the `Watch` using a closure. The closure must return
457    /// `true` if the value was modified, which notifies all receivers.
458    pub fn send_if_modified<F>(&self, mut f: F)
459    where
460        F: Fn(&mut Option<T>) -> bool,
461    {
462        self.watch.send_if_modified(&mut f)
463    }
464}
465
466/// A sender of a `Watch` channel.
467///
468/// For a simpler type definition, consider [`DynSender`] at the expense of
469/// some runtime performance due to dynamic dispatch.
470pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>);
471
472impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> {
473    fn clone(&self) -> Self {
474        Self(self.0.clone())
475    }
476}
477
478impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> {
479    /// Converts the `Sender` into a [`DynSender`].
480    pub fn as_dyn(self) -> DynSender<'a, T> {
481        DynSender(Snd::new(self.watch))
482    }
483}
484
485impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> {
486    fn into(self) -> DynSender<'a, T> {
487        self.as_dyn()
488    }
489}
490
491impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> {
492    type Target = Snd<'a, T, Watch<M, T, N>>;
493
494    fn deref(&self) -> &Self::Target {
495        &self.0
496    }
497}
498
499impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> {
500    fn deref_mut(&mut self) -> &mut Self::Target {
501        &mut self.0
502    }
503}
504
505/// A sender which holds a **dynamic** reference to a `Watch` channel.
506///
507/// This is an alternative to [`Sender`] with a simpler type definition,
508pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>);
509
510impl<'a, T: Clone> Clone for DynSender<'a, T> {
511    fn clone(&self) -> Self {
512        Self(self.0.clone())
513    }
514}
515
516impl<'a, T: Clone> Deref for DynSender<'a, T> {
517    type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>;
518
519    fn deref(&self) -> &Self::Target {
520        &self.0
521    }
522}
523
524impl<'a, T: Clone> DerefMut for DynSender<'a, T> {
525    fn deref_mut(&mut self) -> &mut Self::Target {
526        &mut self.0
527    }
528}
529
530/// A receiver can `.await` a change in the `Watch` value.
531pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
532    watch: &'a W,
533    at_id: u64,
534    _phantom: PhantomData<T>,
535}
536
537impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
538    /// Creates a new `Receiver` with a reference to the `Watch`.
539    fn new(watch: &'a W, at_id: u64) -> Self {
540        Self {
541            watch,
542            at_id,
543            _phantom: PhantomData,
544        }
545    }
546
547    /// Returns the current value of the `Watch` once it is initialized, marking it as seen.
548    ///
549    /// **Note**: Futures do nothing unless you `.await` or poll them.
550    pub fn get(&mut self) -> impl Future<Output = T> + '_ {
551        poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx))
552    }
553
554    /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
555    pub fn try_get(&mut self) -> Option<T> {
556        self.watch.try_get(Some(&mut self.at_id))
557    }
558
559    /// Returns the value of the `Watch` if it matches the predicate function `f`,
560    /// or waits for it to match, marking it as seen.
561    ///
562    /// **Note**: Futures do nothing unless you `.await` or poll them.
563    pub async fn get_and<F>(&mut self, mut f: F) -> T
564    where
565        F: Fn(&T) -> bool,
566    {
567        poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await
568    }
569
570    /// Tries to get the current value of the `Watch` if it matches the predicate
571    /// function `f` without waiting, marking it as seen.
572    pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
573    where
574        F: Fn(&T) -> bool,
575    {
576        self.watch.try_get_and(Some(&mut self.at_id), &mut f)
577    }
578
579    /// Waits for the `Watch` to change and returns the new value, marking it as seen.
580    ///
581    /// **Note**: Futures do nothing unless you `.await` or poll them.
582    pub async fn changed(&mut self) -> T {
583        poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await
584    }
585
586    /// Tries to get the new value of the watch without waiting, marking it as seen.
587    pub fn try_changed(&mut self) -> Option<T> {
588        self.watch.try_changed(&mut self.at_id)
589    }
590
591    /// Waits for the `Watch` to change to a value which satisfies the predicate
592    /// function `f` and returns the new value, marking it as seen.
593    ///
594    /// **Note**: Futures do nothing unless you `.await` or poll them.
595    pub async fn changed_and<F>(&mut self, mut f: F) -> T
596    where
597        F: Fn(&T) -> bool,
598    {
599        poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await
600    }
601
602    /// Tries to get the new value of the watch which satisfies the predicate
603    /// function `f` and returns the new value without waiting, marking it as seen.
604    pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
605    where
606        F: Fn(&T) -> bool,
607    {
608        self.watch.try_changed_and(&mut self.at_id, &mut f)
609    }
610
611    /// Checks if the `Watch` contains a value. If this returns true,
612    /// then awaiting [`Rcv::get`] will return immediately.
613    pub fn contains_value(&self) -> bool {
614        self.watch.contains_value()
615    }
616}
617
618impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
619    fn drop(&mut self) {
620        self.watch.drop_receiver();
621    }
622}
623
624/// A anonymous receiver can NOT `.await` a change in the `Watch` value.
625pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
626    watch: &'a W,
627    at_id: u64,
628    _phantom: PhantomData<T>,
629}
630
631impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> {
632    /// Creates a new `Receiver` with a reference to the `Watch`.
633    fn new(watch: &'a W, at_id: u64) -> Self {
634        Self {
635            watch,
636            at_id,
637            _phantom: PhantomData,
638        }
639    }
640
641    /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
642    pub fn try_get(&mut self) -> Option<T> {
643        self.watch.try_get(Some(&mut self.at_id))
644    }
645
646    /// Tries to get the current value of the `Watch` if it matches the predicate
647    /// function `f` without waiting, marking it as seen.
648    pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
649    where
650        F: Fn(&T) -> bool,
651    {
652        self.watch.try_get_and(Some(&mut self.at_id), &mut f)
653    }
654
655    /// Tries to get the new value of the watch without waiting, marking it as seen.
656    pub fn try_changed(&mut self) -> Option<T> {
657        self.watch.try_changed(&mut self.at_id)
658    }
659
660    /// Tries to get the new value of the watch which satisfies the predicate
661    /// function `f` and returns the new value without waiting, marking it as seen.
662    pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
663    where
664        F: Fn(&T) -> bool,
665    {
666        self.watch.try_changed_and(&mut self.at_id, &mut f)
667    }
668
669    /// Checks if the `Watch` contains a value. If this returns true,
670    /// then awaiting [`Rcv::get`] will return immediately.
671    pub fn contains_value(&self) -> bool {
672        self.watch.contains_value()
673    }
674}
675
676/// A receiver of a `Watch` channel.
677pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
678
679impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> {
680    /// Converts the `Receiver` into a [`DynReceiver`].
681    pub fn as_dyn(self) -> DynReceiver<'a, T> {
682        let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id));
683        core::mem::forget(self); // Ensures the destructor is not called
684        rcv
685    }
686}
687
688impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> {
689    fn into(self) -> DynReceiver<'a, T> {
690        self.as_dyn()
691    }
692}
693
694impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
695    type Target = Rcv<'a, T, Watch<M, T, N>>;
696
697    fn deref(&self) -> &Self::Target {
698        &self.0
699    }
700}
701
702impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> {
703    fn deref_mut(&mut self) -> &mut Self::Target {
704        &mut self.0
705    }
706}
707
708/// A receiver which holds a **dynamic** reference to a `Watch` channel.
709///
710/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of
711/// some runtime performance due to dynamic dispatch.
712pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
713
714impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
715    type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
716
717    fn deref(&self) -> &Self::Target {
718        &self.0
719    }
720}
721
722impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
723    fn deref_mut(&mut self) -> &mut Self::Target {
724        &mut self.0
725    }
726}
727
728/// A receiver of a `Watch` channel that cannot `.await` values.
729pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>);
730
731impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> {
732    /// Converts the `Receiver` into a [`DynReceiver`].
733    pub fn as_dyn(self) -> DynAnonReceiver<'a, T> {
734        let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id));
735        core::mem::forget(self); // Ensures the destructor is not called
736        rcv
737    }
738}
739
740impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> {
741    fn into(self) -> DynAnonReceiver<'a, T> {
742        self.as_dyn()
743    }
744}
745
746impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> {
747    type Target = AnonRcv<'a, T, Watch<M, T, N>>;
748
749    fn deref(&self) -> &Self::Target {
750        &self.0
751    }
752}
753
754impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> {
755    fn deref_mut(&mut self) -> &mut Self::Target {
756        &mut self.0
757    }
758}
759
760/// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel.
761///
762/// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of
763/// some runtime performance due to dynamic dispatch.
764pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>);
765
766impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> {
767    type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>;
768
769    fn deref(&self) -> &Self::Target {
770        &self.0
771    }
772}
773
774impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> {
775    fn deref_mut(&mut self) -> &mut Self::Target {
776        &mut self.0
777    }
778}
779
780#[cfg(test)]
781mod tests {
782    use futures_executor::block_on;
783
784    use super::Watch;
785    use crate::blocking_mutex::raw::CriticalSectionRawMutex;
786
787    #[test]
788    fn multiple_sends() {
789        let f = async {
790            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
791
792            // Obtain receiver and sender
793            let mut rcv = WATCH.receiver().unwrap();
794            let snd = WATCH.sender();
795
796            // Not initialized
797            assert_eq!(rcv.try_changed(), None);
798
799            // Receive the new value
800            snd.send(10);
801            assert_eq!(rcv.changed().await, 10);
802
803            // Receive another value
804            snd.send(20);
805            assert_eq!(rcv.try_changed(), Some(20));
806
807            // No update
808            assert_eq!(rcv.try_changed(), None);
809        };
810        block_on(f);
811    }
812
813    #[test]
814    fn all_try_get() {
815        let f = async {
816            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
817
818            // Obtain receiver and sender
819            let mut rcv = WATCH.receiver().unwrap();
820            let snd = WATCH.sender();
821
822            // Not initialized
823            assert_eq!(WATCH.try_get(), None);
824            assert_eq!(rcv.try_get(), None);
825            assert_eq!(snd.try_get(), None);
826
827            // Receive the new value
828            snd.send(10);
829            assert_eq!(WATCH.try_get(), Some(10));
830            assert_eq!(rcv.try_get(), Some(10));
831            assert_eq!(snd.try_get(), Some(10));
832
833            assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10));
834            assert_eq!(rcv.try_get_and(|x| x > &5), Some(10));
835            assert_eq!(snd.try_get_and(|x| x > &5), Some(10));
836
837            assert_eq!(WATCH.try_get_and(|x| x < &5), None);
838            assert_eq!(rcv.try_get_and(|x| x < &5), None);
839            assert_eq!(snd.try_get_and(|x| x < &5), None);
840        };
841        block_on(f);
842    }
843
844    #[test]
845    fn once_lock_like() {
846        let f = async {
847            static CONFIG0: u8 = 10;
848            static CONFIG1: u8 = 20;
849
850            static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new();
851
852            // Obtain receiver and sender
853            let mut rcv = WATCH.receiver().unwrap();
854            let snd = WATCH.sender();
855
856            // Not initialized
857            assert_eq!(rcv.try_changed(), None);
858
859            // Receive the new value
860            snd.send(&CONFIG0);
861            let rcv0 = rcv.changed().await;
862            assert_eq!(rcv0, &10);
863
864            // Receive another value
865            snd.send(&CONFIG1);
866            let rcv1 = rcv.try_changed();
867            assert_eq!(rcv1, Some(&20));
868
869            // No update
870            assert_eq!(rcv.try_changed(), None);
871
872            // Ensure similarity with original static
873            assert_eq!(rcv0, &CONFIG0);
874            assert_eq!(rcv1, Some(&CONFIG1));
875        };
876        block_on(f);
877    }
878
879    #[test]
880    fn sender_modify() {
881        let f = async {
882            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
883
884            // Obtain receiver and sender
885            let mut rcv = WATCH.receiver().unwrap();
886            let snd = WATCH.sender();
887
888            // Receive the new value
889            snd.send(10);
890            assert_eq!(rcv.try_changed(), Some(10));
891
892            // Modify the value inplace
893            snd.send_modify(|opt| {
894                if let Some(inner) = opt {
895                    *inner += 5;
896                }
897            });
898
899            // Get the modified value
900            assert_eq!(rcv.try_changed(), Some(15));
901            assert_eq!(rcv.try_changed(), None);
902        };
903        block_on(f);
904    }
905
906    #[test]
907    fn predicate_fn() {
908        let f = async {
909            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
910
911            // Obtain receiver and sender
912            let mut rcv = WATCH.receiver().unwrap();
913            let snd = WATCH.sender();
914
915            snd.send(15);
916            assert_eq!(rcv.try_get_and(|x| x > &5), Some(15));
917            assert_eq!(rcv.try_get_and(|x| x < &5), None);
918            assert!(rcv.try_changed().is_none());
919
920            snd.send(20);
921            assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20));
922            assert_eq!(rcv.try_changed_and(|x| x > &5), None);
923
924            snd.send(25);
925            assert_eq!(rcv.try_changed_and(|x| x < &5), None);
926            assert_eq!(rcv.try_changed(), Some(25));
927
928            snd.send(30);
929            assert_eq!(rcv.changed_and(|x| x > &5).await, 30);
930            assert_eq!(rcv.get_and(|x| x > &5).await, 30);
931        };
932        block_on(f);
933    }
934
935    #[test]
936    fn receive_after_create() {
937        let f = async {
938            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
939
940            // Obtain sender and send value
941            let snd = WATCH.sender();
942            snd.send(10);
943
944            // Obtain receiver and receive value
945            let mut rcv = WATCH.receiver().unwrap();
946            assert_eq!(rcv.try_changed(), Some(10));
947        };
948        block_on(f);
949    }
950
951    #[test]
952    fn max_receivers_drop() {
953        let f = async {
954            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
955
956            // Try to create 3 receivers (only 2 can exist at once)
957            let rcv0 = WATCH.receiver();
958            let rcv1 = WATCH.receiver();
959            let rcv2 = WATCH.receiver();
960
961            // Ensure the first two are successful and the third is not
962            assert!(rcv0.is_some());
963            assert!(rcv1.is_some());
964            assert!(rcv2.is_none());
965
966            // Drop the first receiver
967            drop(rcv0);
968
969            // Create another receiver and ensure it is successful
970            let rcv3 = WATCH.receiver();
971            assert!(rcv3.is_some());
972        };
973        block_on(f);
974    }
975
976    #[test]
977    fn multiple_receivers() {
978        let f = async {
979            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
980
981            // Obtain receivers and sender
982            let mut rcv0 = WATCH.receiver().unwrap();
983            let mut rcv1 = WATCH.anon_receiver();
984            let snd = WATCH.sender();
985
986            // No update for both
987            assert_eq!(rcv0.try_changed(), None);
988            assert_eq!(rcv1.try_changed(), None);
989
990            // Send a new value
991            snd.send(0);
992
993            // Both receivers receive the new value
994            assert_eq!(rcv0.try_changed(), Some(0));
995            assert_eq!(rcv1.try_changed(), Some(0));
996        };
997        block_on(f);
998    }
999
1000    #[test]
1001    fn clone_senders() {
1002        let f = async {
1003            // Obtain different ways to send
1004            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
1005            let snd0 = WATCH.sender();
1006            let snd1 = snd0.clone();
1007
1008            // Obtain Receiver
1009            let mut rcv = WATCH.receiver().unwrap().as_dyn();
1010
1011            // Send a value from first sender
1012            snd0.send(10);
1013            assert_eq!(rcv.try_changed(), Some(10));
1014
1015            // Send a value from second sender
1016            snd1.send(20);
1017            assert_eq!(rcv.try_changed(), Some(20));
1018        };
1019        block_on(f);
1020    }
1021
1022    #[test]
1023    fn use_dynamics() {
1024        let f = async {
1025            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1026
1027            // Obtain receiver and sender
1028            let mut anon_rcv = WATCH.dyn_anon_receiver();
1029            let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
1030            let dyn_snd = WATCH.dyn_sender();
1031
1032            // Send a value
1033            dyn_snd.send(10);
1034
1035            // Ensure the dynamic receiver receives the value
1036            assert_eq!(anon_rcv.try_changed(), Some(10));
1037            assert_eq!(dyn_rcv.try_changed(), Some(10));
1038            assert_eq!(dyn_rcv.try_changed(), None);
1039        };
1040        block_on(f);
1041    }
1042
1043    #[test]
1044    fn convert_to_dyn() {
1045        let f = async {
1046            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1047
1048            // Obtain receiver and sender
1049            let anon_rcv = WATCH.anon_receiver();
1050            let rcv = WATCH.receiver().unwrap();
1051            let snd = WATCH.sender();
1052
1053            // Convert to dynamic
1054            let mut dyn_anon_rcv = anon_rcv.as_dyn();
1055            let mut dyn_rcv = rcv.as_dyn();
1056            let dyn_snd = snd.as_dyn();
1057
1058            // Send a value
1059            dyn_snd.send(10);
1060
1061            // Ensure the dynamic receiver receives the value
1062            assert_eq!(dyn_anon_rcv.try_changed(), Some(10));
1063            assert_eq!(dyn_rcv.try_changed(), Some(10));
1064            assert_eq!(dyn_rcv.try_changed(), None);
1065        };
1066        block_on(f);
1067    }
1068
1069    #[test]
1070    fn dynamic_receiver_count() {
1071        let f = async {
1072            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1073
1074            // Obtain receiver and sender
1075            let rcv0 = WATCH.receiver();
1076            let rcv1 = WATCH.receiver();
1077            let rcv2 = WATCH.receiver();
1078
1079            // Ensure the first two are successful and the third is not
1080            assert!(rcv0.is_some());
1081            assert!(rcv1.is_some());
1082            assert!(rcv2.is_none());
1083
1084            // Convert to dynamic
1085            let dyn_rcv0 = rcv0.unwrap().as_dyn();
1086
1087            // Drop the (now dynamic) receiver
1088            drop(dyn_rcv0);
1089
1090            // Create another receiver and ensure it is successful
1091            let rcv3 = WATCH.receiver();
1092            let rcv4 = WATCH.receiver();
1093            assert!(rcv3.is_some());
1094            assert!(rcv4.is_none());
1095        };
1096        block_on(f);
1097    }
1098
1099    #[test]
1100    fn contains_value() {
1101        let f = async {
1102            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1103
1104            // Obtain receiver and sender
1105            let rcv = WATCH.receiver().unwrap();
1106            let snd = WATCH.sender();
1107
1108            // check if the watch contains a value
1109            assert_eq!(rcv.contains_value(), false);
1110            assert_eq!(snd.contains_value(), false);
1111
1112            // Send a value
1113            snd.send(10);
1114
1115            // check if the watch contains a value
1116            assert_eq!(rcv.contains_value(), true);
1117            assert_eq!(snd.contains_value(), true);
1118        };
1119        block_on(f);
1120    }
1121}