tokio_threadpool/blocking/
mod.rs

1use worker::Worker;
2
3use futures::{Async, Poll};
4use tokio_executor;
5
6use std::error::Error;
7use std::fmt;
8
9mod global;
10pub use self::global::blocking;
11#[doc(hidden)]
12pub use self::global::{set_default, with_default, DefaultGuard};
13
14/// Error raised by `blocking`.
15pub struct BlockingError {
16    _p: (),
17}
18
19/// A function implementing the behavior run on calls to `blocking`.
20///
21/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
22/// backwards-compatibility layer. In general, user code should not override the
23/// blocking implementation. If you use this, make sure you know what you're
24/// doing.
25#[doc(hidden)]
26pub type BlockingImpl = fn(&mut dyn FnMut()) -> Poll<(), BlockingError>;
27
28fn default_blocking(f: &mut dyn FnMut()) -> Poll<(), BlockingError> {
29    let res = Worker::with_current(|worker| {
30        let worker = match worker {
31            Some(worker) => worker,
32            None => {
33                return Err(BlockingError::new());
34            }
35        };
36
37        // Transition the worker state to blocking. This will exit the fn early
38        // with `NotReady` if the pool does not have enough capacity to enter
39        // blocking mode.
40        worker.transition_to_blocking()
41    });
42
43    // If the transition cannot happen, exit early
44    try_ready!(res);
45
46    // Currently in blocking mode, so call the inner closure.
47    //
48    // "Exit" the current executor in case the blocking function wants
49    // to call a different executor.
50    tokio_executor::exit(move || (f)());
51
52    // Try to transition out of blocking mode. This is a fast path that takes
53    // back ownership of the worker if the worker handoff didn't complete yet.
54    Worker::with_current(|worker| {
55        // Worker must be set since it was above.
56        worker.unwrap().transition_from_blocking();
57    });
58
59    Ok(Async::Ready(()))
60}
61
62impl BlockingError {
63    /// Returns a new `BlockingError`.
64    #[doc(hidden)]
65    pub fn new() -> Self {
66        Self { _p: () }
67    }
68}
69
70impl fmt::Display for BlockingError {
71    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
72        write!(fmt, "{}", self.description())
73    }
74}
75
76impl fmt::Debug for BlockingError {
77    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
78        f.debug_struct("BlockingError")
79            .field("reason", &self.description())
80            .finish()
81    }
82}
83
84impl Error for BlockingError {
85    fn description(&self) -> &str {
86        "`blocking` annotation used from outside the context of a thread pool"
87    }
88}