embassy_sync/
zerocopy_channel.rs

1//! A zero-copy queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by a producer (sender) and a
4//! consumer (receiver), i.e. it is an  "SPSC channel".
5//!
6//! This queue takes a Mutex type so that various
7//! targets can be attained. For example, a ThreadModeMutex can be used
8//! for single-core Cortex-M targets where messages are only passed
9//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
10//! can also be used for single-core targets where messages are to be
11//! passed from exception mode e.g. out of an interrupt handler.
12//!
13//! This module provides a bounded channel that has a limit on the number of
14//! messages that it can store, and if this limit is reached, trying to send
15//! another message will result in an error being returned.
16
17use core::cell::RefCell;
18use core::future::{poll_fn, Future};
19use core::marker::PhantomData;
20use core::task::{Context, Poll};
21
22use crate::blocking_mutex::raw::RawMutex;
23use crate::blocking_mutex::Mutex;
24use crate::waitqueue::WakerRegistration;
25
26/// A bounded zero-copy channel for communicating between asynchronous tasks
27/// with backpressure.
28///
29/// The channel will buffer up to the provided number of messages.  Once the
30/// buffer is full, attempts to `send` new messages will wait until a message is
31/// received from the channel.
32///
33/// All data sent will become available in the same order as it was sent.
34///
35/// The channel requires a buffer of recyclable elements.  Writing to the channel is done through
36/// an `&mut T`.
37pub struct Channel<'a, M: RawMutex, T> {
38    buf: *mut T,
39    phantom: PhantomData<&'a mut T>,
40    state: Mutex<M, RefCell<State>>,
41}
42
43impl<'a, M: RawMutex, T> Channel<'a, M, T> {
44    /// Initialize a new [`Channel`].
45    ///
46    /// The provided buffer will be used and reused by the channel's logic, and thus dictates the
47    /// channel's capacity.
48    pub fn new(buf: &'a mut [T]) -> Self {
49        let len = buf.len();
50        assert!(len != 0);
51
52        Self {
53            buf: buf.as_mut_ptr(),
54            phantom: PhantomData,
55            state: Mutex::new(RefCell::new(State {
56                capacity: len,
57                front: 0,
58                back: 0,
59                full: false,
60                send_waker: WakerRegistration::new(),
61                receive_waker: WakerRegistration::new(),
62            })),
63        }
64    }
65
66    /// Creates a [`Sender`] and [`Receiver`] from an existing channel.
67    ///
68    /// Further Senders and Receivers can be created through [`Sender::borrow`] and
69    /// [`Receiver::borrow`] respectively.
70    pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
71        (Sender { channel: self }, Receiver { channel: self })
72    }
73
74    /// Clears all elements in the channel.
75    pub fn clear(&mut self) {
76        self.state.lock(|s| {
77            s.borrow_mut().clear();
78        });
79    }
80
81    /// Returns the number of elements currently in the channel.
82    pub fn len(&self) -> usize {
83        self.state.lock(|s| s.borrow().len())
84    }
85
86    /// Returns whether the channel is empty.
87    pub fn is_empty(&self) -> bool {
88        self.state.lock(|s| s.borrow().is_empty())
89    }
90
91    /// Returns whether the channel is full.
92    pub fn is_full(&self) -> bool {
93        self.state.lock(|s| s.borrow().is_full())
94    }
95}
96
97/// Send-only access to a [`Channel`].
98pub struct Sender<'a, M: RawMutex, T> {
99    channel: &'a Channel<'a, M, T>,
100}
101
102impl<'a, M: RawMutex, T> Sender<'a, M, T> {
103    /// Creates one further [`Sender`] over the same channel.
104    pub fn borrow(&mut self) -> Sender<'_, M, T> {
105        Sender { channel: self.channel }
106    }
107
108    /// Attempts to send a value over the channel.
109    pub fn try_send(&mut self) -> Option<&mut T> {
110        self.channel.state.lock(|s| {
111            let s = &mut *s.borrow_mut();
112            match s.push_index() {
113                Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
114                None => None,
115            }
116        })
117    }
118
119    /// Attempts to send a value over the channel.
120    pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
121        self.channel.state.lock(|s| {
122            let s = &mut *s.borrow_mut();
123            match s.push_index() {
124                Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
125                None => {
126                    s.receive_waker.register(cx.waker());
127                    Poll::Pending
128                }
129            }
130        })
131    }
132
133    /// Asynchronously send a value over the channel.
134    pub fn send(&mut self) -> impl Future<Output = &mut T> {
135        poll_fn(|cx| {
136            self.channel.state.lock(|s| {
137                let s = &mut *s.borrow_mut();
138                match s.push_index() {
139                    Some(i) => {
140                        let r = unsafe { &mut *self.channel.buf.add(i) };
141                        Poll::Ready(r)
142                    }
143                    None => {
144                        s.receive_waker.register(cx.waker());
145                        Poll::Pending
146                    }
147                }
148            })
149        })
150    }
151
152    /// Notify the channel that the sending of the value has been finalized.
153    pub fn send_done(&mut self) {
154        self.channel.state.lock(|s| s.borrow_mut().push_done())
155    }
156
157    /// Clears all elements in the channel.
158    pub fn clear(&mut self) {
159        self.channel.state.lock(|s| {
160            s.borrow_mut().clear();
161        });
162    }
163
164    /// Returns the number of elements currently in the channel.
165    pub fn len(&self) -> usize {
166        self.channel.state.lock(|s| s.borrow().len())
167    }
168
169    /// Returns whether the channel is empty.
170    pub fn is_empty(&self) -> bool {
171        self.channel.state.lock(|s| s.borrow().is_empty())
172    }
173
174    /// Returns whether the channel is full.
175    pub fn is_full(&self) -> bool {
176        self.channel.state.lock(|s| s.borrow().is_full())
177    }
178}
179
180/// Receive-only access to a [`Channel`].
181pub struct Receiver<'a, M: RawMutex, T> {
182    channel: &'a Channel<'a, M, T>,
183}
184
185impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
186    /// Creates one further [`Sender`] over the same channel.
187    pub fn borrow(&mut self) -> Receiver<'_, M, T> {
188        Receiver { channel: self.channel }
189    }
190
191    /// Attempts to receive a value over the channel.
192    pub fn try_receive(&mut self) -> Option<&mut T> {
193        self.channel.state.lock(|s| {
194            let s = &mut *s.borrow_mut();
195            match s.pop_index() {
196                Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
197                None => None,
198            }
199        })
200    }
201
202    /// Attempts to asynchronously receive a value over the channel.
203    pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
204        self.channel.state.lock(|s| {
205            let s = &mut *s.borrow_mut();
206            match s.pop_index() {
207                Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
208                None => {
209                    s.send_waker.register(cx.waker());
210                    Poll::Pending
211                }
212            }
213        })
214    }
215
216    /// Asynchronously receive a value over the channel.
217    pub fn receive(&mut self) -> impl Future<Output = &mut T> {
218        poll_fn(|cx| {
219            self.channel.state.lock(|s| {
220                let s = &mut *s.borrow_mut();
221                match s.pop_index() {
222                    Some(i) => {
223                        let r = unsafe { &mut *self.channel.buf.add(i) };
224                        Poll::Ready(r)
225                    }
226                    None => {
227                        s.send_waker.register(cx.waker());
228                        Poll::Pending
229                    }
230                }
231            })
232        })
233    }
234
235    /// Notify the channel that the receiving of the value has been finalized.
236    pub fn receive_done(&mut self) {
237        self.channel.state.lock(|s| s.borrow_mut().pop_done())
238    }
239
240    /// Clears all elements in the channel.
241    pub fn clear(&mut self) {
242        self.channel.state.lock(|s| {
243            s.borrow_mut().clear();
244        });
245    }
246
247    /// Returns the number of elements currently in the channel.
248    pub fn len(&self) -> usize {
249        self.channel.state.lock(|s| s.borrow().len())
250    }
251
252    /// Returns whether the channel is empty.
253    pub fn is_empty(&self) -> bool {
254        self.channel.state.lock(|s| s.borrow().is_empty())
255    }
256
257    /// Returns whether the channel is full.
258    pub fn is_full(&self) -> bool {
259        self.channel.state.lock(|s| s.borrow().is_full())
260    }
261}
262
263struct State {
264    /// Maximum number of elements the channel can hold.
265    capacity: usize,
266
267    /// Front index. Always 0..=(N-1)
268    front: usize,
269    /// Back index. Always 0..=(N-1).
270    back: usize,
271
272    /// Used to distinguish "empty" and "full" cases when `front == back`.
273    /// May only be `true` if `front == back`, always `false` otherwise.
274    full: bool,
275
276    send_waker: WakerRegistration,
277    receive_waker: WakerRegistration,
278}
279
280impl State {
281    fn increment(&self, i: usize) -> usize {
282        if i + 1 == self.capacity {
283            0
284        } else {
285            i + 1
286        }
287    }
288
289    fn clear(&mut self) {
290        self.front = 0;
291        self.back = 0;
292        self.full = false;
293    }
294
295    fn len(&self) -> usize {
296        if !self.full {
297            if self.back >= self.front {
298                self.back - self.front
299            } else {
300                self.capacity + self.back - self.front
301            }
302        } else {
303            self.capacity
304        }
305    }
306
307    fn is_full(&self) -> bool {
308        self.full
309    }
310
311    fn is_empty(&self) -> bool {
312        self.front == self.back && !self.full
313    }
314
315    fn push_index(&mut self) -> Option<usize> {
316        match self.is_full() {
317            true => None,
318            false => Some(self.back),
319        }
320    }
321
322    fn push_done(&mut self) {
323        assert!(!self.is_full());
324        self.back = self.increment(self.back);
325        if self.back == self.front {
326            self.full = true;
327        }
328        self.send_waker.wake();
329    }
330
331    fn pop_index(&mut self) -> Option<usize> {
332        match self.is_empty() {
333            true => None,
334            false => Some(self.front),
335        }
336    }
337
338    fn pop_done(&mut self) {
339        assert!(!self.is_empty());
340        self.front = self.increment(self.front);
341        self.full = false;
342        self.receive_waker.wake();
343    }
344}