broker_tokio/io/util/
buf_stream.rs1use crate::io::util::{BufReader, BufWriter};
2use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite};
3
4use pin_project_lite::pin_project;
5use std::io;
6use std::mem::MaybeUninit;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10pin_project! {
11 #[derive(Debug)]
19 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
20 pub struct BufStream<RW> {
21 #[pin]
22 inner: BufReader<BufWriter<RW>>,
23 }
24}
25
26impl<RW: AsyncRead + AsyncWrite> BufStream<RW> {
27 pub fn new(stream: RW) -> BufStream<RW> {
31 BufStream {
32 inner: BufReader::new(BufWriter::new(stream)),
33 }
34 }
35
36 pub fn with_capacity(
41 reader_capacity: usize,
42 writer_capacity: usize,
43 stream: RW,
44 ) -> BufStream<RW> {
45 BufStream {
46 inner: BufReader::with_capacity(
47 reader_capacity,
48 BufWriter::with_capacity(writer_capacity, stream),
49 ),
50 }
51 }
52
53 pub fn get_ref(&self) -> &RW {
57 self.inner.get_ref().get_ref()
58 }
59
60 pub fn get_mut(&mut self) -> &mut RW {
64 self.inner.get_mut().get_mut()
65 }
66
67 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> {
71 self.project().inner.get_pin_mut().get_pin_mut()
72 }
73
74 pub fn into_inner(self) -> RW {
78 self.inner.into_inner().into_inner()
79 }
80}
81
82impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> {
83 fn from(b: BufReader<BufWriter<RW>>) -> Self {
84 BufStream { inner: b }
85 }
86}
87
88impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> {
89 fn from(b: BufWriter<BufReader<RW>>) -> Self {
90 let BufWriter {
92 inner:
93 BufReader {
94 inner,
95 buf: rbuf,
96 pos,
97 cap,
98 },
99 buf: wbuf,
100 written,
101 } = b;
102
103 BufStream {
104 inner: BufReader {
105 inner: BufWriter {
106 inner,
107 buf: wbuf,
108 written,
109 },
110 buf: rbuf,
111 pos,
112 cap,
113 },
114 }
115 }
116}
117
118impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> {
119 fn poll_write(
120 self: Pin<&mut Self>,
121 cx: &mut Context<'_>,
122 buf: &[u8],
123 ) -> Poll<io::Result<usize>> {
124 self.project().inner.poll_write(cx, buf)
125 }
126
127 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
128 self.project().inner.poll_flush(cx)
129 }
130
131 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
132 self.project().inner.poll_shutdown(cx)
133 }
134}
135
136impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> {
137 fn poll_read(
138 self: Pin<&mut Self>,
139 cx: &mut Context<'_>,
140 buf: &mut [u8],
141 ) -> Poll<io::Result<usize>> {
142 self.project().inner.poll_read(cx, buf)
143 }
144
145 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
147 self.inner.prepare_uninitialized_buffer(buf)
148 }
149}
150
151impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> {
152 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
153 self.project().inner.poll_fill_buf(cx)
154 }
155
156 fn consume(self: Pin<&mut Self>, amt: usize) {
157 self.project().inner.consume(amt)
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164
165 #[test]
166 fn assert_unpin() {
167 crate::is_unpin::<BufStream<()>>();
168 }
169}