tokio_threadpool/thread_pool.rs
1use builder::Builder;
2use pool::Pool;
3use sender::Sender;
4use shutdown::{Shutdown, ShutdownTrigger};
5
6use futures::sync::oneshot;
7use futures::{Future, Poll};
8
9use std::sync::Arc;
10
11/// Work-stealing based thread pool for executing futures.
12///
13/// If a `ThreadPool` instance is dropped without explicitly being shutdown,
14/// `shutdown_now` is called implicitly, forcing all tasks that have not yet
15/// completed to be dropped.
16///
17/// Create `ThreadPool` instances using `Builder`.
18#[derive(Debug)]
19pub struct ThreadPool {
20 inner: Option<Inner>,
21}
22
23#[derive(Debug)]
24struct Inner {
25 sender: Sender,
26 trigger: Arc<ShutdownTrigger>,
27}
28
29impl ThreadPool {
30 /// Create a new `ThreadPool` with default values.
31 ///
32 /// Use [`Builder`] for creating a configured thread pool.
33 ///
34 /// [`Builder`]: struct.Builder.html
35 pub fn new() -> ThreadPool {
36 Builder::new().build()
37 }
38
39 pub(crate) fn new2(pool: Arc<Pool>, trigger: Arc<ShutdownTrigger>) -> ThreadPool {
40 ThreadPool {
41 inner: Some(Inner {
42 sender: Sender { pool },
43 trigger,
44 }),
45 }
46 }
47
48 /// Spawn a future onto the thread pool.
49 ///
50 /// This function takes ownership of the future and randomly assigns it to a
51 /// worker thread. The thread will then start executing the future.
52 ///
53 /// # Examples
54 ///
55 /// ```rust
56 /// # extern crate tokio_threadpool;
57 /// # extern crate futures;
58 /// # use tokio_threadpool::ThreadPool;
59 /// use futures::future::{Future, lazy};
60 ///
61 /// # pub fn main() {
62 /// // Create a thread pool with default configuration values
63 /// let thread_pool = ThreadPool::new();
64 ///
65 /// thread_pool.spawn(lazy(|| {
66 /// println!("called from a worker thread");
67 /// Ok(())
68 /// }));
69 ///
70 /// // Gracefully shutdown the threadpool
71 /// thread_pool.shutdown().wait().unwrap();
72 /// # }
73 /// ```
74 ///
75 /// # Panics
76 ///
77 /// This function panics if the spawn fails. Use [`Sender::spawn`] for a
78 /// version that returns a `Result` instead of panicking.
79 pub fn spawn<F>(&self, future: F)
80 where
81 F: Future<Item = (), Error = ()> + Send + 'static,
82 {
83 self.sender().spawn(future).unwrap();
84 }
85
86 /// Spawn a future on to the thread pool, return a future representing
87 /// the produced value.
88 ///
89 /// The SpawnHandle returned is a future that is a proxy for future itself.
90 /// When future completes on this thread pool then the SpawnHandle will itself
91 /// be resolved.
92 ///
93 /// # Examples
94 ///
95 /// ```rust
96 /// # extern crate tokio_threadpool;
97 /// # extern crate futures;
98 /// # use tokio_threadpool::ThreadPool;
99 /// use futures::future::{Future, lazy};
100 ///
101 /// # pub fn main() {
102 /// // Create a thread pool with default configuration values
103 /// let thread_pool = ThreadPool::new();
104 ///
105 /// let handle = thread_pool.spawn_handle(lazy(|| Ok::<_, ()>(42)));
106 ///
107 /// let value = handle.wait().unwrap();
108 /// assert_eq!(value, 42);
109 ///
110 /// // Gracefully shutdown the threadpool
111 /// thread_pool.shutdown().wait().unwrap();
112 /// # }
113 /// ```
114 ///
115 /// # Panics
116 ///
117 /// This function panics if the spawn fails.
118 pub fn spawn_handle<F>(&self, future: F) -> SpawnHandle<F::Item, F::Error>
119 where
120 F: Future + Send + 'static,
121 F::Item: Send + 'static,
122 F::Error: Send + 'static,
123 {
124 SpawnHandle(oneshot::spawn(future, self.sender()))
125 }
126
127 /// Return a reference to the sender handle
128 ///
129 /// The handle is used to spawn futures onto the thread pool. It also
130 /// implements the `Executor` trait.
131 pub fn sender(&self) -> &Sender {
132 &self.inner.as_ref().unwrap().sender
133 }
134
135 /// Return a mutable reference to the sender handle
136 pub fn sender_mut(&mut self) -> &mut Sender {
137 &mut self.inner.as_mut().unwrap().sender
138 }
139
140 /// Shutdown the pool once it becomes idle.
141 ///
142 /// Idle is defined as the completion of all futures that have been spawned
143 /// onto the thread pool. There may still be outstanding handles when the
144 /// thread pool reaches an idle state.
145 ///
146 /// Once the idle state is reached, calling `spawn` on any outstanding
147 /// handle will result in an error. All worker threads are signaled and will
148 /// shutdown. The returned future completes once all worker threads have
149 /// completed the shutdown process.
150 pub fn shutdown_on_idle(mut self) -> Shutdown {
151 let inner = self.inner.take().unwrap();
152 inner.sender.pool.shutdown(false, false);
153 Shutdown::new(&inner.trigger)
154 }
155
156 /// Shutdown the pool
157 ///
158 /// This prevents the thread pool from accepting new tasks but will allow
159 /// any existing tasks to complete.
160 ///
161 /// Calling `spawn` on any outstanding handle will result in an error. All
162 /// worker threads are signaled and will shutdown. The returned future
163 /// completes once all worker threads have completed the shutdown process.
164 pub fn shutdown(mut self) -> Shutdown {
165 let inner = self.inner.take().unwrap();
166 inner.sender.pool.shutdown(true, false);
167 Shutdown::new(&inner.trigger)
168 }
169
170 /// Shutdown the pool immediately
171 ///
172 /// This will prevent the thread pool from accepting new tasks **and**
173 /// abort any tasks that are currently running on the thread pool.
174 ///
175 /// Calling `spawn` on any outstanding handle will result in an error. All
176 /// worker threads are signaled and will shutdown. The returned future
177 /// completes once all worker threads have completed the shutdown process.
178 pub fn shutdown_now(mut self) -> Shutdown {
179 let inner = self.inner.take().unwrap();
180 inner.sender.pool.shutdown(true, true);
181 Shutdown::new(&inner.trigger)
182 }
183}
184
185impl Drop for ThreadPool {
186 fn drop(&mut self) {
187 if let Some(inner) = self.inner.take() {
188 // Begin the shutdown process.
189 inner.sender.pool.shutdown(true, true);
190 let shutdown = Shutdown::new(&inner.trigger);
191
192 // Drop `inner` in order to drop its shutdown trigger.
193 drop(inner);
194
195 // Wait until all worker threads terminate and the threadpool's resources clean up.
196 let _ = shutdown.wait();
197 }
198 }
199}
200
201/// Handle returned from ThreadPool::spawn_handle.
202///
203/// This handle is a future representing the completion of a different future
204/// spawned on to the thread pool. Created through the ThreadPool::spawn_handle
205/// function this handle will resolve when the future provided resolves on the
206/// thread pool.
207#[derive(Debug)]
208pub struct SpawnHandle<T, E>(oneshot::SpawnHandle<T, E>);
209
210impl<T, E> Future for SpawnHandle<T, E> {
211 type Item = T;
212 type Error = E;
213
214 fn poll(&mut self) -> Poll<T, E> {
215 self.0.poll()
216 }
217}