async_std/result/
product.rs

1use std::pin::Pin;
2
3use crate::prelude::*;
4use crate::stream::{Product, Stream};
5
6impl<T, U, E> Product<Result<U, E>> for Result<T, E>
7where
8    T: Product<U>,
9{
10    #[doc = r#"
11        Takes each element in the `Stream`: if it is an `Err`, no further
12        elements are taken, and the `Err` is returned. Should no `Err` occur,
13        the product of all elements is returned.
14
15        # Examples
16
17        This multiplies every integer in a vector, rejecting the product if a negative element is
18        encountered:
19
20        ```
21        # fn main() { async_std::task::block_on(async {
22        #
23        use async_std::prelude::*;
24        use async_std::stream;
25
26        let v = stream::from_iter(vec![1, 2, 4]);
27        let res: Result<i32, &'static str> = v.map(|x|
28            if x < 0 {
29                Err("Negative element found")
30            } else {
31                Ok(x)
32            }).product().await;
33        assert_eq!(res, Ok(8));
34        #
35        # }) }
36        ```
37    "#]
38    fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
39    where
40        S: Stream<Item = Result<U, E>> + 'a,
41    {
42        Box::pin(async move {
43            // Using `take_while` here because it is able to stop the stream early
44            // if a failure occurs
45            let mut is_error = false;
46            let mut found_error = None;
47            let out = <T as Product<U>>::product(
48                stream
49                    .take_while(|elem| {
50                        // Stop processing the stream on `Err`
51                        !is_error
52                            && (elem.is_ok() || {
53                                is_error = true;
54                                // Capture first `Err`
55                                true
56                            })
57                    })
58                    .filter_map(|elem| match elem {
59                        Ok(value) => Some(value),
60                        Err(err) => {
61                            found_error = Some(err);
62                            None
63                        }
64                    }),
65            )
66            .await;
67
68            if is_error {
69                Err(found_error.unwrap())
70            } else {
71                Ok(out)
72            }
73        })
74    }
75}