async_std/task/
builder.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use pin_project_lite::pin_project;
7
8use crate::io;
9use crate::task::{JoinHandle, Task, TaskLocalsWrapper};
10
11#[derive(Debug, Default)]
13pub struct Builder {
14 pub(crate) name: Option<String>,
15}
16
17impl Builder {
18 #[inline]
20 pub fn new() -> Builder {
21 Builder { name: None }
22 }
23
24 #[inline]
26 pub fn name(mut self, name: String) -> Builder {
27 self.name = Some(name);
28 self
29 }
30
31 fn build<F, T>(self, future: F) -> SupportTaskLocals<F>
32 where
33 F: Future<Output = T>,
34 {
35 let name = self.name.map(Arc::new);
36
37 let task = Task::new(name);
39
40 #[cfg(not(target_os = "unknown"))]
41 once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
42
43 let tag = TaskLocalsWrapper::new(task);
44
45 SupportTaskLocals { tag, future }
46 }
47
48 #[cfg(not(target_os = "unknown"))]
50 pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
51 where
52 F: Future<Output = T> + Send + 'static,
53 T: Send + 'static,
54 {
55 let wrapped = self.build(future);
56
57 kv_log_macro::trace!("spawn", {
58 task_id: wrapped.tag.id().0,
59 parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0),
60 });
61
62 let task = wrapped.tag.task().clone();
63 let handle = async_global_executor::spawn(wrapped);
64
65 Ok(JoinHandle::new(handle, task))
66 }
67
68 #[cfg(all(not(target_os = "unknown"), feature = "unstable"))]
70 pub fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
71 where
72 F: Future<Output = T> + 'static,
73 T: 'static,
74 {
75 let wrapped = self.build(future);
76
77 kv_log_macro::trace!("spawn_local", {
78 task_id: wrapped.tag.id().0,
79 parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0),
80 });
81
82 let task = wrapped.tag.task().clone();
83 let handle = async_global_executor::spawn_local(wrapped);
84
85 Ok(JoinHandle::new(handle, task))
86 }
87
88 #[cfg(all(target_arch = "wasm32", feature = "unstable"))]
90 pub fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
91 where
92 F: Future<Output = T> + 'static,
93 T: 'static,
94 {
95 use futures_channel::oneshot::channel;
96 let (sender, receiver) = channel();
97
98 let wrapped = self.build(async move {
99 let res = future.await;
100 let _ = sender.send(res);
101 });
102 kv_log_macro::trace!("spawn_local", {
103 task_id: wrapped.tag.id().0,
104 parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0),
105 });
106
107 let task = wrapped.tag.task().clone();
108 wasm_bindgen_futures::spawn_local(wrapped);
109
110 Ok(JoinHandle::new(receiver, task))
111 }
112
113 #[cfg(all(target_arch = "wasm32", not(feature = "unstable")))]
115 pub(crate) fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
116 where
117 F: Future<Output = T> + 'static,
118 T: 'static,
119 {
120 use futures_channel::oneshot::channel;
121 let (sender, receiver) = channel();
122
123 let wrapped = self.build(async move {
124 let res = future.await;
125 let _ = sender.send(res);
126 });
127
128 kv_log_macro::trace!("spawn_local", {
129 task_id: wrapped.tag.id().0,
130 parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0),
131 });
132
133 let task = wrapped.tag.task().clone();
134 wasm_bindgen_futures::spawn_local(wrapped);
135
136 Ok(JoinHandle::new(receiver, task))
137 }
138
139 #[cfg(not(target_os = "unknown"))]
141 pub fn blocking<F, T>(self, future: F) -> T
142 where
143 F: Future<Output = T>,
144 {
145 use std::cell::Cell;
146
147 let wrapped = self.build(future);
148
149 kv_log_macro::trace!("block_on", {
151 task_id: wrapped.tag.id().0,
152 parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0),
153 });
154
155 thread_local! {
156 static NUM_NESTED_BLOCKING: Cell<usize> = Cell::new(0);
158 }
159
160 NUM_NESTED_BLOCKING.with(|num_nested_blocking| {
162 let count = num_nested_blocking.get();
163 let should_run = count == 0;
164 num_nested_blocking.replace(count + 1);
166
167 unsafe {
168 TaskLocalsWrapper::set_current(&wrapped.tag, || {
169 let res = if should_run {
170 async_global_executor::block_on(wrapped)
172 } else {
173 futures_lite::future::block_on(wrapped)
174 };
175 num_nested_blocking.replace(num_nested_blocking.get() - 1);
176 res
177 })
178 }
179 })
180 }
181}
182
183pin_project! {
184 struct SupportTaskLocals<F> {
186 tag: TaskLocalsWrapper,
187 #[pin]
188 future: F,
189 }
190}
191
192impl<F: Future> Future for SupportTaskLocals<F> {
193 type Output = F::Output;
194
195 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
196 unsafe {
197 TaskLocalsWrapper::set_current(&self.tag, || {
198 let this = self.project();
199 this.future.poll(cx)
200 })
201 }
202 }
203}