broker_tokio/io/util/
buf_stream.rs

1use crate::io::util::{BufReader, BufWriter};
2use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
3
4use pin_project_lite::pin_project;
5use std::io;
6use std::mem::MaybeUninit;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10pin_project! {
11    /// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output.
12    ///
13    /// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`]
14    /// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall
15    /// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`]
16    /// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps
17    /// one in the other so that both directions are buffered. See their documentation for details.
18    #[derive(Debug)]
19    #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
20    pub struct BufStream<RW> {
21        #[pin]
22        inner: BufReader<BufWriter<RW>>,
23    }
24}
25
26impl<RW: AsyncRead + AsyncWrite> BufStream<RW> {
27    /// Wrap a type in both [`BufWriter`] and [`BufReader`].
28    ///
29    /// See the documentation for those types and [`BufStream`] for details.
30    pub fn new(stream: RW) -> BufStream<RW> {
31        BufStream {
32            inner: BufReader::new(BufWriter::new(stream)),
33        }
34    }
35
36    /// Creates a `BufStream` with the specified [`BufReader`] capacity and [`BufWriter`]
37    /// capacity.
38    ///
39    /// See the documentation for those types and [`BufStream`] for details.
40    pub fn with_capacity(
41        reader_capacity: usize,
42        writer_capacity: usize,
43        stream: RW,
44    ) -> BufStream<RW> {
45        BufStream {
46            inner: BufReader::with_capacity(
47                reader_capacity,
48                BufWriter::with_capacity(writer_capacity, stream),
49            ),
50        }
51    }
52
53    /// Gets a reference to the underlying I/O object.
54    ///
55    /// It is inadvisable to directly read from the underlying I/O object.
56    pub fn get_ref(&self) -> &RW {
57        self.inner.get_ref().get_ref()
58    }
59
60    /// Gets a mutable reference to the underlying I/O object.
61    ///
62    /// It is inadvisable to directly read from the underlying I/O object.
63    pub fn get_mut(&mut self) -> &mut RW {
64        self.inner.get_mut().get_mut()
65    }
66
67    /// Gets a pinned mutable reference to the underlying I/O object.
68    ///
69    /// It is inadvisable to directly read from the underlying I/O object.
70    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> {
71        self.project().inner.get_pin_mut().get_pin_mut()
72    }
73
74    /// Consumes this `BufStream`, returning the underlying I/O object.
75    ///
76    /// Note that any leftover data in the internal buffer is lost.
77    pub fn into_inner(self) -> RW {
78        self.inner.into_inner().into_inner()
79    }
80}
81
82impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> {
83    fn from(b: BufReader<BufWriter<RW>>) -> Self {
84        BufStream { inner: b }
85    }
86}
87
88impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
89    fn from(b: BufWriter<BufReader<RW>>) -> Self {
90        // we need to "invert" the reader and writer
91        let BufWriter {
92            inner:
93                BufReader {
94                    inner,
95                    buf: rbuf,
96                    pos,
97                    cap,
98                },
99            buf: wbuf,
100            written,
101        } = b;
102
103        BufStream {
104            inner: BufReader {
105                inner: BufWriter {
106                    inner,
107                    buf: wbuf,
108                    written,
109                },
110                buf: rbuf,
111                pos,
112                cap,
113            },
114        }
115    }
116}
117
118impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> {
119    fn poll_write(
120        self: Pin<&mut Self>,
121        cx: &mut Context<'_>,
122        buf: &[u8],
123    ) -> Poll<io::Result<usize>> {
124        self.project().inner.poll_write(cx, buf)
125    }
126
127    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
128        self.project().inner.poll_flush(cx)
129    }
130
131    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
132        self.project().inner.poll_shutdown(cx)
133    }
134}
135
136impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> {
137    fn poll_read(
138        self: Pin<&mut Self>,
139        cx: &mut Context<'_>,
140        buf: &mut [u8],
141    ) -> Poll<io::Result<usize>> {
142        self.project().inner.poll_read(cx, buf)
143    }
144
145    // we can't skip unconditionally because of the large buffer case in read.
146    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
147        self.inner.prepare_uninitialized_buffer(buf)
148    }
149}
150
151impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> {
152    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
153        self.project().inner.poll_fill_buf(cx)
154    }
155
156    fn consume(self: Pin<&mut Self>, amt: usize) {
157        self.project().inner.consume(amt)
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    #[test]
166    fn assert_unpin() {
167        crate::is_unpin::<BufStream<()>>();
168    }
169}