tokio_threadpool/
builder.rs

1use callback::Callback;
2use config::{Config, MAX_WORKERS};
3use park::{BoxPark, BoxedPark, DefaultPark};
4use pool::{Pool, MAX_BACKUP};
5use shutdown::ShutdownTrigger;
6use thread_pool::ThreadPool;
7use worker::{self, Worker, WorkerId};
8
9use std::any::Any;
10use std::cmp::max;
11use std::error::Error;
12use std::fmt;
13use std::sync::Arc;
14use std::time::Duration;
15
16use crossbeam_deque::Injector;
17use num_cpus;
18use tokio_executor::park::Park;
19use tokio_executor::Enter;
20
21/// Builds a thread pool with custom configuration values.
22///
23/// Methods can be chained in order to set the configuration values. The thread
24/// pool is constructed by calling [`build`].
25///
26/// New instances of `Builder` are obtained via [`Builder::new`].
27///
28/// See function level documentation for details on the various configuration
29/// settings.
30///
31/// [`build`]: #method.build
32/// [`Builder::new`]: #method.new
33///
34/// # Examples
35///
36/// ```
37/// # extern crate tokio_threadpool;
38/// # extern crate futures;
39/// # use tokio_threadpool::Builder;
40/// use futures::future::{Future, lazy};
41/// use std::time::Duration;
42///
43/// # pub fn main() {
44/// let thread_pool = Builder::new()
45///     .pool_size(4)
46///     .keep_alive(Some(Duration::from_secs(30)))
47///     .build();
48///
49/// thread_pool.spawn(lazy(|| {
50///     println!("called from a worker thread");
51///     Ok(())
52/// }));
53///
54/// // Gracefully shutdown the threadpool
55/// thread_pool.shutdown().wait().unwrap();
56/// # }
57/// ```
58pub struct Builder {
59    /// Thread pool specific configuration values
60    config: Config,
61
62    /// Number of workers to spawn
63    pool_size: usize,
64
65    /// Maximum number of futures that can be in a blocking section
66    /// concurrently.
67    max_blocking: usize,
68
69    /// Generates the `Park` instances
70    new_park: Box<dyn Fn(&WorkerId) -> BoxPark>,
71}
72
73impl Builder {
74    /// Returns a new thread pool builder initialized with default configuration
75    /// values.
76    ///
77    /// Configuration methods can be chained on the return value.
78    ///
79    /// # Examples
80    ///
81    /// ```
82    /// # extern crate tokio_threadpool;
83    /// # extern crate futures;
84    /// # use tokio_threadpool::Builder;
85    /// use std::time::Duration;
86    ///
87    /// # pub fn main() {
88    /// let thread_pool = Builder::new()
89    ///     .pool_size(4)
90    ///     .keep_alive(Some(Duration::from_secs(30)))
91    ///     .build();
92    /// # }
93    /// ```
94    pub fn new() -> Builder {
95        let num_cpus = max(1, num_cpus::get());
96
97        let new_park =
98            Box::new(|_: &WorkerId| Box::new(BoxedPark::new(DefaultPark::new())) as BoxPark);
99
100        Builder {
101            pool_size: num_cpus,
102            max_blocking: 100,
103            config: Config {
104                keep_alive: None,
105                name_prefix: None,
106                stack_size: None,
107                around_worker: None,
108                after_start: None,
109                before_stop: None,
110                panic_handler: None,
111            },
112            new_park,
113        }
114    }
115
116    /// Set the maximum number of worker threads for the thread pool instance.
117    ///
118    /// This must be a number between 1 and 32,768 though it is advised to keep
119    /// this value on the smaller side.
120    ///
121    /// The default value is the number of cores available to the system.
122    ///
123    /// # Examples
124    ///
125    /// ```
126    /// # extern crate tokio_threadpool;
127    /// # extern crate futures;
128    /// # use tokio_threadpool::Builder;
129    ///
130    /// # pub fn main() {
131    /// let thread_pool = Builder::new()
132    ///     .pool_size(4)
133    ///     .build();
134    /// # }
135    /// ```
136    pub fn pool_size(&mut self, val: usize) -> &mut Self {
137        assert!(val >= 1, "at least one thread required");
138        assert!(val <= MAX_WORKERS, "max value is {}", MAX_WORKERS);
139
140        self.pool_size = val;
141        self
142    }
143
144    /// Set the maximum number of concurrent blocking sections.
145    ///
146    /// When the maximum concurrent `blocking` calls is reached, any further
147    /// calls to `blocking` will return `NotReady` and the task is notified once
148    /// previously in-flight calls to `blocking` return.
149    ///
150    /// This must be a number between 1 and 32,768 though it is advised to keep
151    /// this value on the smaller side.
152    ///
153    /// The default value is 100.
154    ///
155    /// # Examples
156    ///
157    /// ```
158    /// # extern crate tokio_threadpool;
159    /// # extern crate futures;
160    /// # use tokio_threadpool::Builder;
161    ///
162    /// # pub fn main() {
163    /// let thread_pool = Builder::new()
164    ///     .max_blocking(200)
165    ///     .build();
166    /// # }
167    /// ```
168    pub fn max_blocking(&mut self, val: usize) -> &mut Self {
169        assert!(val <= MAX_BACKUP, "max value is {}", MAX_BACKUP);
170        self.max_blocking = val;
171        self
172    }
173
174    /// Set the thread keep alive duration
175    ///
176    /// If set, a thread that has completed a `blocking` call will wait for up
177    /// to the specified duration to become a worker thread again. Once the
178    /// duration elapses, the thread will shutdown.
179    ///
180    /// When the value is `None`, the thread will wait to become a worker
181    /// thread forever.
182    ///
183    /// The default value is `None`.
184    ///
185    /// # Examples
186    ///
187    /// ```
188    /// # extern crate tokio_threadpool;
189    /// # extern crate futures;
190    /// # use tokio_threadpool::Builder;
191    /// use std::time::Duration;
192    ///
193    /// # pub fn main() {
194    /// let thread_pool = Builder::new()
195    ///     .keep_alive(Some(Duration::from_secs(30)))
196    ///     .build();
197    /// # }
198    /// ```
199    pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self {
200        self.config.keep_alive = val;
201        self
202    }
203
204    /// Sets a callback to be triggered when a panic during a future bubbles up
205    /// to Tokio. By default Tokio catches these panics, and they will be
206    /// ignored. The parameter passed to this callback is the same error value
207    /// returned from std::panic::catch_unwind(). To abort the process on
208    /// panics, use std::panic::resume_unwind() in this callback as shown
209    /// below.
210    ///
211    /// # Examples
212    ///
213    /// ```
214    /// # extern crate tokio_threadpool;
215    /// # extern crate futures;
216    /// # use tokio_threadpool::Builder;
217    ///
218    /// # pub fn main() {
219    /// let thread_pool = Builder::new()
220    ///     .panic_handler(|err| std::panic::resume_unwind(err))
221    ///     .build();
222    /// # }
223    /// ```
224    pub fn panic_handler<F>(&mut self, f: F) -> &mut Self
225    where
226        F: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
227    {
228        self.config.panic_handler = Some(Arc::new(f));
229        self
230    }
231
232    /// Set name prefix of threads spawned by the scheduler
233    ///
234    /// Thread name prefix is used for generating thread names. For example, if
235    /// prefix is `my-pool-`, then threads in the pool will get names like
236    /// `my-pool-1` etc.
237    ///
238    /// If this configuration is not set, then the thread will use the system
239    /// default naming scheme.
240    ///
241    /// # Examples
242    ///
243    /// ```
244    /// # extern crate tokio_threadpool;
245    /// # extern crate futures;
246    /// # use tokio_threadpool::Builder;
247    ///
248    /// # pub fn main() {
249    /// let thread_pool = Builder::new()
250    ///     .name_prefix("my-pool-")
251    ///     .build();
252    /// # }
253    /// ```
254    pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
255        self.config.name_prefix = Some(val.into());
256        self
257    }
258
259    /// Set the stack size (in bytes) for worker threads.
260    ///
261    /// The actual stack size may be greater than this value if the platform
262    /// specifies minimal stack size.
263    ///
264    /// The default stack size for spawned threads is 2 MiB, though this
265    /// particular stack size is subject to change in the future.
266    ///
267    /// # Examples
268    ///
269    /// ```
270    /// # extern crate tokio_threadpool;
271    /// # extern crate futures;
272    /// # use tokio_threadpool::Builder;
273    ///
274    /// # pub fn main() {
275    /// let thread_pool = Builder::new()
276    ///     .stack_size(32 * 1024)
277    ///     .build();
278    /// # }
279    /// ```
280    pub fn stack_size(&mut self, val: usize) -> &mut Self {
281        self.config.stack_size = Some(val);
282        self
283    }
284
285    /// Execute function `f` on each worker thread.
286    ///
287    /// This function is provided a handle to the worker and is expected to call
288    /// [`Worker::run`], otherwise the worker thread will shutdown without doing
289    /// any work.
290    ///
291    /// # Examples
292    ///
293    /// ```
294    /// # extern crate tokio_threadpool;
295    /// # extern crate futures;
296    /// # use tokio_threadpool::Builder;
297    ///
298    /// # pub fn main() {
299    /// let thread_pool = Builder::new()
300    ///     .around_worker(|worker, _| {
301    ///         println!("worker is starting up");
302    ///         worker.run();
303    ///         println!("worker is shutting down");
304    ///     })
305    ///     .build();
306    /// # }
307    /// ```
308    ///
309    /// [`Worker::run`]: struct.Worker.html#method.run
310    pub fn around_worker<F>(&mut self, f: F) -> &mut Self
311    where
312        F: Fn(&Worker, &mut Enter) + Send + Sync + 'static,
313    {
314        self.config.around_worker = Some(Callback::new(f));
315        self
316    }
317
318    /// Execute function `f` after each thread is started but before it starts
319    /// doing work.
320    ///
321    /// This is intended for bookkeeping and monitoring use cases.
322    ///
323    /// # Examples
324    ///
325    /// ```
326    /// # extern crate tokio_threadpool;
327    /// # extern crate futures;
328    /// # use tokio_threadpool::Builder;
329    ///
330    /// # pub fn main() {
331    /// let thread_pool = Builder::new()
332    ///     .after_start(|| {
333    ///         println!("thread started");
334    ///     })
335    ///     .build();
336    /// # }
337    /// ```
338    pub fn after_start<F>(&mut self, f: F) -> &mut Self
339    where
340        F: Fn() + Send + Sync + 'static,
341    {
342        self.config.after_start = Some(Arc::new(f));
343        self
344    }
345
346    /// Execute function `f` before each thread stops.
347    ///
348    /// This is intended for bookkeeping and monitoring use cases.
349    ///
350    /// # Examples
351    ///
352    /// ```
353    /// # extern crate tokio_threadpool;
354    /// # extern crate futures;
355    /// # use tokio_threadpool::Builder;
356    ///
357    /// # pub fn main() {
358    /// let thread_pool = Builder::new()
359    ///     .before_stop(|| {
360    ///         println!("thread stopping");
361    ///     })
362    ///     .build();
363    /// # }
364    /// ```
365    pub fn before_stop<F>(&mut self, f: F) -> &mut Self
366    where
367        F: Fn() + Send + Sync + 'static,
368    {
369        self.config.before_stop = Some(Arc::new(f));
370        self
371    }
372
373    /// Customize the `park` instance used by each worker thread.
374    ///
375    /// The provided closure `f` is called once per worker and returns a `Park`
376    /// instance that is used by the worker to put itself to sleep.
377    ///
378    /// # Examples
379    ///
380    /// ```
381    /// # extern crate tokio_threadpool;
382    /// # extern crate futures;
383    /// # use tokio_threadpool::Builder;
384    /// # fn decorate<F>(f: F) -> F { f }
385    ///
386    /// # pub fn main() {
387    /// let thread_pool = Builder::new()
388    ///     .custom_park(|_| {
389    ///         use tokio_threadpool::park::DefaultPark;
390    ///
391    ///         // This is the default park type that the worker would use if we
392    ///         // did not customize it.
393    ///         let park = DefaultPark::new();
394    ///
395    ///         // Decorate the `park` instance, allowing us to customize work
396    ///         // that happens when a worker thread goes to sleep.
397    ///         decorate(park)
398    ///     })
399    ///     .build();
400    /// # }
401    /// ```
402    pub fn custom_park<F, P>(&mut self, f: F) -> &mut Self
403    where
404        F: Fn(&WorkerId) -> P + 'static,
405        P: Park + Send + 'static,
406        P::Error: Error,
407    {
408        self.new_park = Box::new(move |id| Box::new(BoxedPark::new(f(id))));
409
410        self
411    }
412
413    /// Create the configured `ThreadPool`.
414    ///
415    /// The returned `ThreadPool` instance is ready to spawn tasks.
416    ///
417    /// # Examples
418    ///
419    /// ```
420    /// # extern crate tokio_threadpool;
421    /// # extern crate futures;
422    /// # use tokio_threadpool::Builder;
423    ///
424    /// # pub fn main() {
425    /// let thread_pool = Builder::new()
426    ///     .build();
427    /// # }
428    /// ```
429    pub fn build(&self) -> ThreadPool {
430        trace!("build; num-workers={}", self.pool_size);
431
432        // Create the worker entry list
433        let workers: Arc<[worker::Entry]> = {
434            let mut workers = vec![];
435
436            for i in 0..self.pool_size {
437                let id = WorkerId::new(i);
438                let park = (self.new_park)(&id);
439                let unpark = park.unpark();
440
441                workers.push(worker::Entry::new(park, unpark));
442            }
443
444            workers.into()
445        };
446
447        let queue = Arc::new(Injector::new());
448
449        // Create a trigger that will clean up resources on shutdown.
450        //
451        // The `Pool` contains a weak reference to it, while `Worker`s and the `ThreadPool` contain
452        // strong references.
453        let trigger = Arc::new(ShutdownTrigger::new(workers.clone(), queue.clone()));
454
455        // Create the pool
456        let pool = Arc::new(Pool::new(
457            workers,
458            Arc::downgrade(&trigger),
459            self.max_blocking,
460            self.config.clone(),
461            queue,
462        ));
463
464        ThreadPool::new2(pool, trigger)
465    }
466}
467
468impl fmt::Debug for Builder {
469    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
470        fmt.debug_struct("Builder")
471            .field("config", &self.config)
472            .field("pool_size", &self.pool_size)
473            .field("new_park", &"Box<Fn() -> BoxPark>")
474            .finish()
475    }
476}