broker_tokio/io/util/
async_buf_read_ext.rs

1use crate::io::util::lines::{lines, Lines};
2use crate::io::util::read_line::{read_line, ReadLine};
3use crate::io::util::read_until::{read_until, ReadUntil};
4use crate::io::util::split::{split, Split};
5use crate::io::AsyncBufRead;
6
7cfg_io_util! {
8    /// An extension trait which adds utility methods to [`AsyncBufRead`] types.
9    ///
10    /// [`AsyncBufRead`]: crate::io::AsyncBufRead
11    pub trait AsyncBufReadExt: AsyncBufRead {
12        /// Read all bytes into `buf` until the delimiter `byte` or EOF is reached.
13        ///
14        /// Equivalent to:
15        ///
16        /// ```ignore
17        /// async fn read_until(&mut self, buf: &mut Vec<u8>) -> io::Result<usize>;
18        /// ```
19        ///
20        /// This function will read bytes from the underlying stream until the
21        /// delimiter or EOF is found. Once found, all bytes up to, and including,
22        /// the delimiter (if found) will be appended to `buf`.
23        ///
24        /// If successful, this function will return the total number of bytes read.
25        ///
26        /// # Errors
27        ///
28        /// This function will ignore all instances of [`ErrorKind::Interrupted`] and
29        /// will otherwise return any errors returned by [`fill_buf`].
30        ///
31        /// If an I/O error is encountered then all bytes read so far will be
32        /// present in `buf` and its length will have been adjusted appropriately.
33        ///
34        /// [`fill_buf`]: AsyncBufRead::poll_fill_buf
35        /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
36        ///
37        /// # Examples
38        ///
39        /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
40        /// this example, we use [`Cursor`] to read all the bytes in a byte slice
41        /// in hyphen delimited segments:
42        ///
43        /// [`Cursor`]: std::io::Cursor
44        ///
45        /// ```
46        /// use tokio::io::AsyncBufReadExt;
47        ///
48        /// use std::io::Cursor;
49        ///
50        /// #[tokio::main]
51        /// async fn main() {
52        ///     let mut cursor = Cursor::new(b"lorem-ipsum");
53        ///     let mut buf = vec![];
54        ///
55        ///     // cursor is at 'l'
56        ///     let num_bytes = cursor.read_until(b'-', &mut buf)
57        ///         .await
58        ///         .expect("reading from cursor won't fail");
59        ///
60        ///     assert_eq!(num_bytes, 6);
61        ///     assert_eq!(buf, b"lorem-");
62        ///     buf.clear();
63        ///
64        ///     // cursor is at 'i'
65        ///     let num_bytes = cursor.read_until(b'-', &mut buf)
66        ///         .await
67        ///         .expect("reading from cursor won't fail");
68        ///
69        ///     assert_eq!(num_bytes, 5);
70        ///     assert_eq!(buf, b"ipsum");
71        ///     buf.clear();
72        ///
73        ///     // cursor is at EOF
74        ///     let num_bytes = cursor.read_until(b'-', &mut buf)
75        ///         .await
76        ///         .expect("reading from cursor won't fail");
77        ///     assert_eq!(num_bytes, 0);
78        ///     assert_eq!(buf, b"");
79        /// }
80        /// ```
81        fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
82        where
83            Self: Unpin,
84        {
85            read_until(self, byte, buf)
86        }
87
88        /// Read all bytes until a newline (the 0xA byte) is reached, and append
89        /// them to the provided buffer.
90        ///
91        /// Equivalent to:
92        ///
93        /// ```ignore
94        /// async fn read_line(&mut self, buf: &mut String) -> io::Result<usize>;
95        /// ```
96        ///
97        /// This function will read bytes from the underlying stream until the
98        /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
99        /// up to, and including, the delimiter (if found) will be appended to
100        /// `buf`.
101        ///
102        /// If successful, this function will return the total number of bytes read.
103        ///
104        /// If this function returns `Ok(0)`, the stream has reached EOF.
105        ///
106        /// # Errors
107        ///
108        /// This function has the same error semantics as [`read_until`] and will
109        /// also return an error if the read bytes are not valid UTF-8. If an I/O
110        /// error is encountered then `buf` may contain some bytes already read in
111        /// the event that all data read so far was valid UTF-8.
112        ///
113        /// [`read_until`]: AsyncBufReadExt::read_until
114        ///
115        /// # Examples
116        ///
117        /// [`std::io::Cursor`][`Cursor`] is a type that implements
118        /// `AsyncBufRead`. In this example, we use [`Cursor`] to read all the
119        /// lines in a byte slice:
120        ///
121        /// [`Cursor`]: std::io::Cursor
122        ///
123        /// ```
124        /// use tokio::io::AsyncBufReadExt;
125        ///
126        /// use std::io::Cursor;
127        ///
128        /// #[tokio::main]
129        /// async fn main() {
130        ///     let mut cursor = Cursor::new(b"foo\nbar");
131        ///     let mut buf = String::new();
132        ///
133        ///     // cursor is at 'f'
134        ///     let num_bytes = cursor.read_line(&mut buf)
135        ///         .await
136        ///         .expect("reading from cursor won't fail");
137        ///
138        ///     assert_eq!(num_bytes, 4);
139        ///     assert_eq!(buf, "foo\n");
140        ///     buf.clear();
141        ///
142        ///     // cursor is at 'b'
143        ///     let num_bytes = cursor.read_line(&mut buf)
144        ///         .await
145        ///         .expect("reading from cursor won't fail");
146        ///
147        ///     assert_eq!(num_bytes, 3);
148        ///     assert_eq!(buf, "bar");
149        ///     buf.clear();
150        ///
151        ///     // cursor is at EOF
152        ///     let num_bytes = cursor.read_line(&mut buf)
153        ///         .await
154        ///         .expect("reading from cursor won't fail");
155        ///
156        ///     assert_eq!(num_bytes, 0);
157        ///     assert_eq!(buf, "");
158        /// }
159        /// ```
160        fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
161        where
162            Self: Unpin,
163        {
164            read_line(self, buf)
165        }
166
167        /// Returns a stream of the contents of this reader split on the byte
168        /// `byte`.
169        ///
170        /// This method is the asynchronous equivalent to
171        /// [`BufRead::split`](std::io::BufRead::split).
172        ///
173        /// The stream returned from this function will yield instances of
174        /// [`io::Result`]`<`[`Vec<u8>`]`>`. Each vector returned will *not* have
175        /// the delimiter byte at the end.
176        ///
177        /// [`io::Result`]: std::io::Result
178        /// [`Vec<u8>`]: std::vec::Vec
179        ///
180        /// # Errors
181        ///
182        /// Each item of the stream has the same error semantics as
183        /// [`AsyncBufReadExt::read_until`](AsyncBufReadExt::read_until).
184        ///
185        /// # Examples
186        ///
187        /// ```
188        /// # use tokio::io::AsyncBufRead;
189        /// use tokio::io::AsyncBufReadExt;
190        ///
191        /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
192        /// let mut segments = my_buf_read.split(b'f');
193        ///
194        /// while let Some(segment) = segments.next_segment().await? {
195        ///     println!("length = {}", segment.len())
196        /// }
197        /// # Ok(())
198        /// # }
199        /// ```
200        fn split(self, byte: u8) -> Split<Self>
201        where
202            Self: Sized + Unpin,
203        {
204            split(self, byte)
205        }
206
207        /// Returns a stream over the lines of this reader.
208        /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
209        ///
210        /// The stream returned from this function will yield instances of
211        /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
212        /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
213        ///
214        /// [`io::Result`]: std::io::Result
215        /// [`String`]: String
216        ///
217        /// # Errors
218        ///
219        /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
220        ///
221        /// # Examples
222        ///
223        /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
224        /// this example, we use [`Cursor`] to iterate over all the lines in a byte
225        /// slice.
226        ///
227        /// [`Cursor`]: std::io::Cursor
228        ///
229        /// ```
230        /// use tokio::io::AsyncBufReadExt;
231        /// use tokio::stream::StreamExt;
232        ///
233        /// use std::io::Cursor;
234        ///
235        /// #[tokio::main]
236        /// async fn main() {
237        ///     let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");
238        ///
239        ///     let mut lines = cursor.lines().map(|res| res.unwrap());
240        ///
241        ///     assert_eq!(lines.next().await, Some(String::from("lorem")));
242        ///     assert_eq!(lines.next().await, Some(String::from("ipsum")));
243        ///     assert_eq!(lines.next().await, Some(String::from("dolor")));
244        ///     assert_eq!(lines.next().await, None);
245        /// }
246        /// ```
247        ///
248        /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
249        fn lines(self) -> Lines<Self>
250        where
251            Self: Sized,
252        {
253            lines(self)
254        }
255    }
256}
257
258impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}