madsim_real_tokio/io/util/
buf_stream.rs

1use crate::io::util::{BufReader, BufWriter};
2use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
3
4use pin_project_lite::pin_project;
5use std::io::{self, IoSlice, SeekFrom};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project! {
10    /// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output.
11    ///
12    /// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`]
13    /// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall
14    /// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`]
15    /// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps
16    /// one in the other so that both directions are buffered. See their documentation for details.
17    #[derive(Debug)]
18    #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
19    pub struct BufStream<RW> {
20        #[pin]
21        inner: BufReader<BufWriter<RW>>,
22    }
23}
24
25impl<RW: AsyncRead + AsyncWrite> BufStream<RW> {
26    /// Wraps a type in both [`BufWriter`] and [`BufReader`].
27    ///
28    /// See the documentation for those types and [`BufStream`] for details.
29    pub fn new(stream: RW) -> BufStream<RW> {
30        BufStream {
31            inner: BufReader::new(BufWriter::new(stream)),
32        }
33    }
34
35    /// Creates a `BufStream` with the specified [`BufReader`] capacity and [`BufWriter`]
36    /// capacity.
37    ///
38    /// See the documentation for those types and [`BufStream`] for details.
39    pub fn with_capacity(
40        reader_capacity: usize,
41        writer_capacity: usize,
42        stream: RW,
43    ) -> BufStream<RW> {
44        BufStream {
45            inner: BufReader::with_capacity(
46                reader_capacity,
47                BufWriter::with_capacity(writer_capacity, stream),
48            ),
49        }
50    }
51
52    /// Gets a reference to the underlying I/O object.
53    ///
54    /// It is inadvisable to directly read from the underlying I/O object.
55    pub fn get_ref(&self) -> &RW {
56        self.inner.get_ref().get_ref()
57    }
58
59    /// Gets a mutable reference to the underlying I/O object.
60    ///
61    /// It is inadvisable to directly read from the underlying I/O object.
62    pub fn get_mut(&mut self) -> &mut RW {
63        self.inner.get_mut().get_mut()
64    }
65
66    /// Gets a pinned mutable reference to the underlying I/O object.
67    ///
68    /// It is inadvisable to directly read from the underlying I/O object.
69    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> {
70        self.project().inner.get_pin_mut().get_pin_mut()
71    }
72
73    /// Consumes this `BufStream`, returning the underlying I/O object.
74    ///
75    /// Note that any leftover data in the internal buffer is lost.
76    pub fn into_inner(self) -> RW {
77        self.inner.into_inner().into_inner()
78    }
79}
80
81impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> {
82    fn from(b: BufReader<BufWriter<RW>>) -> Self {
83        BufStream { inner: b }
84    }
85}
86
87impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
88    fn from(b: BufWriter<BufReader<RW>>) -> Self {
89        // we need to "invert" the reader and writer
90        let BufWriter {
91            inner:
92                BufReader {
93                    inner,
94                    buf: rbuf,
95                    pos,
96                    cap,
97                    seek_state: rseek_state,
98                },
99            buf: wbuf,
100            written,
101            seek_state: wseek_state,
102        } = b;
103
104        BufStream {
105            inner: BufReader {
106                inner: BufWriter {
107                    inner,
108                    buf: wbuf,
109                    written,
110                    seek_state: wseek_state,
111                },
112                buf: rbuf,
113                pos,
114                cap,
115                seek_state: rseek_state,
116            },
117        }
118    }
119}
120
121impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> {
122    fn poll_write(
123        self: Pin<&mut Self>,
124        cx: &mut Context<'_>,
125        buf: &[u8],
126    ) -> Poll<io::Result<usize>> {
127        self.project().inner.poll_write(cx, buf)
128    }
129
130    fn poll_write_vectored(
131        self: Pin<&mut Self>,
132        cx: &mut Context<'_>,
133        bufs: &[IoSlice<'_>],
134    ) -> Poll<io::Result<usize>> {
135        self.project().inner.poll_write_vectored(cx, bufs)
136    }
137
138    fn is_write_vectored(&self) -> bool {
139        self.inner.is_write_vectored()
140    }
141
142    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
143        self.project().inner.poll_flush(cx)
144    }
145
146    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
147        self.project().inner.poll_shutdown(cx)
148    }
149}
150
151impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> {
152    fn poll_read(
153        self: Pin<&mut Self>,
154        cx: &mut Context<'_>,
155        buf: &mut ReadBuf<'_>,
156    ) -> Poll<io::Result<()>> {
157        self.project().inner.poll_read(cx, buf)
158    }
159}
160
161/// Seek to an offset, in bytes, in the underlying stream.
162///
163/// The position used for seeking with `SeekFrom::Current(_)` is the
164/// position the underlying stream would be at if the `BufStream` had no
165/// internal buffer.
166///
167/// Seeking always discards the internal buffer, even if the seek position
168/// would otherwise fall within it. This guarantees that calling
169/// `.into_inner()` immediately after a seek yields the underlying reader
170/// at the same position.
171///
172/// See [`AsyncSeek`] for more details.
173///
174/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
175/// where `n` minus the internal buffer length overflows an `i64`, two
176/// seeks will be performed instead of one. If the second seek returns
177/// `Err`, the underlying reader will be left at the same position it would
178/// have if you called `seek` with `SeekFrom::Current(0)`.
179impl<RW: AsyncRead + AsyncWrite + AsyncSeek> AsyncSeek for BufStream<RW> {
180    fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
181        self.project().inner.start_seek(position)
182    }
183
184    fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
185        self.project().inner.poll_complete(cx)
186    }
187}
188
189impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> {
190    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
191        self.project().inner.poll_fill_buf(cx)
192    }
193
194    fn consume(self: Pin<&mut Self>, amt: usize) {
195        self.project().inner.consume(amt);
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn assert_unpin() {
205        crate::is_unpin::<BufStream<()>>();
206    }
207}