tokio_executor/global.rs
1use super::{Enter, Executor, SpawnError};
2
3use futures::{future, Future};
4
5use std::cell::Cell;
6
7/// Executes futures on the default executor for the current execution context.
8///
9/// `DefaultExecutor` implements `Executor` and can be used to spawn futures
10/// without referencing a specific executor.
11///
12/// When an executor starts, it sets the `DefaultExecutor` handle to point to an
13/// executor (usually itself) that is used to spawn new tasks.
14///
15/// The current `DefaultExecutor` reference is tracked using a thread-local
16/// variable and is set using `tokio_executor::with_default`
17#[derive(Debug, Clone)]
18pub struct DefaultExecutor {
19 _dummy: (),
20}
21
22/// Ensures that the executor is removed from the thread-local context
23/// when leaving the scope. This handles cases that involve panicking.
24#[derive(Debug)]
25pub struct DefaultGuard {
26 _p: (),
27}
28
29impl DefaultExecutor {
30 /// Returns a handle to the default executor for the current context.
31 ///
32 /// Futures may be spawned onto the default executor using this handle.
33 ///
34 /// The returned handle will reference whichever executor is configured as
35 /// the default **at the time `spawn` is called**. This enables
36 /// `DefaultExecutor::current()` to be called before an execution context is
37 /// setup, then passed **into** an execution context before it is used.
38 ///
39 /// This is also true for sending the handle across threads, so calling
40 /// `DefaultExecutor::current()` on thread A and then sending the result to
41 /// thread B will _not_ reference the default executor that was set on thread A.
42 pub fn current() -> DefaultExecutor {
43 DefaultExecutor { _dummy: () }
44 }
45
46 #[inline]
47 fn with_current<F: FnOnce(&mut dyn Executor) -> R, R>(f: F) -> Option<R> {
48 EXECUTOR.with(
49 |current_executor| match current_executor.replace(State::Active) {
50 State::Ready(executor_ptr) => {
51 let executor = unsafe { &mut *executor_ptr };
52 let result = f(executor);
53 current_executor.set(State::Ready(executor_ptr));
54 Some(result)
55 }
56 State::Empty | State::Active => None,
57 },
58 )
59 }
60}
61
62#[derive(Clone, Copy)]
63enum State {
64 // default executor not defined
65 Empty,
66 // default executor is defined and ready to be used
67 Ready(*mut dyn Executor),
68 // default executor is currently active (used to detect recursive calls)
69 Active,
70}
71
72thread_local! {
73 /// Thread-local tracking the current executor
74 static EXECUTOR: Cell<State> = Cell::new(State::Empty)
75}
76
77// ===== impl DefaultExecutor =====
78
79impl super::Executor for DefaultExecutor {
80 fn spawn(
81 &mut self,
82 future: Box<dyn Future<Item = (), Error = ()> + Send>,
83 ) -> Result<(), SpawnError> {
84 DefaultExecutor::with_current(|executor| executor.spawn(future))
85 .unwrap_or_else(|| Err(SpawnError::shutdown()))
86 }
87
88 fn status(&self) -> Result<(), SpawnError> {
89 DefaultExecutor::with_current(|executor| executor.status())
90 .unwrap_or_else(|| Err(SpawnError::shutdown()))
91 }
92}
93
94impl<T> super::TypedExecutor<T> for DefaultExecutor
95where
96 T: Future<Item = (), Error = ()> + Send + 'static,
97{
98 fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
99 super::Executor::spawn(self, Box::new(future))
100 }
101
102 fn status(&self) -> Result<(), SpawnError> {
103 super::Executor::status(self)
104 }
105}
106
107impl<T> future::Executor<T> for DefaultExecutor
108where
109 T: Future<Item = (), Error = ()> + Send + 'static,
110{
111 fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
112 if let Err(e) = super::Executor::status(self) {
113 let kind = if e.is_at_capacity() {
114 future::ExecuteErrorKind::NoCapacity
115 } else {
116 future::ExecuteErrorKind::Shutdown
117 };
118
119 return Err(future::ExecuteError::new(kind, future));
120 }
121
122 let _ = DefaultExecutor::with_current(|executor| executor.spawn(Box::new(future)));
123 Ok(())
124 }
125}
126
127// ===== global spawn fns =====
128
129/// Submits a future for execution on the default executor -- usually a
130/// threadpool.
131///
132/// Futures are lazy constructs. When they are defined, no work happens. In
133/// order for the logic defined by the future to be run, the future must be
134/// spawned on an executor. This function is the easiest way to do so.
135///
136/// This function must be called from an execution context, i.e. from a future
137/// that has been already spawned onto an executor.
138///
139/// Once spawned, the future will execute. The details of how that happens is
140/// left up to the executor instance. If the executor is a thread pool, the
141/// future will be pushed onto a queue that a worker thread polls from. If the
142/// executor is a "current thread" executor, the future might be polled
143/// immediately from within the call to `spawn` or it might be pushed onto an
144/// internal queue.
145///
146/// # Panics
147///
148/// This function will panic if the default executor is not set or if spawning
149/// onto the default executor returns an error. To avoid the panic, use the
150/// `DefaultExecutor` handle directly.
151///
152/// # Examples
153///
154/// ```rust
155/// # extern crate futures;
156/// # extern crate tokio_executor;
157/// # use tokio_executor::spawn;
158/// # pub fn dox() {
159/// use futures::future::lazy;
160///
161/// spawn(lazy(|| {
162/// println!("running on the default executor");
163/// Ok(())
164/// }));
165/// # }
166/// # pub fn main() {}
167/// ```
168pub fn spawn<T>(future: T)
169where
170 T: Future<Item = (), Error = ()> + Send + 'static,
171{
172 DefaultExecutor::current().spawn(Box::new(future)).unwrap()
173}
174
175/// Set the default executor for the duration of the closure
176///
177/// # Panics
178///
179/// This function panics if there already is a default executor set.
180pub fn with_default<T, F, R>(executor: &mut T, enter: &mut Enter, f: F) -> R
181where
182 T: Executor,
183 F: FnOnce(&mut Enter) -> R,
184{
185 unsafe fn hide_lt<'a>(p: *mut (dyn Executor + 'a)) -> *mut (dyn Executor + 'static) {
186 use std::mem;
187 mem::transmute(p)
188 }
189
190 EXECUTOR.with(|cell| {
191 match cell.get() {
192 State::Ready(_) | State::Active => {
193 panic!("default executor already set for execution context")
194 }
195 _ => {}
196 }
197
198 // Ensure that the executor is removed from the thread-local context
199 // when leaving the scope. This handles cases that involve panicking.
200 struct Reset<'a>(&'a Cell<State>);
201
202 impl<'a> Drop for Reset<'a> {
203 fn drop(&mut self) {
204 self.0.set(State::Empty);
205 }
206 }
207
208 let _reset = Reset(cell);
209
210 // While scary, this is safe. The function takes a
211 // `&mut Executor`, which guarantees that the reference lives for the
212 // duration of `with_default`.
213 //
214 // Because we are always clearing the TLS value at the end of the
215 // function, we can cast the reference to 'static which thread-local
216 // cells require.
217 let executor = unsafe { hide_lt(executor as &mut _ as *mut _) };
218
219 cell.set(State::Ready(executor));
220
221 f(enter)
222 })
223}
224
225/// Sets `executor` as the default executor, returning a guard that unsets it when
226/// dropped.
227///
228/// # Panics
229///
230/// This function panics if there already is a default executor set.
231pub fn set_default<T>(executor: T) -> DefaultGuard
232where
233 T: Executor + 'static,
234{
235 EXECUTOR.with(|cell| {
236 match cell.get() {
237 State::Ready(_) | State::Active => {
238 panic!("default executor already set for execution context")
239 }
240 _ => {}
241 }
242
243 // Ensure that the executor will outlive the call to set_default, even
244 // if the drop guard is never dropped due to calls to `mem::forget` or
245 // similar.
246 let executor = Box::new(executor);
247
248 cell.set(State::Ready(Box::into_raw(executor)));
249 });
250
251 DefaultGuard { _p: () }
252}
253
254impl Drop for DefaultGuard {
255 fn drop(&mut self) {
256 let _ = EXECUTOR.try_with(|cell| {
257 if let State::Ready(prev) = cell.replace(State::Empty) {
258 // drop the previous executor.
259 unsafe {
260 let prev = Box::from_raw(prev);
261 drop(prev);
262 };
263 }
264 });
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::{with_default, DefaultExecutor, Executor};
271
272 #[test]
273 fn default_executor_is_send_and_sync() {
274 fn assert_send_sync<T: Send + Sync>() {}
275
276 assert_send_sync::<DefaultExecutor>();
277 }
278
279 #[test]
280 fn nested_default_executor_status() {
281 let mut enter = super::super::enter().unwrap();
282 let mut executor = DefaultExecutor::current();
283
284 let result = with_default(&mut executor, &mut enter, |_| {
285 DefaultExecutor::current().status()
286 });
287
288 assert!(result.err().unwrap().is_shutdown())
289 }
290}