broker_tokio/io/
async_read.rs

1use bytes::BufMut;
2use std::io;
3use std::mem::MaybeUninit;
4use std::ops::DerefMut;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8/// Read bytes from a source.
9///
10/// This trait is analogous to the [`std::io::Read`] trait, but integrates with
11/// the asynchronous task system. In particular, the [`poll_read`] method,
12/// unlike [`Read::read`], will automatically queue the current task for wakeup
13/// and return if data is not yet available, rather than blocking the calling
14/// thread.
15///
16/// Specifically, this means that the `poll_read` function will return one of
17/// the following:
18///
19/// * `Poll::Ready(Ok(n))` means that `n` bytes of data was immediately read
20///   and placed into the output buffer, where `n` == 0 implies that EOF has
21///   been reached.
22///
23/// * `Poll::Pending` means that no data was read into the buffer
24///   provided. The I/O object is not currently readable but may become readable
25///   in the future. Most importantly, **the current future's task is scheduled
26///   to get unparked when the object is readable**. This means that like
27///   `Future::poll` you'll receive a notification when the I/O object is
28///   readable again.
29///
30/// * `Poll::Ready(Err(e))` for other errors are standard I/O errors coming from the
31///   underlying object.
32///
33/// This trait importantly means that the `read` method only works in the
34/// context of a future's task. The object may panic if used outside of a task.
35///
36/// Utilities for working with `AsyncRead` values are provided by
37/// [`AsyncReadExt`].
38///
39/// [`poll_read`]: AsyncRead::poll_read
40/// [`std::io::Read`]: std::io::Read
41/// [`Read::read`]: std::io::Read::read
42/// [`AsyncReadExt`]: crate::io::AsyncReadExt
43pub trait AsyncRead {
44    /// Prepares an uninitialized buffer to be safe to pass to `read`. Returns
45    /// `true` if the supplied buffer was zeroed out.
46    ///
47    /// While it would be highly unusual, implementations of [`io::Read`] are
48    /// able to read data from the buffer passed as an argument. Because of
49    /// this, the buffer passed to [`io::Read`] must be initialized memory. In
50    /// situations where large numbers of buffers are used, constantly having to
51    /// zero out buffers can be expensive.
52    ///
53    /// This function does any necessary work to prepare an uninitialized buffer
54    /// to be safe to pass to `read`. If `read` guarantees to never attempt to
55    /// read data out of the supplied buffer, then `prepare_uninitialized_buffer`
56    /// doesn't need to do any work.
57    ///
58    /// If this function returns `true`, then the memory has been zeroed out.
59    /// This allows implementations of `AsyncRead` which are composed of
60    /// multiple subimplementations to efficiently implement
61    /// `prepare_uninitialized_buffer`.
62    ///
63    /// This function isn't actually `unsafe` to call but `unsafe` to implement.
64    /// The implementer must ensure that either the whole `buf` has been zeroed
65    /// or `poll_read_buf()` overwrites the buffer without reading it and returns
66    /// correct value.
67    ///
68    /// This function is called from [`poll_read_buf`].
69    ///
70    /// # Safety
71    ///
72    /// Implementations that return `false` must never read from data slices
73    /// that they did not write to.
74    ///
75    /// [`io::Read`]: std::io::Read
76    /// [`poll_read_buf`]: #method.poll_read_buf
77    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
78        for x in buf {
79            *x.as_mut_ptr() = 0;
80        }
81
82        true
83    }
84
85    /// Attempt to read from the `AsyncRead` into `buf`.
86    ///
87    /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
88    ///
89    /// If no data is available for reading, the method returns
90    /// `Poll::Pending` and arranges for the current task (via
91    /// `cx.waker()`) to receive a notification when the object becomes
92    /// readable or is closed.
93    fn poll_read(
94        self: Pin<&mut Self>,
95        cx: &mut Context<'_>,
96        buf: &mut [u8],
97    ) -> Poll<io::Result<usize>>;
98
99    /// Pull some bytes from this source into the specified `BufMut`, returning
100    /// how many bytes were read.
101    ///
102    /// The `buf` provided will have bytes read into it and the internal cursor
103    /// will be advanced if any bytes were read. Note that this method typically
104    /// will not reallocate the buffer provided.
105    fn poll_read_buf<B: BufMut>(
106        self: Pin<&mut Self>,
107        cx: &mut Context<'_>,
108        buf: &mut B,
109    ) -> Poll<io::Result<usize>>
110    where
111        Self: Sized,
112    {
113        if !buf.has_remaining_mut() {
114            return Poll::Ready(Ok(0));
115        }
116
117        unsafe {
118            let n = {
119                let b = buf.bytes_mut();
120
121                self.prepare_uninitialized_buffer(b);
122
123                // Convert to `&mut [u8]`
124                let b = &mut *(b as *mut [MaybeUninit<u8>] as *mut [u8]);
125
126                let n = ready!(self.poll_read(cx, b))?;
127                assert!(n <= b.len(), "Bad AsyncRead implementation, more bytes were reported as read than the buffer can hold");
128                n
129            };
130
131            buf.advance_mut(n);
132            Poll::Ready(Ok(n))
133        }
134    }
135}
136
137macro_rules! deref_async_read {
138    () => {
139        unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
140            (**self).prepare_uninitialized_buffer(buf)
141        }
142
143        fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
144            -> Poll<io::Result<usize>>
145        {
146            Pin::new(&mut **self).poll_read(cx, buf)
147        }
148    }
149}
150
151impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for Box<T> {
152    deref_async_read!();
153}
154
155impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for &mut T {
156    deref_async_read!();
157}
158
159impl<P> AsyncRead for Pin<P>
160where
161    P: DerefMut + Unpin,
162    P::Target: AsyncRead,
163{
164    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
165        (**self).prepare_uninitialized_buffer(buf)
166    }
167
168    fn poll_read(
169        self: Pin<&mut Self>,
170        cx: &mut Context<'_>,
171        buf: &mut [u8],
172    ) -> Poll<io::Result<usize>> {
173        self.get_mut().as_mut().poll_read(cx, buf)
174    }
175}
176
177impl AsyncRead for &[u8] {
178    unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [MaybeUninit<u8>]) -> bool {
179        false
180    }
181
182    fn poll_read(
183        self: Pin<&mut Self>,
184        _cx: &mut Context<'_>,
185        buf: &mut [u8],
186    ) -> Poll<io::Result<usize>> {
187        Poll::Ready(io::Read::read(self.get_mut(), buf))
188    }
189}
190
191impl<T: AsRef<[u8]> + Unpin> AsyncRead for io::Cursor<T> {
192    unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [MaybeUninit<u8>]) -> bool {
193        false
194    }
195
196    fn poll_read(
197        self: Pin<&mut Self>,
198        _cx: &mut Context<'_>,
199        buf: &mut [u8],
200    ) -> Poll<io::Result<usize>> {
201        Poll::Ready(io::Read::read(self.get_mut(), buf))
202    }
203}