broker_tokio/task/
join.rs

1use crate::loom::alloc::Track;
2use crate::task::RawTask;
3
4use std::fmt;
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10doc_rt_core! {
11    /// An owned permission to join on a task (await its termination).
12    ///
13    /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for
14    /// a task rather than a thread.
15    ///
16    /// A `JoinHandle` *detaches* the associated task when it is dropped, which
17    /// means that there is no longer any handle to the task, and no way to `join`
18    /// on it.
19    ///
20    /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
21    /// functions.
22    ///
23    /// # Examples
24    ///
25    /// Creation from [`task::spawn`]:
26    ///
27    /// ```
28    /// use tokio::task;
29    ///
30    /// # async fn doc() {
31    /// let join_handle: task::JoinHandle<_> = task::spawn(async {
32    ///     // some work here
33    /// });
34    /// # }
35    /// ```
36    ///
37    /// Creation from [`task::spawn_blocking`]:
38    ///
39    /// ```
40    /// use tokio::task;
41    ///
42    /// # async fn doc() {
43    /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| {
44    ///     // some blocking work here
45    /// });
46    /// # }
47    /// ```
48    ///
49    /// Child being detached and outliving its parent:
50    ///
51    /// ```no_run
52    /// use tokio::task;
53    /// use tokio::time;
54    /// use std::time::Duration;
55    ///
56    /// # #[tokio::main] async fn main() {
57    /// let original_task = task::spawn(async {
58    ///     let _detached_task = task::spawn(async {
59    ///         // Here we sleep to make sure that the first task returns before.
60    ///         time::delay_for(Duration::from_millis(10)).await;
61    ///         // This will be called, even though the JoinHandle is dropped.
62    ///         println!("♫ Still alive ♫");
63    ///     });
64    /// });
65    ///
66    /// original_task.await.expect("The task being joined has panicked");
67    /// println!("Original task is joined.");
68    ///
69    /// // We make sure that the new task has time to run, before the main
70    /// // task returns.
71    ///
72    /// time::delay_for(Duration::from_millis(1000)).await;
73    /// # }
74    /// ```
75    ///
76    /// [`task::spawn`]: crate::task::spawn()
77    /// [`task::spawn_blocking`]: crate::task::spawn_blocking
78    /// [`std::thread::JoinHandle`]: std::thread::JoinHandle
79    pub struct JoinHandle<T> {
80        raw: Option<RawTask>,
81        _p: PhantomData<T>,
82    }
83}
84
85unsafe impl<T: Send> Send for JoinHandle<T> {}
86unsafe impl<T: Send> Sync for JoinHandle<T> {}
87
88impl<T> JoinHandle<T> {
89    pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
90        JoinHandle {
91            raw: Some(raw),
92            _p: PhantomData,
93        }
94    }
95}
96
97impl<T> Unpin for JoinHandle<T> {}
98
99impl<T> Future for JoinHandle<T> {
100    type Output = super::Result<T>;
101
102    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
103        use std::mem::MaybeUninit;
104
105        // Raw should always be set
106        let raw = self.raw.as_ref().unwrap();
107
108        // Load the current task state
109        let mut state = raw.header().state.load();
110
111        debug_assert!(state.is_join_interested());
112
113        if state.is_active() {
114            state = if state.has_join_waker() {
115                raw.swap_join_waker(cx.waker(), state)
116            } else {
117                raw.store_join_waker(cx.waker())
118            };
119
120            if state.is_active() {
121                return Poll::Pending;
122            }
123        }
124
125        let mut out = MaybeUninit::<Track<Self::Output>>::uninit();
126
127        unsafe {
128            // This could result in the task being freed.
129            raw.read_output(out.as_mut_ptr() as *mut (), state);
130
131            self.raw = None;
132
133            Poll::Ready(out.assume_init().into_inner())
134        }
135    }
136}
137
138impl<T> Drop for JoinHandle<T> {
139    fn drop(&mut self) {
140        if let Some(raw) = self.raw.take() {
141            if raw.header().state.drop_join_handle_fast() {
142                return;
143            }
144
145            raw.drop_join_handle_slow();
146        }
147    }
148}
149
150impl<T> fmt::Debug for JoinHandle<T>
151where
152    T: fmt::Debug,
153{
154    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
155        fmt.debug_struct("JoinHandle").finish()
156    }
157}