tokio/macros/
join.rs

1macro_rules! doc {
2    ($join:item) => {
3        /// Waits on multiple concurrent branches, returning when **all** branches
4        /// complete.
5        ///
6        /// The `join!` macro must be used inside of async functions, closures, and
7        /// blocks.
8        ///
9        /// The `join!` macro takes a list of async expressions and evaluates them
10        /// concurrently on the same task. Each async expression evaluates to a future
11        /// and the futures from each expression are multiplexed on the current task.
12        ///
13        /// When working with async expressions returning `Result`, `join!` will wait
14        /// for **all** branches complete regardless if any complete with `Err`. Use
15        /// [`try_join!`] to return early when `Err` is encountered.
16        ///
17        /// [`try_join!`]: crate::try_join
18        ///
19        /// # Notes
20        ///
21        /// The supplied futures are stored inline and do not require allocating a
22        /// `Vec`.
23        ///
24        /// ### Runtime characteristics
25        ///
26        /// By running all async expressions on the current task, the expressions are
27        /// able to run **concurrently** but not in **parallel**. This means all
28        /// expressions are run on the same thread and if one branch blocks the thread,
29        /// all other expressions will be unable to continue. If parallelism is
30        /// required, spawn each async expression using [`tokio::spawn`] and pass the
31        /// join handle to `join!`.
32        ///
33        /// [`tokio::spawn`]: crate::spawn
34        ///
35        /// # Examples
36        ///
37        /// Basic join with two branches
38        ///
39        /// ```
40        /// async fn do_stuff_async() {
41        ///     // async work
42        /// }
43        ///
44        /// async fn more_async_work() {
45        ///     // more here
46        /// }
47        ///
48        /// #[tokio::main]
49        /// async fn main() {
50        ///     let (first, second) = tokio::join!(
51        ///         do_stuff_async(),
52        ///         more_async_work());
53        ///
54        ///     // do something with the values
55        /// }
56        /// ```
57        #[macro_export]
58        #[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
59        $join
60    };
61}
62
63#[cfg(doc)]
64doc! {macro_rules! join {
65    ($($future:expr),*) => { unimplemented!() }
66}}
67
68#[cfg(not(doc))]
69doc! {macro_rules! join {
70    (@ {
71        // One `_` for each branch in the `join!` macro. This is not used once
72        // normalization is complete.
73        ( $($count:tt)* )
74
75        // The expression `0+1+1+ ... +1` equal to the number of branches.
76        ( $($total:tt)* )
77
78        // Normalized join! branches
79        $( ( $($skip:tt)* ) $e:expr, )*
80
81    }) => {{
82        use $crate::macros::support::{maybe_done, poll_fn, Future, Pin};
83        use $crate::macros::support::Poll::{Ready, Pending};
84
85        // Safety: nothing must be moved out of `futures`. This is to satisfy
86        // the requirement of `Pin::new_unchecked` called below.
87        //
88        // We can't use the `pin!` macro for this because `futures` is a tuple
89        // and the standard library provides no way to pin-project to the fields
90        // of a tuple.
91        let mut futures = ( $( maybe_done($e), )* );
92
93        // This assignment makes sure that the `poll_fn` closure only has a
94        // reference to the futures, instead of taking ownership of them. This
95        // mitigates the issue described in
96        // <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484>
97        let mut futures = &mut futures;
98
99        // Each time the future created by poll_fn is polled, a different future will be polled first
100        // to ensure every future passed to join! gets a chance to make progress even if
101        // one of the futures consumes the whole budget.
102        //
103        // This is number of futures that will be skipped in the first loop
104        // iteration the next time.
105        let mut skip_next_time: u32 = 0;
106
107        poll_fn(move |cx| {
108            const COUNT: u32 = $($total)*;
109
110            let mut is_pending = false;
111
112            let mut to_run = COUNT;
113
114            // The number of futures that will be skipped in the first loop iteration.
115            let mut skip = skip_next_time;
116
117            skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 };
118
119            // This loop runs twice and the first `skip` futures
120            // are not polled in the first iteration.
121            loop {
122            $(
123                if skip == 0 {
124                    if to_run == 0 {
125                        // Every future has been polled
126                        break;
127                    }
128                    to_run -= 1;
129
130                    // Extract the future for this branch from the tuple.
131                    let ( $($skip,)* fut, .. ) = &mut *futures;
132
133                    // Safety: future is stored on the stack above
134                    // and never moved.
135                    let mut fut = unsafe { Pin::new_unchecked(fut) };
136
137                    // Try polling
138                    if fut.poll(cx).is_pending() {
139                        is_pending = true;
140                    }
141                } else {
142                    // Future skipped, one less future to skip in the next iteration
143                    skip -= 1;
144                }
145            )*
146            }
147
148            if is_pending {
149                Pending
150            } else {
151                Ready(($({
152                    // Extract the future for this branch from the tuple.
153                    let ( $($skip,)* fut, .. ) = &mut futures;
154
155                    // Safety: future is stored on the stack above
156                    // and never moved.
157                    let mut fut = unsafe { Pin::new_unchecked(fut) };
158
159                    fut.take_output().expect("expected completed future")
160                },)*))
161            }
162        }).await
163    }};
164
165    // ===== Normalize =====
166
167    (@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
168        $crate::join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
169    };
170
171    // ===== Entry point =====
172
173    ( $($e:expr),+ $(,)?) => {
174        $crate::join!(@{ () (0) } $($e,)*)
175    };
176
177    () => { async {}.await }
178}}