tokio_threadpool/builder.rs
1use callback::Callback;
2use config::{Config, MAX_WORKERS};
3use park::{BoxPark, BoxedPark, DefaultPark};
4use pool::{Pool, MAX_BACKUP};
5use shutdown::ShutdownTrigger;
6use thread_pool::ThreadPool;
7use worker::{self, Worker, WorkerId};
8
9use std::any::Any;
10use std::cmp::max;
11use std::error::Error;
12use std::fmt;
13use std::sync::Arc;
14use std::time::Duration;
15
16use crossbeam_deque::Injector;
17use num_cpus;
18use tokio_executor::park::Park;
19use tokio_executor::Enter;
20
21/// Builds a thread pool with custom configuration values.
22///
23/// Methods can be chained in order to set the configuration values. The thread
24/// pool is constructed by calling [`build`].
25///
26/// New instances of `Builder` are obtained via [`Builder::new`].
27///
28/// See function level documentation for details on the various configuration
29/// settings.
30///
31/// [`build`]: #method.build
32/// [`Builder::new`]: #method.new
33///
34/// # Examples
35///
36/// ```
37/// # extern crate tokio_threadpool;
38/// # extern crate futures;
39/// # use tokio_threadpool::Builder;
40/// use futures::future::{Future, lazy};
41/// use std::time::Duration;
42///
43/// # pub fn main() {
44/// let thread_pool = Builder::new()
45/// .pool_size(4)
46/// .keep_alive(Some(Duration::from_secs(30)))
47/// .build();
48///
49/// thread_pool.spawn(lazy(|| {
50/// println!("called from a worker thread");
51/// Ok(())
52/// }));
53///
54/// // Gracefully shutdown the threadpool
55/// thread_pool.shutdown().wait().unwrap();
56/// # }
57/// ```
58pub struct Builder {
59 /// Thread pool specific configuration values
60 config: Config,
61
62 /// Number of workers to spawn
63 pool_size: usize,
64
65 /// Maximum number of futures that can be in a blocking section
66 /// concurrently.
67 max_blocking: usize,
68
69 /// Generates the `Park` instances
70 new_park: Box<dyn Fn(&WorkerId) -> BoxPark>,
71}
72
73impl Builder {
74 /// Returns a new thread pool builder initialized with default configuration
75 /// values.
76 ///
77 /// Configuration methods can be chained on the return value.
78 ///
79 /// # Examples
80 ///
81 /// ```
82 /// # extern crate tokio_threadpool;
83 /// # extern crate futures;
84 /// # use tokio_threadpool::Builder;
85 /// use std::time::Duration;
86 ///
87 /// # pub fn main() {
88 /// let thread_pool = Builder::new()
89 /// .pool_size(4)
90 /// .keep_alive(Some(Duration::from_secs(30)))
91 /// .build();
92 /// # }
93 /// ```
94 pub fn new() -> Builder {
95 let num_cpus = max(1, num_cpus::get());
96
97 let new_park =
98 Box::new(|_: &WorkerId| Box::new(BoxedPark::new(DefaultPark::new())) as BoxPark);
99
100 Builder {
101 pool_size: num_cpus,
102 max_blocking: 100,
103 config: Config {
104 keep_alive: None,
105 name_prefix: None,
106 stack_size: None,
107 around_worker: None,
108 after_start: None,
109 before_stop: None,
110 panic_handler: None,
111 },
112 new_park,
113 }
114 }
115
116 /// Set the maximum number of worker threads for the thread pool instance.
117 ///
118 /// This must be a number between 1 and 32,768 though it is advised to keep
119 /// this value on the smaller side.
120 ///
121 /// The default value is the number of cores available to the system.
122 ///
123 /// # Examples
124 ///
125 /// ```
126 /// # extern crate tokio_threadpool;
127 /// # extern crate futures;
128 /// # use tokio_threadpool::Builder;
129 ///
130 /// # pub fn main() {
131 /// let thread_pool = Builder::new()
132 /// .pool_size(4)
133 /// .build();
134 /// # }
135 /// ```
136 pub fn pool_size(&mut self, val: usize) -> &mut Self {
137 assert!(val >= 1, "at least one thread required");
138 assert!(val <= MAX_WORKERS, "max value is {}", MAX_WORKERS);
139
140 self.pool_size = val;
141 self
142 }
143
144 /// Set the maximum number of concurrent blocking sections.
145 ///
146 /// When the maximum concurrent `blocking` calls is reached, any further
147 /// calls to `blocking` will return `NotReady` and the task is notified once
148 /// previously in-flight calls to `blocking` return.
149 ///
150 /// This must be a number between 1 and 32,768 though it is advised to keep
151 /// this value on the smaller side.
152 ///
153 /// The default value is 100.
154 ///
155 /// # Examples
156 ///
157 /// ```
158 /// # extern crate tokio_threadpool;
159 /// # extern crate futures;
160 /// # use tokio_threadpool::Builder;
161 ///
162 /// # pub fn main() {
163 /// let thread_pool = Builder::new()
164 /// .max_blocking(200)
165 /// .build();
166 /// # }
167 /// ```
168 pub fn max_blocking(&mut self, val: usize) -> &mut Self {
169 assert!(val <= MAX_BACKUP, "max value is {}", MAX_BACKUP);
170 self.max_blocking = val;
171 self
172 }
173
174 /// Set the thread keep alive duration
175 ///
176 /// If set, a thread that has completed a `blocking` call will wait for up
177 /// to the specified duration to become a worker thread again. Once the
178 /// duration elapses, the thread will shutdown.
179 ///
180 /// When the value is `None`, the thread will wait to become a worker
181 /// thread forever.
182 ///
183 /// The default value is `None`.
184 ///
185 /// # Examples
186 ///
187 /// ```
188 /// # extern crate tokio_threadpool;
189 /// # extern crate futures;
190 /// # use tokio_threadpool::Builder;
191 /// use std::time::Duration;
192 ///
193 /// # pub fn main() {
194 /// let thread_pool = Builder::new()
195 /// .keep_alive(Some(Duration::from_secs(30)))
196 /// .build();
197 /// # }
198 /// ```
199 pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self {
200 self.config.keep_alive = val;
201 self
202 }
203
204 /// Sets a callback to be triggered when a panic during a future bubbles up
205 /// to Tokio. By default Tokio catches these panics, and they will be
206 /// ignored. The parameter passed to this callback is the same error value
207 /// returned from std::panic::catch_unwind(). To abort the process on
208 /// panics, use std::panic::resume_unwind() in this callback as shown
209 /// below.
210 ///
211 /// # Examples
212 ///
213 /// ```
214 /// # extern crate tokio_threadpool;
215 /// # extern crate futures;
216 /// # use tokio_threadpool::Builder;
217 ///
218 /// # pub fn main() {
219 /// let thread_pool = Builder::new()
220 /// .panic_handler(|err| std::panic::resume_unwind(err))
221 /// .build();
222 /// # }
223 /// ```
224 pub fn panic_handler<F>(&mut self, f: F) -> &mut Self
225 where
226 F: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
227 {
228 self.config.panic_handler = Some(Arc::new(f));
229 self
230 }
231
232 /// Set name prefix of threads spawned by the scheduler
233 ///
234 /// Thread name prefix is used for generating thread names. For example, if
235 /// prefix is `my-pool-`, then threads in the pool will get names like
236 /// `my-pool-1` etc.
237 ///
238 /// If this configuration is not set, then the thread will use the system
239 /// default naming scheme.
240 ///
241 /// # Examples
242 ///
243 /// ```
244 /// # extern crate tokio_threadpool;
245 /// # extern crate futures;
246 /// # use tokio_threadpool::Builder;
247 ///
248 /// # pub fn main() {
249 /// let thread_pool = Builder::new()
250 /// .name_prefix("my-pool-")
251 /// .build();
252 /// # }
253 /// ```
254 pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
255 self.config.name_prefix = Some(val.into());
256 self
257 }
258
259 /// Set the stack size (in bytes) for worker threads.
260 ///
261 /// The actual stack size may be greater than this value if the platform
262 /// specifies minimal stack size.
263 ///
264 /// The default stack size for spawned threads is 2 MiB, though this
265 /// particular stack size is subject to change in the future.
266 ///
267 /// # Examples
268 ///
269 /// ```
270 /// # extern crate tokio_threadpool;
271 /// # extern crate futures;
272 /// # use tokio_threadpool::Builder;
273 ///
274 /// # pub fn main() {
275 /// let thread_pool = Builder::new()
276 /// .stack_size(32 * 1024)
277 /// .build();
278 /// # }
279 /// ```
280 pub fn stack_size(&mut self, val: usize) -> &mut Self {
281 self.config.stack_size = Some(val);
282 self
283 }
284
285 /// Execute function `f` on each worker thread.
286 ///
287 /// This function is provided a handle to the worker and is expected to call
288 /// [`Worker::run`], otherwise the worker thread will shutdown without doing
289 /// any work.
290 ///
291 /// # Examples
292 ///
293 /// ```
294 /// # extern crate tokio_threadpool;
295 /// # extern crate futures;
296 /// # use tokio_threadpool::Builder;
297 ///
298 /// # pub fn main() {
299 /// let thread_pool = Builder::new()
300 /// .around_worker(|worker, _| {
301 /// println!("worker is starting up");
302 /// worker.run();
303 /// println!("worker is shutting down");
304 /// })
305 /// .build();
306 /// # }
307 /// ```
308 ///
309 /// [`Worker::run`]: struct.Worker.html#method.run
310 pub fn around_worker<F>(&mut self, f: F) -> &mut Self
311 where
312 F: Fn(&Worker, &mut Enter) + Send + Sync + 'static,
313 {
314 self.config.around_worker = Some(Callback::new(f));
315 self
316 }
317
318 /// Execute function `f` after each thread is started but before it starts
319 /// doing work.
320 ///
321 /// This is intended for bookkeeping and monitoring use cases.
322 ///
323 /// # Examples
324 ///
325 /// ```
326 /// # extern crate tokio_threadpool;
327 /// # extern crate futures;
328 /// # use tokio_threadpool::Builder;
329 ///
330 /// # pub fn main() {
331 /// let thread_pool = Builder::new()
332 /// .after_start(|| {
333 /// println!("thread started");
334 /// })
335 /// .build();
336 /// # }
337 /// ```
338 pub fn after_start<F>(&mut self, f: F) -> &mut Self
339 where
340 F: Fn() + Send + Sync + 'static,
341 {
342 self.config.after_start = Some(Arc::new(f));
343 self
344 }
345
346 /// Execute function `f` before each thread stops.
347 ///
348 /// This is intended for bookkeeping and monitoring use cases.
349 ///
350 /// # Examples
351 ///
352 /// ```
353 /// # extern crate tokio_threadpool;
354 /// # extern crate futures;
355 /// # use tokio_threadpool::Builder;
356 ///
357 /// # pub fn main() {
358 /// let thread_pool = Builder::new()
359 /// .before_stop(|| {
360 /// println!("thread stopping");
361 /// })
362 /// .build();
363 /// # }
364 /// ```
365 pub fn before_stop<F>(&mut self, f: F) -> &mut Self
366 where
367 F: Fn() + Send + Sync + 'static,
368 {
369 self.config.before_stop = Some(Arc::new(f));
370 self
371 }
372
373 /// Customize the `park` instance used by each worker thread.
374 ///
375 /// The provided closure `f` is called once per worker and returns a `Park`
376 /// instance that is used by the worker to put itself to sleep.
377 ///
378 /// # Examples
379 ///
380 /// ```
381 /// # extern crate tokio_threadpool;
382 /// # extern crate futures;
383 /// # use tokio_threadpool::Builder;
384 /// # fn decorate<F>(f: F) -> F { f }
385 ///
386 /// # pub fn main() {
387 /// let thread_pool = Builder::new()
388 /// .custom_park(|_| {
389 /// use tokio_threadpool::park::DefaultPark;
390 ///
391 /// // This is the default park type that the worker would use if we
392 /// // did not customize it.
393 /// let park = DefaultPark::new();
394 ///
395 /// // Decorate the `park` instance, allowing us to customize work
396 /// // that happens when a worker thread goes to sleep.
397 /// decorate(park)
398 /// })
399 /// .build();
400 /// # }
401 /// ```
402 pub fn custom_park<F, P>(&mut self, f: F) -> &mut Self
403 where
404 F: Fn(&WorkerId) -> P + 'static,
405 P: Park + Send + 'static,
406 P::Error: Error,
407 {
408 self.new_park = Box::new(move |id| Box::new(BoxedPark::new(f(id))));
409
410 self
411 }
412
413 /// Create the configured `ThreadPool`.
414 ///
415 /// The returned `ThreadPool` instance is ready to spawn tasks.
416 ///
417 /// # Examples
418 ///
419 /// ```
420 /// # extern crate tokio_threadpool;
421 /// # extern crate futures;
422 /// # use tokio_threadpool::Builder;
423 ///
424 /// # pub fn main() {
425 /// let thread_pool = Builder::new()
426 /// .build();
427 /// # }
428 /// ```
429 pub fn build(&self) -> ThreadPool {
430 trace!("build; num-workers={}", self.pool_size);
431
432 // Create the worker entry list
433 let workers: Arc<[worker::Entry]> = {
434 let mut workers = vec![];
435
436 for i in 0..self.pool_size {
437 let id = WorkerId::new(i);
438 let park = (self.new_park)(&id);
439 let unpark = park.unpark();
440
441 workers.push(worker::Entry::new(park, unpark));
442 }
443
444 workers.into()
445 };
446
447 let queue = Arc::new(Injector::new());
448
449 // Create a trigger that will clean up resources on shutdown.
450 //
451 // The `Pool` contains a weak reference to it, while `Worker`s and the `ThreadPool` contain
452 // strong references.
453 let trigger = Arc::new(ShutdownTrigger::new(workers.clone(), queue.clone()));
454
455 // Create the pool
456 let pool = Arc::new(Pool::new(
457 workers,
458 Arc::downgrade(&trigger),
459 self.max_blocking,
460 self.config.clone(),
461 queue,
462 ));
463
464 ThreadPool::new2(pool, trigger)
465 }
466}
467
468impl fmt::Debug for Builder {
469 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
470 fmt.debug_struct("Builder")
471 .field("config", &self.config)
472 .field("pool_size", &self.pool_size)
473 .field("new_park", &"Box<Fn() -> BoxPark>")
474 .finish()
475 }
476}