1use core::cell::RefCell;
22use core::future::Future;
23use core::pin::Pin;
24use core::task::{Context, Poll};
25
26use heapless::Deque;
27
28use crate::blocking_mutex::raw::RawMutex;
29use crate::blocking_mutex::Mutex;
30use crate::waitqueue::WakerRegistration;
31
32pub struct Sender<'ch, M, T, const N: usize>
34where
35 M: RawMutex,
36{
37 channel: &'ch Channel<M, T, N>,
38}
39
40impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
41where
42 M: RawMutex,
43{
44 fn clone(&self) -> Self {
45 *self
46 }
47}
48
49impl<'ch, M, T, const N: usize> Copy for Sender<'ch, M, T, N> where M: RawMutex {}
50
51impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
52where
53 M: RawMutex,
54{
55 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
59 self.channel.send(message)
60 }
61
62 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
66 self.channel.try_send(message)
67 }
68
69 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
73 self.channel.poll_ready_to_send(cx)
74 }
75
76 pub const fn capacity(&self) -> usize {
80 self.channel.capacity()
81 }
82
83 pub fn free_capacity(&self) -> usize {
87 self.channel.free_capacity()
88 }
89
90 pub fn clear(&self) {
94 self.channel.clear();
95 }
96
97 pub fn len(&self) -> usize {
101 self.channel.len()
102 }
103
104 pub fn is_empty(&self) -> bool {
108 self.channel.is_empty()
109 }
110
111 pub fn is_full(&self) -> bool {
115 self.channel.is_full()
116 }
117}
118
119pub struct DynamicSender<'ch, T> {
121 pub(crate) channel: &'ch dyn DynamicChannel<T>,
122}
123
124impl<'ch, T> Clone for DynamicSender<'ch, T> {
125 fn clone(&self) -> Self {
126 *self
127 }
128}
129
130impl<'ch, T> Copy for DynamicSender<'ch, T> {}
131
132impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
133where
134 M: RawMutex,
135{
136 fn from(s: Sender<'ch, M, T, N>) -> Self {
137 Self { channel: s.channel }
138 }
139}
140
141impl<'ch, T> DynamicSender<'ch, T> {
142 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
146 DynamicSendFuture {
147 channel: self.channel,
148 message: Some(message),
149 }
150 }
151
152 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
156 self.channel.try_send_with_context(message, None)
157 }
158
159 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
163 self.channel.poll_ready_to_send(cx)
164 }
165}
166
167pub struct Receiver<'ch, M, T, const N: usize>
169where
170 M: RawMutex,
171{
172 channel: &'ch Channel<M, T, N>,
173}
174
175impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
176where
177 M: RawMutex,
178{
179 fn clone(&self) -> Self {
180 *self
181 }
182}
183
184impl<'ch, M, T, const N: usize> Copy for Receiver<'ch, M, T, N> where M: RawMutex {}
185
186impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
187where
188 M: RawMutex,
189{
190 pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
194 self.channel.receive()
195 }
196
197 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
201 self.channel.ready_to_receive()
202 }
203
204 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
208 self.channel.try_receive()
209 }
210
211 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
215 self.channel.poll_ready_to_receive(cx)
216 }
217
218 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
222 self.channel.poll_receive(cx)
223 }
224
225 pub const fn capacity(&self) -> usize {
229 self.channel.capacity()
230 }
231
232 pub fn free_capacity(&self) -> usize {
236 self.channel.free_capacity()
237 }
238
239 pub fn clear(&self) {
243 self.channel.clear();
244 }
245
246 pub fn len(&self) -> usize {
250 self.channel.len()
251 }
252
253 pub fn is_empty(&self) -> bool {
257 self.channel.is_empty()
258 }
259
260 pub fn is_full(&self) -> bool {
264 self.channel.is_full()
265 }
266}
267
268pub struct DynamicReceiver<'ch, T> {
270 pub(crate) channel: &'ch dyn DynamicChannel<T>,
271}
272
273impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
274 fn clone(&self) -> Self {
275 *self
276 }
277}
278
279impl<'ch, T> Copy for DynamicReceiver<'ch, T> {}
280
281impl<'ch, T> DynamicReceiver<'ch, T> {
282 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
286 DynamicReceiveFuture { channel: self.channel }
287 }
288
289 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
293 self.channel.try_receive_with_context(None)
294 }
295
296 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
300 self.channel.poll_ready_to_receive(cx)
301 }
302
303 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
307 self.channel.poll_receive(cx)
308 }
309}
310
311impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
312where
313 M: RawMutex,
314{
315 fn from(s: Receiver<'ch, M, T, N>) -> Self {
316 Self { channel: s.channel }
317 }
318}
319
320#[must_use = "futures do nothing unless you `.await` or poll them"]
322pub struct ReceiveFuture<'ch, M, T, const N: usize>
323where
324 M: RawMutex,
325{
326 channel: &'ch Channel<M, T, N>,
327}
328
329impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N>
330where
331 M: RawMutex,
332{
333 type Output = T;
334
335 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
336 self.channel.poll_receive(cx)
337 }
338}
339
340#[must_use = "futures do nothing unless you `.await` or poll them"]
342pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
343where
344 M: RawMutex,
345{
346 channel: &'ch Channel<M, T, N>,
347}
348
349impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
350where
351 M: RawMutex,
352{
353 type Output = ();
354
355 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
356 self.channel.poll_ready_to_receive(cx)
357 }
358}
359
360#[must_use = "futures do nothing unless you `.await` or poll them"]
362pub struct DynamicReceiveFuture<'ch, T> {
363 channel: &'ch dyn DynamicChannel<T>,
364}
365
366impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
367 type Output = T;
368
369 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
370 match self.channel.try_receive_with_context(Some(cx)) {
371 Ok(v) => Poll::Ready(v),
372 Err(TryReceiveError::Empty) => Poll::Pending,
373 }
374 }
375}
376
377impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
378 fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
379 Self { channel: value.channel }
380 }
381}
382
383#[must_use = "futures do nothing unless you `.await` or poll them"]
385pub struct SendFuture<'ch, M, T, const N: usize>
386where
387 M: RawMutex,
388{
389 channel: &'ch Channel<M, T, N>,
390 message: Option<T>,
391}
392
393impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
394where
395 M: RawMutex,
396{
397 type Output = ();
398
399 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
400 match self.message.take() {
401 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
402 Ok(..) => Poll::Ready(()),
403 Err(TrySendError::Full(m)) => {
404 self.message = Some(m);
405 Poll::Pending
406 }
407 },
408 None => panic!("Message cannot be None"),
409 }
410 }
411}
412
413impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
414
415#[must_use = "futures do nothing unless you `.await` or poll them"]
417pub struct DynamicSendFuture<'ch, T> {
418 channel: &'ch dyn DynamicChannel<T>,
419 message: Option<T>,
420}
421
422impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
423 type Output = ();
424
425 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
426 match self.message.take() {
427 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
428 Ok(..) => Poll::Ready(()),
429 Err(TrySendError::Full(m)) => {
430 self.message = Some(m);
431 Poll::Pending
432 }
433 },
434 None => panic!("Message cannot be None"),
435 }
436 }
437}
438
439impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
440
441impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
442 fn from(value: SendFuture<'ch, M, T, N>) -> Self {
443 Self {
444 channel: value.channel,
445 message: value.message,
446 }
447 }
448}
449
450pub(crate) trait DynamicChannel<T> {
451 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
452
453 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
454
455 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
456 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
457
458 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
459}
460
461#[derive(PartialEq, Eq, Clone, Copy, Debug)]
463#[cfg_attr(feature = "defmt", derive(defmt::Format))]
464pub enum TryReceiveError {
465 Empty,
467}
468
469#[derive(PartialEq, Eq, Clone, Copy, Debug)]
471#[cfg_attr(feature = "defmt", derive(defmt::Format))]
472pub enum TrySendError<T> {
473 Full(T),
476}
477
478struct ChannelState<T, const N: usize> {
479 queue: Deque<T, N>,
480 receiver_waker: WakerRegistration,
481 senders_waker: WakerRegistration,
482}
483
484impl<T, const N: usize> ChannelState<T, N> {
485 const fn new() -> Self {
486 ChannelState {
487 queue: Deque::new(),
488 receiver_waker: WakerRegistration::new(),
489 senders_waker: WakerRegistration::new(),
490 }
491 }
492
493 fn try_receive(&mut self) -> Result<T, TryReceiveError> {
494 self.try_receive_with_context(None)
495 }
496
497 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
498 if self.queue.is_full() {
499 self.senders_waker.wake();
500 }
501
502 if let Some(message) = self.queue.pop_front() {
503 Ok(message)
504 } else {
505 if let Some(cx) = cx {
506 self.receiver_waker.register(cx.waker());
507 }
508 Err(TryReceiveError::Empty)
509 }
510 }
511
512 fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
513 if self.queue.is_full() {
514 self.senders_waker.wake();
515 }
516
517 if let Some(message) = self.queue.pop_front() {
518 Poll::Ready(message)
519 } else {
520 self.receiver_waker.register(cx.waker());
521 Poll::Pending
522 }
523 }
524
525 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
526 self.receiver_waker.register(cx.waker());
527
528 if !self.queue.is_empty() {
529 Poll::Ready(())
530 } else {
531 Poll::Pending
532 }
533 }
534
535 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
536 self.try_send_with_context(message, None)
537 }
538
539 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
540 match self.queue.push_back(message) {
541 Ok(()) => {
542 self.receiver_waker.wake();
543 Ok(())
544 }
545 Err(message) => {
546 if let Some(cx) = cx {
547 self.senders_waker.register(cx.waker());
548 }
549 Err(TrySendError::Full(message))
550 }
551 }
552 }
553
554 fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
555 self.senders_waker.register(cx.waker());
556
557 if !self.queue.is_full() {
558 Poll::Ready(())
559 } else {
560 Poll::Pending
561 }
562 }
563
564 fn clear(&mut self) {
565 self.queue.clear();
566 }
567
568 fn len(&self) -> usize {
569 self.queue.len()
570 }
571
572 fn is_empty(&self) -> bool {
573 self.queue.is_empty()
574 }
575
576 fn is_full(&self) -> bool {
577 self.queue.is_full()
578 }
579}
580
581pub struct Channel<M, T, const N: usize>
590where
591 M: RawMutex,
592{
593 inner: Mutex<M, RefCell<ChannelState<T, N>>>,
594}
595
596impl<M, T, const N: usize> Channel<M, T, N>
597where
598 M: RawMutex,
599{
600 pub const fn new() -> Self {
610 Self {
611 inner: Mutex::new(RefCell::new(ChannelState::new())),
612 }
613 }
614
615 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
616 self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
617 }
618
619 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
620 self.lock(|c| c.try_receive_with_context(cx))
621 }
622
623 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
625 self.lock(|c| c.poll_receive(cx))
626 }
627
628 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
629 self.lock(|c| c.try_send_with_context(m, cx))
630 }
631
632 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
634 self.lock(|c| c.poll_ready_to_receive(cx))
635 }
636
637 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
639 self.lock(|c| c.poll_ready_to_send(cx))
640 }
641
642 pub fn sender(&self) -> Sender<'_, M, T, N> {
644 Sender { channel: self }
645 }
646
647 pub fn receiver(&self) -> Receiver<'_, M, T, N> {
649 Receiver { channel: self }
650 }
651
652 pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
654 DynamicSender { channel: self }
655 }
656
657 pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
659 DynamicReceiver { channel: self }
660 }
661
662 pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
667 SendFuture {
668 channel: self,
669 message: Some(message),
670 }
671 }
672
673 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
684 self.lock(|c| c.try_send(message))
685 }
686
687 pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
692 ReceiveFuture { channel: self }
693 }
694
695 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
700 ReceiveReadyFuture { channel: self }
701 }
702
703 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
708 self.lock(|c| c.try_receive())
709 }
710
711 pub const fn capacity(&self) -> usize {
713 N
714 }
715
716 pub fn free_capacity(&self) -> usize {
720 N - self.len()
721 }
722
723 pub fn clear(&self) {
725 self.lock(|c| c.clear());
726 }
727
728 pub fn len(&self) -> usize {
730 self.lock(|c| c.len())
731 }
732
733 pub fn is_empty(&self) -> bool {
735 self.lock(|c| c.is_empty())
736 }
737
738 pub fn is_full(&self) -> bool {
740 self.lock(|c| c.is_full())
741 }
742}
743
744impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
747where
748 M: RawMutex,
749{
750 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
751 Channel::try_send_with_context(self, m, cx)
752 }
753
754 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
755 Channel::try_receive_with_context(self, cx)
756 }
757
758 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
759 Channel::poll_ready_to_send(self, cx)
760 }
761
762 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
763 Channel::poll_ready_to_receive(self, cx)
764 }
765
766 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
767 Channel::poll_receive(self, cx)
768 }
769}
770
771#[cfg(test)]
772mod tests {
773 use core::time::Duration;
774
775 use futures_executor::ThreadPool;
776 use futures_timer::Delay;
777 use futures_util::task::SpawnExt;
778 use static_cell::StaticCell;
779
780 use super::*;
781 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
782
783 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
784 c.queue.capacity() - c.queue.len()
785 }
786
787 #[test]
788 fn sending_once() {
789 let mut c = ChannelState::<u32, 3>::new();
790 assert!(c.try_send(1).is_ok());
791 assert_eq!(capacity(&c), 2);
792 }
793
794 #[test]
795 fn sending_when_full() {
796 let mut c = ChannelState::<u32, 3>::new();
797 let _ = c.try_send(1);
798 let _ = c.try_send(1);
799 let _ = c.try_send(1);
800 match c.try_send(2) {
801 Err(TrySendError::Full(2)) => assert!(true),
802 _ => assert!(false),
803 }
804 assert_eq!(capacity(&c), 0);
805 }
806
807 #[test]
808 fn receiving_once_with_one_send() {
809 let mut c = ChannelState::<u32, 3>::new();
810 assert!(c.try_send(1).is_ok());
811 assert_eq!(c.try_receive().unwrap(), 1);
812 assert_eq!(capacity(&c), 3);
813 }
814
815 #[test]
816 fn receiving_when_empty() {
817 let mut c = ChannelState::<u32, 3>::new();
818 match c.try_receive() {
819 Err(TryReceiveError::Empty) => assert!(true),
820 _ => assert!(false),
821 }
822 assert_eq!(capacity(&c), 3);
823 }
824
825 #[test]
826 fn simple_send_and_receive() {
827 let c = Channel::<NoopRawMutex, u32, 3>::new();
828 assert!(c.try_send(1).is_ok());
829 assert_eq!(c.try_receive().unwrap(), 1);
830 }
831
832 #[test]
833 fn cloning() {
834 let c = Channel::<NoopRawMutex, u32, 3>::new();
835 let r1 = c.receiver();
836 let s1 = c.sender();
837
838 let _ = r1.clone();
839 let _ = s1.clone();
840 }
841
842 #[test]
843 fn dynamic_dispatch_into() {
844 let c = Channel::<NoopRawMutex, u32, 3>::new();
845 let s: DynamicSender<'_, u32> = c.sender().into();
846 let r: DynamicReceiver<'_, u32> = c.receiver().into();
847
848 assert!(s.try_send(1).is_ok());
849 assert_eq!(r.try_receive().unwrap(), 1);
850 }
851
852 #[test]
853 fn dynamic_dispatch_constructor() {
854 let c = Channel::<NoopRawMutex, u32, 3>::new();
855 let s = c.dyn_sender();
856 let r = c.dyn_receiver();
857
858 assert!(s.try_send(1).is_ok());
859 assert_eq!(r.try_receive().unwrap(), 1);
860 }
861
862 #[futures_test::test]
863 async fn receiver_receives_given_try_send_async() {
864 let executor = ThreadPool::new().unwrap();
865
866 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
867 let c = &*CHANNEL.init(Channel::new());
868 let c2 = c;
869 assert!(executor
870 .spawn(async move {
871 assert!(c2.try_send(1).is_ok());
872 })
873 .is_ok());
874 assert_eq!(c.receive().await, 1);
875 }
876
877 #[futures_test::test]
878 async fn sender_send_completes_if_capacity() {
879 let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
880 c.send(1).await;
881 assert_eq!(c.receive().await, 1);
882 }
883
884 #[futures_test::test]
885 async fn senders_sends_wait_until_capacity() {
886 let executor = ThreadPool::new().unwrap();
887
888 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
889 let c = &*CHANNEL.init(Channel::new());
890 assert!(c.try_send(1).is_ok());
891
892 let c2 = c;
893 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
894 let c2 = c;
895 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
896 Delay::new(Duration::from_millis(500)).await;
899 assert_eq!(c.receive().await, 1);
900 assert!(executor
901 .spawn(async move {
902 loop {
903 c.receive().await;
904 }
905 })
906 .is_ok());
907 send_task_1.unwrap().await;
908 send_task_2.unwrap().await;
909 }
910}