futures_util/future/
join_all.rs

1//! Definition of the `JoinAll` combinator, waiting for all of a list of futures
2//! to finish.
3
4use std::prelude::v1::*;
5
6use std::fmt;
7use std::mem;
8use std::iter::FromIterator;
9
10use futures_core::{Future, IntoFuture, Poll, Async};
11use futures_core::task;
12
13#[derive(Debug)]
14enum ElemState<F> where F: Future {
15    Pending(F),
16    Done(F::Item),
17}
18
19/// A future which takes a list of futures and resolves with a vector of the
20/// completed values.
21///
22/// This future is created with the `join_all` method.
23#[must_use = "futures do nothing unless polled"]
24pub struct JoinAll<F>
25    where F: Future,
26{
27    elems: Vec<ElemState<F>>,
28}
29
30impl<F> fmt::Debug for JoinAll<F>
31    where F: Future + fmt::Debug,
32          F::Item: fmt::Debug,
33{
34    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
35        fmt.debug_struct("JoinAll")
36            .field("elems", &self.elems)
37            .finish()
38    }
39}
40
41/// Creates a future which represents a collection of the results of the futures
42/// given.
43///
44/// The returned future will drive execution for all of its underlying futures,
45/// collecting the results into a destination `Vec<T>` in the same order as they
46/// were provided. If any future returns an error then all other futures will be
47/// canceled and an error will be returned immediately. If all futures complete
48/// successfully, however, then the returned future will succeed with a `Vec` of
49/// all the successful results.
50///
51/// # Examples
52///
53/// ```
54/// # extern crate futures;
55/// use futures::prelude::*;
56/// use futures::future::{join_all, ok, err};
57///
58/// # fn main() {
59/// #
60/// let f = join_all(vec![
61///     ok::<u32, u32>(1),
62///     ok::<u32, u32>(2),
63///     ok::<u32, u32>(3),
64/// ]);
65/// let f = f.map(|x| {
66///     assert_eq!(x, [1, 2, 3]);
67/// });
68///
69/// let f = join_all(vec![
70///     Box::new(ok::<u32, u32>(1)),
71///     Box::new(err::<u32, u32>(2)),
72///     Box::new(ok::<u32, u32>(3)),
73/// ]);
74/// let f = f.then(|x| {
75///     assert_eq!(x, Err(2));
76///     x
77/// });
78/// # }
79/// ```
80pub fn join_all<I>(i: I) -> JoinAll<<I::Item as IntoFuture>::Future>
81    where I: IntoIterator,
82          I::Item: IntoFuture,
83{
84    let elems = i.into_iter().map(|f| {
85        ElemState::Pending(f.into_future())
86    }).collect();
87    JoinAll { elems }
88}
89
90impl<F> Future for JoinAll<F>
91    where F: Future,
92{
93    type Item = Vec<F::Item>;
94    type Error = F::Error;
95
96
97    fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
98        let mut all_done = true;
99
100        for idx in 0 .. self.elems.len() {
101            let done_val = match self.elems[idx] {
102                ElemState::Pending(ref mut t) => {
103                    match t.poll(cx) {
104                        Ok(Async::Ready(v)) => Ok(v),
105                        Ok(Async::Pending) => {
106                            all_done = false;
107                            continue
108                        }
109                        Err(e) => Err(e),
110                    }
111                }
112                ElemState::Done(ref mut _v) => continue,
113            };
114
115            match done_val {
116                Ok(v) => self.elems[idx] = ElemState::Done(v),
117                Err(e) => {
118                    // On completion drop all our associated resources
119                    // ASAP.
120                    self.elems = Vec::new();
121                    return Err(e)
122                }
123            }
124        }
125
126        if all_done {
127            let elems = mem::replace(&mut self.elems, Vec::new());
128            let result = elems.into_iter().map(|e| {
129                match e {
130                    ElemState::Done(t) => t,
131                    _ => unreachable!(),
132                }
133            }).collect();
134            Ok(Async::Ready(result))
135        } else {
136            Ok(Async::Pending)
137        }
138    }
139}
140
141impl<F: IntoFuture> FromIterator<F> for JoinAll<F::Future> {
142    fn from_iter<T: IntoIterator<Item=F>>(iter: T) -> Self {
143        join_all(iter)
144    }
145}