1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//!A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle.

/// A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle.
#[cfg(feature = "io-pipe")]
pub mod pipe {
    use std::io;

    use bytes::{Buf, BufMut, BytesMut};

    /// The write-end of the pipe, receiving items to become available in the [`Reader`].
    ///
    /// It's commonly used with the [`std::io::Write`] trait it implements.
    pub struct Writer {
        /// The channel through which bytes are transferred. Useful for sending [`std::io::Error`]s instead.
        pub channel: std::sync::mpsc::SyncSender<io::Result<BytesMut>>,
        buf: BytesMut,
    }

    /// The read-end of the pipe, implementing the [`std::io::Read`] trait.
    pub struct Reader {
        channel: std::sync::mpsc::Receiver<io::Result<BytesMut>>,
        buf: BytesMut,
    }

    impl io::BufRead for Reader {
        fn fill_buf(&mut self) -> io::Result<&[u8]> {
            if self.buf.is_empty() {
                match self.channel.recv() {
                    Ok(Ok(buf)) => self.buf = buf,
                    Ok(Err(err)) => return Err(err),
                    Err(_) => {}
                }
            };
            Ok(&self.buf)
        }

        fn consume(&mut self, amt: usize) {
            self.buf.advance(amt.min(self.buf.len()));
        }
    }

    impl io::Read for Reader {
        fn read(&mut self, mut out: &mut [u8]) -> io::Result<usize> {
            let mut written = 0;
            while !out.is_empty() {
                if self.buf.is_empty() {
                    match self.channel.recv() {
                        Ok(Ok(buf)) => self.buf = buf,
                        Ok(Err(err)) => return Err(err),
                        Err(_) => break,
                    }
                }
                let bytes_to_write = self.buf.len().min(out.len());
                let (to_write, rest) = out.split_at_mut(bytes_to_write);
                self.buf.split_to(bytes_to_write).copy_to_slice(to_write);
                out = rest;
                written += bytes_to_write;
            }
            Ok(written)
        }
    }

    impl io::Write for Writer {
        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
            self.buf.put_slice(buf);
            self.channel
                .send(Ok(self.buf.split()))
                .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
            Ok(buf.len())
        }

        fn flush(&mut self) -> io::Result<()> {
            Ok(())
        }
    }

    /// Returns the _([`write`][Writer], [`read`][Reader])_ ends of a pipe for transferring bytes, analogous to a unix pipe.
    ///
    /// * `in_flight_writes` defines the amount of chunks of bytes to keep in memory until the `write` end will block when writing.
    ///    If `0`, the `write` end will always block until the `read` end consumes the transferred bytes.
    pub fn unidirectional(in_flight_writes: usize) -> (Writer, Reader) {
        let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes);
        (
            Writer {
                channel: tx,
                buf: BytesMut::with_capacity(4096),
            },
            Reader {
                channel: rx,
                buf: BytesMut::new(),
            },
        )
    }
}