async_std/result/
product.rs1use std::pin::Pin;
2
3use crate::prelude::*;
4use crate::stream::{Product, Stream};
5
6impl<T, U, E> Product<Result<U, E>> for Result<T, E>
7where
8 T: Product<U>,
9{
10 #[doc = r#"
11 Takes each element in the `Stream`: if it is an `Err`, no further
12 elements are taken, and the `Err` is returned. Should no `Err` occur,
13 the product of all elements is returned.
14
15 # Examples
16
17 This multiplies every integer in a vector, rejecting the product if a negative element is
18 encountered:
19
20 ```
21 # fn main() { async_std::task::block_on(async {
22 #
23 use async_std::prelude::*;
24 use async_std::stream;
25
26 let v = stream::from_iter(vec![1, 2, 4]);
27 let res: Result<i32, &'static str> = v.map(|x|
28 if x < 0 {
29 Err("Negative element found")
30 } else {
31 Ok(x)
32 }).product().await;
33 assert_eq!(res, Ok(8));
34 #
35 # }) }
36 ```
37 "#]
38 fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
39 where
40 S: Stream<Item = Result<U, E>> + 'a,
41 {
42 Box::pin(async move {
43 let mut is_error = false;
46 let mut found_error = None;
47 let out = <T as Product<U>>::product(
48 stream
49 .take_while(|elem| {
50 !is_error
52 && (elem.is_ok() || {
53 is_error = true;
54 true
56 })
57 })
58 .filter_map(|elem| match elem {
59 Ok(value) => Some(value),
60 Err(err) => {
61 found_error = Some(err);
62 None
63 }
64 }),
65 )
66 .await;
67
68 if is_error {
69 Err(found_error.unwrap())
70 } else {
71 Ok(out)
72 }
73 })
74 }
75}