async_macros/try_join.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
/// Awaits multiple fallible futures simultaneously, returning all results once
/// complete.
///
/// `try_join!` is similar to [`join!`], but completes immediately if any of
/// the futures return an error.
///
/// This macro is only usable inside of async functions, closures, and blocks.
///
/// # Examples
///
/// When used on multiple futures that return `Ok`, `try_join!` will return
/// `Ok` of a tuple of the values:
///
/// ```
/// #![feature(async_await)]
/// # futures::executor::block_on(async {
/// use async_macros::try_join;
/// use futures::future;
///
/// let a = future::ready(Ok::<i32, i32>(1));
/// let b = future::ready(Ok::<u64, i32>(2));
///
/// assert_eq!(try_join!(a, b).await, Ok((1, 2)));
/// # });
/// ```
///
/// If one of the futures resolves to an error, `try_join!` will return
/// that error:
///
/// ```
/// #![feature(async_await)]
/// # futures::executor::block_on(async {
/// use async_macros::try_join;
/// use futures::future;
///
/// let a = future::ready(Ok::<i32, i32>(1));
/// let b = future::ready(Err::<u64, i32>(2));
///
/// assert_eq!(try_join!(a, b).await, Err(2));
/// # });
/// ```
#[macro_export]
macro_rules! try_join {
($($fut:ident),* $(,)?) => { {
async {
use $crate::utils::future::Future;
use $crate::utils::pin::Pin;
use $crate::utils::poll_fn;
use $crate::utils::result::Result;
use $crate::utils::task::Poll;
$(
// Move future into a local so that it is pinned in one place and
// is no longer accessible by the end user.
let mut $fut = $crate::MaybeDone::new($fut);
)*
let res: Result<_, _> = poll_fn(move |cx| {
let mut all_done = true;
$(
let fut = unsafe { Pin::new_unchecked(&mut $fut) };
if Future::poll(fut, cx).is_pending() {
all_done = false;
} else if unsafe { Pin::new_unchecked(&mut $fut) }.output_mut().unwrap().is_err() {
// `.err().unwrap()` rather than `.unwrap_err()` so that we don't introduce
// a `T: Debug` bound.
return Poll::Ready(
Result::Err(unsafe { Pin::new_unchecked(&mut $fut) }
.take()
.unwrap()
.err()
.unwrap()
));
}
)*
if all_done {
let res = ($(
// `.ok().unwrap()` rather than `.unwrap()` so that we don't introduce
// an `E: Debug` bound.
unsafe { Pin::new_unchecked(&mut $fut) }
.take()
.unwrap()
.ok()
.unwrap(),
)*);
Poll::Ready(Result::Ok(res))
} else {
Poll::Pending
}
}).await;
res
}
} }
}