1use core::cell::{RefCell, UnsafeCell};
4use core::convert::Infallible;
5use core::future::Future;
6use core::ops::Range;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9
10use crate::blocking_mutex::raw::RawMutex;
11use crate::blocking_mutex::Mutex;
12use crate::ring_buffer::RingBuffer;
13use crate::waitqueue::WakerRegistration;
14
15pub struct Writer<'p, M, const N: usize>
17where
18 M: RawMutex,
19{
20 pipe: &'p Pipe<M, N>,
21}
22
23impl<'p, M, const N: usize> Clone for Writer<'p, M, N>
24where
25 M: RawMutex,
26{
27 fn clone(&self) -> Self {
28 *self
29 }
30}
31
32impl<'p, M, const N: usize> Copy for Writer<'p, M, N> where M: RawMutex {}
33
34impl<'p, M, const N: usize> Writer<'p, M, N>
35where
36 M: RawMutex,
37{
38 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
42 self.pipe.write(buf)
43 }
44
45 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
49 self.pipe.try_write(buf)
50 }
51}
52
53#[must_use = "futures do nothing unless you `.await` or poll them"]
55pub struct WriteFuture<'p, M, const N: usize>
56where
57 M: RawMutex,
58{
59 pipe: &'p Pipe<M, N>,
60 buf: &'p [u8],
61}
62
63impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N>
64where
65 M: RawMutex,
66{
67 type Output = usize;
68
69 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
70 match self.pipe.try_write_with_context(Some(cx), self.buf) {
71 Ok(n) => Poll::Ready(n),
72 Err(TryWriteError::Full) => Poll::Pending,
73 }
74 }
75}
76
77impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {}
78
79pub struct Reader<'p, M, const N: usize>
81where
82 M: RawMutex,
83{
84 pipe: &'p Pipe<M, N>,
85}
86
87impl<'p, M, const N: usize> Reader<'p, M, N>
88where
89 M: RawMutex,
90{
91 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
95 self.pipe.read(buf)
96 }
97
98 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
102 self.pipe.try_read(buf)
103 }
104
105 pub fn fill_buf(&mut self) -> FillBufFuture<'_, M, N> {
111 FillBufFuture { pipe: Some(self.pipe) }
112 }
113
114 pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
120 unsafe { self.pipe.try_fill_buf_with_context(None) }
121 }
122
123 pub fn consume(&mut self, amt: usize) {
125 self.pipe.consume(amt)
126 }
127}
128
129#[must_use = "futures do nothing unless you `.await` or poll them"]
131pub struct ReadFuture<'p, M, const N: usize>
132where
133 M: RawMutex,
134{
135 pipe: &'p Pipe<M, N>,
136 buf: &'p mut [u8],
137}
138
139impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N>
140where
141 M: RawMutex,
142{
143 type Output = usize;
144
145 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146 match self.pipe.try_read_with_context(Some(cx), self.buf) {
147 Ok(n) => Poll::Ready(n),
148 Err(TryReadError::Empty) => Poll::Pending,
149 }
150 }
151}
152
153impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
154
155#[must_use = "futures do nothing unless you `.await` or poll them"]
157pub struct FillBufFuture<'p, M, const N: usize>
158where
159 M: RawMutex,
160{
161 pipe: Option<&'p Pipe<M, N>>,
162}
163
164impl<'p, M, const N: usize> Future for FillBufFuture<'p, M, N>
165where
166 M: RawMutex,
167{
168 type Output = &'p [u8];
169
170 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171 let pipe = self.pipe.take().unwrap();
172 match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
173 Ok(buf) => Poll::Ready(buf),
174 Err(TryReadError::Empty) => {
175 self.pipe = Some(pipe);
176 Poll::Pending
177 }
178 }
179 }
180}
181
182impl<'p, M, const N: usize> Unpin for FillBufFuture<'p, M, N> where M: RawMutex {}
183
184#[derive(PartialEq, Eq, Clone, Copy, Debug)]
186#[cfg_attr(feature = "defmt", derive(defmt::Format))]
187pub enum TryReadError {
188 Empty,
191}
192
193#[derive(PartialEq, Eq, Clone, Copy, Debug)]
195#[cfg_attr(feature = "defmt", derive(defmt::Format))]
196pub enum TryWriteError {
197 Full,
200}
201
202struct PipeState<const N: usize> {
203 buffer: RingBuffer<N>,
204 read_waker: WakerRegistration,
205 write_waker: WakerRegistration,
206}
207
208#[repr(transparent)]
209struct Buffer<const N: usize>(UnsafeCell<[u8; N]>);
210
211impl<const N: usize> Buffer<N> {
212 unsafe fn get<'a>(&self, r: Range<usize>) -> &'a [u8] {
213 let p = self.0.get() as *const u8;
214 core::slice::from_raw_parts(p.add(r.start), r.end - r.start)
215 }
216
217 unsafe fn get_mut<'a>(&self, r: Range<usize>) -> &'a mut [u8] {
218 let p = self.0.get() as *mut u8;
219 core::slice::from_raw_parts_mut(p.add(r.start), r.end - r.start)
220 }
221}
222
223unsafe impl<const N: usize> Send for Buffer<N> {}
224unsafe impl<const N: usize> Sync for Buffer<N> {}
225
226pub struct Pipe<M, const N: usize>
234where
235 M: RawMutex,
236{
237 buf: Buffer<N>,
238 inner: Mutex<M, RefCell<PipeState<N>>>,
239}
240
241impl<M, const N: usize> Pipe<M, N>
242where
243 M: RawMutex,
244{
245 pub const fn new() -> Self {
255 Self {
256 buf: Buffer(UnsafeCell::new([0; N])),
257 inner: Mutex::new(RefCell::new(PipeState {
258 buffer: RingBuffer::new(),
259 read_waker: WakerRegistration::new(),
260 write_waker: WakerRegistration::new(),
261 })),
262 }
263 }
264
265 fn lock<R>(&self, f: impl FnOnce(&mut PipeState<N>) -> R) -> R {
266 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
267 }
268
269 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
270 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
271 let s = &mut *rc.borrow_mut();
272
273 if s.buffer.is_full() {
274 s.write_waker.wake();
275 }
276
277 let available = unsafe { self.buf.get(s.buffer.pop_buf()) };
278 if available.is_empty() {
279 if let Some(cx) = cx {
280 s.read_waker.register(cx.waker());
281 }
282 return Err(TryReadError::Empty);
283 }
284
285 let n = available.len().min(buf.len());
286 buf[..n].copy_from_slice(&available[..n]);
287 s.buffer.pop(n);
288 Ok(n)
289 })
290 }
291
292 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
295 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
296 let s = &mut *rc.borrow_mut();
297
298 if s.buffer.is_full() {
299 s.write_waker.wake();
300 }
301
302 let available = unsafe { self.buf.get(s.buffer.pop_buf()) };
303 if available.is_empty() {
304 if let Some(cx) = cx {
305 s.read_waker.register(cx.waker());
306 }
307 return Err(TryReadError::Empty);
308 }
309
310 Ok(available)
311 })
312 }
313
314 fn consume(&self, amt: usize) {
315 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
316 let s = &mut *rc.borrow_mut();
317 let available = s.buffer.pop_buf();
318 assert!(amt <= available.len());
319 s.buffer.pop(amt);
320 })
321 }
322
323 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
324 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
325 let s = &mut *rc.borrow_mut();
326
327 if s.buffer.is_empty() {
328 s.read_waker.wake();
329 }
330
331 let available = unsafe { self.buf.get_mut(s.buffer.push_buf()) };
332 if available.is_empty() {
333 if let Some(cx) = cx {
334 s.write_waker.register(cx.waker());
335 }
336 return Err(TryWriteError::Full);
337 }
338
339 let n = available.len().min(buf.len());
340 available[..n].copy_from_slice(&buf[..n]);
341 s.buffer.push(n);
342 Ok(n)
343 })
344 }
345
346 pub fn split(&mut self) -> (Reader<'_, M, N>, Writer<'_, M, N>) {
354 (Reader { pipe: self }, Writer { pipe: self })
355 }
356
357 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
372 WriteFuture { pipe: self, buf }
373 }
374
375 pub async fn write_all(&self, mut buf: &[u8]) {
379 while !buf.is_empty() {
380 let n = self.write(buf).await;
381 buf = &buf[n..];
382 }
383 }
384
385 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
391 self.try_write_with_context(None, buf)
392 }
393
394 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
409 ReadFuture { pipe: self, buf }
410 }
411
412 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
418 self.try_read_with_context(None, buf)
419 }
420
421 pub fn clear(&self) {
423 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
424 let s = &mut *rc.borrow_mut();
425
426 s.buffer.clear();
427 s.write_waker.wake();
428 })
429 }
430
431 pub fn is_full(&self) -> bool {
433 self.len() == N
434 }
435
436 pub fn is_empty(&self) -> bool {
438 self.len() == 0
439 }
440
441 pub fn capacity(&self) -> usize {
445 N
446 }
447
448 pub fn len(&self) -> usize {
450 self.lock(|c| c.buffer.len())
451 }
452
453 pub fn free_capacity(&self) -> usize {
457 N - self.len()
458 }
459}
460
461impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Pipe<M, N> {
462 type Error = Infallible;
463}
464
465impl<M: RawMutex, const N: usize> embedded_io_async::Read for Pipe<M, N> {
466 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
467 Ok(Pipe::read(self, buf).await)
468 }
469}
470
471impl<M: RawMutex, const N: usize> embedded_io_async::Write for Pipe<M, N> {
472 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
473 Ok(Pipe::write(self, buf).await)
474 }
475
476 async fn flush(&mut self) -> Result<(), Self::Error> {
477 Ok(())
478 }
479}
480
481impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for &Pipe<M, N> {
482 type Error = Infallible;
483}
484
485impl<M: RawMutex, const N: usize> embedded_io_async::Read for &Pipe<M, N> {
486 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
487 Ok(Pipe::read(self, buf).await)
488 }
489}
490
491impl<M: RawMutex, const N: usize> embedded_io_async::Write for &Pipe<M, N> {
492 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
493 Ok(Pipe::write(self, buf).await)
494 }
495
496 async fn flush(&mut self) -> Result<(), Self::Error> {
497 Ok(())
498 }
499}
500
501impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Reader<'_, M, N> {
502 type Error = Infallible;
503}
504
505impl<M: RawMutex, const N: usize> embedded_io_async::Read for Reader<'_, M, N> {
506 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
507 Ok(Reader::read(self, buf).await)
508 }
509}
510
511impl<M: RawMutex, const N: usize> embedded_io_async::BufRead for Reader<'_, M, N> {
512 async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
513 Ok(Reader::fill_buf(self).await)
514 }
515
516 fn consume(&mut self, amt: usize) {
517 Reader::consume(self, amt)
518 }
519}
520
521impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Writer<'_, M, N> {
522 type Error = Infallible;
523}
524
525impl<M: RawMutex, const N: usize> embedded_io_async::Write for Writer<'_, M, N> {
526 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
527 Ok(Writer::write(self, buf).await)
528 }
529
530 async fn flush(&mut self) -> Result<(), Self::Error> {
531 Ok(())
532 }
533}
534
535pub(crate) trait DynamicPipe {
540 fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a>;
541 fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a>;
542
543 fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError>;
544 fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError>;
545
546 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError>;
547 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError>;
548
549 fn consume(&self, amt: usize);
550 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError>;
551}
552
553impl<M, const N: usize> DynamicPipe for Pipe<M, N>
554where
555 M: RawMutex,
556{
557 fn consume(&self, amt: usize) {
558 Pipe::consume(self, amt)
559 }
560
561 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
562 Pipe::try_fill_buf_with_context(self, cx)
563 }
564
565 fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
566 Pipe::write(self, buf).into()
567 }
568
569 fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
570 Pipe::read(self, buf).into()
571 }
572
573 fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
574 Pipe::try_read(self, buf)
575 }
576
577 fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
578 Pipe::try_write(self, buf)
579 }
580
581 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
582 Pipe::try_write_with_context(self, cx, buf)
583 }
584
585 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
586 Pipe::try_read_with_context(self, cx, buf)
587 }
588}
589
590pub struct DynamicWriter<'p> {
592 pipe: &'p dyn DynamicPipe,
593}
594
595impl<'p> Clone for DynamicWriter<'p> {
596 fn clone(&self) -> Self {
597 *self
598 }
599}
600
601impl<'p> Copy for DynamicWriter<'p> {}
602
603impl<'p> DynamicWriter<'p> {
604 pub fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
608 self.pipe.write(buf)
609 }
610
611 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
615 self.pipe.try_write(buf)
616 }
617}
618
619impl<'p, M, const N: usize> From<Writer<'p, M, N>> for DynamicWriter<'p>
620where
621 M: RawMutex,
622{
623 fn from(value: Writer<'p, M, N>) -> Self {
624 Self { pipe: value.pipe }
625 }
626}
627
628#[must_use = "futures do nothing unless you `.await` or poll them"]
630pub struct DynamicWriteFuture<'p> {
631 pipe: &'p dyn DynamicPipe,
632 buf: &'p [u8],
633}
634
635impl<'p> Future for DynamicWriteFuture<'p> {
636 type Output = usize;
637
638 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
639 match self.pipe.try_write_with_context(Some(cx), self.buf) {
640 Ok(n) => Poll::Ready(n),
641 Err(TryWriteError::Full) => Poll::Pending,
642 }
643 }
644}
645
646impl<'p> Unpin for DynamicWriteFuture<'p> {}
647
648impl<'p, M, const N: usize> From<WriteFuture<'p, M, N>> for DynamicWriteFuture<'p>
649where
650 M: RawMutex,
651{
652 fn from(value: WriteFuture<'p, M, N>) -> Self {
653 Self {
654 pipe: value.pipe,
655 buf: value.buf,
656 }
657 }
658}
659
660pub struct DynamicReader<'p> {
662 pipe: &'p dyn DynamicPipe,
663}
664
665impl<'p> DynamicReader<'p> {
666 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
670 self.pipe.read(buf)
671 }
672
673 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
677 self.pipe.try_read(buf)
678 }
679
680 pub fn fill_buf(&mut self) -> DynamicFillBufFuture<'_> {
686 DynamicFillBufFuture { pipe: Some(self.pipe) }
687 }
688
689 pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
695 unsafe { self.pipe.try_fill_buf_with_context(None) }
696 }
697
698 pub fn consume(&mut self, amt: usize) {
700 self.pipe.consume(amt)
701 }
702}
703
704impl<'p, M, const N: usize> From<Reader<'p, M, N>> for DynamicReader<'p>
705where
706 M: RawMutex,
707{
708 fn from(value: Reader<'p, M, N>) -> Self {
709 Self { pipe: value.pipe }
710 }
711}
712
713#[must_use = "futures do nothing unless you `.await` or poll them"]
715pub struct DynamicReadFuture<'p> {
716 pipe: &'p dyn DynamicPipe,
717 buf: &'p mut [u8],
718}
719
720impl<'p> Future for DynamicReadFuture<'p> {
721 type Output = usize;
722
723 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
724 match self.pipe.try_read_with_context(Some(cx), self.buf) {
725 Ok(n) => Poll::Ready(n),
726 Err(TryReadError::Empty) => Poll::Pending,
727 }
728 }
729}
730
731impl<'p> Unpin for DynamicReadFuture<'p> {}
732
733impl<'p, M, const N: usize> From<ReadFuture<'p, M, N>> for DynamicReadFuture<'p>
734where
735 M: RawMutex,
736{
737 fn from(value: ReadFuture<'p, M, N>) -> Self {
738 Self {
739 pipe: value.pipe,
740 buf: value.buf,
741 }
742 }
743}
744
745#[must_use = "futures do nothing unless you `.await` or poll them"]
747pub struct DynamicFillBufFuture<'p> {
748 pipe: Option<&'p dyn DynamicPipe>,
749}
750
751impl<'p> Future for DynamicFillBufFuture<'p> {
752 type Output = &'p [u8];
753
754 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
755 let pipe = self.pipe.take().unwrap();
756 match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
757 Ok(buf) => Poll::Ready(buf),
758 Err(TryReadError::Empty) => {
759 self.pipe = Some(pipe);
760 Poll::Pending
761 }
762 }
763 }
764}
765
766impl<'p> Unpin for DynamicFillBufFuture<'p> {}
767
768impl<'p, M, const N: usize> From<FillBufFuture<'p, M, N>> for DynamicFillBufFuture<'p>
769where
770 M: RawMutex,
771{
772 fn from(value: FillBufFuture<'p, M, N>) -> Self {
773 Self {
774 pipe: value.pipe.map(|p| p as &dyn DynamicPipe),
775 }
776 }
777}
778
779#[cfg(test)]
780mod tests {
781 use futures_executor::ThreadPool;
782 use futures_util::task::SpawnExt;
783 use static_cell::StaticCell;
784
785 use super::*;
786 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
787
788 #[test]
789 fn writing_once() {
790 let c = Pipe::<NoopRawMutex, 3>::new();
791 assert!(c.try_write(&[1]).is_ok());
792 assert_eq!(c.free_capacity(), 2);
793 }
794
795 #[test]
796 fn writing_when_full() {
797 let c = Pipe::<NoopRawMutex, 3>::new();
798 assert_eq!(c.try_write(&[42]), Ok(1));
799 assert_eq!(c.try_write(&[43]), Ok(1));
800 assert_eq!(c.try_write(&[44]), Ok(1));
801 assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full));
802 assert_eq!(c.free_capacity(), 0);
803 }
804
805 #[test]
806 fn receiving_once_with_one_send() {
807 let c = Pipe::<NoopRawMutex, 3>::new();
808 assert!(c.try_write(&[42]).is_ok());
809 let mut buf = [0; 16];
810 assert_eq!(c.try_read(&mut buf), Ok(1));
811 assert_eq!(buf[0], 42);
812 assert_eq!(c.free_capacity(), 3);
813 }
814
815 #[test]
816 fn receiving_when_empty() {
817 let c = Pipe::<NoopRawMutex, 3>::new();
818 let mut buf = [0; 16];
819 assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty));
820 assert_eq!(c.free_capacity(), 3);
821 }
822
823 #[test]
824 fn simple_send_and_receive() {
825 let c = Pipe::<NoopRawMutex, 3>::new();
826 assert!(c.try_write(&[42]).is_ok());
827 let mut buf = [0; 16];
828 assert_eq!(c.try_read(&mut buf), Ok(1));
829 assert_eq!(buf[0], 42);
830 }
831
832 #[test]
833 fn read_buf() {
834 let mut c = Pipe::<NoopRawMutex, 3>::new();
835 let (mut r, w) = c.split();
836 assert!(w.try_write(&[42, 43]).is_ok());
837 let buf = r.try_fill_buf().unwrap();
838 assert_eq!(buf, &[42, 43]);
839 let buf = r.try_fill_buf().unwrap();
840 assert_eq!(buf, &[42, 43]);
841 r.consume(1);
842 let buf = r.try_fill_buf().unwrap();
843 assert_eq!(buf, &[43]);
844 r.consume(1);
845 assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
846 assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
847 assert_eq!(w.try_write(&[45, 46]), Ok(2));
848 let buf = r.try_fill_buf().unwrap();
849 assert_eq!(buf, &[44]); r.consume(1);
851 let buf = r.try_fill_buf().unwrap();
852 assert_eq!(buf, &[45, 46]);
853 assert!(w.try_write(&[47]).is_ok());
854 let buf = r.try_fill_buf().unwrap();
855 assert_eq!(buf, &[45, 46, 47]);
856 r.consume(3);
857 }
858
859 #[test]
860 fn writer_is_cloneable() {
861 let mut c = Pipe::<NoopRawMutex, 3>::new();
862 let (_r, w) = c.split();
863 let _ = w.clone();
864 }
865
866 #[test]
867 fn dynamic_dispatch_pipe() {
868 let mut c = Pipe::<NoopRawMutex, 3>::new();
869 let (r, w) = c.split();
870 let (mut r, w): (DynamicReader<'_>, DynamicWriter<'_>) = (r.into(), w.into());
871
872 assert!(w.try_write(&[42, 43]).is_ok());
873 let buf = r.try_fill_buf().unwrap();
874 assert_eq!(buf, &[42, 43]);
875 let buf = r.try_fill_buf().unwrap();
876 assert_eq!(buf, &[42, 43]);
877 r.consume(1);
878 let buf = r.try_fill_buf().unwrap();
879 assert_eq!(buf, &[43]);
880 r.consume(1);
881 assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
882 assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
883 assert_eq!(w.try_write(&[45, 46]), Ok(2));
884 let buf = r.try_fill_buf().unwrap();
885 assert_eq!(buf, &[44]); r.consume(1);
887 let buf = r.try_fill_buf().unwrap();
888 assert_eq!(buf, &[45, 46]);
889 assert!(w.try_write(&[47]).is_ok());
890 let buf = r.try_fill_buf().unwrap();
891 assert_eq!(buf, &[45, 46, 47]);
892 r.consume(3);
893 }
894
895 #[futures_test::test]
896 async fn receiver_receives_given_try_write_async() {
897 let executor = ThreadPool::new().unwrap();
898
899 static CHANNEL: StaticCell<Pipe<CriticalSectionRawMutex, 3>> = StaticCell::new();
900 let c = &*CHANNEL.init(Pipe::new());
901 let c2 = c;
902 let f = async move {
903 assert_eq!(c2.try_write(&[42]), Ok(1));
904 };
905 executor.spawn(f).unwrap();
906 let mut buf = [0; 16];
907 assert_eq!(c.read(&mut buf).await, 1);
908 assert_eq!(buf[0], 42);
909 }
910
911 #[futures_test::test]
912 async fn sender_send_completes_if_capacity() {
913 let c = Pipe::<CriticalSectionRawMutex, 1>::new();
914 c.write(&[42]).await;
915 let mut buf = [0; 16];
916 assert_eq!(c.read(&mut buf).await, 1);
917 assert_eq!(buf[0], 42);
918 }
919}