embassy_sync/
channel.rs

1//! A queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an  "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19//!
20
21use core::cell::RefCell;
22use core::future::Future;
23use core::pin::Pin;
24use core::task::{Context, Poll};
25
26use heapless::Deque;
27
28use crate::blocking_mutex::raw::RawMutex;
29use crate::blocking_mutex::Mutex;
30use crate::waitqueue::WakerRegistration;
31
32/// Send-only access to a [`Channel`].
33pub struct Sender<'ch, M, T, const N: usize>
34where
35    M: RawMutex,
36{
37    channel: &'ch Channel<M, T, N>,
38}
39
40impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
41where
42    M: RawMutex,
43{
44    fn clone(&self) -> Self {
45        *self
46    }
47}
48
49impl<'ch, M, T, const N: usize> Copy for Sender<'ch, M, T, N> where M: RawMutex {}
50
51impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
52where
53    M: RawMutex,
54{
55    /// Sends a value.
56    ///
57    /// See [`Channel::send()`]
58    pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
59        self.channel.send(message)
60    }
61
62    /// Attempt to immediately send a message.
63    ///
64    /// See [`Channel::send()`]
65    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
66        self.channel.try_send(message)
67    }
68
69    /// Allows a poll_fn to poll until the channel is ready to send
70    ///
71    /// See [`Channel::poll_ready_to_send()`]
72    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
73        self.channel.poll_ready_to_send(cx)
74    }
75
76    /// Returns the maximum number of elements the channel can hold.
77    ///
78    /// See [`Channel::capacity()`]
79    pub const fn capacity(&self) -> usize {
80        self.channel.capacity()
81    }
82
83    /// Returns the free capacity of the channel.
84    ///
85    /// See [`Channel::free_capacity()`]
86    pub fn free_capacity(&self) -> usize {
87        self.channel.free_capacity()
88    }
89
90    /// Clears all elements in the channel.
91    ///
92    /// See [`Channel::clear()`]
93    pub fn clear(&self) {
94        self.channel.clear();
95    }
96
97    /// Returns the number of elements currently in the channel.
98    ///
99    /// See [`Channel::len()`]
100    pub fn len(&self) -> usize {
101        self.channel.len()
102    }
103
104    /// Returns whether the channel is empty.
105    ///
106    /// See [`Channel::is_empty()`]
107    pub fn is_empty(&self) -> bool {
108        self.channel.is_empty()
109    }
110
111    /// Returns whether the channel is full.
112    ///
113    /// See [`Channel::is_full()`]
114    pub fn is_full(&self) -> bool {
115        self.channel.is_full()
116    }
117}
118
119/// Send-only access to a [`Channel`] without knowing channel size.
120pub struct DynamicSender<'ch, T> {
121    pub(crate) channel: &'ch dyn DynamicChannel<T>,
122}
123
124impl<'ch, T> Clone for DynamicSender<'ch, T> {
125    fn clone(&self) -> Self {
126        *self
127    }
128}
129
130impl<'ch, T> Copy for DynamicSender<'ch, T> {}
131
132impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
133where
134    M: RawMutex,
135{
136    fn from(s: Sender<'ch, M, T, N>) -> Self {
137        Self { channel: s.channel }
138    }
139}
140
141impl<'ch, T> DynamicSender<'ch, T> {
142    /// Sends a value.
143    ///
144    /// See [`Channel::send()`]
145    pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
146        DynamicSendFuture {
147            channel: self.channel,
148            message: Some(message),
149        }
150    }
151
152    /// Attempt to immediately send a message.
153    ///
154    /// See [`Channel::send()`]
155    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
156        self.channel.try_send_with_context(message, None)
157    }
158
159    /// Allows a poll_fn to poll until the channel is ready to send
160    ///
161    /// See [`Channel::poll_ready_to_send()`]
162    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
163        self.channel.poll_ready_to_send(cx)
164    }
165}
166
167/// Receive-only access to a [`Channel`].
168pub struct Receiver<'ch, M, T, const N: usize>
169where
170    M: RawMutex,
171{
172    channel: &'ch Channel<M, T, N>,
173}
174
175impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
176where
177    M: RawMutex,
178{
179    fn clone(&self) -> Self {
180        *self
181    }
182}
183
184impl<'ch, M, T, const N: usize> Copy for Receiver<'ch, M, T, N> where M: RawMutex {}
185
186impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
187where
188    M: RawMutex,
189{
190    /// Receive the next value.
191    ///
192    /// See [`Channel::receive()`].
193    pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
194        self.channel.receive()
195    }
196
197    /// Is a value ready to be received in the channel
198    ///
199    /// See [`Channel::ready_to_receive()`].
200    pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
201        self.channel.ready_to_receive()
202    }
203
204    /// Attempt to immediately receive the next value.
205    ///
206    /// See [`Channel::try_receive()`]
207    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
208        self.channel.try_receive()
209    }
210
211    /// Allows a poll_fn to poll until the channel is ready to receive
212    ///
213    /// See [`Channel::poll_ready_to_receive()`]
214    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
215        self.channel.poll_ready_to_receive(cx)
216    }
217
218    /// Poll the channel for the next item
219    ///
220    /// See [`Channel::poll_receive()`]
221    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
222        self.channel.poll_receive(cx)
223    }
224
225    /// Returns the maximum number of elements the channel can hold.
226    ///
227    /// See [`Channel::capacity()`]
228    pub const fn capacity(&self) -> usize {
229        self.channel.capacity()
230    }
231
232    /// Returns the free capacity of the channel.
233    ///
234    /// See [`Channel::free_capacity()`]
235    pub fn free_capacity(&self) -> usize {
236        self.channel.free_capacity()
237    }
238
239    /// Clears all elements in the channel.
240    ///
241    /// See [`Channel::clear()`]
242    pub fn clear(&self) {
243        self.channel.clear();
244    }
245
246    /// Returns the number of elements currently in the channel.
247    ///
248    /// See [`Channel::len()`]
249    pub fn len(&self) -> usize {
250        self.channel.len()
251    }
252
253    /// Returns whether the channel is empty.
254    ///
255    /// See [`Channel::is_empty()`]
256    pub fn is_empty(&self) -> bool {
257        self.channel.is_empty()
258    }
259
260    /// Returns whether the channel is full.
261    ///
262    /// See [`Channel::is_full()`]
263    pub fn is_full(&self) -> bool {
264        self.channel.is_full()
265    }
266}
267
268/// Receive-only access to a [`Channel`] without knowing channel size.
269pub struct DynamicReceiver<'ch, T> {
270    pub(crate) channel: &'ch dyn DynamicChannel<T>,
271}
272
273impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
274    fn clone(&self) -> Self {
275        *self
276    }
277}
278
279impl<'ch, T> Copy for DynamicReceiver<'ch, T> {}
280
281impl<'ch, T> DynamicReceiver<'ch, T> {
282    /// Receive the next value.
283    ///
284    /// See [`Channel::receive()`].
285    pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
286        DynamicReceiveFuture { channel: self.channel }
287    }
288
289    /// Attempt to immediately receive the next value.
290    ///
291    /// See [`Channel::try_receive()`]
292    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
293        self.channel.try_receive_with_context(None)
294    }
295
296    /// Allows a poll_fn to poll until the channel is ready to receive
297    ///
298    /// See [`Channel::poll_ready_to_receive()`]
299    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
300        self.channel.poll_ready_to_receive(cx)
301    }
302
303    /// Poll the channel for the next item
304    ///
305    /// See [`Channel::poll_receive()`]
306    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
307        self.channel.poll_receive(cx)
308    }
309}
310
311impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
312where
313    M: RawMutex,
314{
315    fn from(s: Receiver<'ch, M, T, N>) -> Self {
316        Self { channel: s.channel }
317    }
318}
319
320/// Future returned by [`Channel::receive`] and  [`Receiver::receive`].
321#[must_use = "futures do nothing unless you `.await` or poll them"]
322pub struct ReceiveFuture<'ch, M, T, const N: usize>
323where
324    M: RawMutex,
325{
326    channel: &'ch Channel<M, T, N>,
327}
328
329impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N>
330where
331    M: RawMutex,
332{
333    type Output = T;
334
335    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
336        self.channel.poll_receive(cx)
337    }
338}
339
340/// Future returned by [`Channel::ready_to_receive`] and  [`Receiver::ready_to_receive`].
341#[must_use = "futures do nothing unless you `.await` or poll them"]
342pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
343where
344    M: RawMutex,
345{
346    channel: &'ch Channel<M, T, N>,
347}
348
349impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
350where
351    M: RawMutex,
352{
353    type Output = ();
354
355    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
356        self.channel.poll_ready_to_receive(cx)
357    }
358}
359
360/// Future returned by [`DynamicReceiver::receive`].
361#[must_use = "futures do nothing unless you `.await` or poll them"]
362pub struct DynamicReceiveFuture<'ch, T> {
363    channel: &'ch dyn DynamicChannel<T>,
364}
365
366impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
367    type Output = T;
368
369    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
370        match self.channel.try_receive_with_context(Some(cx)) {
371            Ok(v) => Poll::Ready(v),
372            Err(TryReceiveError::Empty) => Poll::Pending,
373        }
374    }
375}
376
377impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
378    fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
379        Self { channel: value.channel }
380    }
381}
382
383/// Future returned by [`Channel::send`] and  [`Sender::send`].
384#[must_use = "futures do nothing unless you `.await` or poll them"]
385pub struct SendFuture<'ch, M, T, const N: usize>
386where
387    M: RawMutex,
388{
389    channel: &'ch Channel<M, T, N>,
390    message: Option<T>,
391}
392
393impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
394where
395    M: RawMutex,
396{
397    type Output = ();
398
399    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
400        match self.message.take() {
401            Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
402                Ok(..) => Poll::Ready(()),
403                Err(TrySendError::Full(m)) => {
404                    self.message = Some(m);
405                    Poll::Pending
406                }
407            },
408            None => panic!("Message cannot be None"),
409        }
410    }
411}
412
413impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
414
415/// Future returned by [`DynamicSender::send`].
416#[must_use = "futures do nothing unless you `.await` or poll them"]
417pub struct DynamicSendFuture<'ch, T> {
418    channel: &'ch dyn DynamicChannel<T>,
419    message: Option<T>,
420}
421
422impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
423    type Output = ();
424
425    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
426        match self.message.take() {
427            Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
428                Ok(..) => Poll::Ready(()),
429                Err(TrySendError::Full(m)) => {
430                    self.message = Some(m);
431                    Poll::Pending
432                }
433            },
434            None => panic!("Message cannot be None"),
435        }
436    }
437}
438
439impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
440
441impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
442    fn from(value: SendFuture<'ch, M, T, N>) -> Self {
443        Self {
444            channel: value.channel,
445            message: value.message,
446        }
447    }
448}
449
450pub(crate) trait DynamicChannel<T> {
451    fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
452
453    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
454
455    fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
456    fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
457
458    fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
459}
460
461/// Error returned by [`try_receive`](Channel::try_receive).
462#[derive(PartialEq, Eq, Clone, Copy, Debug)]
463#[cfg_attr(feature = "defmt", derive(defmt::Format))]
464pub enum TryReceiveError {
465    /// A message could not be received because the channel is empty.
466    Empty,
467}
468
469/// Error returned by [`try_send`](Channel::try_send).
470#[derive(PartialEq, Eq, Clone, Copy, Debug)]
471#[cfg_attr(feature = "defmt", derive(defmt::Format))]
472pub enum TrySendError<T> {
473    /// The data could not be sent on the channel because the channel is
474    /// currently full and sending would require blocking.
475    Full(T),
476}
477
478struct ChannelState<T, const N: usize> {
479    queue: Deque<T, N>,
480    receiver_waker: WakerRegistration,
481    senders_waker: WakerRegistration,
482}
483
484impl<T, const N: usize> ChannelState<T, N> {
485    const fn new() -> Self {
486        ChannelState {
487            queue: Deque::new(),
488            receiver_waker: WakerRegistration::new(),
489            senders_waker: WakerRegistration::new(),
490        }
491    }
492
493    fn try_receive(&mut self) -> Result<T, TryReceiveError> {
494        self.try_receive_with_context(None)
495    }
496
497    fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
498        if self.queue.is_full() {
499            self.senders_waker.wake();
500        }
501
502        if let Some(message) = self.queue.pop_front() {
503            Ok(message)
504        } else {
505            if let Some(cx) = cx {
506                self.receiver_waker.register(cx.waker());
507            }
508            Err(TryReceiveError::Empty)
509        }
510    }
511
512    fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
513        if self.queue.is_full() {
514            self.senders_waker.wake();
515        }
516
517        if let Some(message) = self.queue.pop_front() {
518            Poll::Ready(message)
519        } else {
520            self.receiver_waker.register(cx.waker());
521            Poll::Pending
522        }
523    }
524
525    fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
526        self.receiver_waker.register(cx.waker());
527
528        if !self.queue.is_empty() {
529            Poll::Ready(())
530        } else {
531            Poll::Pending
532        }
533    }
534
535    fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
536        self.try_send_with_context(message, None)
537    }
538
539    fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
540        match self.queue.push_back(message) {
541            Ok(()) => {
542                self.receiver_waker.wake();
543                Ok(())
544            }
545            Err(message) => {
546                if let Some(cx) = cx {
547                    self.senders_waker.register(cx.waker());
548                }
549                Err(TrySendError::Full(message))
550            }
551        }
552    }
553
554    fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
555        self.senders_waker.register(cx.waker());
556
557        if !self.queue.is_full() {
558            Poll::Ready(())
559        } else {
560            Poll::Pending
561        }
562    }
563
564    fn clear(&mut self) {
565        self.queue.clear();
566    }
567
568    fn len(&self) -> usize {
569        self.queue.len()
570    }
571
572    fn is_empty(&self) -> bool {
573        self.queue.is_empty()
574    }
575
576    fn is_full(&self) -> bool {
577        self.queue.is_full()
578    }
579}
580
581/// A bounded channel for communicating between asynchronous tasks
582/// with backpressure.
583///
584/// The channel will buffer up to the provided number of messages.  Once the
585/// buffer is full, attempts to `send` new messages will wait until a message is
586/// received from the channel.
587///
588/// All data sent will become available in the same order as it was sent.
589pub struct Channel<M, T, const N: usize>
590where
591    M: RawMutex,
592{
593    inner: Mutex<M, RefCell<ChannelState<T, N>>>,
594}
595
596impl<M, T, const N: usize> Channel<M, T, N>
597where
598    M: RawMutex,
599{
600    /// Establish a new bounded channel. For example, to create one with a NoopMutex:
601    ///
602    /// ```
603    /// use embassy_sync::channel::Channel;
604    /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
605    ///
606    /// // Declare a bounded channel of 3 u32s.
607    /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
608    /// ```
609    pub const fn new() -> Self {
610        Self {
611            inner: Mutex::new(RefCell::new(ChannelState::new())),
612        }
613    }
614
615    fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
616        self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
617    }
618
619    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
620        self.lock(|c| c.try_receive_with_context(cx))
621    }
622
623    /// Poll the channel for the next message
624    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
625        self.lock(|c| c.poll_receive(cx))
626    }
627
628    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
629        self.lock(|c| c.try_send_with_context(m, cx))
630    }
631
632    /// Allows a poll_fn to poll until the channel is ready to receive
633    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
634        self.lock(|c| c.poll_ready_to_receive(cx))
635    }
636
637    /// Allows a poll_fn to poll until the channel is ready to send
638    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
639        self.lock(|c| c.poll_ready_to_send(cx))
640    }
641
642    /// Get a sender for this channel.
643    pub fn sender(&self) -> Sender<'_, M, T, N> {
644        Sender { channel: self }
645    }
646
647    /// Get a receiver for this channel.
648    pub fn receiver(&self) -> Receiver<'_, M, T, N> {
649        Receiver { channel: self }
650    }
651
652    /// Get a sender for this channel using dynamic dispatch.
653    pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
654        DynamicSender { channel: self }
655    }
656
657    /// Get a receiver for this channel using dynamic dispatch.
658    pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
659        DynamicReceiver { channel: self }
660    }
661
662    /// Send a value, waiting until there is capacity.
663    ///
664    /// Sending completes when the value has been pushed to the channel's queue.
665    /// This doesn't mean the value has been received yet.
666    pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
667        SendFuture {
668            channel: self,
669            message: Some(message),
670        }
671    }
672
673    /// Attempt to immediately send a message.
674    ///
675    /// This method differs from [`send`](Channel::send) by returning immediately if the channel's
676    /// buffer is full, instead of waiting.
677    ///
678    /// # Errors
679    ///
680    /// If the channel capacity has been reached, i.e., the channel has `n`
681    /// buffered values where `n` is the argument passed to [`Channel`], then an
682    /// error is returned.
683    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
684        self.lock(|c| c.try_send(message))
685    }
686
687    /// Receive the next value.
688    ///
689    /// If there are no messages in the channel's buffer, this method will
690    /// wait until a message is sent.
691    pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
692        ReceiveFuture { channel: self }
693    }
694
695    /// Is a value ready to be received in the channel
696    ///
697    /// If there are no messages in the channel's buffer, this method will
698    /// wait until there is at least one
699    pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
700        ReceiveReadyFuture { channel: self }
701    }
702
703    /// Attempt to immediately receive a message.
704    ///
705    /// This method will either receive a message from the channel immediately or return an error
706    /// if the channel is empty.
707    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
708        self.lock(|c| c.try_receive())
709    }
710
711    /// Returns the maximum number of elements the channel can hold.
712    pub const fn capacity(&self) -> usize {
713        N
714    }
715
716    /// Returns the free capacity of the channel.
717    ///
718    /// This is equivalent to `capacity() - len()`
719    pub fn free_capacity(&self) -> usize {
720        N - self.len()
721    }
722
723    /// Clears all elements in the channel.
724    pub fn clear(&self) {
725        self.lock(|c| c.clear());
726    }
727
728    /// Returns the number of elements currently in the channel.
729    pub fn len(&self) -> usize {
730        self.lock(|c| c.len())
731    }
732
733    /// Returns whether the channel is empty.
734    pub fn is_empty(&self) -> bool {
735        self.lock(|c| c.is_empty())
736    }
737
738    /// Returns whether the channel is full.
739    pub fn is_full(&self) -> bool {
740        self.lock(|c| c.is_full())
741    }
742}
743
744/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
745/// tradeoff cost of dynamic dispatch.
746impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
747where
748    M: RawMutex,
749{
750    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
751        Channel::try_send_with_context(self, m, cx)
752    }
753
754    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
755        Channel::try_receive_with_context(self, cx)
756    }
757
758    fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
759        Channel::poll_ready_to_send(self, cx)
760    }
761
762    fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
763        Channel::poll_ready_to_receive(self, cx)
764    }
765
766    fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
767        Channel::poll_receive(self, cx)
768    }
769}
770
771#[cfg(test)]
772mod tests {
773    use core::time::Duration;
774
775    use futures_executor::ThreadPool;
776    use futures_timer::Delay;
777    use futures_util::task::SpawnExt;
778    use static_cell::StaticCell;
779
780    use super::*;
781    use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
782
783    fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
784        c.queue.capacity() - c.queue.len()
785    }
786
787    #[test]
788    fn sending_once() {
789        let mut c = ChannelState::<u32, 3>::new();
790        assert!(c.try_send(1).is_ok());
791        assert_eq!(capacity(&c), 2);
792    }
793
794    #[test]
795    fn sending_when_full() {
796        let mut c = ChannelState::<u32, 3>::new();
797        let _ = c.try_send(1);
798        let _ = c.try_send(1);
799        let _ = c.try_send(1);
800        match c.try_send(2) {
801            Err(TrySendError::Full(2)) => assert!(true),
802            _ => assert!(false),
803        }
804        assert_eq!(capacity(&c), 0);
805    }
806
807    #[test]
808    fn receiving_once_with_one_send() {
809        let mut c = ChannelState::<u32, 3>::new();
810        assert!(c.try_send(1).is_ok());
811        assert_eq!(c.try_receive().unwrap(), 1);
812        assert_eq!(capacity(&c), 3);
813    }
814
815    #[test]
816    fn receiving_when_empty() {
817        let mut c = ChannelState::<u32, 3>::new();
818        match c.try_receive() {
819            Err(TryReceiveError::Empty) => assert!(true),
820            _ => assert!(false),
821        }
822        assert_eq!(capacity(&c), 3);
823    }
824
825    #[test]
826    fn simple_send_and_receive() {
827        let c = Channel::<NoopRawMutex, u32, 3>::new();
828        assert!(c.try_send(1).is_ok());
829        assert_eq!(c.try_receive().unwrap(), 1);
830    }
831
832    #[test]
833    fn cloning() {
834        let c = Channel::<NoopRawMutex, u32, 3>::new();
835        let r1 = c.receiver();
836        let s1 = c.sender();
837
838        let _ = r1.clone();
839        let _ = s1.clone();
840    }
841
842    #[test]
843    fn dynamic_dispatch_into() {
844        let c = Channel::<NoopRawMutex, u32, 3>::new();
845        let s: DynamicSender<'_, u32> = c.sender().into();
846        let r: DynamicReceiver<'_, u32> = c.receiver().into();
847
848        assert!(s.try_send(1).is_ok());
849        assert_eq!(r.try_receive().unwrap(), 1);
850    }
851
852    #[test]
853    fn dynamic_dispatch_constructor() {
854        let c = Channel::<NoopRawMutex, u32, 3>::new();
855        let s = c.dyn_sender();
856        let r = c.dyn_receiver();
857
858        assert!(s.try_send(1).is_ok());
859        assert_eq!(r.try_receive().unwrap(), 1);
860    }
861
862    #[futures_test::test]
863    async fn receiver_receives_given_try_send_async() {
864        let executor = ThreadPool::new().unwrap();
865
866        static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
867        let c = &*CHANNEL.init(Channel::new());
868        let c2 = c;
869        assert!(executor
870            .spawn(async move {
871                assert!(c2.try_send(1).is_ok());
872            })
873            .is_ok());
874        assert_eq!(c.receive().await, 1);
875    }
876
877    #[futures_test::test]
878    async fn sender_send_completes_if_capacity() {
879        let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
880        c.send(1).await;
881        assert_eq!(c.receive().await, 1);
882    }
883
884    #[futures_test::test]
885    async fn senders_sends_wait_until_capacity() {
886        let executor = ThreadPool::new().unwrap();
887
888        static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
889        let c = &*CHANNEL.init(Channel::new());
890        assert!(c.try_send(1).is_ok());
891
892        let c2 = c;
893        let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
894        let c2 = c;
895        let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
896        // Wish I could think of a means of determining that the async send is waiting instead.
897        // However, I've used the debugger to observe that the send does indeed wait.
898        Delay::new(Duration::from_millis(500)).await;
899        assert_eq!(c.receive().await, 1);
900        assert!(executor
901            .spawn(async move {
902                loop {
903                    c.receive().await;
904                }
905            })
906            .is_ok());
907        send_task_1.unwrap().await;
908        send_task_2.unwrap().await;
909    }
910}