1use core::cell::RefCell;
7use core::future::Future;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11pub use heapless::binary_heap::{Kind, Max, Min};
12use heapless::BinaryHeap;
13
14use crate::blocking_mutex::raw::RawMutex;
15use crate::blocking_mutex::Mutex;
16use crate::channel::{DynamicChannel, DynamicReceiver, DynamicSender, TryReceiveError, TrySendError};
17use crate::waitqueue::WakerRegistration;
18
19pub struct Sender<'ch, M, T, K, const N: usize>
21where
22 T: Ord,
23 K: Kind,
24 M: RawMutex,
25{
26 channel: &'ch PriorityChannel<M, T, K, N>,
27}
28
29impl<'ch, M, T, K, const N: usize> Clone for Sender<'ch, M, T, K, N>
30where
31 T: Ord,
32 K: Kind,
33 M: RawMutex,
34{
35 fn clone(&self) -> Self {
36 *self
37 }
38}
39
40impl<'ch, M, T, K, const N: usize> Copy for Sender<'ch, M, T, K, N>
41where
42 T: Ord,
43 K: Kind,
44 M: RawMutex,
45{
46}
47
48impl<'ch, M, T, K, const N: usize> Sender<'ch, M, T, K, N>
49where
50 T: Ord,
51 K: Kind,
52 M: RawMutex,
53{
54 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, K, N> {
58 self.channel.send(message)
59 }
60
61 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
65 self.channel.try_send(message)
66 }
67
68 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
72 self.channel.poll_ready_to_send(cx)
73 }
74
75 pub const fn capacity(&self) -> usize {
79 self.channel.capacity()
80 }
81
82 pub fn free_capacity(&self) -> usize {
86 self.channel.free_capacity()
87 }
88
89 pub fn clear(&self) {
93 self.channel.clear();
94 }
95
96 pub fn len(&self) -> usize {
100 self.channel.len()
101 }
102
103 pub fn is_empty(&self) -> bool {
107 self.channel.is_empty()
108 }
109
110 pub fn is_full(&self) -> bool {
114 self.channel.is_full()
115 }
116}
117
118impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T>
119where
120 T: Ord,
121 K: Kind,
122 M: RawMutex,
123{
124 fn from(s: Sender<'ch, M, T, K, N>) -> Self {
125 Self { channel: s.channel }
126 }
127}
128
129pub struct Receiver<'ch, M, T, K, const N: usize>
131where
132 T: Ord,
133 K: Kind,
134 M: RawMutex,
135{
136 channel: &'ch PriorityChannel<M, T, K, N>,
137}
138
139impl<'ch, M, T, K, const N: usize> Clone for Receiver<'ch, M, T, K, N>
140where
141 T: Ord,
142 K: Kind,
143 M: RawMutex,
144{
145 fn clone(&self) -> Self {
146 *self
147 }
148}
149
150impl<'ch, M, T, K, const N: usize> Copy for Receiver<'ch, M, T, K, N>
151where
152 T: Ord,
153 K: Kind,
154 M: RawMutex,
155{
156}
157
158impl<'ch, M, T, K, const N: usize> Receiver<'ch, M, T, K, N>
159where
160 T: Ord,
161 K: Kind,
162 M: RawMutex,
163{
164 pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> {
168 self.channel.receive()
169 }
170
171 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
175 self.channel.try_receive()
176 }
177
178 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
182 self.channel.poll_ready_to_receive(cx)
183 }
184
185 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
189 self.channel.poll_receive(cx)
190 }
191
192 pub const fn capacity(&self) -> usize {
196 self.channel.capacity()
197 }
198
199 pub fn free_capacity(&self) -> usize {
203 self.channel.free_capacity()
204 }
205
206 pub fn clear(&self) {
210 self.channel.clear();
211 }
212
213 pub fn len(&self) -> usize {
217 self.channel.len()
218 }
219
220 pub fn is_empty(&self) -> bool {
224 self.channel.is_empty()
225 }
226
227 pub fn is_full(&self) -> bool {
231 self.channel.is_full()
232 }
233}
234
235impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T>
236where
237 T: Ord,
238 K: Kind,
239 M: RawMutex,
240{
241 fn from(s: Receiver<'ch, M, T, K, N>) -> Self {
242 Self { channel: s.channel }
243 }
244}
245
246#[must_use = "futures do nothing unless you `.await` or poll them"]
248pub struct ReceiveFuture<'ch, M, T, K, const N: usize>
249where
250 T: Ord,
251 K: Kind,
252 M: RawMutex,
253{
254 channel: &'ch PriorityChannel<M, T, K, N>,
255}
256
257impl<'ch, M, T, K, const N: usize> Future for ReceiveFuture<'ch, M, T, K, N>
258where
259 T: Ord,
260 K: Kind,
261 M: RawMutex,
262{
263 type Output = T;
264
265 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
266 self.channel.poll_receive(cx)
267 }
268}
269
270#[must_use = "futures do nothing unless you `.await` or poll them"]
272pub struct SendFuture<'ch, M, T, K, const N: usize>
273where
274 T: Ord,
275 K: Kind,
276 M: RawMutex,
277{
278 channel: &'ch PriorityChannel<M, T, K, N>,
279 message: Option<T>,
280}
281
282impl<'ch, M, T, K, const N: usize> Future for SendFuture<'ch, M, T, K, N>
283where
284 T: Ord,
285 K: Kind,
286 M: RawMutex,
287{
288 type Output = ();
289
290 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
291 match self.message.take() {
292 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
293 Ok(..) => Poll::Ready(()),
294 Err(TrySendError::Full(m)) => {
295 self.message = Some(m);
296 Poll::Pending
297 }
298 },
299 None => panic!("Message cannot be None"),
300 }
301 }
302}
303
304impl<'ch, M, T, K, const N: usize> Unpin for SendFuture<'ch, M, T, K, N>
305where
306 T: Ord,
307 K: Kind,
308 M: RawMutex,
309{
310}
311
312struct ChannelState<T, K, const N: usize> {
313 queue: BinaryHeap<T, K, N>,
314 receiver_waker: WakerRegistration,
315 senders_waker: WakerRegistration,
316}
317
318impl<T, K, const N: usize> ChannelState<T, K, N>
319where
320 T: Ord,
321 K: Kind,
322{
323 const fn new() -> Self {
324 ChannelState {
325 queue: BinaryHeap::new(),
326 receiver_waker: WakerRegistration::new(),
327 senders_waker: WakerRegistration::new(),
328 }
329 }
330
331 fn try_receive(&mut self) -> Result<T, TryReceiveError> {
332 self.try_receive_with_context(None)
333 }
334
335 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
336 if self.queue.len() == self.queue.capacity() {
337 self.senders_waker.wake();
338 }
339
340 if let Some(message) = self.queue.pop() {
341 Ok(message)
342 } else {
343 if let Some(cx) = cx {
344 self.receiver_waker.register(cx.waker());
345 }
346 Err(TryReceiveError::Empty)
347 }
348 }
349
350 fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
351 if self.queue.len() == self.queue.capacity() {
352 self.senders_waker.wake();
353 }
354
355 if let Some(message) = self.queue.pop() {
356 Poll::Ready(message)
357 } else {
358 self.receiver_waker.register(cx.waker());
359 Poll::Pending
360 }
361 }
362
363 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
364 self.receiver_waker.register(cx.waker());
365
366 if !self.queue.is_empty() {
367 Poll::Ready(())
368 } else {
369 Poll::Pending
370 }
371 }
372
373 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
374 self.try_send_with_context(message, None)
375 }
376
377 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
378 match self.queue.push(message) {
379 Ok(()) => {
380 self.receiver_waker.wake();
381 Ok(())
382 }
383 Err(message) => {
384 if let Some(cx) = cx {
385 self.senders_waker.register(cx.waker());
386 }
387 Err(TrySendError::Full(message))
388 }
389 }
390 }
391
392 fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
393 self.senders_waker.register(cx.waker());
394
395 if !self.queue.len() == self.queue.capacity() {
396 Poll::Ready(())
397 } else {
398 Poll::Pending
399 }
400 }
401
402 fn clear(&mut self) {
403 self.queue.clear();
404 }
405
406 fn len(&self) -> usize {
407 self.queue.len()
408 }
409
410 fn is_empty(&self) -> bool {
411 self.queue.is_empty()
412 }
413
414 fn is_full(&self) -> bool {
415 self.queue.len() == self.queue.capacity()
416 }
417}
418
419pub struct PriorityChannel<M, T, K, const N: usize>
430where
431 T: Ord,
432 K: Kind,
433 M: RawMutex,
434{
435 inner: Mutex<M, RefCell<ChannelState<T, K, N>>>,
436}
437
438impl<M, T, K, const N: usize> PriorityChannel<M, T, K, N>
439where
440 T: Ord,
441 K: Kind,
442 M: RawMutex,
443{
444 pub const fn new() -> Self {
454 Self {
455 inner: Mutex::new(RefCell::new(ChannelState::new())),
456 }
457 }
458
459 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, K, N>) -> R) -> R {
460 self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
461 }
462
463 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
464 self.lock(|c| c.try_receive_with_context(cx))
465 }
466
467 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
469 self.lock(|c| c.poll_receive(cx))
470 }
471
472 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
473 self.lock(|c| c.try_send_with_context(m, cx))
474 }
475
476 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
478 self.lock(|c| c.poll_ready_to_receive(cx))
479 }
480
481 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
483 self.lock(|c| c.poll_ready_to_send(cx))
484 }
485
486 pub fn sender(&self) -> Sender<'_, M, T, K, N> {
488 Sender { channel: self }
489 }
490
491 pub fn receiver(&self) -> Receiver<'_, M, T, K, N> {
493 Receiver { channel: self }
494 }
495
496 pub fn send(&self, message: T) -> SendFuture<'_, M, T, K, N> {
501 SendFuture {
502 channel: self,
503 message: Some(message),
504 }
505 }
506
507 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
518 self.lock(|c| c.try_send(message))
519 }
520
521 pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> {
526 ReceiveFuture { channel: self }
527 }
528
529 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
534 self.lock(|c| c.try_receive())
535 }
536
537 pub const fn capacity(&self) -> usize {
539 N
540 }
541
542 pub fn free_capacity(&self) -> usize {
546 N - self.len()
547 }
548
549 pub fn clear(&self) {
551 self.lock(|c| c.clear());
552 }
553
554 pub fn len(&self) -> usize {
556 self.lock(|c| c.len())
557 }
558
559 pub fn is_empty(&self) -> bool {
561 self.lock(|c| c.is_empty())
562 }
563
564 pub fn is_full(&self) -> bool {
566 self.lock(|c| c.is_full())
567 }
568}
569
570impl<M, T, K, const N: usize> DynamicChannel<T> for PriorityChannel<M, T, K, N>
573where
574 T: Ord,
575 K: Kind,
576 M: RawMutex,
577{
578 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
579 PriorityChannel::try_send_with_context(self, m, cx)
580 }
581
582 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
583 PriorityChannel::try_receive_with_context(self, cx)
584 }
585
586 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
587 PriorityChannel::poll_ready_to_send(self, cx)
588 }
589
590 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
591 PriorityChannel::poll_ready_to_receive(self, cx)
592 }
593
594 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
595 PriorityChannel::poll_receive(self, cx)
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use core::time::Duration;
602
603 use futures_executor::ThreadPool;
604 use futures_timer::Delay;
605 use futures_util::task::SpawnExt;
606 use heapless::binary_heap::{Kind, Max};
607 use static_cell::StaticCell;
608
609 use super::*;
610 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
611
612 fn capacity<T, K, const N: usize>(c: &ChannelState<T, K, N>) -> usize
613 where
614 T: Ord,
615 K: Kind,
616 {
617 c.queue.capacity() - c.queue.len()
618 }
619
620 #[test]
621 fn sending_once() {
622 let mut c = ChannelState::<u32, Max, 3>::new();
623 assert!(c.try_send(1).is_ok());
624 assert_eq!(capacity(&c), 2);
625 }
626
627 #[test]
628 fn sending_when_full() {
629 let mut c = ChannelState::<u32, Max, 3>::new();
630 let _ = c.try_send(1);
631 let _ = c.try_send(1);
632 let _ = c.try_send(1);
633 match c.try_send(2) {
634 Err(TrySendError::Full(2)) => assert!(true),
635 _ => assert!(false),
636 }
637 assert_eq!(capacity(&c), 0);
638 }
639
640 #[test]
641 fn send_priority() {
642 let mut c = ChannelState::<u32, Max, 3>::new();
644 assert!(c.try_send(1).is_ok());
645 assert!(c.try_send(2).is_ok());
646 assert!(c.try_send(3).is_ok());
647 assert_eq!(c.try_receive().unwrap(), 3);
648 assert_eq!(c.try_receive().unwrap(), 2);
649 assert_eq!(c.try_receive().unwrap(), 1);
650 }
651
652 #[test]
653 fn receiving_once_with_one_send() {
654 let mut c = ChannelState::<u32, Max, 3>::new();
655 assert!(c.try_send(1).is_ok());
656 assert_eq!(c.try_receive().unwrap(), 1);
657 assert_eq!(capacity(&c), 3);
658 }
659
660 #[test]
661 fn receiving_when_empty() {
662 let mut c = ChannelState::<u32, Max, 3>::new();
663 match c.try_receive() {
664 Err(TryReceiveError::Empty) => assert!(true),
665 _ => assert!(false),
666 }
667 assert_eq!(capacity(&c), 3);
668 }
669
670 #[test]
671 fn simple_send_and_receive() {
672 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
673 assert!(c.try_send(1).is_ok());
674 assert_eq!(c.try_receive().unwrap(), 1);
675 }
676
677 #[test]
678 fn cloning() {
679 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
680 let r1 = c.receiver();
681 let s1 = c.sender();
682
683 let _ = r1.clone();
684 let _ = s1.clone();
685 }
686
687 #[test]
688 fn dynamic_dispatch() {
689 let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
690 let s: DynamicSender<'_, u32> = c.sender().into();
691 let r: DynamicReceiver<'_, u32> = c.receiver().into();
692
693 assert!(s.try_send(1).is_ok());
694 assert_eq!(r.try_receive().unwrap(), 1);
695 }
696
697 #[futures_test::test]
698 async fn receiver_receives_given_try_send_async() {
699 let executor = ThreadPool::new().unwrap();
700
701 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 3>> = StaticCell::new();
702 let c = &*CHANNEL.init(PriorityChannel::new());
703 let c2 = c;
704 assert!(executor
705 .spawn(async move {
706 assert!(c2.try_send(1).is_ok());
707 })
708 .is_ok());
709 assert_eq!(c.receive().await, 1);
710 }
711
712 #[futures_test::test]
713 async fn sender_send_completes_if_capacity() {
714 let c = PriorityChannel::<CriticalSectionRawMutex, u32, Max, 1>::new();
715 c.send(1).await;
716 assert_eq!(c.receive().await, 1);
717 }
718
719 #[futures_test::test]
720 async fn senders_sends_wait_until_capacity() {
721 let executor = ThreadPool::new().unwrap();
722
723 static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 1>> = StaticCell::new();
724 let c = &*CHANNEL.init(PriorityChannel::new());
725 assert!(c.try_send(1).is_ok());
726
727 let c2 = c;
728 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
729 let c2 = c;
730 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
731 Delay::new(Duration::from_millis(500)).await;
734 assert_eq!(c.receive().await, 1);
735 assert!(executor
736 .spawn(async move {
737 loop {
738 c.receive().await;
739 }
740 })
741 .is_ok());
742 send_task_1.unwrap().await;
743 send_task_2.unwrap().await;
744 }
745}