embassy_sync/
priority_channel.rs

1//! A queue for sending values between asynchronous tasks.
2//!
3//! Similar to a [`Channel`](crate::channel::Channel), however [`PriorityChannel`] sifts higher priority items to the front of the queue.
4//! Priority is determined by the `Ord` trait. Priority behavior is determined by the [`Kind`](heapless::binary_heap::Kind) parameter of the channel.
5
6use core::cell::RefCell;
7use core::future::Future;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11pub use heapless::binary_heap::{Kind, Max, Min};
12use heapless::BinaryHeap;
13
14use crate::blocking_mutex::raw::RawMutex;
15use crate::blocking_mutex::Mutex;
16use crate::channel::{DynamicChannel, DynamicReceiver, DynamicSender, TryReceiveError, TrySendError};
17use crate::waitqueue::WakerRegistration;
18
19/// Send-only access to a [`PriorityChannel`].
20pub struct Sender<'ch, M, T, K, const N: usize>
21where
22    T: Ord,
23    K: Kind,
24    M: RawMutex,
25{
26    channel: &'ch PriorityChannel<M, T, K, N>,
27}
28
29impl<'ch, M, T, K, const N: usize> Clone for Sender<'ch, M, T, K, N>
30where
31    T: Ord,
32    K: Kind,
33    M: RawMutex,
34{
35    fn clone(&self) -> Self {
36        *self
37    }
38}
39
40impl<'ch, M, T, K, const N: usize> Copy for Sender<'ch, M, T, K, N>
41where
42    T: Ord,
43    K: Kind,
44    M: RawMutex,
45{
46}
47
48impl<'ch, M, T, K, const N: usize> Sender<'ch, M, T, K, N>
49where
50    T: Ord,
51    K: Kind,
52    M: RawMutex,
53{
54    /// Sends a value.
55    ///
56    /// See [`PriorityChannel::send()`]
57    pub fn send(&self, message: T) -> SendFuture<'ch, M, T, K, N> {
58        self.channel.send(message)
59    }
60
61    /// Attempt to immediately send a message.
62    ///
63    /// See [`PriorityChannel::send()`]
64    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
65        self.channel.try_send(message)
66    }
67
68    /// Allows a poll_fn to poll until the channel is ready to send
69    ///
70    /// See [`PriorityChannel::poll_ready_to_send()`]
71    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
72        self.channel.poll_ready_to_send(cx)
73    }
74
75    /// Returns the maximum number of elements the channel can hold.
76    ///
77    /// See [`PriorityChannel::capacity()`]
78    pub const fn capacity(&self) -> usize {
79        self.channel.capacity()
80    }
81
82    /// Returns the free capacity of the channel.
83    ///
84    /// See [`PriorityChannel::free_capacity()`]
85    pub fn free_capacity(&self) -> usize {
86        self.channel.free_capacity()
87    }
88
89    /// Clears all elements in the channel.
90    ///
91    /// See [`PriorityChannel::clear()`]
92    pub fn clear(&self) {
93        self.channel.clear();
94    }
95
96    /// Returns the number of elements currently in the channel.
97    ///
98    /// See [`PriorityChannel::len()`]
99    pub fn len(&self) -> usize {
100        self.channel.len()
101    }
102
103    /// Returns whether the channel is empty.
104    ///
105    /// See [`PriorityChannel::is_empty()`]
106    pub fn is_empty(&self) -> bool {
107        self.channel.is_empty()
108    }
109
110    /// Returns whether the channel is full.
111    ///
112    /// See [`PriorityChannel::is_full()`]
113    pub fn is_full(&self) -> bool {
114        self.channel.is_full()
115    }
116}
117
118impl<'ch, M, T, K, const N: usize> From<Sender<'ch, M, T, K, N>> for DynamicSender<'ch, T>
119where
120    T: Ord,
121    K: Kind,
122    M: RawMutex,
123{
124    fn from(s: Sender<'ch, M, T, K, N>) -> Self {
125        Self { channel: s.channel }
126    }
127}
128
129/// Receive-only access to a [`PriorityChannel`].
130pub struct Receiver<'ch, M, T, K, const N: usize>
131where
132    T: Ord,
133    K: Kind,
134    M: RawMutex,
135{
136    channel: &'ch PriorityChannel<M, T, K, N>,
137}
138
139impl<'ch, M, T, K, const N: usize> Clone for Receiver<'ch, M, T, K, N>
140where
141    T: Ord,
142    K: Kind,
143    M: RawMutex,
144{
145    fn clone(&self) -> Self {
146        *self
147    }
148}
149
150impl<'ch, M, T, K, const N: usize> Copy for Receiver<'ch, M, T, K, N>
151where
152    T: Ord,
153    K: Kind,
154    M: RawMutex,
155{
156}
157
158impl<'ch, M, T, K, const N: usize> Receiver<'ch, M, T, K, N>
159where
160    T: Ord,
161    K: Kind,
162    M: RawMutex,
163{
164    /// Receive the next value.
165    ///
166    /// See [`PriorityChannel::receive()`].
167    pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> {
168        self.channel.receive()
169    }
170
171    /// Attempt to immediately receive the next value.
172    ///
173    /// See [`PriorityChannel::try_receive()`]
174    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
175        self.channel.try_receive()
176    }
177
178    /// Allows a poll_fn to poll until the channel is ready to receive
179    ///
180    /// See [`PriorityChannel::poll_ready_to_receive()`]
181    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
182        self.channel.poll_ready_to_receive(cx)
183    }
184
185    /// Poll the channel for the next item
186    ///
187    /// See [`PriorityChannel::poll_receive()`]
188    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
189        self.channel.poll_receive(cx)
190    }
191
192    /// Returns the maximum number of elements the channel can hold.
193    ///
194    /// See [`PriorityChannel::capacity()`]
195    pub const fn capacity(&self) -> usize {
196        self.channel.capacity()
197    }
198
199    /// Returns the free capacity of the channel.
200    ///
201    /// See [`PriorityChannel::free_capacity()`]
202    pub fn free_capacity(&self) -> usize {
203        self.channel.free_capacity()
204    }
205
206    /// Clears all elements in the channel.
207    ///
208    /// See [`PriorityChannel::clear()`]
209    pub fn clear(&self) {
210        self.channel.clear();
211    }
212
213    /// Returns the number of elements currently in the channel.
214    ///
215    /// See [`PriorityChannel::len()`]
216    pub fn len(&self) -> usize {
217        self.channel.len()
218    }
219
220    /// Returns whether the channel is empty.
221    ///
222    /// See [`PriorityChannel::is_empty()`]
223    pub fn is_empty(&self) -> bool {
224        self.channel.is_empty()
225    }
226
227    /// Returns whether the channel is full.
228    ///
229    /// See [`PriorityChannel::is_full()`]
230    pub fn is_full(&self) -> bool {
231        self.channel.is_full()
232    }
233}
234
235impl<'ch, M, T, K, const N: usize> From<Receiver<'ch, M, T, K, N>> for DynamicReceiver<'ch, T>
236where
237    T: Ord,
238    K: Kind,
239    M: RawMutex,
240{
241    fn from(s: Receiver<'ch, M, T, K, N>) -> Self {
242        Self { channel: s.channel }
243    }
244}
245
246/// Future returned by [`PriorityChannel::receive`] and  [`Receiver::receive`].
247#[must_use = "futures do nothing unless you `.await` or poll them"]
248pub struct ReceiveFuture<'ch, M, T, K, const N: usize>
249where
250    T: Ord,
251    K: Kind,
252    M: RawMutex,
253{
254    channel: &'ch PriorityChannel<M, T, K, N>,
255}
256
257impl<'ch, M, T, K, const N: usize> Future for ReceiveFuture<'ch, M, T, K, N>
258where
259    T: Ord,
260    K: Kind,
261    M: RawMutex,
262{
263    type Output = T;
264
265    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
266        self.channel.poll_receive(cx)
267    }
268}
269
270/// Future returned by [`PriorityChannel::send`] and  [`Sender::send`].
271#[must_use = "futures do nothing unless you `.await` or poll them"]
272pub struct SendFuture<'ch, M, T, K, const N: usize>
273where
274    T: Ord,
275    K: Kind,
276    M: RawMutex,
277{
278    channel: &'ch PriorityChannel<M, T, K, N>,
279    message: Option<T>,
280}
281
282impl<'ch, M, T, K, const N: usize> Future for SendFuture<'ch, M, T, K, N>
283where
284    T: Ord,
285    K: Kind,
286    M: RawMutex,
287{
288    type Output = ();
289
290    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
291        match self.message.take() {
292            Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
293                Ok(..) => Poll::Ready(()),
294                Err(TrySendError::Full(m)) => {
295                    self.message = Some(m);
296                    Poll::Pending
297                }
298            },
299            None => panic!("Message cannot be None"),
300        }
301    }
302}
303
304impl<'ch, M, T, K, const N: usize> Unpin for SendFuture<'ch, M, T, K, N>
305where
306    T: Ord,
307    K: Kind,
308    M: RawMutex,
309{
310}
311
312struct ChannelState<T, K, const N: usize> {
313    queue: BinaryHeap<T, K, N>,
314    receiver_waker: WakerRegistration,
315    senders_waker: WakerRegistration,
316}
317
318impl<T, K, const N: usize> ChannelState<T, K, N>
319where
320    T: Ord,
321    K: Kind,
322{
323    const fn new() -> Self {
324        ChannelState {
325            queue: BinaryHeap::new(),
326            receiver_waker: WakerRegistration::new(),
327            senders_waker: WakerRegistration::new(),
328        }
329    }
330
331    fn try_receive(&mut self) -> Result<T, TryReceiveError> {
332        self.try_receive_with_context(None)
333    }
334
335    fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
336        if self.queue.len() == self.queue.capacity() {
337            self.senders_waker.wake();
338        }
339
340        if let Some(message) = self.queue.pop() {
341            Ok(message)
342        } else {
343            if let Some(cx) = cx {
344                self.receiver_waker.register(cx.waker());
345            }
346            Err(TryReceiveError::Empty)
347        }
348    }
349
350    fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
351        if self.queue.len() == self.queue.capacity() {
352            self.senders_waker.wake();
353        }
354
355        if let Some(message) = self.queue.pop() {
356            Poll::Ready(message)
357        } else {
358            self.receiver_waker.register(cx.waker());
359            Poll::Pending
360        }
361    }
362
363    fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
364        self.receiver_waker.register(cx.waker());
365
366        if !self.queue.is_empty() {
367            Poll::Ready(())
368        } else {
369            Poll::Pending
370        }
371    }
372
373    fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
374        self.try_send_with_context(message, None)
375    }
376
377    fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
378        match self.queue.push(message) {
379            Ok(()) => {
380                self.receiver_waker.wake();
381                Ok(())
382            }
383            Err(message) => {
384                if let Some(cx) = cx {
385                    self.senders_waker.register(cx.waker());
386                }
387                Err(TrySendError::Full(message))
388            }
389        }
390    }
391
392    fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
393        self.senders_waker.register(cx.waker());
394
395        if !self.queue.len() == self.queue.capacity() {
396            Poll::Ready(())
397        } else {
398            Poll::Pending
399        }
400    }
401
402    fn clear(&mut self) {
403        self.queue.clear();
404    }
405
406    fn len(&self) -> usize {
407        self.queue.len()
408    }
409
410    fn is_empty(&self) -> bool {
411        self.queue.is_empty()
412    }
413
414    fn is_full(&self) -> bool {
415        self.queue.len() == self.queue.capacity()
416    }
417}
418
419/// A bounded channel for communicating between asynchronous tasks
420/// with backpressure.
421///
422/// The channel will buffer up to the provided number of messages.  Once the
423/// buffer is full, attempts to `send` new messages will wait until a message is
424/// received from the channel.
425///
426/// Sent data may be reordered based on their priority within the channel.
427/// For example, in a [`Max`](heapless::binary_heap::Max) [`PriorityChannel`]
428/// containing `u32`'s, data sent in the following order `[1, 2, 3]` will be received as `[3, 2, 1]`.
429pub struct PriorityChannel<M, T, K, const N: usize>
430where
431    T: Ord,
432    K: Kind,
433    M: RawMutex,
434{
435    inner: Mutex<M, RefCell<ChannelState<T, K, N>>>,
436}
437
438impl<M, T, K, const N: usize> PriorityChannel<M, T, K, N>
439where
440    T: Ord,
441    K: Kind,
442    M: RawMutex,
443{
444    /// Establish a new bounded channel. For example, to create one with a NoopMutex:
445    ///
446    /// ```
447    /// use embassy_sync::priority_channel::{PriorityChannel, Max};
448    /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
449    ///
450    /// // Declare a bounded channel of 3 u32s.
451    /// let mut channel = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
452    /// ```
453    pub const fn new() -> Self {
454        Self {
455            inner: Mutex::new(RefCell::new(ChannelState::new())),
456        }
457    }
458
459    fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, K, N>) -> R) -> R {
460        self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
461    }
462
463    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
464        self.lock(|c| c.try_receive_with_context(cx))
465    }
466
467    /// Poll the channel for the next message
468    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
469        self.lock(|c| c.poll_receive(cx))
470    }
471
472    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
473        self.lock(|c| c.try_send_with_context(m, cx))
474    }
475
476    /// Allows a poll_fn to poll until the channel is ready to receive
477    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
478        self.lock(|c| c.poll_ready_to_receive(cx))
479    }
480
481    /// Allows a poll_fn to poll until the channel is ready to send
482    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
483        self.lock(|c| c.poll_ready_to_send(cx))
484    }
485
486    /// Get a sender for this channel.
487    pub fn sender(&self) -> Sender<'_, M, T, K, N> {
488        Sender { channel: self }
489    }
490
491    /// Get a receiver for this channel.
492    pub fn receiver(&self) -> Receiver<'_, M, T, K, N> {
493        Receiver { channel: self }
494    }
495
496    /// Send a value, waiting until there is capacity.
497    ///
498    /// Sending completes when the value has been pushed to the channel's queue.
499    /// This doesn't mean the value has been received yet.
500    pub fn send(&self, message: T) -> SendFuture<'_, M, T, K, N> {
501        SendFuture {
502            channel: self,
503            message: Some(message),
504        }
505    }
506
507    /// Attempt to immediately send a message.
508    ///
509    /// This method differs from [`send`](PriorityChannel::send) by returning immediately if the channel's
510    /// buffer is full, instead of waiting.
511    ///
512    /// # Errors
513    ///
514    /// If the channel capacity has been reached, i.e., the channel has `n`
515    /// buffered values where `n` is the argument passed to [`PriorityChannel`], then an
516    /// error is returned.
517    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
518        self.lock(|c| c.try_send(message))
519    }
520
521    /// Receive the next value.
522    ///
523    /// If there are no messages in the channel's buffer, this method will
524    /// wait until a message is sent.
525    pub fn receive(&self) -> ReceiveFuture<'_, M, T, K, N> {
526        ReceiveFuture { channel: self }
527    }
528
529    /// Attempt to immediately receive a message.
530    ///
531    /// This method will either receive a message from the channel immediately or return an error
532    /// if the channel is empty.
533    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
534        self.lock(|c| c.try_receive())
535    }
536
537    /// Returns the maximum number of elements the channel can hold.
538    pub const fn capacity(&self) -> usize {
539        N
540    }
541
542    /// Returns the free capacity of the channel.
543    ///
544    /// This is equivalent to `capacity() - len()`
545    pub fn free_capacity(&self) -> usize {
546        N - self.len()
547    }
548
549    /// Clears all elements in the channel.
550    pub fn clear(&self) {
551        self.lock(|c| c.clear());
552    }
553
554    /// Returns the number of elements currently in the channel.
555    pub fn len(&self) -> usize {
556        self.lock(|c| c.len())
557    }
558
559    /// Returns whether the channel is empty.
560    pub fn is_empty(&self) -> bool {
561        self.lock(|c| c.is_empty())
562    }
563
564    /// Returns whether the channel is full.
565    pub fn is_full(&self) -> bool {
566        self.lock(|c| c.is_full())
567    }
568}
569
570/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
571/// tradeoff cost of dynamic dispatch.
572impl<M, T, K, const N: usize> DynamicChannel<T> for PriorityChannel<M, T, K, N>
573where
574    T: Ord,
575    K: Kind,
576    M: RawMutex,
577{
578    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
579        PriorityChannel::try_send_with_context(self, m, cx)
580    }
581
582    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
583        PriorityChannel::try_receive_with_context(self, cx)
584    }
585
586    fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
587        PriorityChannel::poll_ready_to_send(self, cx)
588    }
589
590    fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
591        PriorityChannel::poll_ready_to_receive(self, cx)
592    }
593
594    fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
595        PriorityChannel::poll_receive(self, cx)
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use core::time::Duration;
602
603    use futures_executor::ThreadPool;
604    use futures_timer::Delay;
605    use futures_util::task::SpawnExt;
606    use heapless::binary_heap::{Kind, Max};
607    use static_cell::StaticCell;
608
609    use super::*;
610    use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
611
612    fn capacity<T, K, const N: usize>(c: &ChannelState<T, K, N>) -> usize
613    where
614        T: Ord,
615        K: Kind,
616    {
617        c.queue.capacity() - c.queue.len()
618    }
619
620    #[test]
621    fn sending_once() {
622        let mut c = ChannelState::<u32, Max, 3>::new();
623        assert!(c.try_send(1).is_ok());
624        assert_eq!(capacity(&c), 2);
625    }
626
627    #[test]
628    fn sending_when_full() {
629        let mut c = ChannelState::<u32, Max, 3>::new();
630        let _ = c.try_send(1);
631        let _ = c.try_send(1);
632        let _ = c.try_send(1);
633        match c.try_send(2) {
634            Err(TrySendError::Full(2)) => assert!(true),
635            _ => assert!(false),
636        }
637        assert_eq!(capacity(&c), 0);
638    }
639
640    #[test]
641    fn send_priority() {
642        // Prio channel with kind `Max` sifts larger numbers to the front of the queue
643        let mut c = ChannelState::<u32, Max, 3>::new();
644        assert!(c.try_send(1).is_ok());
645        assert!(c.try_send(2).is_ok());
646        assert!(c.try_send(3).is_ok());
647        assert_eq!(c.try_receive().unwrap(), 3);
648        assert_eq!(c.try_receive().unwrap(), 2);
649        assert_eq!(c.try_receive().unwrap(), 1);
650    }
651
652    #[test]
653    fn receiving_once_with_one_send() {
654        let mut c = ChannelState::<u32, Max, 3>::new();
655        assert!(c.try_send(1).is_ok());
656        assert_eq!(c.try_receive().unwrap(), 1);
657        assert_eq!(capacity(&c), 3);
658    }
659
660    #[test]
661    fn receiving_when_empty() {
662        let mut c = ChannelState::<u32, Max, 3>::new();
663        match c.try_receive() {
664            Err(TryReceiveError::Empty) => assert!(true),
665            _ => assert!(false),
666        }
667        assert_eq!(capacity(&c), 3);
668    }
669
670    #[test]
671    fn simple_send_and_receive() {
672        let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
673        assert!(c.try_send(1).is_ok());
674        assert_eq!(c.try_receive().unwrap(), 1);
675    }
676
677    #[test]
678    fn cloning() {
679        let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
680        let r1 = c.receiver();
681        let s1 = c.sender();
682
683        let _ = r1.clone();
684        let _ = s1.clone();
685    }
686
687    #[test]
688    fn dynamic_dispatch() {
689        let c = PriorityChannel::<NoopRawMutex, u32, Max, 3>::new();
690        let s: DynamicSender<'_, u32> = c.sender().into();
691        let r: DynamicReceiver<'_, u32> = c.receiver().into();
692
693        assert!(s.try_send(1).is_ok());
694        assert_eq!(r.try_receive().unwrap(), 1);
695    }
696
697    #[futures_test::test]
698    async fn receiver_receives_given_try_send_async() {
699        let executor = ThreadPool::new().unwrap();
700
701        static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 3>> = StaticCell::new();
702        let c = &*CHANNEL.init(PriorityChannel::new());
703        let c2 = c;
704        assert!(executor
705            .spawn(async move {
706                assert!(c2.try_send(1).is_ok());
707            })
708            .is_ok());
709        assert_eq!(c.receive().await, 1);
710    }
711
712    #[futures_test::test]
713    async fn sender_send_completes_if_capacity() {
714        let c = PriorityChannel::<CriticalSectionRawMutex, u32, Max, 1>::new();
715        c.send(1).await;
716        assert_eq!(c.receive().await, 1);
717    }
718
719    #[futures_test::test]
720    async fn senders_sends_wait_until_capacity() {
721        let executor = ThreadPool::new().unwrap();
722
723        static CHANNEL: StaticCell<PriorityChannel<CriticalSectionRawMutex, u32, Max, 1>> = StaticCell::new();
724        let c = &*CHANNEL.init(PriorityChannel::new());
725        assert!(c.try_send(1).is_ok());
726
727        let c2 = c;
728        let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
729        let c2 = c;
730        let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
731        // Wish I could think of a means of determining that the async send is waiting instead.
732        // However, I've used the debugger to observe that the send does indeed wait.
733        Delay::new(Duration::from_millis(500)).await;
734        assert_eq!(c.receive().await, 1);
735        assert!(executor
736            .spawn(async move {
737                loop {
738                    c.receive().await;
739                }
740            })
741            .is_ok());
742        send_task_1.unwrap().await;
743        send_task_2.unwrap().await;
744    }
745}