tokio_threadpool/
thread_pool.rs

1use builder::Builder;
2use pool::Pool;
3use sender::Sender;
4use shutdown::{Shutdown, ShutdownTrigger};
5
6use futures::sync::oneshot;
7use futures::{Future, Poll};
8
9use std::sync::Arc;
10
11/// Work-stealing based thread pool for executing futures.
12///
13/// If a `ThreadPool` instance is dropped without explicitly being shutdown,
14/// `shutdown_now` is called implicitly, forcing all tasks that have not yet
15/// completed to be dropped.
16///
17/// Create `ThreadPool` instances using `Builder`.
18#[derive(Debug)]
19pub struct ThreadPool {
20    inner: Option<Inner>,
21}
22
23#[derive(Debug)]
24struct Inner {
25    sender: Sender,
26    trigger: Arc<ShutdownTrigger>,
27}
28
29impl ThreadPool {
30    /// Create a new `ThreadPool` with default values.
31    ///
32    /// Use [`Builder`] for creating a configured thread pool.
33    ///
34    /// [`Builder`]: struct.Builder.html
35    pub fn new() -> ThreadPool {
36        Builder::new().build()
37    }
38
39    pub(crate) fn new2(pool: Arc<Pool>, trigger: Arc<ShutdownTrigger>) -> ThreadPool {
40        ThreadPool {
41            inner: Some(Inner {
42                sender: Sender { pool },
43                trigger,
44            }),
45        }
46    }
47
48    /// Spawn a future onto the thread pool.
49    ///
50    /// This function takes ownership of the future and randomly assigns it to a
51    /// worker thread. The thread will then start executing the future.
52    ///
53    /// # Examples
54    ///
55    /// ```rust
56    /// # extern crate tokio_threadpool;
57    /// # extern crate futures;
58    /// # use tokio_threadpool::ThreadPool;
59    /// use futures::future::{Future, lazy};
60    ///
61    /// # pub fn main() {
62    /// // Create a thread pool with default configuration values
63    /// let thread_pool = ThreadPool::new();
64    ///
65    /// thread_pool.spawn(lazy(|| {
66    ///     println!("called from a worker thread");
67    ///     Ok(())
68    /// }));
69    ///
70    /// // Gracefully shutdown the threadpool
71    /// thread_pool.shutdown().wait().unwrap();
72    /// # }
73    /// ```
74    ///
75    /// # Panics
76    ///
77    /// This function panics if the spawn fails. Use [`Sender::spawn`] for a
78    /// version that returns a `Result` instead of panicking.
79    pub fn spawn<F>(&self, future: F)
80    where
81        F: Future<Item = (), Error = ()> + Send + 'static,
82    {
83        self.sender().spawn(future).unwrap();
84    }
85
86    /// Spawn a future on to the thread pool, return a future representing
87    /// the produced value.
88    ///
89    /// The SpawnHandle returned is a future that is a proxy for future itself.
90    /// When future completes on this thread pool then the SpawnHandle will itself
91    /// be resolved.
92    ///
93    /// # Examples
94    ///
95    /// ```rust
96    /// # extern crate tokio_threadpool;
97    /// # extern crate futures;
98    /// # use tokio_threadpool::ThreadPool;
99    /// use futures::future::{Future, lazy};
100    ///
101    /// # pub fn main() {
102    /// // Create a thread pool with default configuration values
103    /// let thread_pool = ThreadPool::new();
104    ///
105    /// let handle = thread_pool.spawn_handle(lazy(|| Ok::<_, ()>(42)));
106    ///
107    /// let value = handle.wait().unwrap();
108    /// assert_eq!(value, 42);
109    ///
110    /// // Gracefully shutdown the threadpool
111    /// thread_pool.shutdown().wait().unwrap();
112    /// # }
113    /// ```
114    ///
115    /// # Panics
116    ///
117    /// This function panics if the spawn fails.
118    pub fn spawn_handle<F>(&self, future: F) -> SpawnHandle<F::Item, F::Error>
119    where
120        F: Future + Send + 'static,
121        F::Item: Send + 'static,
122        F::Error: Send + 'static,
123    {
124        SpawnHandle(oneshot::spawn(future, self.sender()))
125    }
126
127    /// Return a reference to the sender handle
128    ///
129    /// The handle is used to spawn futures onto the thread pool. It also
130    /// implements the `Executor` trait.
131    pub fn sender(&self) -> &Sender {
132        &self.inner.as_ref().unwrap().sender
133    }
134
135    /// Return a mutable reference to the sender handle
136    pub fn sender_mut(&mut self) -> &mut Sender {
137        &mut self.inner.as_mut().unwrap().sender
138    }
139
140    /// Shutdown the pool once it becomes idle.
141    ///
142    /// Idle is defined as the completion of all futures that have been spawned
143    /// onto the thread pool. There may still be outstanding handles when the
144    /// thread pool reaches an idle state.
145    ///
146    /// Once the idle state is reached, calling `spawn` on any outstanding
147    /// handle will result in an error. All worker threads are signaled and will
148    /// shutdown. The returned future completes once all worker threads have
149    /// completed the shutdown process.
150    pub fn shutdown_on_idle(mut self) -> Shutdown {
151        let inner = self.inner.take().unwrap();
152        inner.sender.pool.shutdown(false, false);
153        Shutdown::new(&inner.trigger)
154    }
155
156    /// Shutdown the pool
157    ///
158    /// This prevents the thread pool from accepting new tasks but will allow
159    /// any existing tasks to complete.
160    ///
161    /// Calling `spawn` on any outstanding handle will result in an error. All
162    /// worker threads are signaled and will shutdown. The returned future
163    /// completes once all worker threads have completed the shutdown process.
164    pub fn shutdown(mut self) -> Shutdown {
165        let inner = self.inner.take().unwrap();
166        inner.sender.pool.shutdown(true, false);
167        Shutdown::new(&inner.trigger)
168    }
169
170    /// Shutdown the pool immediately
171    ///
172    /// This will prevent the thread pool from accepting new tasks **and**
173    /// abort any tasks that are currently running on the thread pool.
174    ///
175    /// Calling `spawn` on any outstanding handle will result in an error. All
176    /// worker threads are signaled and will shutdown. The returned future
177    /// completes once all worker threads have completed the shutdown process.
178    pub fn shutdown_now(mut self) -> Shutdown {
179        let inner = self.inner.take().unwrap();
180        inner.sender.pool.shutdown(true, true);
181        Shutdown::new(&inner.trigger)
182    }
183}
184
185impl Drop for ThreadPool {
186    fn drop(&mut self) {
187        if let Some(inner) = self.inner.take() {
188            // Begin the shutdown process.
189            inner.sender.pool.shutdown(true, true);
190            let shutdown = Shutdown::new(&inner.trigger);
191
192            // Drop `inner` in order to drop its shutdown trigger.
193            drop(inner);
194
195            // Wait until all worker threads terminate and the threadpool's resources clean up.
196            let _ = shutdown.wait();
197        }
198    }
199}
200
201/// Handle returned from ThreadPool::spawn_handle.
202///
203/// This handle is a future representing the completion of a different future
204/// spawned on to the thread pool. Created through the ThreadPool::spawn_handle
205/// function this handle will resolve when the future provided resolves on the
206/// thread pool.
207#[derive(Debug)]
208pub struct SpawnHandle<T, E>(oneshot::SpawnHandle<T, E>);
209
210impl<T, E> Future for SpawnHandle<T, E> {
211    type Item = T;
212    type Error = E;
213
214    fn poll(&mut self) -> Poll<T, E> {
215        self.0.poll()
216    }
217}