async_std/io/read/
bytes.rs

1use std::pin::Pin;
2
3use crate::io::{self, Read};
4use crate::stream::stream::Stream;
5use crate::task::{Context, Poll};
6
7/// A stream over `u8` values of a reader.
8///
9/// This struct is generally created by calling [`bytes`] on a reader.
10/// Please see the documentation of [`bytes`] for more details.
11///
12/// [`bytes`]: trait.Read.html#method.bytes
13#[derive(Debug)]
14pub struct Bytes<T> {
15    pub(crate) inner: T,
16}
17
18impl<T: Read + Unpin> Stream for Bytes<T> {
19    type Item = io::Result<u8>;
20
21    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
22        let mut byte = 0;
23
24        let rd = Pin::new(&mut self.inner);
25
26        match futures_core::ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
27            Ok(0) => Poll::Ready(None),
28            Ok(..) => Poll::Ready(Some(Ok(byte))),
29            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => Poll::Pending,
30            Err(e) => Poll::Ready(Some(Err(e))),
31        }
32    }
33}
34
35#[cfg(all(test, feature = "default", not(target_arch = "wasm32")))]
36mod tests {
37    use crate::io;
38    use crate::prelude::*;
39    use crate::task;
40
41    #[test]
42    fn test_bytes_basics() -> std::io::Result<()> {
43        task::block_on(async move {
44            let raw: Vec<u8> = vec![0, 1, 2, 3, 4, 5, 6, 7, 8];
45            let source: io::Cursor<Vec<u8>> = io::Cursor::new(raw.clone());
46
47            let mut s = source.bytes();
48
49            // TODO(@dignifiedquire): Use collect, once it is stable.
50            let mut result = Vec::new();
51            while let Some(byte) = s.next().await {
52                result.push(byte?);
53            }
54
55            assert_eq!(result, raw);
56
57            Ok(())
58        })
59    }
60}