async_std/io/read/
bytes.rs1use std::pin::Pin;
2
3use crate::io::{self, Read};
4use crate::stream::stream::Stream;
5use crate::task::{Context, Poll};
6
7#[derive(Debug)]
14pub struct Bytes<T> {
15 pub(crate) inner: T,
16}
17
18impl<T: Read + Unpin> Stream for Bytes<T> {
19 type Item = io::Result<u8>;
20
21 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
22 let mut byte = 0;
23
24 let rd = Pin::new(&mut self.inner);
25
26 match futures_core::ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
27 Ok(0) => Poll::Ready(None),
28 Ok(..) => Poll::Ready(Some(Ok(byte))),
29 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => Poll::Pending,
30 Err(e) => Poll::Ready(Some(Err(e))),
31 }
32 }
33}
34
35#[cfg(all(test, feature = "default", not(target_arch = "wasm32")))]
36mod tests {
37 use crate::io;
38 use crate::prelude::*;
39 use crate::task;
40
41 #[test]
42 fn test_bytes_basics() -> std::io::Result<()> {
43 task::block_on(async move {
44 let raw: Vec<u8> = vec![0, 1, 2, 3, 4, 5, 6, 7, 8];
45 let source: io::Cursor<Vec<u8>> = io::Cursor::new(raw.clone());
46
47 let mut s = source.bytes();
48
49 let mut result = Vec::new();
51 while let Some(byte) = s.next().await {
52 result.push(byte?);
53 }
54
55 assert_eq!(result, raw);
56
57 Ok(())
58 })
59 }
60}