embassy_sync/
zerocopy_channel.rs1use core::cell::RefCell;
18use core::future::{poll_fn, Future};
19use core::marker::PhantomData;
20use core::task::{Context, Poll};
21
22use crate::blocking_mutex::raw::RawMutex;
23use crate::blocking_mutex::Mutex;
24use crate::waitqueue::WakerRegistration;
25
26pub struct Channel<'a, M: RawMutex, T> {
38 buf: *mut T,
39 phantom: PhantomData<&'a mut T>,
40 state: Mutex<M, RefCell<State>>,
41}
42
43impl<'a, M: RawMutex, T> Channel<'a, M, T> {
44 pub fn new(buf: &'a mut [T]) -> Self {
49 let len = buf.len();
50 assert!(len != 0);
51
52 Self {
53 buf: buf.as_mut_ptr(),
54 phantom: PhantomData,
55 state: Mutex::new(RefCell::new(State {
56 capacity: len,
57 front: 0,
58 back: 0,
59 full: false,
60 send_waker: WakerRegistration::new(),
61 receive_waker: WakerRegistration::new(),
62 })),
63 }
64 }
65
66 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
71 (Sender { channel: self }, Receiver { channel: self })
72 }
73
74 pub fn clear(&mut self) {
76 self.state.lock(|s| {
77 s.borrow_mut().clear();
78 });
79 }
80
81 pub fn len(&self) -> usize {
83 self.state.lock(|s| s.borrow().len())
84 }
85
86 pub fn is_empty(&self) -> bool {
88 self.state.lock(|s| s.borrow().is_empty())
89 }
90
91 pub fn is_full(&self) -> bool {
93 self.state.lock(|s| s.borrow().is_full())
94 }
95}
96
97pub struct Sender<'a, M: RawMutex, T> {
99 channel: &'a Channel<'a, M, T>,
100}
101
102impl<'a, M: RawMutex, T> Sender<'a, M, T> {
103 pub fn borrow(&mut self) -> Sender<'_, M, T> {
105 Sender { channel: self.channel }
106 }
107
108 pub fn try_send(&mut self) -> Option<&mut T> {
110 self.channel.state.lock(|s| {
111 let s = &mut *s.borrow_mut();
112 match s.push_index() {
113 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
114 None => None,
115 }
116 })
117 }
118
119 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
121 self.channel.state.lock(|s| {
122 let s = &mut *s.borrow_mut();
123 match s.push_index() {
124 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
125 None => {
126 s.receive_waker.register(cx.waker());
127 Poll::Pending
128 }
129 }
130 })
131 }
132
133 pub fn send(&mut self) -> impl Future<Output = &mut T> {
135 poll_fn(|cx| {
136 self.channel.state.lock(|s| {
137 let s = &mut *s.borrow_mut();
138 match s.push_index() {
139 Some(i) => {
140 let r = unsafe { &mut *self.channel.buf.add(i) };
141 Poll::Ready(r)
142 }
143 None => {
144 s.receive_waker.register(cx.waker());
145 Poll::Pending
146 }
147 }
148 })
149 })
150 }
151
152 pub fn send_done(&mut self) {
154 self.channel.state.lock(|s| s.borrow_mut().push_done())
155 }
156
157 pub fn clear(&mut self) {
159 self.channel.state.lock(|s| {
160 s.borrow_mut().clear();
161 });
162 }
163
164 pub fn len(&self) -> usize {
166 self.channel.state.lock(|s| s.borrow().len())
167 }
168
169 pub fn is_empty(&self) -> bool {
171 self.channel.state.lock(|s| s.borrow().is_empty())
172 }
173
174 pub fn is_full(&self) -> bool {
176 self.channel.state.lock(|s| s.borrow().is_full())
177 }
178}
179
180pub struct Receiver<'a, M: RawMutex, T> {
182 channel: &'a Channel<'a, M, T>,
183}
184
185impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
186 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
188 Receiver { channel: self.channel }
189 }
190
191 pub fn try_receive(&mut self) -> Option<&mut T> {
193 self.channel.state.lock(|s| {
194 let s = &mut *s.borrow_mut();
195 match s.pop_index() {
196 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
197 None => None,
198 }
199 })
200 }
201
202 pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
204 self.channel.state.lock(|s| {
205 let s = &mut *s.borrow_mut();
206 match s.pop_index() {
207 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
208 None => {
209 s.send_waker.register(cx.waker());
210 Poll::Pending
211 }
212 }
213 })
214 }
215
216 pub fn receive(&mut self) -> impl Future<Output = &mut T> {
218 poll_fn(|cx| {
219 self.channel.state.lock(|s| {
220 let s = &mut *s.borrow_mut();
221 match s.pop_index() {
222 Some(i) => {
223 let r = unsafe { &mut *self.channel.buf.add(i) };
224 Poll::Ready(r)
225 }
226 None => {
227 s.send_waker.register(cx.waker());
228 Poll::Pending
229 }
230 }
231 })
232 })
233 }
234
235 pub fn receive_done(&mut self) {
237 self.channel.state.lock(|s| s.borrow_mut().pop_done())
238 }
239
240 pub fn clear(&mut self) {
242 self.channel.state.lock(|s| {
243 s.borrow_mut().clear();
244 });
245 }
246
247 pub fn len(&self) -> usize {
249 self.channel.state.lock(|s| s.borrow().len())
250 }
251
252 pub fn is_empty(&self) -> bool {
254 self.channel.state.lock(|s| s.borrow().is_empty())
255 }
256
257 pub fn is_full(&self) -> bool {
259 self.channel.state.lock(|s| s.borrow().is_full())
260 }
261}
262
263struct State {
264 capacity: usize,
266
267 front: usize,
269 back: usize,
271
272 full: bool,
275
276 send_waker: WakerRegistration,
277 receive_waker: WakerRegistration,
278}
279
280impl State {
281 fn increment(&self, i: usize) -> usize {
282 if i + 1 == self.capacity {
283 0
284 } else {
285 i + 1
286 }
287 }
288
289 fn clear(&mut self) {
290 self.front = 0;
291 self.back = 0;
292 self.full = false;
293 }
294
295 fn len(&self) -> usize {
296 if !self.full {
297 if self.back >= self.front {
298 self.back - self.front
299 } else {
300 self.capacity + self.back - self.front
301 }
302 } else {
303 self.capacity
304 }
305 }
306
307 fn is_full(&self) -> bool {
308 self.full
309 }
310
311 fn is_empty(&self) -> bool {
312 self.front == self.back && !self.full
313 }
314
315 fn push_index(&mut self) -> Option<usize> {
316 match self.is_full() {
317 true => None,
318 false => Some(self.back),
319 }
320 }
321
322 fn push_done(&mut self) {
323 assert!(!self.is_full());
324 self.back = self.increment(self.back);
325 if self.back == self.front {
326 self.full = true;
327 }
328 self.send_waker.wake();
329 }
330
331 fn pop_index(&mut self) -> Option<usize> {
332 match self.is_empty() {
333 true => None,
334 false => Some(self.front),
335 }
336 }
337
338 fn pop_done(&mut self) {
339 assert!(!self.is_empty());
340 self.front = self.increment(self.front);
341 self.full = false;
342 self.receive_waker.wake();
343 }
344}