async_std/task/
builder.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use pin_project_lite::pin_project;
7
8use crate::io;
9use crate::task::{JoinHandle, Task, TaskLocalsWrapper};
10
11/// Task builder that configures the settings of a new task.
12#[derive(Debug, Default)]
13pub struct Builder {
14    pub(crate) name: Option<String>,
15}
16
17impl Builder {
18    /// Creates a new builder.
19    #[inline]
20    pub fn new() -> Builder {
21        Builder { name: None }
22    }
23
24    /// Configures the name of the task.
25    #[inline]
26    pub fn name(mut self, name: String) -> Builder {
27        self.name = Some(name);
28        self
29    }
30
31    fn build<F, T>(self, future: F) -> SupportTaskLocals<F>
32    where
33        F: Future<Output = T>,
34    {
35        let name = self.name.map(Arc::new);
36
37        // Create a new task handle.
38        let task = Task::new(name);
39
40        #[cfg(not(target_os = "unknown"))]
41        once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
42
43        let tag = TaskLocalsWrapper::new(task);
44
45        SupportTaskLocals { tag, future }
46    }
47
48    /// Spawns a task with the configured settings.
49    #[cfg(not(target_os = "unknown"))]
50    pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
51    where
52        F: Future<Output = T> + Send + 'static,
53        T: Send + 'static,
54    {
55        let wrapped = self.build(future);
56
57        kv_log_macro::trace!("spawn", {
58            task_id: wrapped.tag.id().0,
59            parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0),
60        });
61
62        let task = wrapped.tag.task().clone();
63        let handle = async_global_executor::spawn(wrapped);
64
65        Ok(JoinHandle::new(handle, task))
66    }
67
68    /// Spawns a task locally with the configured settings.
69    #[cfg(all(not(target_os = "unknown"), feature = "unstable"))]
70    pub fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
71    where
72        F: Future<Output = T> + 'static,
73        T: 'static,
74    {
75        let wrapped = self.build(future);
76
77        kv_log_macro::trace!("spawn_local", {
78            task_id: wrapped.tag.id().0,
79            parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0),
80        });
81
82        let task = wrapped.tag.task().clone();
83        let handle = async_global_executor::spawn_local(wrapped);
84
85        Ok(JoinHandle::new(handle, task))
86    }
87
88    /// Spawns a task locally with the configured settings.
89    #[cfg(all(target_arch = "wasm32", feature = "unstable"))]
90    pub fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
91    where
92        F: Future<Output = T> + 'static,
93        T: 'static,
94    {
95        use futures_channel::oneshot::channel;
96        let (sender, receiver) = channel();
97
98        let wrapped = self.build(async move {
99            let res = future.await;
100            let _ = sender.send(res);
101        });
102        kv_log_macro::trace!("spawn_local", {
103            task_id: wrapped.tag.id().0,
104            parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0),
105        });
106
107        let task = wrapped.tag.task().clone();
108        wasm_bindgen_futures::spawn_local(wrapped);
109
110        Ok(JoinHandle::new(receiver, task))
111    }
112
113    /// Spawns a task locally with the configured settings.
114    #[cfg(all(target_arch = "wasm32", not(feature = "unstable")))]
115    pub(crate) fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
116    where
117        F: Future<Output = T> + 'static,
118        T: 'static,
119    {
120        use futures_channel::oneshot::channel;
121        let (sender, receiver) = channel();
122
123        let wrapped = self.build(async move {
124            let res = future.await;
125            let _ = sender.send(res);
126        });
127
128        kv_log_macro::trace!("spawn_local", {
129            task_id: wrapped.tag.id().0,
130            parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0),
131        });
132
133        let task = wrapped.tag.task().clone();
134        wasm_bindgen_futures::spawn_local(wrapped);
135
136        Ok(JoinHandle::new(receiver, task))
137    }
138
139    /// Spawns a task with the configured settings, blocking on its execution.
140    #[cfg(not(target_os = "unknown"))]
141    pub fn blocking<F, T>(self, future: F) -> T
142    where
143        F: Future<Output = T>,
144    {
145        use std::cell::Cell;
146
147        let wrapped = self.build(future);
148
149        // Log this `block_on` operation.
150        kv_log_macro::trace!("block_on", {
151            task_id: wrapped.tag.id().0,
152            parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0),
153        });
154
155        thread_local! {
156            /// Tracks the number of nested block_on calls.
157            static NUM_NESTED_BLOCKING: Cell<usize> = Cell::new(0);
158        }
159
160        // Run the future as a task.
161        NUM_NESTED_BLOCKING.with(|num_nested_blocking| {
162            let count = num_nested_blocking.get();
163            let should_run = count == 0;
164            // increase the count
165            num_nested_blocking.replace(count + 1);
166
167            unsafe {
168                TaskLocalsWrapper::set_current(&wrapped.tag, || {
169                    let res = if should_run {
170                        // The first call should run the executor
171                        async_global_executor::block_on(wrapped)
172                    } else {
173                        futures_lite::future::block_on(wrapped)
174                    };
175                    num_nested_blocking.replace(num_nested_blocking.get() - 1);
176                    res
177                })
178            }
179        })
180    }
181}
182
183pin_project! {
184    /// Wrapper to add support for task locals.
185    struct SupportTaskLocals<F> {
186        tag: TaskLocalsWrapper,
187        #[pin]
188        future: F,
189    }
190}
191
192impl<F: Future> Future for SupportTaskLocals<F> {
193    type Output = F::Output;
194
195    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
196        unsafe {
197            TaskLocalsWrapper::set_current(&self.tag, || {
198                let this = self.project();
199                this.future.poll(cx)
200            })
201        }
202    }
203}