embassy_sync/
pipe.rs

1//! Async byte stream pipe.
2
3use core::cell::{RefCell, UnsafeCell};
4use core::convert::Infallible;
5use core::future::Future;
6use core::ops::Range;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9
10use crate::blocking_mutex::raw::RawMutex;
11use crate::blocking_mutex::Mutex;
12use crate::ring_buffer::RingBuffer;
13use crate::waitqueue::WakerRegistration;
14
15/// Write-only access to a [`Pipe`].
16pub struct Writer<'p, M, const N: usize>
17where
18    M: RawMutex,
19{
20    pipe: &'p Pipe<M, N>,
21}
22
23impl<'p, M, const N: usize> Clone for Writer<'p, M, N>
24where
25    M: RawMutex,
26{
27    fn clone(&self) -> Self {
28        *self
29    }
30}
31
32impl<'p, M, const N: usize> Copy for Writer<'p, M, N> where M: RawMutex {}
33
34impl<'p, M, const N: usize> Writer<'p, M, N>
35where
36    M: RawMutex,
37{
38    /// Write some bytes to the pipe.
39    ///
40    /// See [`Pipe::write()`]
41    pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
42        self.pipe.write(buf)
43    }
44
45    /// Attempt to immediately write some bytes to the pipe.
46    ///
47    /// See [`Pipe::try_write()`]
48    pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
49        self.pipe.try_write(buf)
50    }
51}
52
53/// Future returned by [`Pipe::write`] and  [`Writer::write`].
54#[must_use = "futures do nothing unless you `.await` or poll them"]
55pub struct WriteFuture<'p, M, const N: usize>
56where
57    M: RawMutex,
58{
59    pipe: &'p Pipe<M, N>,
60    buf: &'p [u8],
61}
62
63impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N>
64where
65    M: RawMutex,
66{
67    type Output = usize;
68
69    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
70        match self.pipe.try_write_with_context(Some(cx), self.buf) {
71            Ok(n) => Poll::Ready(n),
72            Err(TryWriteError::Full) => Poll::Pending,
73        }
74    }
75}
76
77impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {}
78
79/// Read-only access to a [`Pipe`].
80pub struct Reader<'p, M, const N: usize>
81where
82    M: RawMutex,
83{
84    pipe: &'p Pipe<M, N>,
85}
86
87impl<'p, M, const N: usize> Reader<'p, M, N>
88where
89    M: RawMutex,
90{
91    /// Read some bytes from the pipe.
92    ///
93    /// See [`Pipe::read()`]
94    pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
95        self.pipe.read(buf)
96    }
97
98    /// Attempt to immediately read some bytes from the pipe.
99    ///
100    /// See [`Pipe::try_read()`]
101    pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
102        self.pipe.try_read(buf)
103    }
104
105    /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
106    ///
107    /// If no bytes are currently available to read, this function waits until at least one byte is available.
108    ///
109    /// If the reader is at end-of-file (EOF), an empty slice is returned.
110    pub fn fill_buf(&mut self) -> FillBufFuture<'_, M, N> {
111        FillBufFuture { pipe: Some(self.pipe) }
112    }
113
114    /// Try returning contents of the internal buffer.
115    ///
116    /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`.
117    ///
118    /// If the reader is at end-of-file (EOF), an empty slice is returned.
119    pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
120        unsafe { self.pipe.try_fill_buf_with_context(None) }
121    }
122
123    /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
124    pub fn consume(&mut self, amt: usize) {
125        self.pipe.consume(amt)
126    }
127}
128
129/// Future returned by [`Pipe::read`] and  [`Reader::read`].
130#[must_use = "futures do nothing unless you `.await` or poll them"]
131pub struct ReadFuture<'p, M, const N: usize>
132where
133    M: RawMutex,
134{
135    pipe: &'p Pipe<M, N>,
136    buf: &'p mut [u8],
137}
138
139impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N>
140where
141    M: RawMutex,
142{
143    type Output = usize;
144
145    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146        match self.pipe.try_read_with_context(Some(cx), self.buf) {
147            Ok(n) => Poll::Ready(n),
148            Err(TryReadError::Empty) => Poll::Pending,
149        }
150    }
151}
152
153impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
154
155/// Future returned by [`Pipe::fill_buf`] and  [`Reader::fill_buf`].
156#[must_use = "futures do nothing unless you `.await` or poll them"]
157pub struct FillBufFuture<'p, M, const N: usize>
158where
159    M: RawMutex,
160{
161    pipe: Option<&'p Pipe<M, N>>,
162}
163
164impl<'p, M, const N: usize> Future for FillBufFuture<'p, M, N>
165where
166    M: RawMutex,
167{
168    type Output = &'p [u8];
169
170    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171        let pipe = self.pipe.take().unwrap();
172        match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
173            Ok(buf) => Poll::Ready(buf),
174            Err(TryReadError::Empty) => {
175                self.pipe = Some(pipe);
176                Poll::Pending
177            }
178        }
179    }
180}
181
182impl<'p, M, const N: usize> Unpin for FillBufFuture<'p, M, N> where M: RawMutex {}
183
184/// Error returned by [`try_read`](Pipe::try_read).
185#[derive(PartialEq, Eq, Clone, Copy, Debug)]
186#[cfg_attr(feature = "defmt", derive(defmt::Format))]
187pub enum TryReadError {
188    /// No data could be read from the pipe because it is currently
189    /// empty, and reading would require blocking.
190    Empty,
191}
192
193/// Error returned by [`try_write`](Pipe::try_write).
194#[derive(PartialEq, Eq, Clone, Copy, Debug)]
195#[cfg_attr(feature = "defmt", derive(defmt::Format))]
196pub enum TryWriteError {
197    /// No data could be written to the pipe because it is
198    /// currently full, and writing would require blocking.
199    Full,
200}
201
202struct PipeState<const N: usize> {
203    buffer: RingBuffer<N>,
204    read_waker: WakerRegistration,
205    write_waker: WakerRegistration,
206}
207
208#[repr(transparent)]
209struct Buffer<const N: usize>(UnsafeCell<[u8; N]>);
210
211impl<const N: usize> Buffer<N> {
212    unsafe fn get<'a>(&self, r: Range<usize>) -> &'a [u8] {
213        let p = self.0.get() as *const u8;
214        core::slice::from_raw_parts(p.add(r.start), r.end - r.start)
215    }
216
217    unsafe fn get_mut<'a>(&self, r: Range<usize>) -> &'a mut [u8] {
218        let p = self.0.get() as *mut u8;
219        core::slice::from_raw_parts_mut(p.add(r.start), r.end - r.start)
220    }
221}
222
223unsafe impl<const N: usize> Send for Buffer<N> {}
224unsafe impl<const N: usize> Sync for Buffer<N> {}
225
226/// A bounded byte-oriented pipe for communicating between asynchronous tasks
227/// with backpressure.
228///
229/// The pipe will buffer up to the provided number of bytes. Once the
230/// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up.
231///
232/// All data written will become available in the same order as it was written.
233pub struct Pipe<M, const N: usize>
234where
235    M: RawMutex,
236{
237    buf: Buffer<N>,
238    inner: Mutex<M, RefCell<PipeState<N>>>,
239}
240
241impl<M, const N: usize> Pipe<M, N>
242where
243    M: RawMutex,
244{
245    /// Establish a new bounded pipe. For example, to create one with a NoopMutex:
246    ///
247    /// ```
248    /// use embassy_sync::pipe::Pipe;
249    /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
250    ///
251    /// // Declare a bounded pipe, with a buffer of 256 bytes.
252    /// let mut pipe = Pipe::<NoopRawMutex, 256>::new();
253    /// ```
254    pub const fn new() -> Self {
255        Self {
256            buf: Buffer(UnsafeCell::new([0; N])),
257            inner: Mutex::new(RefCell::new(PipeState {
258                buffer: RingBuffer::new(),
259                read_waker: WakerRegistration::new(),
260                write_waker: WakerRegistration::new(),
261            })),
262        }
263    }
264
265    fn lock<R>(&self, f: impl FnOnce(&mut PipeState<N>) -> R) -> R {
266        self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
267    }
268
269    fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
270        self.inner.lock(|rc: &RefCell<PipeState<N>>| {
271            let s = &mut *rc.borrow_mut();
272
273            if s.buffer.is_full() {
274                s.write_waker.wake();
275            }
276
277            let available = unsafe { self.buf.get(s.buffer.pop_buf()) };
278            if available.is_empty() {
279                if let Some(cx) = cx {
280                    s.read_waker.register(cx.waker());
281                }
282                return Err(TryReadError::Empty);
283            }
284
285            let n = available.len().min(buf.len());
286            buf[..n].copy_from_slice(&available[..n]);
287            s.buffer.pop(n);
288            Ok(n)
289        })
290    }
291
292    // safety: While the returned slice is alive,
293    // no `read` or `consume` methods in the pipe must be called.
294    unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
295        self.inner.lock(|rc: &RefCell<PipeState<N>>| {
296            let s = &mut *rc.borrow_mut();
297
298            if s.buffer.is_full() {
299                s.write_waker.wake();
300            }
301
302            let available = unsafe { self.buf.get(s.buffer.pop_buf()) };
303            if available.is_empty() {
304                if let Some(cx) = cx {
305                    s.read_waker.register(cx.waker());
306                }
307                return Err(TryReadError::Empty);
308            }
309
310            Ok(available)
311        })
312    }
313
314    fn consume(&self, amt: usize) {
315        self.inner.lock(|rc: &RefCell<PipeState<N>>| {
316            let s = &mut *rc.borrow_mut();
317            let available = s.buffer.pop_buf();
318            assert!(amt <= available.len());
319            s.buffer.pop(amt);
320        })
321    }
322
323    fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
324        self.inner.lock(|rc: &RefCell<PipeState<N>>| {
325            let s = &mut *rc.borrow_mut();
326
327            if s.buffer.is_empty() {
328                s.read_waker.wake();
329            }
330
331            let available = unsafe { self.buf.get_mut(s.buffer.push_buf()) };
332            if available.is_empty() {
333                if let Some(cx) = cx {
334                    s.write_waker.register(cx.waker());
335                }
336                return Err(TryWriteError::Full);
337            }
338
339            let n = available.len().min(buf.len());
340            available[..n].copy_from_slice(&buf[..n]);
341            s.buffer.push(n);
342            Ok(n)
343        })
344    }
345
346    /// Split this pipe into a BufRead-capable reader and a writer.
347    ///
348    /// The reader and writer borrow the current pipe mutably, so it is not
349    /// possible to use it directly while they exist. This is needed because
350    /// implementing `BufRead` requires there is a single reader.
351    ///
352    /// The writer is cloneable, the reader is not.
353    pub fn split(&mut self) -> (Reader<'_, M, N>, Writer<'_, M, N>) {
354        (Reader { pipe: self }, Writer { pipe: self })
355    }
356
357    /// Write some bytes to the pipe.
358    ///
359    /// This method writes a nonzero amount of bytes from `buf` into the pipe, and
360    /// returns the amount of bytes written.
361    ///
362    /// If it is not possible to write a nonzero amount of bytes because the pipe's buffer is full,
363    /// this method will wait until it isn't. See [`try_write`](Self::try_write) for a variant that
364    /// returns an error instead of waiting.
365    ///
366    /// It is not guaranteed that all bytes in the buffer are written, even if there's enough
367    /// free space in the pipe buffer for all. In other words, it is possible for `write` to return
368    /// without writing all of `buf` (returning a number less than `buf.len()`) and still leave
369    /// free space in the pipe buffer. You should always `write` in a loop, or use helpers like
370    /// `write_all` from the `embedded-io` crate.
371    pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
372        WriteFuture { pipe: self, buf }
373    }
374
375    /// Write all bytes to the pipe.
376    ///
377    /// This method writes all bytes from `buf` into the pipe
378    pub async fn write_all(&self, mut buf: &[u8]) {
379        while !buf.is_empty() {
380            let n = self.write(buf).await;
381            buf = &buf[n..];
382        }
383    }
384
385    /// Attempt to immediately write some bytes to the pipe.
386    ///
387    /// This method will either write a nonzero amount of bytes to the pipe immediately,
388    /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant
389    /// that waits instead of returning an error.
390    pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
391        self.try_write_with_context(None, buf)
392    }
393
394    /// Read some bytes from the pipe.
395    ///
396    /// This method reads a nonzero amount of bytes from the pipe into `buf` and
397    /// returns the amount of bytes read.
398    ///
399    /// If it is not possible to read a nonzero amount of bytes because the pipe's buffer is empty,
400    /// this method will wait until it isn't. See [`try_read`](Self::try_read) for a variant that
401    /// returns an error instead of waiting.
402    ///
403    /// It is not guaranteed that all bytes in the buffer are read, even if there's enough
404    /// space in `buf` for all. In other words, it is possible for `read` to return
405    /// without filling `buf` (returning a number less than `buf.len()`) and still leave bytes
406    /// in the pipe buffer. You should always `read` in a loop, or use helpers like
407    /// `read_exact` from the `embedded-io` crate.
408    pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
409        ReadFuture { pipe: self, buf }
410    }
411
412    /// Attempt to immediately read some bytes from the pipe.
413    ///
414    /// This method will either read a nonzero amount of bytes from the pipe immediately,
415    /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant
416    /// that waits instead of returning an error.
417    pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
418        self.try_read_with_context(None, buf)
419    }
420
421    /// Clear the data in the pipe's buffer.
422    pub fn clear(&self) {
423        self.inner.lock(|rc: &RefCell<PipeState<N>>| {
424            let s = &mut *rc.borrow_mut();
425
426            s.buffer.clear();
427            s.write_waker.wake();
428        })
429    }
430
431    /// Return whether the pipe is full (no free space in the buffer)
432    pub fn is_full(&self) -> bool {
433        self.len() == N
434    }
435
436    /// Return whether the pipe is empty (no data buffered)
437    pub fn is_empty(&self) -> bool {
438        self.len() == 0
439    }
440
441    /// Total byte capacity.
442    ///
443    /// This is the same as the `N` generic param.
444    pub fn capacity(&self) -> usize {
445        N
446    }
447
448    /// Used byte capacity.
449    pub fn len(&self) -> usize {
450        self.lock(|c| c.buffer.len())
451    }
452
453    /// Free byte capacity.
454    ///
455    /// This is equivalent to `capacity() - len()`
456    pub fn free_capacity(&self) -> usize {
457        N - self.len()
458    }
459}
460
461impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Pipe<M, N> {
462    type Error = Infallible;
463}
464
465impl<M: RawMutex, const N: usize> embedded_io_async::Read for Pipe<M, N> {
466    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
467        Ok(Pipe::read(self, buf).await)
468    }
469}
470
471impl<M: RawMutex, const N: usize> embedded_io_async::Write for Pipe<M, N> {
472    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
473        Ok(Pipe::write(self, buf).await)
474    }
475
476    async fn flush(&mut self) -> Result<(), Self::Error> {
477        Ok(())
478    }
479}
480
481impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for &Pipe<M, N> {
482    type Error = Infallible;
483}
484
485impl<M: RawMutex, const N: usize> embedded_io_async::Read for &Pipe<M, N> {
486    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
487        Ok(Pipe::read(self, buf).await)
488    }
489}
490
491impl<M: RawMutex, const N: usize> embedded_io_async::Write for &Pipe<M, N> {
492    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
493        Ok(Pipe::write(self, buf).await)
494    }
495
496    async fn flush(&mut self) -> Result<(), Self::Error> {
497        Ok(())
498    }
499}
500
501impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Reader<'_, M, N> {
502    type Error = Infallible;
503}
504
505impl<M: RawMutex, const N: usize> embedded_io_async::Read for Reader<'_, M, N> {
506    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
507        Ok(Reader::read(self, buf).await)
508    }
509}
510
511impl<M: RawMutex, const N: usize> embedded_io_async::BufRead for Reader<'_, M, N> {
512    async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
513        Ok(Reader::fill_buf(self).await)
514    }
515
516    fn consume(&mut self, amt: usize) {
517        Reader::consume(self, amt)
518    }
519}
520
521impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Writer<'_, M, N> {
522    type Error = Infallible;
523}
524
525impl<M: RawMutex, const N: usize> embedded_io_async::Write for Writer<'_, M, N> {
526    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
527        Ok(Writer::write(self, buf).await)
528    }
529
530    async fn flush(&mut self) -> Result<(), Self::Error> {
531        Ok(())
532    }
533}
534
535//
536// Type-erased variants
537//
538
539pub(crate) trait DynamicPipe {
540    fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a>;
541    fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a>;
542
543    fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError>;
544    fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError>;
545
546    fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError>;
547    fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError>;
548
549    fn consume(&self, amt: usize);
550    unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError>;
551}
552
553impl<M, const N: usize> DynamicPipe for Pipe<M, N>
554where
555    M: RawMutex,
556{
557    fn consume(&self, amt: usize) {
558        Pipe::consume(self, amt)
559    }
560
561    unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
562        Pipe::try_fill_buf_with_context(self, cx)
563    }
564
565    fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
566        Pipe::write(self, buf).into()
567    }
568
569    fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
570        Pipe::read(self, buf).into()
571    }
572
573    fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
574        Pipe::try_read(self, buf)
575    }
576
577    fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
578        Pipe::try_write(self, buf)
579    }
580
581    fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
582        Pipe::try_write_with_context(self, cx, buf)
583    }
584
585    fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
586        Pipe::try_read_with_context(self, cx, buf)
587    }
588}
589
590/// Write-only access to a [`DynamicPipe`].
591pub struct DynamicWriter<'p> {
592    pipe: &'p dyn DynamicPipe,
593}
594
595impl<'p> Clone for DynamicWriter<'p> {
596    fn clone(&self) -> Self {
597        *self
598    }
599}
600
601impl<'p> Copy for DynamicWriter<'p> {}
602
603impl<'p> DynamicWriter<'p> {
604    /// Write some bytes to the pipe.
605    ///
606    /// See [`Pipe::write()`]
607    pub fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
608        self.pipe.write(buf)
609    }
610
611    /// Attempt to immediately write some bytes to the pipe.
612    ///
613    /// See [`Pipe::try_write()`]
614    pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
615        self.pipe.try_write(buf)
616    }
617}
618
619impl<'p, M, const N: usize> From<Writer<'p, M, N>> for DynamicWriter<'p>
620where
621    M: RawMutex,
622{
623    fn from(value: Writer<'p, M, N>) -> Self {
624        Self { pipe: value.pipe }
625    }
626}
627
628/// Future returned by [`DynamicWriter::write`].
629#[must_use = "futures do nothing unless you `.await` or poll them"]
630pub struct DynamicWriteFuture<'p> {
631    pipe: &'p dyn DynamicPipe,
632    buf: &'p [u8],
633}
634
635impl<'p> Future for DynamicWriteFuture<'p> {
636    type Output = usize;
637
638    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
639        match self.pipe.try_write_with_context(Some(cx), self.buf) {
640            Ok(n) => Poll::Ready(n),
641            Err(TryWriteError::Full) => Poll::Pending,
642        }
643    }
644}
645
646impl<'p> Unpin for DynamicWriteFuture<'p> {}
647
648impl<'p, M, const N: usize> From<WriteFuture<'p, M, N>> for DynamicWriteFuture<'p>
649where
650    M: RawMutex,
651{
652    fn from(value: WriteFuture<'p, M, N>) -> Self {
653        Self {
654            pipe: value.pipe,
655            buf: value.buf,
656        }
657    }
658}
659
660/// Read-only access to a [`DynamicPipe`].
661pub struct DynamicReader<'p> {
662    pipe: &'p dyn DynamicPipe,
663}
664
665impl<'p> DynamicReader<'p> {
666    /// Read some bytes from the pipe.
667    ///
668    /// See [`Pipe::read()`]
669    pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
670        self.pipe.read(buf)
671    }
672
673    /// Attempt to immediately read some bytes from the pipe.
674    ///
675    /// See [`Pipe::try_read()`]
676    pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
677        self.pipe.try_read(buf)
678    }
679
680    /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
681    ///
682    /// If no bytes are currently available to read, this function waits until at least one byte is available.
683    ///
684    /// If the reader is at end-of-file (EOF), an empty slice is returned.
685    pub fn fill_buf(&mut self) -> DynamicFillBufFuture<'_> {
686        DynamicFillBufFuture { pipe: Some(self.pipe) }
687    }
688
689    /// Try returning contents of the internal buffer.
690    ///
691    /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`.
692    ///
693    /// If the reader is at end-of-file (EOF), an empty slice is returned.
694    pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
695        unsafe { self.pipe.try_fill_buf_with_context(None) }
696    }
697
698    /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
699    pub fn consume(&mut self, amt: usize) {
700        self.pipe.consume(amt)
701    }
702}
703
704impl<'p, M, const N: usize> From<Reader<'p, M, N>> for DynamicReader<'p>
705where
706    M: RawMutex,
707{
708    fn from(value: Reader<'p, M, N>) -> Self {
709        Self { pipe: value.pipe }
710    }
711}
712
713/// Future returned by [`Pipe::read`] and  [`Reader::read`].
714#[must_use = "futures do nothing unless you `.await` or poll them"]
715pub struct DynamicReadFuture<'p> {
716    pipe: &'p dyn DynamicPipe,
717    buf: &'p mut [u8],
718}
719
720impl<'p> Future for DynamicReadFuture<'p> {
721    type Output = usize;
722
723    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
724        match self.pipe.try_read_with_context(Some(cx), self.buf) {
725            Ok(n) => Poll::Ready(n),
726            Err(TryReadError::Empty) => Poll::Pending,
727        }
728    }
729}
730
731impl<'p> Unpin for DynamicReadFuture<'p> {}
732
733impl<'p, M, const N: usize> From<ReadFuture<'p, M, N>> for DynamicReadFuture<'p>
734where
735    M: RawMutex,
736{
737    fn from(value: ReadFuture<'p, M, N>) -> Self {
738        Self {
739            pipe: value.pipe,
740            buf: value.buf,
741        }
742    }
743}
744
745/// Future returned by [`DynamicPipe::fill_buf`] and  [`DynamicReader::fill_buf`].
746#[must_use = "futures do nothing unless you `.await` or poll them"]
747pub struct DynamicFillBufFuture<'p> {
748    pipe: Option<&'p dyn DynamicPipe>,
749}
750
751impl<'p> Future for DynamicFillBufFuture<'p> {
752    type Output = &'p [u8];
753
754    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
755        let pipe = self.pipe.take().unwrap();
756        match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
757            Ok(buf) => Poll::Ready(buf),
758            Err(TryReadError::Empty) => {
759                self.pipe = Some(pipe);
760                Poll::Pending
761            }
762        }
763    }
764}
765
766impl<'p> Unpin for DynamicFillBufFuture<'p> {}
767
768impl<'p, M, const N: usize> From<FillBufFuture<'p, M, N>> for DynamicFillBufFuture<'p>
769where
770    M: RawMutex,
771{
772    fn from(value: FillBufFuture<'p, M, N>) -> Self {
773        Self {
774            pipe: value.pipe.map(|p| p as &dyn DynamicPipe),
775        }
776    }
777}
778
779#[cfg(test)]
780mod tests {
781    use futures_executor::ThreadPool;
782    use futures_util::task::SpawnExt;
783    use static_cell::StaticCell;
784
785    use super::*;
786    use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
787
788    #[test]
789    fn writing_once() {
790        let c = Pipe::<NoopRawMutex, 3>::new();
791        assert!(c.try_write(&[1]).is_ok());
792        assert_eq!(c.free_capacity(), 2);
793    }
794
795    #[test]
796    fn writing_when_full() {
797        let c = Pipe::<NoopRawMutex, 3>::new();
798        assert_eq!(c.try_write(&[42]), Ok(1));
799        assert_eq!(c.try_write(&[43]), Ok(1));
800        assert_eq!(c.try_write(&[44]), Ok(1));
801        assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full));
802        assert_eq!(c.free_capacity(), 0);
803    }
804
805    #[test]
806    fn receiving_once_with_one_send() {
807        let c = Pipe::<NoopRawMutex, 3>::new();
808        assert!(c.try_write(&[42]).is_ok());
809        let mut buf = [0; 16];
810        assert_eq!(c.try_read(&mut buf), Ok(1));
811        assert_eq!(buf[0], 42);
812        assert_eq!(c.free_capacity(), 3);
813    }
814
815    #[test]
816    fn receiving_when_empty() {
817        let c = Pipe::<NoopRawMutex, 3>::new();
818        let mut buf = [0; 16];
819        assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty));
820        assert_eq!(c.free_capacity(), 3);
821    }
822
823    #[test]
824    fn simple_send_and_receive() {
825        let c = Pipe::<NoopRawMutex, 3>::new();
826        assert!(c.try_write(&[42]).is_ok());
827        let mut buf = [0; 16];
828        assert_eq!(c.try_read(&mut buf), Ok(1));
829        assert_eq!(buf[0], 42);
830    }
831
832    #[test]
833    fn read_buf() {
834        let mut c = Pipe::<NoopRawMutex, 3>::new();
835        let (mut r, w) = c.split();
836        assert!(w.try_write(&[42, 43]).is_ok());
837        let buf = r.try_fill_buf().unwrap();
838        assert_eq!(buf, &[42, 43]);
839        let buf = r.try_fill_buf().unwrap();
840        assert_eq!(buf, &[42, 43]);
841        r.consume(1);
842        let buf = r.try_fill_buf().unwrap();
843        assert_eq!(buf, &[43]);
844        r.consume(1);
845        assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
846        assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
847        assert_eq!(w.try_write(&[45, 46]), Ok(2));
848        let buf = r.try_fill_buf().unwrap();
849        assert_eq!(buf, &[44]); // only one byte due to wraparound.
850        r.consume(1);
851        let buf = r.try_fill_buf().unwrap();
852        assert_eq!(buf, &[45, 46]);
853        assert!(w.try_write(&[47]).is_ok());
854        let buf = r.try_fill_buf().unwrap();
855        assert_eq!(buf, &[45, 46, 47]);
856        r.consume(3);
857    }
858
859    #[test]
860    fn writer_is_cloneable() {
861        let mut c = Pipe::<NoopRawMutex, 3>::new();
862        let (_r, w) = c.split();
863        let _ = w.clone();
864    }
865
866    #[test]
867    fn dynamic_dispatch_pipe() {
868        let mut c = Pipe::<NoopRawMutex, 3>::new();
869        let (r, w) = c.split();
870        let (mut r, w): (DynamicReader<'_>, DynamicWriter<'_>) = (r.into(), w.into());
871
872        assert!(w.try_write(&[42, 43]).is_ok());
873        let buf = r.try_fill_buf().unwrap();
874        assert_eq!(buf, &[42, 43]);
875        let buf = r.try_fill_buf().unwrap();
876        assert_eq!(buf, &[42, 43]);
877        r.consume(1);
878        let buf = r.try_fill_buf().unwrap();
879        assert_eq!(buf, &[43]);
880        r.consume(1);
881        assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
882        assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
883        assert_eq!(w.try_write(&[45, 46]), Ok(2));
884        let buf = r.try_fill_buf().unwrap();
885        assert_eq!(buf, &[44]); // only one byte due to wraparound.
886        r.consume(1);
887        let buf = r.try_fill_buf().unwrap();
888        assert_eq!(buf, &[45, 46]);
889        assert!(w.try_write(&[47]).is_ok());
890        let buf = r.try_fill_buf().unwrap();
891        assert_eq!(buf, &[45, 46, 47]);
892        r.consume(3);
893    }
894
895    #[futures_test::test]
896    async fn receiver_receives_given_try_write_async() {
897        let executor = ThreadPool::new().unwrap();
898
899        static CHANNEL: StaticCell<Pipe<CriticalSectionRawMutex, 3>> = StaticCell::new();
900        let c = &*CHANNEL.init(Pipe::new());
901        let c2 = c;
902        let f = async move {
903            assert_eq!(c2.try_write(&[42]), Ok(1));
904        };
905        executor.spawn(f).unwrap();
906        let mut buf = [0; 16];
907        assert_eq!(c.read(&mut buf).await, 1);
908        assert_eq!(buf[0], 42);
909    }
910
911    #[futures_test::test]
912    async fn sender_send_completes_if_capacity() {
913        let c = Pipe::<CriticalSectionRawMutex, 1>::new();
914        c.write(&[42]).await;
915        let mut buf = [0; 16];
916        assert_eq!(c.read(&mut buf).await, 1);
917        assert_eq!(buf[0], 42);
918    }
919}