broker_tokio/runtime/
builder.rs

1use crate::runtime::handle::Handle;
2use crate::runtime::shell::Shell;
3use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
4
5use std::fmt;
6#[cfg(not(loom))]
7use std::sync::Arc;
8
9/// Builds Tokio Runtime with custom configuration values.
10///
11/// Methods can be chained in order to set the configuration values. The
12/// Runtime is constructed by calling [`build`].
13///
14/// New instances of `Builder` are obtained via [`Builder::new`].
15///
16/// See function level documentation for details on the various configuration
17/// settings.
18///
19/// [`build`]: #method.build
20/// [`Builder::new`]: #method.new
21///
22/// # Examples
23///
24/// ```
25/// use tokio::runtime::Builder;
26///
27/// fn main() {
28///     // build runtime
29///     let runtime = Builder::new()
30///         .threaded_scheduler()
31///         .core_threads(4)
32///         .thread_name("my-custom-name")
33///         .thread_stack_size(3 * 1024 * 1024)
34///         .build()
35///         .unwrap();
36///
37///     // use runtime ...
38/// }
39/// ```
40pub struct Builder {
41    /// The task execution model to use.
42    kind: Kind,
43
44    /// Whether or not to enable the I/O driver
45    enable_io: bool,
46
47    /// Whether or not to enable the time driver
48    enable_time: bool,
49
50    /// The number of worker threads, used by Runtime.
51    ///
52    /// Only used when not using the current-thread executor.
53    core_threads: usize,
54
55    /// Cap on thread usage.
56    max_threads: usize,
57
58    /// Name used for threads spawned by the runtime.
59    pub(super) thread_name: String,
60
61    /// Stack size used for threads spawned by the runtime.
62    pub(super) thread_stack_size: Option<usize>,
63
64    /// Callback to run after each thread starts.
65    pub(super) after_start: Option<Callback>,
66
67    /// To run before each worker thread stops
68    pub(super) before_stop: Option<Callback>,
69}
70
71#[derive(Debug, Clone, Copy)]
72enum Kind {
73    Shell,
74    #[cfg(feature = "rt-core")]
75    Basic,
76    #[cfg(feature = "rt-threaded")]
77    ThreadPool,
78}
79
80impl Builder {
81    /// Returns a new runtime builder initialized with default configuration
82    /// values.
83    ///
84    /// Configuration methods can be chained on the return value.
85    pub fn new() -> Builder {
86        Builder {
87            // No task execution by default
88            kind: Kind::Shell,
89
90            // I/O defaults to "off"
91            enable_io: false,
92
93            // Time defaults to "off"
94            enable_time: false,
95
96            // Default to use an equal number of threads to number of CPU cores
97            core_threads: crate::loom::sys::num_cpus(),
98
99            max_threads: 512,
100
101            // Default thread name
102            thread_name: "tokio-runtime-worker".into(),
103
104            // Do not set a stack size by default
105            thread_stack_size: None,
106
107            // No worker thread callbacks
108            after_start: None,
109            before_stop: None,
110        }
111    }
112
113    /// Enable both I/O and time drivers.
114    ///
115    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
116    /// individually. If additional components are added to Tokio in the future,
117    /// `enable_all` will include these future components.
118    ///
119    /// # Examples
120    ///
121    /// ```
122    /// use tokio::runtime;
123    ///
124    /// let rt = runtime::Builder::new()
125    ///     .enable_all()
126    ///     .build()
127    ///     .unwrap();
128    /// ```
129    pub fn enable_all(&mut self) -> &mut Self {
130        #[cfg(feature = "io-driver")]
131        self.enable_io();
132        #[cfg(feature = "time")]
133        self.enable_time();
134
135        self
136    }
137
138    #[deprecated(note = "In future will be replaced by core_threads method")]
139    /// Set the maximum number of worker threads for the `Runtime`'s thread pool.
140    ///
141    /// This must be a number between 1 and 32,768 though it is advised to keep
142    /// this value on the smaller side.
143    ///
144    /// The default value is the number of cores available to the system.
145    pub fn num_threads(&mut self, val: usize) -> &mut Self {
146        self.core_threads = val;
147        self
148    }
149
150    /// Set the core number of worker threads for the `Runtime`'s thread pool.
151    ///
152    /// This should be a number between 1 and 32,768 though it is advised to keep
153    /// this value on the smaller side.
154    ///
155    /// The default value is the number of cores available to the system.
156    ///
157    /// These threads will be always active and running.
158    ///
159    /// # Examples
160    ///
161    /// ```
162    /// use tokio::runtime;
163    ///
164    /// let rt = runtime::Builder::new()
165    ///     .core_threads(4)
166    ///     .build()
167    ///     .unwrap();
168    /// ```
169    pub fn core_threads(&mut self, val: usize) -> &mut Self {
170        assert_ne!(val, 0, "Core threads cannot be zero");
171        self.core_threads = val;
172        self
173    }
174
175    /// Specifies limit for threads, spawned by the Runtime.
176    ///
177    /// This is number of threads to be used by Runtime, including `core_threads`
178    /// Having `max_threads` less than `core_threads` results in invalid configuration
179    /// when building multi-threaded `Runtime`, which would cause a panic.
180    ///
181    /// Similarly to the `core_threads`, this number should be between 1 and 32,768.
182    ///
183    /// The default value is 512.
184    ///
185    /// When multi-threaded runtime is not used, will act as limit on additional threads.
186    ///
187    /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for
188    /// blocking annotations) as `max_threads - core_threads`.
189    pub fn max_threads(&mut self, val: usize) -> &mut Self {
190        assert_ne!(val, 0, "Thread limit cannot be zero");
191        self.max_threads = val;
192        self
193    }
194
195    /// Set name of threads spawned by the `Runtime`'s thread pool.
196    ///
197    /// The default name is "tokio-runtime-worker".
198    ///
199    /// # Examples
200    ///
201    /// ```
202    /// # use tokio::runtime;
203    ///
204    /// # pub fn main() {
205    /// let rt = runtime::Builder::new()
206    ///     .thread_name("my-pool")
207    ///     .build();
208    /// # }
209    /// ```
210    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
211        self.thread_name = val.into();
212        self
213    }
214
215    /// Set the stack size (in bytes) for worker threads.
216    ///
217    /// The actual stack size may be greater than this value if the platform
218    /// specifies minimal stack size.
219    ///
220    /// The default stack size for spawned threads is 2 MiB, though this
221    /// particular stack size is subject to change in the future.
222    ///
223    /// # Examples
224    ///
225    /// ```
226    /// # use tokio::runtime;
227    ///
228    /// # pub fn main() {
229    /// let rt = runtime::Builder::new()
230    ///     .thread_stack_size(32 * 1024)
231    ///     .build();
232    /// # }
233    /// ```
234    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
235        self.thread_stack_size = Some(val);
236        self
237    }
238
239    /// Execute function `f` after each thread is started but before it starts
240    /// doing work.
241    ///
242    /// This is intended for bookkeeping and monitoring use cases.
243    ///
244    /// # Examples
245    ///
246    /// ```
247    /// # use tokio::runtime;
248    ///
249    /// # pub fn main() {
250    /// let runtime = runtime::Builder::new()
251    ///     .on_thread_start(|| {
252    ///         println!("thread started");
253    ///     })
254    ///     .build();
255    /// # }
256    /// ```
257    #[cfg(not(loom))]
258    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
259    where
260        F: Fn() + Send + Sync + 'static,
261    {
262        self.after_start = Some(Arc::new(f));
263        self
264    }
265
266    /// Execute function `f` before each thread stops.
267    ///
268    /// This is intended for bookkeeping and monitoring use cases.
269    ///
270    /// # Examples
271    ///
272    /// ```
273    /// # use tokio::runtime;
274    ///
275    /// # pub fn main() {
276    /// let runtime = runtime::Builder::new()
277    ///     .on_thread_stop(|| {
278    ///         println!("thread stopping");
279    ///     })
280    ///     .build();
281    /// # }
282    /// ```
283    #[cfg(not(loom))]
284    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
285    where
286        F: Fn() + Send + Sync + 'static,
287    {
288        self.before_stop = Some(Arc::new(f));
289        self
290    }
291
292    /// Create the configured `Runtime`.
293    ///
294    /// The returned `ThreadPool` instance is ready to spawn tasks.
295    ///
296    /// # Examples
297    ///
298    /// ```
299    /// use tokio::runtime::Builder;
300    ///
301    /// let mut rt = Builder::new().build().unwrap();
302    ///
303    /// rt.block_on(async {
304    ///     println!("Hello from the Tokio runtime");
305    /// });
306    /// ```
307    pub fn build(&mut self) -> io::Result<Runtime> {
308        match self.kind {
309            Kind::Shell => self.build_shell_runtime(),
310            #[cfg(feature = "rt-core")]
311            Kind::Basic => self.build_basic_runtime(),
312            #[cfg(feature = "rt-threaded")]
313            Kind::ThreadPool => self.build_threaded_runtime(),
314        }
315    }
316
317    fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
318        use crate::runtime::Kind;
319
320        let clock = time::create_clock();
321
322        // Create I/O driver
323        let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
324        let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
325
326        let spawner = Spawner::Shell;
327
328        let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
329        let blocking_spawner = blocking_pool.spawner().clone();
330
331        Ok(Runtime {
332            kind: Kind::Shell(Shell::new(driver)),
333            handle: Handle {
334                spawner,
335                io_handle,
336                time_handle,
337                clock,
338                blocking_spawner,
339            },
340            blocking_pool,
341        })
342    }
343}
344
345cfg_io_driver! {
346    impl Builder {
347        /// Enable the I/O driver.
348        ///
349        /// Doing this enables using net, process, signal, and some I/O types on
350        /// the runtime.
351        ///
352        /// # Examples
353        ///
354        /// ```
355        /// use tokio::runtime;
356        ///
357        /// let rt = runtime::Builder::new()
358        ///     .enable_io()
359        ///     .build()
360        ///     .unwrap();
361        /// ```
362        pub fn enable_io(&mut self) -> &mut Self {
363            self.enable_io = true;
364            self
365        }
366    }
367}
368
369cfg_time! {
370    impl Builder {
371        /// Enable the time driver.
372        ///
373        /// Doing this enables using `tokio::time` on the runtime.
374        ///
375        /// # Examples
376        ///
377        /// ```
378        /// use tokio::runtime;
379        ///
380        /// let rt = runtime::Builder::new()
381        ///     .enable_time()
382        ///     .build()
383        ///     .unwrap();
384        /// ```
385        pub fn enable_time(&mut self) -> &mut Self {
386            self.enable_time = true;
387            self
388        }
389    }
390}
391
392cfg_rt_core! {
393    impl Builder {
394        /// Use a simpler scheduler that runs all tasks on the current-thread.
395        ///
396        /// The executor and all necessary drivers will all be run on the current
397        /// thread during `block_on` calls.
398        pub fn basic_scheduler(&mut self) -> &mut Self {
399            self.kind = Kind::Basic;
400            self
401        }
402
403        fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
404            use crate::runtime::{BasicScheduler, Kind};
405
406            let clock = time::create_clock();
407
408            // Create I/O driver
409            let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
410
411            let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
412
413            // And now put a single-threaded scheduler on top of the timer. When
414            // there are no futures ready to do something, it'll let the timer or
415            // the reactor to generate some new stimuli for the futures to continue
416            // in their life.
417            let scheduler = BasicScheduler::new(driver);
418            let spawner = Spawner::Basic(scheduler.spawner());
419
420            // Blocking pool
421            let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
422            let blocking_spawner = blocking_pool.spawner().clone();
423
424            Ok(Runtime {
425                kind: Kind::Basic(scheduler),
426                handle: Handle {
427                    spawner,
428                    io_handle,
429                    time_handle,
430                    clock,
431                    blocking_spawner,
432                },
433                blocking_pool,
434            })
435        }
436    }
437}
438
439cfg_rt_threaded! {
440    impl Builder {
441        /// Use a multi-threaded scheduler for executing tasks.
442        pub fn threaded_scheduler(&mut self) -> &mut Self {
443            self.kind = Kind::ThreadPool;
444            self
445        }
446
447        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
448            use crate::runtime::{Kind, ThreadPool};
449            use crate::runtime::park::Parker;
450
451            assert!(self.core_threads <= self.max_threads, "Core threads number cannot be above max limit");
452
453            let clock = time::create_clock();
454
455            let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
456            let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
457            let (scheduler, workers) = ThreadPool::new(self.core_threads, Parker::new(driver));
458            let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
459
460            // Create the blocking pool
461            let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
462            let blocking_spawner = blocking_pool.spawner().clone();
463
464            // Create the runtime handle
465            let handle = Handle {
466                spawner,
467                io_handle,
468                time_handle,
469                clock,
470                blocking_spawner,
471            };
472
473            // Spawn the thread pool workers
474            workers.spawn(&handle);
475
476            Ok(Runtime {
477                kind: Kind::ThreadPool(scheduler),
478                handle,
479                blocking_pool,
480            })
481        }
482    }
483}
484
485impl Default for Builder {
486    fn default() -> Self {
487        Self::new()
488    }
489}
490
491impl fmt::Debug for Builder {
492    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
493        fmt.debug_struct("Builder")
494            .field("kind", &self.kind)
495            .field("core_threads", &self.core_threads)
496            .field("max_threads", &self.max_threads)
497            .field("thread_name", &self.thread_name)
498            .field("thread_stack_size", &self.thread_stack_size)
499            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
500            .field("before_stop", &self.after_start.as_ref().map(|_| "..."))
501            .finish()
502    }
503}