broker_tokio/task/join.rs
1use crate::loom::alloc::Track;
2use crate::task::RawTask;
3
4use std::fmt;
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10doc_rt_core! {
11 /// An owned permission to join on a task (await its termination).
12 ///
13 /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for
14 /// a task rather than a thread.
15 ///
16 /// A `JoinHandle` *detaches* the associated task when it is dropped, which
17 /// means that there is no longer any handle to the task, and no way to `join`
18 /// on it.
19 ///
20 /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
21 /// functions.
22 ///
23 /// # Examples
24 ///
25 /// Creation from [`task::spawn`]:
26 ///
27 /// ```
28 /// use tokio::task;
29 ///
30 /// # async fn doc() {
31 /// let join_handle: task::JoinHandle<_> = task::spawn(async {
32 /// // some work here
33 /// });
34 /// # }
35 /// ```
36 ///
37 /// Creation from [`task::spawn_blocking`]:
38 ///
39 /// ```
40 /// use tokio::task;
41 ///
42 /// # async fn doc() {
43 /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| {
44 /// // some blocking work here
45 /// });
46 /// # }
47 /// ```
48 ///
49 /// Child being detached and outliving its parent:
50 ///
51 /// ```no_run
52 /// use tokio::task;
53 /// use tokio::time;
54 /// use std::time::Duration;
55 ///
56 /// # #[tokio::main] async fn main() {
57 /// let original_task = task::spawn(async {
58 /// let _detached_task = task::spawn(async {
59 /// // Here we sleep to make sure that the first task returns before.
60 /// time::delay_for(Duration::from_millis(10)).await;
61 /// // This will be called, even though the JoinHandle is dropped.
62 /// println!("♫ Still alive ♫");
63 /// });
64 /// });
65 ///
66 /// original_task.await.expect("The task being joined has panicked");
67 /// println!("Original task is joined.");
68 ///
69 /// // We make sure that the new task has time to run, before the main
70 /// // task returns.
71 ///
72 /// time::delay_for(Duration::from_millis(1000)).await;
73 /// # }
74 /// ```
75 ///
76 /// [`task::spawn`]: crate::task::spawn()
77 /// [`task::spawn_blocking`]: crate::task::spawn_blocking
78 /// [`std::thread::JoinHandle`]: std::thread::JoinHandle
79 pub struct JoinHandle<T> {
80 raw: Option<RawTask>,
81 _p: PhantomData<T>,
82 }
83}
84
85unsafe impl<T: Send> Send for JoinHandle<T> {}
86unsafe impl<T: Send> Sync for JoinHandle<T> {}
87
88impl<T> JoinHandle<T> {
89 pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
90 JoinHandle {
91 raw: Some(raw),
92 _p: PhantomData,
93 }
94 }
95}
96
97impl<T> Unpin for JoinHandle<T> {}
98
99impl<T> Future for JoinHandle<T> {
100 type Output = super::Result<T>;
101
102 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
103 use std::mem::MaybeUninit;
104
105 // Raw should always be set
106 let raw = self.raw.as_ref().unwrap();
107
108 // Load the current task state
109 let mut state = raw.header().state.load();
110
111 debug_assert!(state.is_join_interested());
112
113 if state.is_active() {
114 state = if state.has_join_waker() {
115 raw.swap_join_waker(cx.waker(), state)
116 } else {
117 raw.store_join_waker(cx.waker())
118 };
119
120 if state.is_active() {
121 return Poll::Pending;
122 }
123 }
124
125 let mut out = MaybeUninit::<Track<Self::Output>>::uninit();
126
127 unsafe {
128 // This could result in the task being freed.
129 raw.read_output(out.as_mut_ptr() as *mut (), state);
130
131 self.raw = None;
132
133 Poll::Ready(out.assume_init().into_inner())
134 }
135 }
136}
137
138impl<T> Drop for JoinHandle<T> {
139 fn drop(&mut self) {
140 if let Some(raw) = self.raw.take() {
141 if raw.header().state.drop_join_handle_fast() {
142 return;
143 }
144
145 raw.drop_join_handle_slow();
146 }
147 }
148}
149
150impl<T> fmt::Debug for JoinHandle<T>
151where
152 T: fmt::Debug,
153{
154 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
155 fmt.debug_struct("JoinHandle").finish()
156 }
157}