broker_tokio/runtime/builder.rs
1use crate::runtime::handle::Handle;
2use crate::runtime::shell::Shell;
3use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
4
5use std::fmt;
6#[cfg(not(loom))]
7use std::sync::Arc;
8
9/// Builds Tokio Runtime with custom configuration values.
10///
11/// Methods can be chained in order to set the configuration values. The
12/// Runtime is constructed by calling [`build`].
13///
14/// New instances of `Builder` are obtained via [`Builder::new`].
15///
16/// See function level documentation for details on the various configuration
17/// settings.
18///
19/// [`build`]: #method.build
20/// [`Builder::new`]: #method.new
21///
22/// # Examples
23///
24/// ```
25/// use tokio::runtime::Builder;
26///
27/// fn main() {
28/// // build runtime
29/// let runtime = Builder::new()
30/// .threaded_scheduler()
31/// .core_threads(4)
32/// .thread_name("my-custom-name")
33/// .thread_stack_size(3 * 1024 * 1024)
34/// .build()
35/// .unwrap();
36///
37/// // use runtime ...
38/// }
39/// ```
40pub struct Builder {
41 /// The task execution model to use.
42 kind: Kind,
43
44 /// Whether or not to enable the I/O driver
45 enable_io: bool,
46
47 /// Whether or not to enable the time driver
48 enable_time: bool,
49
50 /// The number of worker threads, used by Runtime.
51 ///
52 /// Only used when not using the current-thread executor.
53 core_threads: usize,
54
55 /// Cap on thread usage.
56 max_threads: usize,
57
58 /// Name used for threads spawned by the runtime.
59 pub(super) thread_name: String,
60
61 /// Stack size used for threads spawned by the runtime.
62 pub(super) thread_stack_size: Option<usize>,
63
64 /// Callback to run after each thread starts.
65 pub(super) after_start: Option<Callback>,
66
67 /// To run before each worker thread stops
68 pub(super) before_stop: Option<Callback>,
69}
70
71#[derive(Debug, Clone, Copy)]
72enum Kind {
73 Shell,
74 #[cfg(feature = "rt-core")]
75 Basic,
76 #[cfg(feature = "rt-threaded")]
77 ThreadPool,
78}
79
80impl Builder {
81 /// Returns a new runtime builder initialized with default configuration
82 /// values.
83 ///
84 /// Configuration methods can be chained on the return value.
85 pub fn new() -> Builder {
86 Builder {
87 // No task execution by default
88 kind: Kind::Shell,
89
90 // I/O defaults to "off"
91 enable_io: false,
92
93 // Time defaults to "off"
94 enable_time: false,
95
96 // Default to use an equal number of threads to number of CPU cores
97 core_threads: crate::loom::sys::num_cpus(),
98
99 max_threads: 512,
100
101 // Default thread name
102 thread_name: "tokio-runtime-worker".into(),
103
104 // Do not set a stack size by default
105 thread_stack_size: None,
106
107 // No worker thread callbacks
108 after_start: None,
109 before_stop: None,
110 }
111 }
112
113 /// Enable both I/O and time drivers.
114 ///
115 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
116 /// individually. If additional components are added to Tokio in the future,
117 /// `enable_all` will include these future components.
118 ///
119 /// # Examples
120 ///
121 /// ```
122 /// use tokio::runtime;
123 ///
124 /// let rt = runtime::Builder::new()
125 /// .enable_all()
126 /// .build()
127 /// .unwrap();
128 /// ```
129 pub fn enable_all(&mut self) -> &mut Self {
130 #[cfg(feature = "io-driver")]
131 self.enable_io();
132 #[cfg(feature = "time")]
133 self.enable_time();
134
135 self
136 }
137
138 #[deprecated(note = "In future will be replaced by core_threads method")]
139 /// Set the maximum number of worker threads for the `Runtime`'s thread pool.
140 ///
141 /// This must be a number between 1 and 32,768 though it is advised to keep
142 /// this value on the smaller side.
143 ///
144 /// The default value is the number of cores available to the system.
145 pub fn num_threads(&mut self, val: usize) -> &mut Self {
146 self.core_threads = val;
147 self
148 }
149
150 /// Set the core number of worker threads for the `Runtime`'s thread pool.
151 ///
152 /// This should be a number between 1 and 32,768 though it is advised to keep
153 /// this value on the smaller side.
154 ///
155 /// The default value is the number of cores available to the system.
156 ///
157 /// These threads will be always active and running.
158 ///
159 /// # Examples
160 ///
161 /// ```
162 /// use tokio::runtime;
163 ///
164 /// let rt = runtime::Builder::new()
165 /// .core_threads(4)
166 /// .build()
167 /// .unwrap();
168 /// ```
169 pub fn core_threads(&mut self, val: usize) -> &mut Self {
170 assert_ne!(val, 0, "Core threads cannot be zero");
171 self.core_threads = val;
172 self
173 }
174
175 /// Specifies limit for threads, spawned by the Runtime.
176 ///
177 /// This is number of threads to be used by Runtime, including `core_threads`
178 /// Having `max_threads` less than `core_threads` results in invalid configuration
179 /// when building multi-threaded `Runtime`, which would cause a panic.
180 ///
181 /// Similarly to the `core_threads`, this number should be between 1 and 32,768.
182 ///
183 /// The default value is 512.
184 ///
185 /// When multi-threaded runtime is not used, will act as limit on additional threads.
186 ///
187 /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for
188 /// blocking annotations) as `max_threads - core_threads`.
189 pub fn max_threads(&mut self, val: usize) -> &mut Self {
190 assert_ne!(val, 0, "Thread limit cannot be zero");
191 self.max_threads = val;
192 self
193 }
194
195 /// Set name of threads spawned by the `Runtime`'s thread pool.
196 ///
197 /// The default name is "tokio-runtime-worker".
198 ///
199 /// # Examples
200 ///
201 /// ```
202 /// # use tokio::runtime;
203 ///
204 /// # pub fn main() {
205 /// let rt = runtime::Builder::new()
206 /// .thread_name("my-pool")
207 /// .build();
208 /// # }
209 /// ```
210 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
211 self.thread_name = val.into();
212 self
213 }
214
215 /// Set the stack size (in bytes) for worker threads.
216 ///
217 /// The actual stack size may be greater than this value if the platform
218 /// specifies minimal stack size.
219 ///
220 /// The default stack size for spawned threads is 2 MiB, though this
221 /// particular stack size is subject to change in the future.
222 ///
223 /// # Examples
224 ///
225 /// ```
226 /// # use tokio::runtime;
227 ///
228 /// # pub fn main() {
229 /// let rt = runtime::Builder::new()
230 /// .thread_stack_size(32 * 1024)
231 /// .build();
232 /// # }
233 /// ```
234 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
235 self.thread_stack_size = Some(val);
236 self
237 }
238
239 /// Execute function `f` after each thread is started but before it starts
240 /// doing work.
241 ///
242 /// This is intended for bookkeeping and monitoring use cases.
243 ///
244 /// # Examples
245 ///
246 /// ```
247 /// # use tokio::runtime;
248 ///
249 /// # pub fn main() {
250 /// let runtime = runtime::Builder::new()
251 /// .on_thread_start(|| {
252 /// println!("thread started");
253 /// })
254 /// .build();
255 /// # }
256 /// ```
257 #[cfg(not(loom))]
258 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
259 where
260 F: Fn() + Send + Sync + 'static,
261 {
262 self.after_start = Some(Arc::new(f));
263 self
264 }
265
266 /// Execute function `f` before each thread stops.
267 ///
268 /// This is intended for bookkeeping and monitoring use cases.
269 ///
270 /// # Examples
271 ///
272 /// ```
273 /// # use tokio::runtime;
274 ///
275 /// # pub fn main() {
276 /// let runtime = runtime::Builder::new()
277 /// .on_thread_stop(|| {
278 /// println!("thread stopping");
279 /// })
280 /// .build();
281 /// # }
282 /// ```
283 #[cfg(not(loom))]
284 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
285 where
286 F: Fn() + Send + Sync + 'static,
287 {
288 self.before_stop = Some(Arc::new(f));
289 self
290 }
291
292 /// Create the configured `Runtime`.
293 ///
294 /// The returned `ThreadPool` instance is ready to spawn tasks.
295 ///
296 /// # Examples
297 ///
298 /// ```
299 /// use tokio::runtime::Builder;
300 ///
301 /// let mut rt = Builder::new().build().unwrap();
302 ///
303 /// rt.block_on(async {
304 /// println!("Hello from the Tokio runtime");
305 /// });
306 /// ```
307 pub fn build(&mut self) -> io::Result<Runtime> {
308 match self.kind {
309 Kind::Shell => self.build_shell_runtime(),
310 #[cfg(feature = "rt-core")]
311 Kind::Basic => self.build_basic_runtime(),
312 #[cfg(feature = "rt-threaded")]
313 Kind::ThreadPool => self.build_threaded_runtime(),
314 }
315 }
316
317 fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
318 use crate::runtime::Kind;
319
320 let clock = time::create_clock();
321
322 // Create I/O driver
323 let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
324 let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
325
326 let spawner = Spawner::Shell;
327
328 let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
329 let blocking_spawner = blocking_pool.spawner().clone();
330
331 Ok(Runtime {
332 kind: Kind::Shell(Shell::new(driver)),
333 handle: Handle {
334 spawner,
335 io_handle,
336 time_handle,
337 clock,
338 blocking_spawner,
339 },
340 blocking_pool,
341 })
342 }
343}
344
345cfg_io_driver! {
346 impl Builder {
347 /// Enable the I/O driver.
348 ///
349 /// Doing this enables using net, process, signal, and some I/O types on
350 /// the runtime.
351 ///
352 /// # Examples
353 ///
354 /// ```
355 /// use tokio::runtime;
356 ///
357 /// let rt = runtime::Builder::new()
358 /// .enable_io()
359 /// .build()
360 /// .unwrap();
361 /// ```
362 pub fn enable_io(&mut self) -> &mut Self {
363 self.enable_io = true;
364 self
365 }
366 }
367}
368
369cfg_time! {
370 impl Builder {
371 /// Enable the time driver.
372 ///
373 /// Doing this enables using `tokio::time` on the runtime.
374 ///
375 /// # Examples
376 ///
377 /// ```
378 /// use tokio::runtime;
379 ///
380 /// let rt = runtime::Builder::new()
381 /// .enable_time()
382 /// .build()
383 /// .unwrap();
384 /// ```
385 pub fn enable_time(&mut self) -> &mut Self {
386 self.enable_time = true;
387 self
388 }
389 }
390}
391
392cfg_rt_core! {
393 impl Builder {
394 /// Use a simpler scheduler that runs all tasks on the current-thread.
395 ///
396 /// The executor and all necessary drivers will all be run on the current
397 /// thread during `block_on` calls.
398 pub fn basic_scheduler(&mut self) -> &mut Self {
399 self.kind = Kind::Basic;
400 self
401 }
402
403 fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
404 use crate::runtime::{BasicScheduler, Kind};
405
406 let clock = time::create_clock();
407
408 // Create I/O driver
409 let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
410
411 let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
412
413 // And now put a single-threaded scheduler on top of the timer. When
414 // there are no futures ready to do something, it'll let the timer or
415 // the reactor to generate some new stimuli for the futures to continue
416 // in their life.
417 let scheduler = BasicScheduler::new(driver);
418 let spawner = Spawner::Basic(scheduler.spawner());
419
420 // Blocking pool
421 let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
422 let blocking_spawner = blocking_pool.spawner().clone();
423
424 Ok(Runtime {
425 kind: Kind::Basic(scheduler),
426 handle: Handle {
427 spawner,
428 io_handle,
429 time_handle,
430 clock,
431 blocking_spawner,
432 },
433 blocking_pool,
434 })
435 }
436 }
437}
438
439cfg_rt_threaded! {
440 impl Builder {
441 /// Use a multi-threaded scheduler for executing tasks.
442 pub fn threaded_scheduler(&mut self) -> &mut Self {
443 self.kind = Kind::ThreadPool;
444 self
445 }
446
447 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
448 use crate::runtime::{Kind, ThreadPool};
449 use crate::runtime::park::Parker;
450
451 assert!(self.core_threads <= self.max_threads, "Core threads number cannot be above max limit");
452
453 let clock = time::create_clock();
454
455 let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
456 let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
457 let (scheduler, workers) = ThreadPool::new(self.core_threads, Parker::new(driver));
458 let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
459
460 // Create the blocking pool
461 let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
462 let blocking_spawner = blocking_pool.spawner().clone();
463
464 // Create the runtime handle
465 let handle = Handle {
466 spawner,
467 io_handle,
468 time_handle,
469 clock,
470 blocking_spawner,
471 };
472
473 // Spawn the thread pool workers
474 workers.spawn(&handle);
475
476 Ok(Runtime {
477 kind: Kind::ThreadPool(scheduler),
478 handle,
479 blocking_pool,
480 })
481 }
482 }
483}
484
485impl Default for Builder {
486 fn default() -> Self {
487 Self::new()
488 }
489}
490
491impl fmt::Debug for Builder {
492 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
493 fmt.debug_struct("Builder")
494 .field("kind", &self.kind)
495 .field("core_threads", &self.core_threads)
496 .field("max_threads", &self.max_threads)
497 .field("thread_name", &self.thread_name)
498 .field("thread_stack_size", &self.thread_stack_size)
499 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
500 .field("before_stop", &self.after_start.as_ref().map(|_| "..."))
501 .finish()
502 }
503}