tokio_threadpool/blocking/global.rs
1use super::{BlockingError, BlockingImpl};
2use futures::Poll;
3use std::cell::Cell;
4use std::fmt;
5use std::marker::PhantomData;
6use tokio_executor::Enter;
7
8thread_local! {
9 static CURRENT: Cell<BlockingImpl> = Cell::new(super::default_blocking);
10}
11
12/// Ensures that the executor is removed from the thread-local context
13/// when leaving the scope. This handles cases that involve panicking.
14///
15/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
16/// backwards-compatibility layer. In general, user code should not override the
17/// blocking implementation. If you use this, make sure you know what you're
18/// doing.
19pub struct DefaultGuard<'a> {
20 prior: BlockingImpl,
21 _lifetime: PhantomData<&'a ()>,
22}
23
24/// Set the default blocking implementation, returning a guard that resets the
25/// blocking implementation when dropped.
26///
27/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
28/// backwards-compatibility layer. In general, user code should not override the
29/// blocking implementation. If you use this, make sure you know what you're
30/// doing.
31pub fn set_default<'a>(blocking: BlockingImpl) -> DefaultGuard<'a> {
32 CURRENT.with(|cell| {
33 let prior = cell.replace(blocking);
34 DefaultGuard {
35 prior,
36 _lifetime: PhantomData,
37 }
38 })
39}
40
41/// Set the default blocking implementation for the duration of the closure.
42///
43/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
44/// backwards-compatibility layer. In general, user code should not override the
45/// blocking implementation. If you use this, make sure you know what you're
46/// doing.
47pub fn with_default<F, R>(blocking: BlockingImpl, enter: &mut Enter, f: F) -> R
48where
49 F: FnOnce(&mut Enter) -> R,
50{
51 let _guard = set_default(blocking);
52 f(enter)
53}
54
55/// Enter a blocking section of code.
56///
57/// The `blocking` function annotates a section of code that performs a blocking
58/// operation, either by issuing a blocking syscall or by performing a long
59/// running CPU-bound computation.
60///
61/// When the `blocking` function enters, it hands off the responsibility of
62/// processing the current work queue to another thread. Then, it calls the
63/// supplied closure. The closure is permitted to block indefinitely.
64///
65/// If the maximum number of concurrent `blocking` calls has been reached, then
66/// `NotReady` is returned and the task is notified once existing `blocking`
67/// calls complete. The maximum value is specified when creating a thread pool
68/// using [`Builder::max_blocking`][build]
69///
70/// NB: The entire task that called `blocking` is blocked whenever the supplied
71/// closure blocks, even if you have used future combinators such as `select` -
72/// the other futures in this task will not make progress until the closure
73/// returns.
74/// If this is not desired, ensure that `blocking` runs in its own task (e.g.
75/// using `futures::sync::oneshot::spawn`).
76///
77/// [build]: struct.Builder.html#method.max_blocking
78///
79/// # Return
80///
81/// When the blocking closure is executed, `Ok(Ready(T))` is returned, where
82/// `T` is the closure's return value.
83///
84/// If the thread pool has shutdown, `Err` is returned.
85///
86/// If the number of concurrent `blocking` calls has reached the maximum,
87/// `Ok(NotReady)` is returned and the current task is notified when a call to
88/// `blocking` will succeed.
89///
90/// If `blocking` is called from outside the context of a Tokio thread pool,
91/// `Err` is returned.
92///
93/// # Background
94///
95/// By default, the Tokio thread pool expects that tasks will only run for short
96/// periods at a time before yielding back to the thread pool. This is the basic
97/// premise of cooperative multitasking.
98///
99/// However, it is common to want to perform a blocking operation while
100/// processing an asynchronous computation. Examples of blocking operation
101/// include:
102///
103/// * Performing synchronous file operations (reading and writing).
104/// * Blocking on acquiring a mutex.
105/// * Performing a CPU bound computation, like cryptographic encryption or
106/// decryption.
107///
108/// One option for dealing with blocking operations in an asynchronous context
109/// is to use a thread pool dedicated to performing these operations. This not
110/// ideal as it requires bidirectional message passing as well as a channel to
111/// communicate which adds a level of buffering.
112///
113/// Instead, `blocking` hands off the responsibility of processing the work queue
114/// to another thread. This hand off is light compared to a channel and does not
115/// require buffering.
116///
117/// # Examples
118///
119/// Block on receiving a message from a `std` channel. This example is a little
120/// silly as using the non-blocking channel from the `futures` crate would make
121/// more sense. The blocking receive can be replaced with any blocking operation
122/// that needs to be performed.
123///
124/// ```rust
125/// # extern crate futures;
126/// # extern crate tokio_threadpool;
127///
128/// use tokio_threadpool::{ThreadPool, blocking};
129///
130/// use futures::Future;
131/// use futures::future::{lazy, poll_fn};
132///
133/// use std::sync::mpsc;
134/// use std::thread;
135/// use std::time::Duration;
136///
137/// pub fn main() {
138/// // This is a *blocking* channel
139/// let (tx, rx) = mpsc::channel();
140///
141/// // Spawn a thread to send a message
142/// thread::spawn(move || {
143/// thread::sleep(Duration::from_millis(500));
144/// tx.send("hello").unwrap();
145/// });
146///
147/// let pool = ThreadPool::new();
148///
149/// pool.spawn(lazy(move || {
150/// // Because `blocking` returns `Poll`, it is intended to be used
151/// // from the context of a `Future` implementation. Since we don't
152/// // have a complicated requirement, we can use `poll_fn` in this
153/// // case.
154/// poll_fn(move || {
155/// blocking(|| {
156/// let msg = rx.recv().unwrap();
157/// println!("message = {}", msg);
158/// }).map_err(|_| panic!("the threadpool shut down"))
159/// })
160/// }));
161///
162/// // Wait for the task we just spawned to complete.
163/// pool.shutdown_on_idle().wait().unwrap();
164/// }
165/// ```
166pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
167where
168 F: FnOnce() -> T,
169{
170 CURRENT.with(|cell| {
171 let blocking = cell.get();
172
173 // Object-safety workaround: the `Blocking` trait must be object-safe,
174 // since we use a trait object in the thread-local. However, a blocking
175 // _operation_ will be generic over the return type of the blocking
176 // function. Therefore, rather than passing a function with a return
177 // type to `Blocking::run_blocking`, we pass a _new_ closure which
178 // doesn't have a return value. That closure invokes the blocking
179 // function and assigns its value to `ret`, which we then unpack when
180 // the blocking call finishes.
181 let mut f = Some(f);
182 let mut ret = None;
183 {
184 let ret2 = &mut ret;
185 let mut run = move || {
186 let f = f
187 .take()
188 .expect("blocking closure invoked twice; this is a bug!");
189 *ret2 = Some((f)());
190 };
191
192 try_ready!((blocking)(&mut run));
193 }
194
195 // Return the result
196 let ret =
197 ret.expect("blocking function finished, but return value was unset; this is a bug!");
198 Ok(ret.into())
199 })
200}
201
202// === impl DefaultGuard ===
203
204impl<'a> fmt::Debug for DefaultGuard<'a> {
205 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
206 f.pad("DefaultGuard { .. }")
207 }
208}
209
210impl<'a> Drop for DefaultGuard<'a> {
211 fn drop(&mut self) {
212 // if the TLS value has already been torn down, there's nothing else we
213 // can do. we're almost certainly panicking anyway.
214 let _ = CURRENT.try_with(|cell| {
215 cell.set(self.prior);
216 });
217 }
218}