async_std/result/from_stream.rs
1use std::pin::Pin;
2
3use crate::prelude::*;
4use crate::stream::{FromStream, IntoStream};
5
6impl<T, E, V> FromStream<Result<T, E>> for Result<V, E>
7where
8 T: Send,
9 E: Send,
10 V: FromStream<T>,
11{
12 /// Takes each element in the stream: if it is an `Err`, no further
13 /// elements are taken, and the `Err` is returned. Should no `Err`
14 /// occur, a container with the values of each `Result` is returned.
15 ///
16 /// # Examples
17 ///
18 /// ```
19 /// # fn main() { async_std::task::block_on(async {
20 /// #
21 /// use async_std::prelude::*;
22 /// use async_std::stream;
23 ///
24 /// let v = stream::from_iter(vec![1, 2]);
25 /// let res: Result<Vec<u32>, &'static str> = v.map(|x: u32|
26 /// x.checked_add(1).ok_or("Overflow!")
27 /// ).collect().await;
28 /// assert_eq!(res, Ok(vec![2, 3]));
29 /// #
30 /// # }) }
31 /// ```
32 #[inline]
33 fn from_stream<'a, S: IntoStream<Item = Result<T, E>> + 'a>(
34 stream: S,
35 ) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
36 where
37 <S as IntoStream>::IntoStream: Send,
38 {
39 let stream = stream.into_stream();
40
41 Box::pin(async move {
42 // Using `take_while` here because it is able to stop the stream early
43 // if a failure occurs
44 let mut is_error = false;
45 let mut found_error = None;
46 let out: V = stream
47 .take_while(|elem| {
48 // Stop processing the stream on `Err`
49 !is_error
50 && (elem.is_ok() || {
51 is_error = true;
52 // Capture first `Err`
53 true
54 })
55 })
56 .filter_map(|elem| match elem {
57 Ok(value) => Some(value),
58 Err(err) => {
59 found_error = Some(err);
60 None
61 }
62 })
63 .collect()
64 .await;
65
66 if is_error {
67 Err(found_error.unwrap())
68 } else {
69 Ok(out)
70 }
71 })
72 }
73}