tokio_executor/
global.rs

1use super::{Enter, Executor, SpawnError};
2
3use futures::{future, Future};
4
5use std::cell::Cell;
6
7/// Executes futures on the default executor for the current execution context.
8///
9/// `DefaultExecutor` implements `Executor` and can be used to spawn futures
10/// without referencing a specific executor.
11///
12/// When an executor starts, it sets the `DefaultExecutor` handle to point to an
13/// executor (usually itself) that is used to spawn new tasks.
14///
15/// The current `DefaultExecutor` reference is tracked using a thread-local
16/// variable and is set using `tokio_executor::with_default`
17#[derive(Debug, Clone)]
18pub struct DefaultExecutor {
19    _dummy: (),
20}
21
22/// Ensures that the executor is removed from the thread-local context
23/// when leaving the scope. This handles cases that involve panicking.
24#[derive(Debug)]
25pub struct DefaultGuard {
26    _p: (),
27}
28
29impl DefaultExecutor {
30    /// Returns a handle to the default executor for the current context.
31    ///
32    /// Futures may be spawned onto the default executor using this handle.
33    ///
34    /// The returned handle will reference whichever executor is configured as
35    /// the default **at the time `spawn` is called**. This enables
36    /// `DefaultExecutor::current()` to be called before an execution context is
37    /// setup, then passed **into** an execution context before it is used.
38    ///
39    /// This is also true for sending the handle across threads, so calling
40    /// `DefaultExecutor::current()` on thread A and then sending the result to
41    /// thread B will _not_ reference the default executor that was set on thread A.
42    pub fn current() -> DefaultExecutor {
43        DefaultExecutor { _dummy: () }
44    }
45
46    #[inline]
47    fn with_current<F: FnOnce(&mut dyn Executor) -> R, R>(f: F) -> Option<R> {
48        EXECUTOR.with(
49            |current_executor| match current_executor.replace(State::Active) {
50                State::Ready(executor_ptr) => {
51                    let executor = unsafe { &mut *executor_ptr };
52                    let result = f(executor);
53                    current_executor.set(State::Ready(executor_ptr));
54                    Some(result)
55                }
56                State::Empty | State::Active => None,
57            },
58        )
59    }
60}
61
62#[derive(Clone, Copy)]
63enum State {
64    // default executor not defined
65    Empty,
66    // default executor is defined and ready to be used
67    Ready(*mut dyn Executor),
68    // default executor is currently active (used to detect recursive calls)
69    Active,
70}
71
72thread_local! {
73    /// Thread-local tracking the current executor
74    static EXECUTOR: Cell<State> = Cell::new(State::Empty)
75}
76
77// ===== impl DefaultExecutor =====
78
79impl super::Executor for DefaultExecutor {
80    fn spawn(
81        &mut self,
82        future: Box<dyn Future<Item = (), Error = ()> + Send>,
83    ) -> Result<(), SpawnError> {
84        DefaultExecutor::with_current(|executor| executor.spawn(future))
85            .unwrap_or_else(|| Err(SpawnError::shutdown()))
86    }
87
88    fn status(&self) -> Result<(), SpawnError> {
89        DefaultExecutor::with_current(|executor| executor.status())
90            .unwrap_or_else(|| Err(SpawnError::shutdown()))
91    }
92}
93
94impl<T> super::TypedExecutor<T> for DefaultExecutor
95where
96    T: Future<Item = (), Error = ()> + Send + 'static,
97{
98    fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
99        super::Executor::spawn(self, Box::new(future))
100    }
101
102    fn status(&self) -> Result<(), SpawnError> {
103        super::Executor::status(self)
104    }
105}
106
107impl<T> future::Executor<T> for DefaultExecutor
108where
109    T: Future<Item = (), Error = ()> + Send + 'static,
110{
111    fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
112        if let Err(e) = super::Executor::status(self) {
113            let kind = if e.is_at_capacity() {
114                future::ExecuteErrorKind::NoCapacity
115            } else {
116                future::ExecuteErrorKind::Shutdown
117            };
118
119            return Err(future::ExecuteError::new(kind, future));
120        }
121
122        let _ = DefaultExecutor::with_current(|executor| executor.spawn(Box::new(future)));
123        Ok(())
124    }
125}
126
127// ===== global spawn fns =====
128
129/// Submits a future for execution on the default executor -- usually a
130/// threadpool.
131///
132/// Futures are lazy constructs. When they are defined, no work happens. In
133/// order for the logic defined by the future to be run, the future must be
134/// spawned on an executor. This function is the easiest way to do so.
135///
136/// This function must be called from an execution context, i.e. from a future
137/// that has been already spawned onto an executor.
138///
139/// Once spawned, the future will execute. The details of how that happens is
140/// left up to the executor instance. If the executor is a thread pool, the
141/// future will be pushed onto a queue that a worker thread polls from. If the
142/// executor is a "current thread" executor, the future might be polled
143/// immediately from within the call to `spawn` or it might be pushed onto an
144/// internal queue.
145///
146/// # Panics
147///
148/// This function will panic if the default executor is not set or if spawning
149/// onto the default executor returns an error. To avoid the panic, use the
150/// `DefaultExecutor` handle directly.
151///
152/// # Examples
153///
154/// ```rust
155/// # extern crate futures;
156/// # extern crate tokio_executor;
157/// # use tokio_executor::spawn;
158/// # pub fn dox() {
159/// use futures::future::lazy;
160///
161/// spawn(lazy(|| {
162///     println!("running on the default executor");
163///     Ok(())
164/// }));
165/// # }
166/// # pub fn main() {}
167/// ```
168pub fn spawn<T>(future: T)
169where
170    T: Future<Item = (), Error = ()> + Send + 'static,
171{
172    DefaultExecutor::current().spawn(Box::new(future)).unwrap()
173}
174
175/// Set the default executor for the duration of the closure
176///
177/// # Panics
178///
179/// This function panics if there already is a default executor set.
180pub fn with_default<T, F, R>(executor: &mut T, enter: &mut Enter, f: F) -> R
181where
182    T: Executor,
183    F: FnOnce(&mut Enter) -> R,
184{
185    unsafe fn hide_lt<'a>(p: *mut (dyn Executor + 'a)) -> *mut (dyn Executor + 'static) {
186        use std::mem;
187        mem::transmute(p)
188    }
189
190    EXECUTOR.with(|cell| {
191        match cell.get() {
192            State::Ready(_) | State::Active => {
193                panic!("default executor already set for execution context")
194            }
195            _ => {}
196        }
197
198        // Ensure that the executor is removed from the thread-local context
199        // when leaving the scope. This handles cases that involve panicking.
200        struct Reset<'a>(&'a Cell<State>);
201
202        impl<'a> Drop for Reset<'a> {
203            fn drop(&mut self) {
204                self.0.set(State::Empty);
205            }
206        }
207
208        let _reset = Reset(cell);
209
210        // While scary, this is safe. The function takes a
211        // `&mut Executor`, which guarantees that the reference lives for the
212        // duration of `with_default`.
213        //
214        // Because we are always clearing the TLS value at the end of the
215        // function, we can cast the reference to 'static which thread-local
216        // cells require.
217        let executor = unsafe { hide_lt(executor as &mut _ as *mut _) };
218
219        cell.set(State::Ready(executor));
220
221        f(enter)
222    })
223}
224
225/// Sets `executor` as the default executor, returning a guard that unsets it when
226/// dropped.
227///
228/// # Panics
229///
230/// This function panics if there already is a default executor set.
231pub fn set_default<T>(executor: T) -> DefaultGuard
232where
233    T: Executor + 'static,
234{
235    EXECUTOR.with(|cell| {
236        match cell.get() {
237            State::Ready(_) | State::Active => {
238                panic!("default executor already set for execution context")
239            }
240            _ => {}
241        }
242
243        // Ensure that the executor will outlive the call to set_default, even
244        // if the drop guard is never dropped due to calls to `mem::forget` or
245        // similar.
246        let executor = Box::new(executor);
247
248        cell.set(State::Ready(Box::into_raw(executor)));
249    });
250
251    DefaultGuard { _p: () }
252}
253
254impl Drop for DefaultGuard {
255    fn drop(&mut self) {
256        let _ = EXECUTOR.try_with(|cell| {
257            if let State::Ready(prev) = cell.replace(State::Empty) {
258                // drop the previous executor.
259                unsafe {
260                    let prev = Box::from_raw(prev);
261                    drop(prev);
262                };
263            }
264        });
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::{with_default, DefaultExecutor, Executor};
271
272    #[test]
273    fn default_executor_is_send_and_sync() {
274        fn assert_send_sync<T: Send + Sync>() {}
275
276        assert_send_sync::<DefaultExecutor>();
277    }
278
279    #[test]
280    fn nested_default_executor_status() {
281        let mut enter = super::super::enter().unwrap();
282        let mut executor = DefaultExecutor::current();
283
284        let result = with_default(&mut executor, &mut enter, |_| {
285            DefaultExecutor::current().status()
286        });
287
288        assert!(result.err().unwrap().is_shutdown())
289    }
290}