tokio_threadpool/blocking/
global.rs

1use super::{BlockingError, BlockingImpl};
2use futures::Poll;
3use std::cell::Cell;
4use std::fmt;
5use std::marker::PhantomData;
6use tokio_executor::Enter;
7
8thread_local! {
9    static CURRENT: Cell<BlockingImpl> = Cell::new(super::default_blocking);
10}
11
12/// Ensures that the executor is removed from the thread-local context
13/// when leaving the scope. This handles cases that involve panicking.
14///
15/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
16/// backwards-compatibility layer. In general, user code should not override the
17/// blocking implementation. If you use this, make sure you know what you're
18/// doing.
19pub struct DefaultGuard<'a> {
20    prior: BlockingImpl,
21    _lifetime: PhantomData<&'a ()>,
22}
23
24/// Set the default blocking implementation, returning a guard that resets the
25/// blocking implementation when dropped.
26///
27/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
28/// backwards-compatibility layer. In general, user code should not override the
29/// blocking implementation. If you use this, make sure you know what you're
30/// doing.
31pub fn set_default<'a>(blocking: BlockingImpl) -> DefaultGuard<'a> {
32    CURRENT.with(|cell| {
33        let prior = cell.replace(blocking);
34        DefaultGuard {
35            prior,
36            _lifetime: PhantomData,
37        }
38    })
39}
40
41/// Set the default blocking implementation for the duration of the closure.
42///
43/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
44/// backwards-compatibility layer. In general, user code should not override the
45/// blocking implementation. If you use this, make sure you know what you're
46/// doing.
47pub fn with_default<F, R>(blocking: BlockingImpl, enter: &mut Enter, f: F) -> R
48where
49    F: FnOnce(&mut Enter) -> R,
50{
51    let _guard = set_default(blocking);
52    f(enter)
53}
54
55/// Enter a blocking section of code.
56///
57/// The `blocking` function annotates a section of code that performs a blocking
58/// operation, either by issuing a blocking syscall or by performing a long
59/// running CPU-bound computation.
60///
61/// When the `blocking` function enters, it hands off the responsibility of
62/// processing the current work queue to another thread. Then, it calls the
63/// supplied closure. The closure is permitted to block indefinitely.
64///
65/// If the maximum number of concurrent `blocking` calls has been reached, then
66/// `NotReady` is returned and the task is notified once existing `blocking`
67/// calls complete. The maximum value is specified when creating a thread pool
68/// using [`Builder::max_blocking`][build]
69///
70/// NB: The entire task that called `blocking` is blocked whenever the supplied
71/// closure blocks, even if you have used future combinators such as `select` -
72/// the other futures in this task will not make progress until the closure
73/// returns.
74/// If this is not desired, ensure that `blocking` runs in its own task (e.g.
75/// using `futures::sync::oneshot::spawn`).
76///
77/// [build]: struct.Builder.html#method.max_blocking
78///
79/// # Return
80///
81/// When the blocking closure is executed, `Ok(Ready(T))` is returned, where
82/// `T` is the closure's return value.
83///
84/// If the thread pool has shutdown, `Err` is returned.
85///
86/// If the number of concurrent `blocking` calls has reached the maximum,
87/// `Ok(NotReady)` is returned and the current task is notified when a call to
88/// `blocking` will succeed.
89///
90/// If `blocking` is called from outside the context of a Tokio thread pool,
91/// `Err` is returned.
92///
93/// # Background
94///
95/// By default, the Tokio thread pool expects that tasks will only run for short
96/// periods at a time before yielding back to the thread pool. This is the basic
97/// premise of cooperative multitasking.
98///
99/// However, it is common to want to perform a blocking operation while
100/// processing an asynchronous computation. Examples of blocking operation
101/// include:
102///
103/// * Performing synchronous file operations (reading and writing).
104/// * Blocking on acquiring a mutex.
105/// * Performing a CPU bound computation, like cryptographic encryption or
106///   decryption.
107///
108/// One option for dealing with blocking operations in an asynchronous context
109/// is to use a thread pool dedicated to performing these operations. This not
110/// ideal as it requires bidirectional message passing as well as a channel to
111/// communicate which adds a level of buffering.
112///
113/// Instead, `blocking` hands off the responsibility of processing the work queue
114/// to another thread. This hand off is light compared to a channel and does not
115/// require buffering.
116///
117/// # Examples
118///
119/// Block on receiving a message from a `std` channel. This example is a little
120/// silly as using the non-blocking channel from the `futures` crate would make
121/// more sense. The blocking receive can be replaced with any blocking operation
122/// that needs to be performed.
123///
124/// ```rust
125/// # extern crate futures;
126/// # extern crate tokio_threadpool;
127///
128/// use tokio_threadpool::{ThreadPool, blocking};
129///
130/// use futures::Future;
131/// use futures::future::{lazy, poll_fn};
132///
133/// use std::sync::mpsc;
134/// use std::thread;
135/// use std::time::Duration;
136///
137/// pub fn main() {
138///     // This is a *blocking* channel
139///     let (tx, rx) = mpsc::channel();
140///
141///     // Spawn a thread to send a message
142///     thread::spawn(move || {
143///         thread::sleep(Duration::from_millis(500));
144///         tx.send("hello").unwrap();
145///     });
146///
147///     let pool = ThreadPool::new();
148///
149///     pool.spawn(lazy(move || {
150///         // Because `blocking` returns `Poll`, it is intended to be used
151///         // from the context of a `Future` implementation. Since we don't
152///         // have a complicated requirement, we can use `poll_fn` in this
153///         // case.
154///         poll_fn(move || {
155///             blocking(|| {
156///                 let msg = rx.recv().unwrap();
157///                 println!("message = {}", msg);
158///             }).map_err(|_| panic!("the threadpool shut down"))
159///         })
160///     }));
161///
162///     // Wait for the task we just spawned to complete.
163///     pool.shutdown_on_idle().wait().unwrap();
164/// }
165/// ```
166pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
167where
168    F: FnOnce() -> T,
169{
170    CURRENT.with(|cell| {
171        let blocking = cell.get();
172
173        // Object-safety workaround: the `Blocking` trait must be object-safe,
174        // since we use a trait object in the thread-local. However, a blocking
175        // _operation_ will be generic over the return type of the blocking
176        // function. Therefore, rather than passing a function with a return
177        // type to `Blocking::run_blocking`, we pass a _new_ closure which
178        // doesn't have a return value. That closure invokes the blocking
179        // function and assigns its value to `ret`, which we then unpack when
180        // the blocking call finishes.
181        let mut f = Some(f);
182        let mut ret = None;
183        {
184            let ret2 = &mut ret;
185            let mut run = move || {
186                let f = f
187                    .take()
188                    .expect("blocking closure invoked twice; this is a bug!");
189                *ret2 = Some((f)());
190            };
191
192            try_ready!((blocking)(&mut run));
193        }
194
195        // Return the result
196        let ret =
197            ret.expect("blocking function finished, but return value was unset; this is a bug!");
198        Ok(ret.into())
199    })
200}
201
202// === impl DefaultGuard ===
203
204impl<'a> fmt::Debug for DefaultGuard<'a> {
205    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
206        f.pad("DefaultGuard { .. }")
207    }
208}
209
210impl<'a> Drop for DefaultGuard<'a> {
211    fn drop(&mut self) {
212        // if the TLS value has already been torn down, there's nothing else we
213        // can do. we're almost certainly panicking anyway.
214        let _ = CURRENT.try_with(|cell| {
215            cell.set(self.prior);
216        });
217    }
218}