compio_io/read/
mod.rs

1#[cfg(feature = "allocator_api")]
2use std::alloc::Allocator;
3use std::{io::Cursor, ops::DerefMut, rc::Rc, sync::Arc};
4
5use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBufMut, buf_try, t_alloc};
6
7mod buf;
8#[macro_use]
9mod ext;
10
11pub use buf::*;
12pub use ext::*;
13
14use crate::util::slice_to_buf;
15
16/// AsyncRead
17///
18/// Async read with a ownership of a buffer
19pub trait AsyncRead {
20    /// Read some bytes from this source into the [`IoBufMut`] buffer and return
21    /// a [`BufResult`], consisting of the buffer and a [`usize`] indicating
22    /// how many bytes were read.
23    ///
24    /// # Caution
25    ///
26    /// Implementor **MUST** update the buffer init via
27    /// [`SetBufInit::set_buf_init`] after reading, and no further update should
28    /// be made by caller.
29    ///
30    /// [`SetBufInit::set_buf_init`]: compio_buf::SetBufInit::set_buf_init
31    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B>;
32
33    /// Like `read`, except that it reads into a type implements
34    /// [`IoVectoredBufMut`].
35    ///
36    /// The default implementation will try to read into the buffers in order,
37    /// and stop whenever the reader returns an error, `Ok(0)`, or a length
38    /// less than the length of the buf passed in, meaning it's possible that
39    /// not all buffer space is filled. If guaranteed full read is desired,
40    /// it is recommended to use [`AsyncReadExt::read_vectored_exact`]
41    /// instead.
42    ///
43    /// # Caution
44    ///
45    /// Implementor **MUST** update the buffer init via
46    /// [`SetBufInit::set_buf_init`] after reading.
47    ///
48    /// [`SetBufInit::set_buf_init`]: compio_buf::SetBufInit::set_buf_init
49    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
50        loop_read_vectored!(
51            buf, len, total: usize, n, iter,
52            loop self.read(iter),
53            break if n == 0 || n < len {
54                Some(Ok(total))
55            } else {
56                None
57            }
58        )
59    }
60}
61
62impl<A: AsyncRead + ?Sized> AsyncRead for &mut A {
63    #[inline(always)]
64    async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
65        (**self).read(buf).await
66    }
67
68    #[inline(always)]
69    async fn read_vectored<T: IoVectoredBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
70        (**self).read_vectored(buf).await
71    }
72}
73
74impl<R: AsyncRead + ?Sized, #[cfg(feature = "allocator_api")] A: Allocator> AsyncRead
75    for t_alloc!(Box, R, A)
76{
77    #[inline(always)]
78    async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
79        (**self).read(buf).await
80    }
81
82    #[inline(always)]
83    async fn read_vectored<T: IoVectoredBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
84        (**self).read_vectored(buf).await
85    }
86}
87
88impl AsyncRead for &[u8] {
89    #[inline]
90    async fn read<T: IoBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
91        let len = slice_to_buf(self, &mut buf);
92        *self = &self[len..];
93        BufResult(Ok(len), buf)
94    }
95
96    async fn read_vectored<T: IoVectoredBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
97        let mut this = *self; // An immutable slice to track the read position
98
99        for mut buf in buf.iter_buf_mut() {
100            let n = slice_to_buf(this, buf.deref_mut());
101            this = &this[n..];
102            if this.is_empty() {
103                break;
104            }
105        }
106
107        BufResult(Ok(self.len() - this.len()), buf)
108    }
109}
110
111/// # AsyncReadAt
112///
113/// Async read with a ownership of a buffer and a position
114pub trait AsyncReadAt {
115    /// Like [`AsyncRead::read`], except that it reads at a specified position.
116    async fn read_at<T: IoBufMut>(&self, buf: T, pos: u64) -> BufResult<usize, T>;
117
118    /// Like [`AsyncRead::read_vectored`], except that it reads at a specified
119    /// position.
120    async fn read_vectored_at<T: IoVectoredBufMut>(&self, buf: T, pos: u64) -> BufResult<usize, T> {
121        loop_read_vectored!(
122            buf, len, total: u64, n, iter,
123            loop self.read_at(iter, pos + total),
124            break if n == 0 || n < len {
125                Some(Ok(total as usize))
126            } else {
127                None
128            }
129        )
130    }
131}
132
133macro_rules! impl_read_at {
134    (@ptr $($ty:ty),*) => {
135        $(
136            impl<A: AsyncReadAt + ?Sized> AsyncReadAt for $ty {
137                async fn read_at<T: IoBufMut>(&self, buf: T, pos: u64) -> BufResult<usize, T> {
138                    (**self).read_at(buf, pos).await
139                }
140
141                async fn read_vectored_at<T: IoVectoredBufMut>(&self, buf: T, pos: u64) -> BufResult<usize, T> {
142                    (**self).read_vectored_at(buf, pos).await
143                }
144            }
145        )*
146    };
147
148    (@ptra $($ty:ident),*) => {
149        $(
150            #[cfg(feature = "allocator_api")]
151            impl<R: AsyncReadAt + ?Sized, A: Allocator> AsyncReadAt for $ty<R, A> {
152                async fn read_at<T: IoBufMut>(&self, buf: T, pos: u64) -> BufResult<usize, T> {
153                    (**self).read_at(buf, pos).await
154                }
155
156                async fn read_vectored_at<T: IoVectoredBufMut>(&self, buf: T, pos: u64) -> BufResult<usize, T> {
157                    (**self).read_vectored_at(buf, pos).await
158                }
159            }
160            #[cfg(not(feature = "allocator_api"))]
161            impl_read_at!(@ptr $ty<A>);
162        )*
163    };
164
165    (@slice $($(const $len:ident =>)? $ty:ty), *) => {
166        $(
167            impl<$(const $len: usize)?> AsyncReadAt for $ty {
168                async fn read_at<T: IoBufMut>(&self, mut buf: T, pos: u64) -> BufResult<usize, T> {
169                    let pos = pos.min(self.len() as u64);
170                    let len = slice_to_buf(&self[pos as usize..], &mut buf);
171                    BufResult(Ok(len), buf)
172                }
173            }
174        )*
175    }
176}
177
178impl_read_at!(@ptr &A, &mut A);
179impl_read_at!(@ptra Box, Rc, Arc);
180impl_read_at!(@slice [u8], const LEN => [u8; LEN]);
181
182impl<#[cfg(feature = "allocator_api")] A: Allocator> AsyncReadAt for t_alloc!(Vec, u8, A) {
183    async fn read_at<T: IoBufMut>(&self, buf: T, pos: u64) -> BufResult<usize, T> {
184        self.as_slice().read_at(buf, pos).await
185    }
186
187    async fn read_vectored_at<T: IoVectoredBufMut>(&self, buf: T, pos: u64) -> BufResult<usize, T> {
188        self.as_slice().read_vectored_at(buf, pos).await
189    }
190}
191
192impl<A: AsyncReadAt> AsyncRead for Cursor<A> {
193    #[inline]
194    async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
195        let pos = self.position();
196        let (n, buf) = buf_try!(self.get_ref().read_at(buf, pos).await);
197        self.set_position(pos + n as u64);
198        BufResult(Ok(n), buf)
199    }
200
201    #[inline]
202    async fn read_vectored<T: IoVectoredBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
203        let pos = self.position();
204        let (n, buf) = buf_try!(self.get_ref().read_vectored_at(buf, pos).await);
205        self.set_position(pos + n as u64);
206        BufResult(Ok(n), buf)
207    }
208}