1use core::cell::RefCell;
4use core::future::{poll_fn, Future};
5use core::marker::PhantomData;
6use core::ops::{Deref, DerefMut};
7use core::task::{Context, Poll};
8
9use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex;
11use crate::waitqueue::MultiWakerRegistration;
12
13pub struct Watch<M: RawMutex, T: Clone, const N: usize> {
69 mutex: Mutex<M, RefCell<WatchState<T, N>>>,
70}
71
72struct WatchState<T: Clone, const N: usize> {
73 data: Option<T>,
74 current_id: u64,
75 wakers: MultiWakerRegistration<N>,
76 receiver_count: usize,
77}
78
79trait SealedWatchBehavior<T> {
80 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
82
83 fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
86
87 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
89
90 fn try_changed(&self, id: &mut u64) -> Option<T>;
92
93 fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
96
97 fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
100
101 fn drop_receiver(&self);
105
106 fn clear(&self);
108
109 fn send(&self, val: T);
111
112 fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>));
115
116 fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool);
119}
120
121#[allow(private_bounds)]
123pub trait WatchBehavior<T: Clone>: SealedWatchBehavior<T> {
124 fn try_get(&self, id: Option<&mut u64>) -> Option<T>;
126
127 fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
130
131 fn contains_value(&self) -> bool;
133}
134
135impl<M: RawMutex, T: Clone, const N: usize> SealedWatchBehavior<T> for Watch<M, T, N> {
136 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
137 self.mutex.lock(|state| {
138 let mut s = state.borrow_mut();
139 match &s.data {
140 Some(data) => {
141 *id = s.current_id;
142 Poll::Ready(data.clone())
143 }
144 None => {
145 s.wakers.register(cx.waker());
146 Poll::Pending
147 }
148 }
149 })
150 }
151
152 fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
153 self.mutex.lock(|state| {
154 let mut s = state.borrow_mut();
155 match s.data {
156 Some(ref data) if f(data) => {
157 *id = s.current_id;
158 Poll::Ready(data.clone())
159 }
160 _ => {
161 s.wakers.register(cx.waker());
162 Poll::Pending
163 }
164 }
165 })
166 }
167
168 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
169 self.mutex.lock(|state| {
170 let mut s = state.borrow_mut();
171 match (&s.data, s.current_id > *id) {
172 (Some(data), true) => {
173 *id = s.current_id;
174 Poll::Ready(data.clone())
175 }
176 _ => {
177 s.wakers.register(cx.waker());
178 Poll::Pending
179 }
180 }
181 })
182 }
183
184 fn try_changed(&self, id: &mut u64) -> Option<T> {
185 self.mutex.lock(|state| {
186 let s = state.borrow();
187 match s.current_id > *id {
188 true => {
189 *id = s.current_id;
190 s.data.clone()
191 }
192 false => None,
193 }
194 })
195 }
196
197 fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
198 self.mutex.lock(|state| {
199 let mut s = state.borrow_mut();
200 match (&s.data, s.current_id > *id) {
201 (Some(data), true) if f(data) => {
202 *id = s.current_id;
203 Poll::Ready(data.clone())
204 }
205 _ => {
206 s.wakers.register(cx.waker());
207 Poll::Pending
208 }
209 }
210 })
211 }
212
213 fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
214 self.mutex.lock(|state| {
215 let s = state.borrow();
216 match (&s.data, s.current_id > *id) {
217 (Some(data), true) if f(data) => {
218 *id = s.current_id;
219 s.data.clone()
220 }
221 _ => None,
222 }
223 })
224 }
225
226 fn drop_receiver(&self) {
227 self.mutex.lock(|state| {
228 let mut s = state.borrow_mut();
229 s.receiver_count -= 1;
230 })
231 }
232
233 fn clear(&self) {
234 self.mutex.lock(|state| {
235 let mut s = state.borrow_mut();
236 s.data = None;
237 })
238 }
239
240 fn send(&self, val: T) {
241 self.mutex.lock(|state| {
242 let mut s = state.borrow_mut();
243 s.data = Some(val);
244 s.current_id += 1;
245 s.wakers.wake();
246 })
247 }
248
249 fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
250 self.mutex.lock(|state| {
251 let mut s = state.borrow_mut();
252 f(&mut s.data);
253 s.current_id += 1;
254 s.wakers.wake();
255 })
256 }
257
258 fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) {
259 self.mutex.lock(|state| {
260 let mut s = state.borrow_mut();
261 if f(&mut s.data) {
262 s.current_id += 1;
263 s.wakers.wake();
264 }
265 })
266 }
267}
268
269impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
270 fn try_get(&self, id: Option<&mut u64>) -> Option<T> {
271 self.mutex.lock(|state| {
272 let s = state.borrow();
273 if let Some(id) = id {
274 *id = s.current_id;
275 }
276 s.data.clone()
277 })
278 }
279
280 fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
281 self.mutex.lock(|state| {
282 let s = state.borrow();
283 match s.data {
284 Some(ref data) if f(data) => {
285 if let Some(id) = id {
286 *id = s.current_id;
287 }
288 Some(data.clone())
289 }
290 _ => None,
291 }
292 })
293 }
294
295 fn contains_value(&self) -> bool {
296 self.mutex.lock(|state| state.borrow().data.is_some())
297 }
298}
299
300impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
301 pub const fn new() -> Self {
303 Self {
304 mutex: Mutex::new(RefCell::new(WatchState {
305 data: None,
306 current_id: 0,
307 wakers: MultiWakerRegistration::new(),
308 receiver_count: 0,
309 })),
310 }
311 }
312
313 pub const fn new_with(data: T) -> Self {
315 Self {
316 mutex: Mutex::new(RefCell::new(WatchState {
317 data: Some(data),
318 current_id: 0,
319 wakers: MultiWakerRegistration::new(),
320 receiver_count: 0,
321 })),
322 }
323 }
324
325 pub fn sender(&self) -> Sender<'_, M, T, N> {
327 Sender(Snd::new(self))
328 }
329
330 pub fn dyn_sender(&self) -> DynSender<'_, T> {
332 DynSender(Snd::new(self))
333 }
334
335 pub fn receiver(&self) -> Option<Receiver<'_, M, T, N>> {
338 self.mutex.lock(|state| {
339 let mut s = state.borrow_mut();
340 if s.receiver_count < N {
341 s.receiver_count += 1;
342 Some(Receiver(Rcv::new(self, 0)))
343 } else {
344 None
345 }
346 })
347 }
348
349 pub fn dyn_receiver(&self) -> Option<DynReceiver<'_, T>> {
352 self.mutex.lock(|state| {
353 let mut s = state.borrow_mut();
354 if s.receiver_count < N {
355 s.receiver_count += 1;
356 Some(DynReceiver(Rcv::new(self, 0)))
357 } else {
358 None
359 }
360 })
361 }
362
363 pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> {
365 AnonReceiver(AnonRcv::new(self, 0))
366 }
367
368 pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> {
370 DynAnonReceiver(AnonRcv::new(self, 0))
371 }
372
373 pub fn get_msg_id(&self) -> u64 {
377 self.mutex.lock(|state| state.borrow().current_id)
378 }
379
380 pub fn try_get(&self) -> Option<T> {
382 WatchBehavior::try_get(self, None)
383 }
384
385 pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
387 where
388 F: Fn(&T) -> bool,
389 {
390 WatchBehavior::try_get_and(self, None, &mut f)
391 }
392}
393
394pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
396 watch: &'a W,
397 _phantom: PhantomData<T>,
398}
399
400impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> {
401 fn clone(&self) -> Self {
402 Self {
403 watch: self.watch,
404 _phantom: PhantomData,
405 }
406 }
407}
408
409impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
410 fn new(watch: &'a W) -> Self {
412 Self {
413 watch,
414 _phantom: PhantomData,
415 }
416 }
417
418 pub fn send(&self, val: T) {
420 self.watch.send(val)
421 }
422
423 pub fn clear(&self) {
426 self.watch.clear()
427 }
428
429 pub fn try_get(&self) -> Option<T> {
431 self.watch.try_get(None)
432 }
433
434 pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
437 where
438 F: Fn(&T) -> bool,
439 {
440 self.watch.try_get_and(None, &mut f)
441 }
442
443 pub fn contains_value(&self) -> bool {
445 self.watch.contains_value()
446 }
447
448 pub fn send_modify<F>(&self, mut f: F)
450 where
451 F: Fn(&mut Option<T>),
452 {
453 self.watch.send_modify(&mut f)
454 }
455
456 pub fn send_if_modified<F>(&self, mut f: F)
459 where
460 F: Fn(&mut Option<T>) -> bool,
461 {
462 self.watch.send_if_modified(&mut f)
463 }
464}
465
466pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>);
471
472impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> {
473 fn clone(&self) -> Self {
474 Self(self.0.clone())
475 }
476}
477
478impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> {
479 pub fn as_dyn(self) -> DynSender<'a, T> {
481 DynSender(Snd::new(self.watch))
482 }
483}
484
485impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> {
486 fn into(self) -> DynSender<'a, T> {
487 self.as_dyn()
488 }
489}
490
491impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> {
492 type Target = Snd<'a, T, Watch<M, T, N>>;
493
494 fn deref(&self) -> &Self::Target {
495 &self.0
496 }
497}
498
499impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> {
500 fn deref_mut(&mut self) -> &mut Self::Target {
501 &mut self.0
502 }
503}
504
505pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>);
509
510impl<'a, T: Clone> Clone for DynSender<'a, T> {
511 fn clone(&self) -> Self {
512 Self(self.0.clone())
513 }
514}
515
516impl<'a, T: Clone> Deref for DynSender<'a, T> {
517 type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>;
518
519 fn deref(&self) -> &Self::Target {
520 &self.0
521 }
522}
523
524impl<'a, T: Clone> DerefMut for DynSender<'a, T> {
525 fn deref_mut(&mut self) -> &mut Self::Target {
526 &mut self.0
527 }
528}
529
530pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
532 watch: &'a W,
533 at_id: u64,
534 _phantom: PhantomData<T>,
535}
536
537impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
538 fn new(watch: &'a W, at_id: u64) -> Self {
540 Self {
541 watch,
542 at_id,
543 _phantom: PhantomData,
544 }
545 }
546
547 pub fn get(&mut self) -> impl Future<Output = T> + '_ {
551 poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx))
552 }
553
554 pub fn try_get(&mut self) -> Option<T> {
556 self.watch.try_get(Some(&mut self.at_id))
557 }
558
559 pub async fn get_and<F>(&mut self, mut f: F) -> T
564 where
565 F: Fn(&T) -> bool,
566 {
567 poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await
568 }
569
570 pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
573 where
574 F: Fn(&T) -> bool,
575 {
576 self.watch.try_get_and(Some(&mut self.at_id), &mut f)
577 }
578
579 pub async fn changed(&mut self) -> T {
583 poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await
584 }
585
586 pub fn try_changed(&mut self) -> Option<T> {
588 self.watch.try_changed(&mut self.at_id)
589 }
590
591 pub async fn changed_and<F>(&mut self, mut f: F) -> T
596 where
597 F: Fn(&T) -> bool,
598 {
599 poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await
600 }
601
602 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
605 where
606 F: Fn(&T) -> bool,
607 {
608 self.watch.try_changed_and(&mut self.at_id, &mut f)
609 }
610
611 pub fn contains_value(&self) -> bool {
614 self.watch.contains_value()
615 }
616}
617
618impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
619 fn drop(&mut self) {
620 self.watch.drop_receiver();
621 }
622}
623
624pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
626 watch: &'a W,
627 at_id: u64,
628 _phantom: PhantomData<T>,
629}
630
631impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> {
632 fn new(watch: &'a W, at_id: u64) -> Self {
634 Self {
635 watch,
636 at_id,
637 _phantom: PhantomData,
638 }
639 }
640
641 pub fn try_get(&mut self) -> Option<T> {
643 self.watch.try_get(Some(&mut self.at_id))
644 }
645
646 pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
649 where
650 F: Fn(&T) -> bool,
651 {
652 self.watch.try_get_and(Some(&mut self.at_id), &mut f)
653 }
654
655 pub fn try_changed(&mut self) -> Option<T> {
657 self.watch.try_changed(&mut self.at_id)
658 }
659
660 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
663 where
664 F: Fn(&T) -> bool,
665 {
666 self.watch.try_changed_and(&mut self.at_id, &mut f)
667 }
668
669 pub fn contains_value(&self) -> bool {
672 self.watch.contains_value()
673 }
674}
675
676pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
678
679impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> {
680 pub fn as_dyn(self) -> DynReceiver<'a, T> {
682 let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id));
683 core::mem::forget(self); rcv
685 }
686}
687
688impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> {
689 fn into(self) -> DynReceiver<'a, T> {
690 self.as_dyn()
691 }
692}
693
694impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
695 type Target = Rcv<'a, T, Watch<M, T, N>>;
696
697 fn deref(&self) -> &Self::Target {
698 &self.0
699 }
700}
701
702impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> {
703 fn deref_mut(&mut self) -> &mut Self::Target {
704 &mut self.0
705 }
706}
707
708pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
713
714impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
715 type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
716
717 fn deref(&self) -> &Self::Target {
718 &self.0
719 }
720}
721
722impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
723 fn deref_mut(&mut self) -> &mut Self::Target {
724 &mut self.0
725 }
726}
727
728pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>);
730
731impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> {
732 pub fn as_dyn(self) -> DynAnonReceiver<'a, T> {
734 let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id));
735 core::mem::forget(self); rcv
737 }
738}
739
740impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> {
741 fn into(self) -> DynAnonReceiver<'a, T> {
742 self.as_dyn()
743 }
744}
745
746impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> {
747 type Target = AnonRcv<'a, T, Watch<M, T, N>>;
748
749 fn deref(&self) -> &Self::Target {
750 &self.0
751 }
752}
753
754impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> {
755 fn deref_mut(&mut self) -> &mut Self::Target {
756 &mut self.0
757 }
758}
759
760pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>);
765
766impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> {
767 type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>;
768
769 fn deref(&self) -> &Self::Target {
770 &self.0
771 }
772}
773
774impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> {
775 fn deref_mut(&mut self) -> &mut Self::Target {
776 &mut self.0
777 }
778}
779
780#[cfg(test)]
781mod tests {
782 use futures_executor::block_on;
783
784 use super::Watch;
785 use crate::blocking_mutex::raw::CriticalSectionRawMutex;
786
787 #[test]
788 fn multiple_sends() {
789 let f = async {
790 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
791
792 let mut rcv = WATCH.receiver().unwrap();
794 let snd = WATCH.sender();
795
796 assert_eq!(rcv.try_changed(), None);
798
799 snd.send(10);
801 assert_eq!(rcv.changed().await, 10);
802
803 snd.send(20);
805 assert_eq!(rcv.try_changed(), Some(20));
806
807 assert_eq!(rcv.try_changed(), None);
809 };
810 block_on(f);
811 }
812
813 #[test]
814 fn all_try_get() {
815 let f = async {
816 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
817
818 let mut rcv = WATCH.receiver().unwrap();
820 let snd = WATCH.sender();
821
822 assert_eq!(WATCH.try_get(), None);
824 assert_eq!(rcv.try_get(), None);
825 assert_eq!(snd.try_get(), None);
826
827 snd.send(10);
829 assert_eq!(WATCH.try_get(), Some(10));
830 assert_eq!(rcv.try_get(), Some(10));
831 assert_eq!(snd.try_get(), Some(10));
832
833 assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10));
834 assert_eq!(rcv.try_get_and(|x| x > &5), Some(10));
835 assert_eq!(snd.try_get_and(|x| x > &5), Some(10));
836
837 assert_eq!(WATCH.try_get_and(|x| x < &5), None);
838 assert_eq!(rcv.try_get_and(|x| x < &5), None);
839 assert_eq!(snd.try_get_and(|x| x < &5), None);
840 };
841 block_on(f);
842 }
843
844 #[test]
845 fn once_lock_like() {
846 let f = async {
847 static CONFIG0: u8 = 10;
848 static CONFIG1: u8 = 20;
849
850 static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new();
851
852 let mut rcv = WATCH.receiver().unwrap();
854 let snd = WATCH.sender();
855
856 assert_eq!(rcv.try_changed(), None);
858
859 snd.send(&CONFIG0);
861 let rcv0 = rcv.changed().await;
862 assert_eq!(rcv0, &10);
863
864 snd.send(&CONFIG1);
866 let rcv1 = rcv.try_changed();
867 assert_eq!(rcv1, Some(&20));
868
869 assert_eq!(rcv.try_changed(), None);
871
872 assert_eq!(rcv0, &CONFIG0);
874 assert_eq!(rcv1, Some(&CONFIG1));
875 };
876 block_on(f);
877 }
878
879 #[test]
880 fn sender_modify() {
881 let f = async {
882 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
883
884 let mut rcv = WATCH.receiver().unwrap();
886 let snd = WATCH.sender();
887
888 snd.send(10);
890 assert_eq!(rcv.try_changed(), Some(10));
891
892 snd.send_modify(|opt| {
894 if let Some(inner) = opt {
895 *inner += 5;
896 }
897 });
898
899 assert_eq!(rcv.try_changed(), Some(15));
901 assert_eq!(rcv.try_changed(), None);
902 };
903 block_on(f);
904 }
905
906 #[test]
907 fn predicate_fn() {
908 let f = async {
909 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
910
911 let mut rcv = WATCH.receiver().unwrap();
913 let snd = WATCH.sender();
914
915 snd.send(15);
916 assert_eq!(rcv.try_get_and(|x| x > &5), Some(15));
917 assert_eq!(rcv.try_get_and(|x| x < &5), None);
918 assert!(rcv.try_changed().is_none());
919
920 snd.send(20);
921 assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20));
922 assert_eq!(rcv.try_changed_and(|x| x > &5), None);
923
924 snd.send(25);
925 assert_eq!(rcv.try_changed_and(|x| x < &5), None);
926 assert_eq!(rcv.try_changed(), Some(25));
927
928 snd.send(30);
929 assert_eq!(rcv.changed_and(|x| x > &5).await, 30);
930 assert_eq!(rcv.get_and(|x| x > &5).await, 30);
931 };
932 block_on(f);
933 }
934
935 #[test]
936 fn receive_after_create() {
937 let f = async {
938 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
939
940 let snd = WATCH.sender();
942 snd.send(10);
943
944 let mut rcv = WATCH.receiver().unwrap();
946 assert_eq!(rcv.try_changed(), Some(10));
947 };
948 block_on(f);
949 }
950
951 #[test]
952 fn max_receivers_drop() {
953 let f = async {
954 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
955
956 let rcv0 = WATCH.receiver();
958 let rcv1 = WATCH.receiver();
959 let rcv2 = WATCH.receiver();
960
961 assert!(rcv0.is_some());
963 assert!(rcv1.is_some());
964 assert!(rcv2.is_none());
965
966 drop(rcv0);
968
969 let rcv3 = WATCH.receiver();
971 assert!(rcv3.is_some());
972 };
973 block_on(f);
974 }
975
976 #[test]
977 fn multiple_receivers() {
978 let f = async {
979 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
980
981 let mut rcv0 = WATCH.receiver().unwrap();
983 let mut rcv1 = WATCH.anon_receiver();
984 let snd = WATCH.sender();
985
986 assert_eq!(rcv0.try_changed(), None);
988 assert_eq!(rcv1.try_changed(), None);
989
990 snd.send(0);
992
993 assert_eq!(rcv0.try_changed(), Some(0));
995 assert_eq!(rcv1.try_changed(), Some(0));
996 };
997 block_on(f);
998 }
999
1000 #[test]
1001 fn clone_senders() {
1002 let f = async {
1003 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
1005 let snd0 = WATCH.sender();
1006 let snd1 = snd0.clone();
1007
1008 let mut rcv = WATCH.receiver().unwrap().as_dyn();
1010
1011 snd0.send(10);
1013 assert_eq!(rcv.try_changed(), Some(10));
1014
1015 snd1.send(20);
1017 assert_eq!(rcv.try_changed(), Some(20));
1018 };
1019 block_on(f);
1020 }
1021
1022 #[test]
1023 fn use_dynamics() {
1024 let f = async {
1025 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1026
1027 let mut anon_rcv = WATCH.dyn_anon_receiver();
1029 let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
1030 let dyn_snd = WATCH.dyn_sender();
1031
1032 dyn_snd.send(10);
1034
1035 assert_eq!(anon_rcv.try_changed(), Some(10));
1037 assert_eq!(dyn_rcv.try_changed(), Some(10));
1038 assert_eq!(dyn_rcv.try_changed(), None);
1039 };
1040 block_on(f);
1041 }
1042
1043 #[test]
1044 fn convert_to_dyn() {
1045 let f = async {
1046 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1047
1048 let anon_rcv = WATCH.anon_receiver();
1050 let rcv = WATCH.receiver().unwrap();
1051 let snd = WATCH.sender();
1052
1053 let mut dyn_anon_rcv = anon_rcv.as_dyn();
1055 let mut dyn_rcv = rcv.as_dyn();
1056 let dyn_snd = snd.as_dyn();
1057
1058 dyn_snd.send(10);
1060
1061 assert_eq!(dyn_anon_rcv.try_changed(), Some(10));
1063 assert_eq!(dyn_rcv.try_changed(), Some(10));
1064 assert_eq!(dyn_rcv.try_changed(), None);
1065 };
1066 block_on(f);
1067 }
1068
1069 #[test]
1070 fn dynamic_receiver_count() {
1071 let f = async {
1072 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1073
1074 let rcv0 = WATCH.receiver();
1076 let rcv1 = WATCH.receiver();
1077 let rcv2 = WATCH.receiver();
1078
1079 assert!(rcv0.is_some());
1081 assert!(rcv1.is_some());
1082 assert!(rcv2.is_none());
1083
1084 let dyn_rcv0 = rcv0.unwrap().as_dyn();
1086
1087 drop(dyn_rcv0);
1089
1090 let rcv3 = WATCH.receiver();
1092 let rcv4 = WATCH.receiver();
1093 assert!(rcv3.is_some());
1094 assert!(rcv4.is_none());
1095 };
1096 block_on(f);
1097 }
1098
1099 #[test]
1100 fn contains_value() {
1101 let f = async {
1102 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1103
1104 let rcv = WATCH.receiver().unwrap();
1106 let snd = WATCH.sender();
1107
1108 assert_eq!(rcv.contains_value(), false);
1110 assert_eq!(snd.contains_value(), false);
1111
1112 snd.send(10);
1114
1115 assert_eq!(rcv.contains_value(), true);
1117 assert_eq!(snd.contains_value(), true);
1118 };
1119 block_on(f);
1120 }
1121}