futures_util/future/
join.rs

1#![allow(non_snake_case)]
2
3use core::fmt;
4use core::mem;
5
6use futures_core::{Future, Poll, Async};
7use futures_core::task;
8
9macro_rules! generate {
10    ($(
11        $(#[$doc:meta])*
12        ($Join:ident, $new:ident, <A, $($B:ident),*>),
13    )*) => ($(
14        $(#[$doc])*
15        #[must_use = "futures do nothing unless polled"]
16        pub struct $Join<A, $($B),*>
17            where A: Future,
18                  $($B: Future<Error=A::Error>),*
19        {
20            a: MaybeDone<A>,
21            $($B: MaybeDone<$B>,)*
22        }
23
24        impl<A, $($B),*> fmt::Debug for $Join<A, $($B),*>
25            where A: Future + fmt::Debug,
26                  A::Item: fmt::Debug,
27                  $(
28                      $B: Future<Error=A::Error> + fmt::Debug,
29                      $B::Item: fmt::Debug
30                  ),*
31        {
32            fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
33                fmt.debug_struct(stringify!($Join))
34                    .field("a", &self.a)
35                    $(.field(stringify!($B), &self.$B))*
36                    .finish()
37            }
38        }
39
40        pub fn $new<A, $($B),*>(a: A, $($B: $B),*) -> $Join<A, $($B),*>
41            where A: Future,
42                  $($B: Future<Error=A::Error>),*
43        {
44            $Join {
45                a: MaybeDone::NotYet(a),
46                $($B: MaybeDone::NotYet($B)),*
47            }
48        }
49
50        impl<A, $($B),*> $Join<A, $($B),*>
51            where A: Future,
52                  $($B: Future<Error=A::Error>),*
53        {
54            fn erase(&mut self) {
55                self.a = MaybeDone::Gone;
56                $(self.$B = MaybeDone::Gone;)*
57            }
58        }
59
60        impl<A, $($B),*> Future for $Join<A, $($B),*>
61            where A: Future,
62                  $($B: Future<Error=A::Error>),*
63        {
64            type Item = (A::Item, $($B::Item),*);
65            type Error = A::Error;
66
67            fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
68                let mut all_done = match self.a.poll(cx) {
69                    Ok(done) => done,
70                    Err(e) => {
71                        self.erase();
72                        return Err(e)
73                    }
74                };
75                $(
76                    all_done = match self.$B.poll(cx) {
77                        Ok(done) => all_done && done,
78                        Err(e) => {
79                            self.erase();
80                            return Err(e)
81                        }
82                    };
83                )*
84
85                if all_done {
86                    Ok(Async::Ready((self.a.take(), $(self.$B.take()),*)))
87                } else {
88                    Ok(Async::Pending)
89                }
90            }
91        }
92
93        // Incoherent-- add to futures-core when stable.
94        /*
95        impl<A, $($B),*> IntoFuture for (A, $($B),*)
96            where A: IntoFuture,
97        $(
98            $B: IntoFuture<Error=A::Error>
99        ),*
100        {
101            type Future = $Join<A::Future, $($B::Future),*>;
102            type Item = (A::Item, $($B::Item),*);
103            type Error = A::Error;
104
105            fn into_future(self) -> Self::Future {
106                match self {
107                    (a, $($B),+) => {
108                        $new(
109                            IntoFuture::into_future(a),
110                            $(IntoFuture::into_future($B)),+
111                        )
112                    }
113                }
114            }
115        }
116        */
117
118    )*)
119}
120
121generate! {
122    /// Future for the `join` combinator, waiting for two futures to
123    /// complete.
124    ///
125    /// This is created by the `Future::join` method.
126    (Join, new, <A, B>),
127
128    /// Future for the `join3` combinator, waiting for three futures to
129    /// complete.
130    ///
131    /// This is created by the `Future::join3` method.
132    (Join3, new3, <A, B, C>),
133
134    /// Future for the `join4` combinator, waiting for four futures to
135    /// complete.
136    ///
137    /// This is created by the `Future::join4` method.
138    (Join4, new4, <A, B, C, D>),
139
140    /// Future for the `join5` combinator, waiting for five futures to
141    /// complete.
142    ///
143    /// This is created by the `Future::join5` method.
144    (Join5, new5, <A, B, C, D, E>),
145}
146
147#[derive(Debug)]
148enum MaybeDone<A: Future> {
149    NotYet(A),
150    Done(A::Item),
151    Gone,
152}
153
154impl<A: Future> MaybeDone<A> {
155    fn poll(&mut self, cx: &mut task::Context) -> Result<bool, A::Error> {
156        let res = match *self {
157            MaybeDone::NotYet(ref mut a) => a.poll(cx)?,
158            MaybeDone::Done(_) => return Ok(true),
159            MaybeDone::Gone => panic!("cannot poll Join twice"),
160        };
161        match res {
162            Async::Ready(res) => {
163                *self = MaybeDone::Done(res);
164                Ok(true)
165            }
166            Async::Pending => Ok(false),
167        }
168    }
169
170    fn take(&mut self) -> A::Item {
171        match mem::replace(self, MaybeDone::Gone) {
172            MaybeDone::Done(a) => a,
173            _ => panic!(),
174        }
175    }
176}