madsim_real_tokio/runtime/builder.rs
1use crate::runtime::handle::Handle;
2use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime};
3use crate::util::rand::{RngSeed, RngSeedGenerator};
4
5use std::fmt;
6use std::io;
7use std::time::Duration;
8
9/// Builds Tokio Runtime with custom configuration values.
10///
11/// Methods can be chained in order to set the configuration values. The
12/// Runtime is constructed by calling [`build`].
13///
14/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
15/// or [`Builder::new_current_thread`].
16///
17/// See function level documentation for details on the various configuration
18/// settings.
19///
20/// [`build`]: method@Self::build
21/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
22/// [`Builder::new_current_thread`]: method@Self::new_current_thread
23///
24/// # Examples
25///
26/// ```
27/// use tokio::runtime::Builder;
28///
29/// fn main() {
30/// // build runtime
31/// let runtime = Builder::new_multi_thread()
32/// .worker_threads(4)
33/// .thread_name("my-custom-name")
34/// .thread_stack_size(3 * 1024 * 1024)
35/// .build()
36/// .unwrap();
37///
38/// // use runtime ...
39/// }
40/// ```
41pub struct Builder {
42 /// Runtime type
43 kind: Kind,
44
45 /// Whether or not to enable the I/O driver
46 enable_io: bool,
47 nevents: usize,
48
49 /// Whether or not to enable the time driver
50 enable_time: bool,
51
52 /// Whether or not the clock should start paused.
53 start_paused: bool,
54
55 /// The number of worker threads, used by Runtime.
56 ///
57 /// Only used when not using the current-thread executor.
58 worker_threads: Option<usize>,
59
60 /// Cap on thread usage.
61 max_blocking_threads: usize,
62
63 /// Name fn used for threads spawned by the runtime.
64 pub(super) thread_name: ThreadNameFn,
65
66 /// Stack size used for threads spawned by the runtime.
67 pub(super) thread_stack_size: Option<usize>,
68
69 /// Callback to run after each thread starts.
70 pub(super) after_start: Option<Callback>,
71
72 /// To run before each worker thread stops
73 pub(super) before_stop: Option<Callback>,
74
75 /// To run before each worker thread is parked.
76 pub(super) before_park: Option<Callback>,
77
78 /// To run after each thread is unparked.
79 pub(super) after_unpark: Option<Callback>,
80
81 /// Customizable keep alive timeout for `BlockingPool`
82 pub(super) keep_alive: Option<Duration>,
83
84 /// How many ticks before pulling a task from the global/remote queue?
85 ///
86 /// When `None`, the value is unspecified and behavior details are left to
87 /// the scheduler. Each scheduler flavor could choose to either pick its own
88 /// default value or use some other strategy to decide when to poll from the
89 /// global queue. For example, the multi-threaded scheduler uses a
90 /// self-tuning strategy based on mean task poll times.
91 pub(super) global_queue_interval: Option<u32>,
92
93 /// How many ticks before yielding to the driver for timer and I/O events?
94 pub(super) event_interval: u32,
95
96 pub(super) local_queue_capacity: usize,
97
98 /// When true, the multi-threade scheduler LIFO slot should not be used.
99 ///
100 /// This option should only be exposed as unstable.
101 pub(super) disable_lifo_slot: bool,
102
103 /// Specify a random number generator seed to provide deterministic results
104 pub(super) seed_generator: RngSeedGenerator,
105
106 /// When true, enables task poll count histogram instrumentation.
107 pub(super) metrics_poll_count_histogram_enable: bool,
108
109 /// Configures the task poll count histogram
110 pub(super) metrics_poll_count_histogram: HistogramBuilder,
111
112 #[cfg(tokio_unstable)]
113 pub(super) unhandled_panic: UnhandledPanic,
114}
115
116cfg_unstable! {
117 /// How the runtime should respond to unhandled panics.
118 ///
119 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
120 /// to configure the runtime behavior when a spawned task panics.
121 ///
122 /// See [`Builder::unhandled_panic`] for more details.
123 #[derive(Debug, Clone)]
124 #[non_exhaustive]
125 pub enum UnhandledPanic {
126 /// The runtime should ignore panics on spawned tasks.
127 ///
128 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
129 /// tasks continue running normally.
130 ///
131 /// This is the default behavior.
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// use tokio::runtime::{self, UnhandledPanic};
137 ///
138 /// # pub fn main() {
139 /// let rt = runtime::Builder::new_current_thread()
140 /// .unhandled_panic(UnhandledPanic::Ignore)
141 /// .build()
142 /// .unwrap();
143 ///
144 /// let task1 = rt.spawn(async { panic!("boom"); });
145 /// let task2 = rt.spawn(async {
146 /// // This task completes normally
147 /// "done"
148 /// });
149 ///
150 /// rt.block_on(async {
151 /// // The panic on the first task is forwarded to the `JoinHandle`
152 /// assert!(task1.await.is_err());
153 ///
154 /// // The second task completes normally
155 /// assert!(task2.await.is_ok());
156 /// })
157 /// # }
158 /// ```
159 ///
160 /// [`JoinHandle`]: struct@crate::task::JoinHandle
161 Ignore,
162
163 /// The runtime should immediately shutdown if a spawned task panics.
164 ///
165 /// The runtime will immediately shutdown even if the panicked task's
166 /// [`JoinHandle`] is still available. All further spawned tasks will be
167 /// immediately dropped and call to [`Runtime::block_on`] will panic.
168 ///
169 /// # Examples
170 ///
171 /// ```should_panic
172 /// use tokio::runtime::{self, UnhandledPanic};
173 ///
174 /// # pub fn main() {
175 /// let rt = runtime::Builder::new_current_thread()
176 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
177 /// .build()
178 /// .unwrap();
179 ///
180 /// rt.spawn(async { panic!("boom"); });
181 /// rt.spawn(async {
182 /// // This task never completes.
183 /// });
184 ///
185 /// rt.block_on(async {
186 /// // Do some work
187 /// # loop { tokio::task::yield_now().await; }
188 /// })
189 /// # }
190 /// ```
191 ///
192 /// [`JoinHandle`]: struct@crate::task::JoinHandle
193 ShutdownRuntime,
194 }
195}
196
197pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
198
199#[derive(Clone, Copy)]
200pub(crate) enum Kind {
201 CurrentThread,
202 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
203 MultiThread,
204 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
205 MultiThreadAlt,
206}
207
208impl Builder {
209 /// Returns a new builder with the current thread scheduler selected.
210 ///
211 /// Configuration methods can be chained on the return value.
212 ///
213 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
214 /// [`LocalSet`].
215 ///
216 /// [`LocalSet`]: crate::task::LocalSet
217 pub fn new_current_thread() -> Builder {
218 #[cfg(loom)]
219 const EVENT_INTERVAL: u32 = 4;
220 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
221 #[cfg(not(loom))]
222 const EVENT_INTERVAL: u32 = 61;
223
224 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
225 }
226
227 cfg_not_wasi! {
228 /// Returns a new builder with the multi thread scheduler selected.
229 ///
230 /// Configuration methods can be chained on the return value.
231 #[cfg(feature = "rt-multi-thread")]
232 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
233 pub fn new_multi_thread() -> Builder {
234 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
235 Builder::new(Kind::MultiThread, 61)
236 }
237
238 cfg_unstable! {
239 /// Returns a new builder with the alternate multi thread scheduler
240 /// selected.
241 ///
242 /// The alternate multi threaded scheduler is an in-progress
243 /// candidate to replace the existing multi threaded scheduler. It
244 /// currently does not scale as well to 16+ processors.
245 ///
246 /// This runtime flavor is currently **not considered production
247 /// ready**.
248 ///
249 /// Configuration methods can be chained on the return value.
250 #[cfg(feature = "rt-multi-thread")]
251 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
252 pub fn new_multi_thread_alt() -> Builder {
253 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
254 Builder::new(Kind::MultiThreadAlt, 61)
255 }
256 }
257 }
258
259 /// Returns a new runtime builder initialized with default configuration
260 /// values.
261 ///
262 /// Configuration methods can be chained on the return value.
263 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
264 Builder {
265 kind,
266
267 // I/O defaults to "off"
268 enable_io: false,
269 nevents: 1024,
270
271 // Time defaults to "off"
272 enable_time: false,
273
274 // The clock starts not-paused
275 start_paused: false,
276
277 // Read from environment variable first in multi-threaded mode.
278 // Default to lazy auto-detection (one thread per CPU core)
279 worker_threads: None,
280
281 max_blocking_threads: 512,
282
283 // Default thread name
284 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
285
286 // Do not set a stack size by default
287 thread_stack_size: None,
288
289 // No worker thread callbacks
290 after_start: None,
291 before_stop: None,
292 before_park: None,
293 after_unpark: None,
294
295 keep_alive: None,
296
297 // Defaults for these values depend on the scheduler kind, so we get them
298 // as parameters.
299 global_queue_interval: None,
300 event_interval,
301
302 #[cfg(not(loom))]
303 local_queue_capacity: 256,
304
305 #[cfg(loom)]
306 local_queue_capacity: 4,
307
308 seed_generator: RngSeedGenerator::new(RngSeed::new()),
309
310 #[cfg(tokio_unstable)]
311 unhandled_panic: UnhandledPanic::Ignore,
312
313 metrics_poll_count_histogram_enable: false,
314
315 metrics_poll_count_histogram: HistogramBuilder::default(),
316
317 disable_lifo_slot: false,
318 }
319 }
320
321 /// Enables both I/O and time drivers.
322 ///
323 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
324 /// individually. If additional components are added to Tokio in the future,
325 /// `enable_all` will include these future components.
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use tokio::runtime;
331 ///
332 /// let rt = runtime::Builder::new_multi_thread()
333 /// .enable_all()
334 /// .build()
335 /// .unwrap();
336 /// ```
337 pub fn enable_all(&mut self) -> &mut Self {
338 #[cfg(any(
339 feature = "net",
340 all(unix, feature = "process"),
341 all(unix, feature = "signal")
342 ))]
343 self.enable_io();
344 #[cfg(feature = "time")]
345 self.enable_time();
346
347 self
348 }
349
350 /// Sets the number of worker threads the `Runtime` will use.
351 ///
352 /// This can be any number above 0 though it is advised to keep this value
353 /// on the smaller side.
354 ///
355 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
356 ///
357 /// # Default
358 ///
359 /// The default value is the number of cores available to the system.
360 ///
361 /// When using the `current_thread` runtime this method has no effect.
362 ///
363 /// # Examples
364 ///
365 /// ## Multi threaded runtime with 4 threads
366 ///
367 /// ```
368 /// use tokio::runtime;
369 ///
370 /// // This will spawn a work-stealing runtime with 4 worker threads.
371 /// let rt = runtime::Builder::new_multi_thread()
372 /// .worker_threads(4)
373 /// .build()
374 /// .unwrap();
375 ///
376 /// rt.spawn(async move {});
377 /// ```
378 ///
379 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
380 ///
381 /// ```
382 /// use tokio::runtime;
383 ///
384 /// // Create a runtime that _must_ be driven from a call
385 /// // to `Runtime::block_on`.
386 /// let rt = runtime::Builder::new_current_thread()
387 /// .build()
388 /// .unwrap();
389 ///
390 /// // This will run the runtime and future on the current thread
391 /// rt.block_on(async move {});
392 /// ```
393 ///
394 /// # Panics
395 ///
396 /// This will panic if `val` is not larger than `0`.
397 #[track_caller]
398 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
399 assert!(val > 0, "Worker threads cannot be set to 0");
400 self.worker_threads = Some(val);
401 self
402 }
403
404 /// Specifies the limit for additional threads spawned by the Runtime.
405 ///
406 /// These threads are used for blocking operations like tasks spawned
407 /// through [`spawn_blocking`], this includes but is not limited to:
408 /// - [`fs`] operations
409 /// - dns resolution through [`ToSocketAddrs`]
410 /// - writing to [`Stdout`] or [`Stderr`]
411 /// - reading from [`Stdin`]
412 ///
413 /// Unlike the [`worker_threads`], they are not always active and will exit
414 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
415 ///
416 /// It's recommended to not set this limit too low in order to avoid hanging on operations
417 /// requiring [`spawn_blocking`].
418 ///
419 /// The default value is 512.
420 ///
421 /// # Panics
422 ///
423 /// This will panic if `val` is not larger than `0`.
424 ///
425 /// # Upgrading from 0.x
426 ///
427 /// In old versions `max_threads` limited both blocking and worker threads, but the
428 /// current `max_blocking_threads` does not include async worker threads in the count.
429 ///
430 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
431 /// [`fs`]: mod@crate::fs
432 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
433 /// [`Stdout`]: struct@crate::io::Stdout
434 /// [`Stdin`]: struct@crate::io::Stdin
435 /// [`Stderr`]: struct@crate::io::Stderr
436 /// [`worker_threads`]: Self::worker_threads
437 /// [`thread_keep_alive`]: Self::thread_keep_alive
438 #[track_caller]
439 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
440 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
441 assert!(val > 0, "Max blocking threads cannot be set to 0");
442 self.max_blocking_threads = val;
443 self
444 }
445
446 /// Sets name of threads spawned by the `Runtime`'s thread pool.
447 ///
448 /// The default name is "tokio-runtime-worker".
449 ///
450 /// # Examples
451 ///
452 /// ```
453 /// # use tokio::runtime;
454 ///
455 /// # pub fn main() {
456 /// let rt = runtime::Builder::new_multi_thread()
457 /// .thread_name("my-pool")
458 /// .build();
459 /// # }
460 /// ```
461 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
462 let val = val.into();
463 self.thread_name = std::sync::Arc::new(move || val.clone());
464 self
465 }
466
467 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
468 ///
469 /// The default name fn is `|| "tokio-runtime-worker".into()`.
470 ///
471 /// # Examples
472 ///
473 /// ```
474 /// # use tokio::runtime;
475 /// # use std::sync::atomic::{AtomicUsize, Ordering};
476 /// # pub fn main() {
477 /// let rt = runtime::Builder::new_multi_thread()
478 /// .thread_name_fn(|| {
479 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
480 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
481 /// format!("my-pool-{}", id)
482 /// })
483 /// .build();
484 /// # }
485 /// ```
486 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
487 where
488 F: Fn() -> String + Send + Sync + 'static,
489 {
490 self.thread_name = std::sync::Arc::new(f);
491 self
492 }
493
494 /// Sets the stack size (in bytes) for worker threads.
495 ///
496 /// The actual stack size may be greater than this value if the platform
497 /// specifies minimal stack size.
498 ///
499 /// The default stack size for spawned threads is 2 MiB, though this
500 /// particular stack size is subject to change in the future.
501 ///
502 /// # Examples
503 ///
504 /// ```
505 /// # use tokio::runtime;
506 ///
507 /// # pub fn main() {
508 /// let rt = runtime::Builder::new_multi_thread()
509 /// .thread_stack_size(32 * 1024)
510 /// .build();
511 /// # }
512 /// ```
513 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
514 self.thread_stack_size = Some(val);
515 self
516 }
517
518 /// Executes function `f` after each thread is started but before it starts
519 /// doing work.
520 ///
521 /// This is intended for bookkeeping and monitoring use cases.
522 ///
523 /// # Examples
524 ///
525 /// ```
526 /// # use tokio::runtime;
527 /// # pub fn main() {
528 /// let runtime = runtime::Builder::new_multi_thread()
529 /// .on_thread_start(|| {
530 /// println!("thread started");
531 /// })
532 /// .build();
533 /// # }
534 /// ```
535 #[cfg(not(loom))]
536 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
537 where
538 F: Fn() + Send + Sync + 'static,
539 {
540 self.after_start = Some(std::sync::Arc::new(f));
541 self
542 }
543
544 /// Executes function `f` before each thread stops.
545 ///
546 /// This is intended for bookkeeping and monitoring use cases.
547 ///
548 /// # Examples
549 ///
550 /// ```
551 /// # use tokio::runtime;
552 /// # pub fn main() {
553 /// let runtime = runtime::Builder::new_multi_thread()
554 /// .on_thread_stop(|| {
555 /// println!("thread stopping");
556 /// })
557 /// .build();
558 /// # }
559 /// ```
560 #[cfg(not(loom))]
561 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
562 where
563 F: Fn() + Send + Sync + 'static,
564 {
565 self.before_stop = Some(std::sync::Arc::new(f));
566 self
567 }
568
569 /// Executes function `f` just before a thread is parked (goes idle).
570 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
571 /// can be called, and may result in this thread being unparked immediately.
572 ///
573 /// This can be used to start work only when the executor is idle, or for bookkeeping
574 /// and monitoring purposes.
575 ///
576 /// Note: There can only be one park callback for a runtime; calling this function
577 /// more than once replaces the last callback defined, rather than adding to it.
578 ///
579 /// # Examples
580 ///
581 /// ## Multithreaded executor
582 /// ```
583 /// # use std::sync::Arc;
584 /// # use std::sync::atomic::{AtomicBool, Ordering};
585 /// # use tokio::runtime;
586 /// # use tokio::sync::Barrier;
587 /// # pub fn main() {
588 /// let once = AtomicBool::new(true);
589 /// let barrier = Arc::new(Barrier::new(2));
590 ///
591 /// let runtime = runtime::Builder::new_multi_thread()
592 /// .worker_threads(1)
593 /// .on_thread_park({
594 /// let barrier = barrier.clone();
595 /// move || {
596 /// let barrier = barrier.clone();
597 /// if once.swap(false, Ordering::Relaxed) {
598 /// tokio::spawn(async move { barrier.wait().await; });
599 /// }
600 /// }
601 /// })
602 /// .build()
603 /// .unwrap();
604 ///
605 /// runtime.block_on(async {
606 /// barrier.wait().await;
607 /// })
608 /// # }
609 /// ```
610 /// ## Current thread executor
611 /// ```
612 /// # use std::sync::Arc;
613 /// # use std::sync::atomic::{AtomicBool, Ordering};
614 /// # use tokio::runtime;
615 /// # use tokio::sync::Barrier;
616 /// # pub fn main() {
617 /// let once = AtomicBool::new(true);
618 /// let barrier = Arc::new(Barrier::new(2));
619 ///
620 /// let runtime = runtime::Builder::new_current_thread()
621 /// .on_thread_park({
622 /// let barrier = barrier.clone();
623 /// move || {
624 /// let barrier = barrier.clone();
625 /// if once.swap(false, Ordering::Relaxed) {
626 /// tokio::spawn(async move { barrier.wait().await; });
627 /// }
628 /// }
629 /// })
630 /// .build()
631 /// .unwrap();
632 ///
633 /// runtime.block_on(async {
634 /// barrier.wait().await;
635 /// })
636 /// # }
637 /// ```
638 #[cfg(not(loom))]
639 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
640 where
641 F: Fn() + Send + Sync + 'static,
642 {
643 self.before_park = Some(std::sync::Arc::new(f));
644 self
645 }
646
647 /// Executes function `f` just after a thread unparks (starts executing tasks).
648 ///
649 /// This is intended for bookkeeping and monitoring use cases; note that work
650 /// in this callback will increase latencies when the application has allowed one or
651 /// more runtime threads to go idle.
652 ///
653 /// Note: There can only be one unpark callback for a runtime; calling this function
654 /// more than once replaces the last callback defined, rather than adding to it.
655 ///
656 /// # Examples
657 ///
658 /// ```
659 /// # use tokio::runtime;
660 /// # pub fn main() {
661 /// let runtime = runtime::Builder::new_multi_thread()
662 /// .on_thread_unpark(|| {
663 /// println!("thread unparking");
664 /// })
665 /// .build();
666 ///
667 /// runtime.unwrap().block_on(async {
668 /// tokio::task::yield_now().await;
669 /// println!("Hello from Tokio!");
670 /// })
671 /// # }
672 /// ```
673 #[cfg(not(loom))]
674 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
675 where
676 F: Fn() + Send + Sync + 'static,
677 {
678 self.after_unpark = Some(std::sync::Arc::new(f));
679 self
680 }
681
682 /// Creates the configured `Runtime`.
683 ///
684 /// The returned `Runtime` instance is ready to spawn tasks.
685 ///
686 /// # Examples
687 ///
688 /// ```
689 /// use tokio::runtime::Builder;
690 ///
691 /// let rt = Builder::new_multi_thread().build().unwrap();
692 ///
693 /// rt.block_on(async {
694 /// println!("Hello from the Tokio runtime");
695 /// });
696 /// ```
697 pub fn build(&mut self) -> io::Result<Runtime> {
698 match &self.kind {
699 Kind::CurrentThread => self.build_current_thread_runtime(),
700 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
701 Kind::MultiThread => self.build_threaded_runtime(),
702 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
703 Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
704 }
705 }
706
707 fn get_cfg(&self) -> driver::Cfg {
708 driver::Cfg {
709 enable_pause_time: match self.kind {
710 Kind::CurrentThread => true,
711 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
712 Kind::MultiThread => false,
713 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
714 Kind::MultiThreadAlt => false,
715 },
716 enable_io: self.enable_io,
717 enable_time: self.enable_time,
718 start_paused: self.start_paused,
719 nevents: self.nevents,
720 }
721 }
722
723 /// Sets a custom timeout for a thread in the blocking pool.
724 ///
725 /// By default, the timeout for a thread is set to 10 seconds. This can
726 /// be overridden using `.thread_keep_alive()`.
727 ///
728 /// # Example
729 ///
730 /// ```
731 /// # use tokio::runtime;
732 /// # use std::time::Duration;
733 /// # pub fn main() {
734 /// let rt = runtime::Builder::new_multi_thread()
735 /// .thread_keep_alive(Duration::from_millis(100))
736 /// .build();
737 /// # }
738 /// ```
739 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
740 self.keep_alive = Some(duration);
741 self
742 }
743
744 /// Sets the number of scheduler ticks after which the scheduler will poll the global
745 /// task queue.
746 ///
747 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
748 ///
749 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
750 /// [the module documentation] for the default behavior of the multi-thread scheduler.
751 ///
752 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
753 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
754 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
755 /// getting started on new work, especially if tasks frequently yield rather than complete
756 /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
757 /// is a good choice when most tasks quickly complete polling.
758 ///
759 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
760 ///
761 /// # Examples
762 ///
763 /// ```
764 /// # use tokio::runtime;
765 /// # pub fn main() {
766 /// let rt = runtime::Builder::new_multi_thread()
767 /// .global_queue_interval(31)
768 /// .build();
769 /// # }
770 /// ```
771 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
772 self.global_queue_interval = Some(val);
773 self
774 }
775
776 /// Sets the number of scheduler ticks after which the scheduler will poll for
777 /// external events (timers, I/O, and so on).
778 ///
779 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
780 ///
781 /// By default, the event interval is `61` for all scheduler types.
782 ///
783 /// Setting the event interval determines the effective "priority" of delivering
784 /// these external events (which may wake up additional tasks), compared to
785 /// executing tasks that are currently ready to run. A smaller value is useful
786 /// when tasks frequently spend a long time in polling, or frequently yield,
787 /// which can result in overly long delays picking up I/O events. Conversely,
788 /// picking up new events requires extra synchronization and syscall overhead,
789 /// so if tasks generally complete their polling quickly, a higher event interval
790 /// will minimize that overhead while still keeping the scheduler responsive to
791 /// events.
792 ///
793 /// # Examples
794 ///
795 /// ```
796 /// # use tokio::runtime;
797 /// # pub fn main() {
798 /// let rt = runtime::Builder::new_multi_thread()
799 /// .event_interval(31)
800 /// .build();
801 /// # }
802 /// ```
803 pub fn event_interval(&mut self, val: u32) -> &mut Self {
804 self.event_interval = val;
805 self
806 }
807
808 cfg_unstable! {
809 /// Configure how the runtime responds to an unhandled panic on a
810 /// spawned task.
811 ///
812 /// By default, an unhandled panic (i.e. a panic not caught by
813 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
814 /// execution. The panic is error value is forwarded to the task's
815 /// [`JoinHandle`] and all other spawned tasks continue running.
816 ///
817 /// The `unhandled_panic` option enables configuring this behavior.
818 ///
819 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
820 /// spawned tasks have no impact on the runtime's execution.
821 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
822 /// shutdown immediately when a spawned task panics even if that
823 /// task's `JoinHandle` has not been dropped. All other spawned tasks
824 /// will immediately terminate and further calls to
825 /// [`Runtime::block_on`] will panic.
826 ///
827 /// # Panics
828 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
829 /// on a runtime other than the current thread runtime.
830 ///
831 /// # Unstable
832 ///
833 /// This option is currently unstable and its implementation is
834 /// incomplete. The API may change or be removed in the future. See
835 /// tokio-rs/tokio#4516 for more details.
836 ///
837 /// # Examples
838 ///
839 /// The following demonstrates a runtime configured to shutdown on
840 /// panic. The first spawned task panics and results in the runtime
841 /// shutting down. The second spawned task never has a chance to
842 /// execute. The call to `block_on` will panic due to the runtime being
843 /// forcibly shutdown.
844 ///
845 /// ```should_panic
846 /// use tokio::runtime::{self, UnhandledPanic};
847 ///
848 /// # pub fn main() {
849 /// let rt = runtime::Builder::new_current_thread()
850 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
851 /// .build()
852 /// .unwrap();
853 ///
854 /// rt.spawn(async { panic!("boom"); });
855 /// rt.spawn(async {
856 /// // This task never completes.
857 /// });
858 ///
859 /// rt.block_on(async {
860 /// // Do some work
861 /// # loop { tokio::task::yield_now().await; }
862 /// })
863 /// # }
864 /// ```
865 ///
866 /// [`JoinHandle`]: struct@crate::task::JoinHandle
867 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
868 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
869 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
870 }
871
872 self.unhandled_panic = behavior;
873 self
874 }
875
876 /// Disables the LIFO task scheduler heuristic.
877 ///
878 /// The multi-threaded scheduler includes a heuristic for optimizing
879 /// message-passing patterns. This heuristic results in the **last**
880 /// scheduled task being polled first.
881 ///
882 /// To implement this heuristic, each worker thread has a slot which
883 /// holds the task that should be polled next. However, this slot cannot
884 /// be stolen by other worker threads, which can result in lower total
885 /// throughput when tasks tend to have longer poll times.
886 ///
887 /// This configuration option will disable this heuristic resulting in
888 /// all scheduled tasks being pushed into the worker-local queue, which
889 /// is stealable.
890 ///
891 /// Consider trying this option when the task "scheduled" time is high
892 /// but the runtime is underutilized. Use tokio-rs/tokio-metrics to
893 /// collect this data.
894 ///
895 /// # Unstable
896 ///
897 /// This configuration option is considered a workaround for the LIFO
898 /// slot not being stealable. When the slot becomes stealable, we will
899 /// revisit whether or not this option is necessary. See
900 /// tokio-rs/tokio#4941.
901 ///
902 /// # Examples
903 ///
904 /// ```
905 /// use tokio::runtime;
906 ///
907 /// let rt = runtime::Builder::new_multi_thread()
908 /// .disable_lifo_slot()
909 /// .build()
910 /// .unwrap();
911 /// ```
912 pub fn disable_lifo_slot(&mut self) -> &mut Self {
913 self.disable_lifo_slot = true;
914 self
915 }
916
917 /// Specifies the random number generation seed to use within all
918 /// threads associated with the runtime being built.
919 ///
920 /// This option is intended to make certain parts of the runtime
921 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
922 /// [`tokio::select!`] it will ensure that the order that branches are
923 /// polled is deterministic.
924 ///
925 /// In addition to the code specifying `rng_seed` and interacting with
926 /// the runtime, the internals of Tokio and the Rust compiler may affect
927 /// the sequences of random numbers. In order to ensure repeatable
928 /// results, the version of Tokio, the versions of all other
929 /// dependencies that interact with Tokio, and the Rust compiler version
930 /// should also all remain constant.
931 ///
932 /// # Examples
933 ///
934 /// ```
935 /// # use tokio::runtime::{self, RngSeed};
936 /// # pub fn main() {
937 /// let seed = RngSeed::from_bytes(b"place your seed here");
938 /// let rt = runtime::Builder::new_current_thread()
939 /// .rng_seed(seed)
940 /// .build();
941 /// # }
942 /// ```
943 ///
944 /// [`tokio::select!`]: crate::select
945 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
946 self.seed_generator = RngSeedGenerator::new(seed);
947 self
948 }
949 }
950
951 cfg_metrics! {
952 /// Enables tracking the distribution of task poll times.
953 ///
954 /// Task poll times are not instrumented by default as doing so requires
955 /// calling [`Instant::now()`] twice per task poll, which could add
956 /// measurable overhead. Use the [`Handle::metrics()`] to access the
957 /// metrics data.
958 ///
959 /// The histogram uses fixed bucket sizes. In other words, the histogram
960 /// buckets are not dynamic based on input values. Use the
961 /// `metrics_poll_count_histogram_` builder methods to configure the
962 /// histogram details.
963 ///
964 /// # Examples
965 ///
966 /// ```
967 /// use tokio::runtime;
968 ///
969 /// let rt = runtime::Builder::new_multi_thread()
970 /// .enable_metrics_poll_count_histogram()
971 /// .build()
972 /// .unwrap();
973 /// # // Test default values here
974 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
975 /// # let m = rt.handle().metrics();
976 /// # assert_eq!(m.poll_count_histogram_num_buckets(), 10);
977 /// # assert_eq!(m.poll_count_histogram_bucket_range(0), us(0)..us(100));
978 /// # assert_eq!(m.poll_count_histogram_bucket_range(1), us(100)..us(200));
979 /// ```
980 ///
981 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
982 /// [`Instant::now()`]: std::time::Instant::now
983 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
984 self.metrics_poll_count_histogram_enable = true;
985 self
986 }
987
988 /// Sets the histogram scale for tracking the distribution of task poll
989 /// times.
990 ///
991 /// Tracking the distribution of task poll times can be done using a
992 /// linear or log scale. When using linear scale, each histogram bucket
993 /// will represent the same range of poll times. When using log scale,
994 /// each histogram bucket will cover a range twice as big as the
995 /// previous bucket.
996 ///
997 /// **Default:** linear scale.
998 ///
999 /// # Examples
1000 ///
1001 /// ```
1002 /// use tokio::runtime::{self, HistogramScale};
1003 ///
1004 /// let rt = runtime::Builder::new_multi_thread()
1005 /// .enable_metrics_poll_count_histogram()
1006 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1007 /// .build()
1008 /// .unwrap();
1009 /// ```
1010 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1011 self.metrics_poll_count_histogram.scale = histogram_scale;
1012 self
1013 }
1014
1015 /// Sets the histogram resolution for tracking the distribution of task
1016 /// poll times.
1017 ///
1018 /// The resolution is the histogram's first bucket's range. When using a
1019 /// linear histogram scale, each bucket will cover the same range. When
1020 /// using a log scale, each bucket will cover a range twice as big as
1021 /// the previous bucket. In the log case, the resolution represents the
1022 /// smallest bucket range.
1023 ///
1024 /// Note that, when using log scale, the resolution is rounded up to the
1025 /// nearest power of 2 in nanoseconds.
1026 ///
1027 /// **Default:** 100 microseconds.
1028 ///
1029 /// # Examples
1030 ///
1031 /// ```
1032 /// use tokio::runtime;
1033 /// use std::time::Duration;
1034 ///
1035 /// let rt = runtime::Builder::new_multi_thread()
1036 /// .enable_metrics_poll_count_histogram()
1037 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1038 /// .build()
1039 /// .unwrap();
1040 /// ```
1041 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1042 assert!(resolution > Duration::from_secs(0));
1043 // Sanity check the argument and also make the cast below safe.
1044 assert!(resolution <= Duration::from_secs(1));
1045
1046 let resolution = resolution.as_nanos() as u64;
1047 self.metrics_poll_count_histogram.resolution = resolution;
1048 self
1049 }
1050
1051 /// Sets the number of buckets for the histogram tracking the
1052 /// distribution of task poll times.
1053 ///
1054 /// The last bucket tracks all greater values that fall out of other
1055 /// ranges. So, configuring the histogram using a linear scale,
1056 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1057 /// polls that take more than 450ms to complete.
1058 ///
1059 /// **Default:** 10
1060 ///
1061 /// # Examples
1062 ///
1063 /// ```
1064 /// use tokio::runtime;
1065 ///
1066 /// let rt = runtime::Builder::new_multi_thread()
1067 /// .enable_metrics_poll_count_histogram()
1068 /// .metrics_poll_count_histogram_buckets(15)
1069 /// .build()
1070 /// .unwrap();
1071 /// ```
1072 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1073 self.metrics_poll_count_histogram.num_buckets = buckets;
1074 self
1075 }
1076 }
1077
1078 cfg_loom! {
1079 pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
1080 assert!(value.is_power_of_two());
1081 self.local_queue_capacity = value;
1082 self
1083 }
1084 }
1085
1086 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1087 use crate::runtime::scheduler::{self, CurrentThread};
1088 use crate::runtime::{runtime::Scheduler, Config};
1089
1090 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1091
1092 // Blocking pool
1093 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1094 let blocking_spawner = blocking_pool.spawner().clone();
1095
1096 // Generate a rng seed for this runtime.
1097 let seed_generator_1 = self.seed_generator.next_generator();
1098 let seed_generator_2 = self.seed_generator.next_generator();
1099
1100 // And now put a single-threaded scheduler on top of the timer. When
1101 // there are no futures ready to do something, it'll let the timer or
1102 // the reactor to generate some new stimuli for the futures to continue
1103 // in their life.
1104 let (scheduler, handle) = CurrentThread::new(
1105 driver,
1106 driver_handle,
1107 blocking_spawner,
1108 seed_generator_2,
1109 Config {
1110 before_park: self.before_park.clone(),
1111 after_unpark: self.after_unpark.clone(),
1112 global_queue_interval: self.global_queue_interval,
1113 event_interval: self.event_interval,
1114 local_queue_capacity: self.local_queue_capacity,
1115 #[cfg(tokio_unstable)]
1116 unhandled_panic: self.unhandled_panic.clone(),
1117 disable_lifo_slot: self.disable_lifo_slot,
1118 seed_generator: seed_generator_1,
1119 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1120 },
1121 );
1122
1123 let handle = Handle {
1124 inner: scheduler::Handle::CurrentThread(handle),
1125 };
1126
1127 Ok(Runtime::from_parts(
1128 Scheduler::CurrentThread(scheduler),
1129 handle,
1130 blocking_pool,
1131 ))
1132 }
1133
1134 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1135 if self.metrics_poll_count_histogram_enable {
1136 Some(self.metrics_poll_count_histogram.clone())
1137 } else {
1138 None
1139 }
1140 }
1141}
1142
1143cfg_io_driver! {
1144 impl Builder {
1145 /// Enables the I/O driver.
1146 ///
1147 /// Doing this enables using net, process, signal, and some I/O types on
1148 /// the runtime.
1149 ///
1150 /// # Examples
1151 ///
1152 /// ```
1153 /// use tokio::runtime;
1154 ///
1155 /// let rt = runtime::Builder::new_multi_thread()
1156 /// .enable_io()
1157 /// .build()
1158 /// .unwrap();
1159 /// ```
1160 pub fn enable_io(&mut self) -> &mut Self {
1161 self.enable_io = true;
1162 self
1163 }
1164
1165 /// Enables the I/O driver and configures the max number of events to be
1166 /// processed per tick.
1167 ///
1168 /// # Examples
1169 ///
1170 /// ```
1171 /// use tokio::runtime;
1172 ///
1173 /// let rt = runtime::Builder::new_current_thread()
1174 /// .enable_io()
1175 /// .max_io_events_per_tick(1024)
1176 /// .build()
1177 /// .unwrap();
1178 /// ```
1179 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1180 self.nevents = capacity;
1181 self
1182 }
1183 }
1184}
1185
1186cfg_time! {
1187 impl Builder {
1188 /// Enables the time driver.
1189 ///
1190 /// Doing this enables using `tokio::time` on the runtime.
1191 ///
1192 /// # Examples
1193 ///
1194 /// ```
1195 /// use tokio::runtime;
1196 ///
1197 /// let rt = runtime::Builder::new_multi_thread()
1198 /// .enable_time()
1199 /// .build()
1200 /// .unwrap();
1201 /// ```
1202 pub fn enable_time(&mut self) -> &mut Self {
1203 self.enable_time = true;
1204 self
1205 }
1206 }
1207}
1208
1209cfg_test_util! {
1210 impl Builder {
1211 /// Controls if the runtime's clock starts paused or advancing.
1212 ///
1213 /// Pausing time requires the current-thread runtime; construction of
1214 /// the runtime will panic otherwise.
1215 ///
1216 /// # Examples
1217 ///
1218 /// ```
1219 /// use tokio::runtime;
1220 ///
1221 /// let rt = runtime::Builder::new_current_thread()
1222 /// .enable_time()
1223 /// .start_paused(true)
1224 /// .build()
1225 /// .unwrap();
1226 /// ```
1227 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1228 self.start_paused = start_paused;
1229 self
1230 }
1231 }
1232}
1233
1234cfg_rt_multi_thread! {
1235 impl Builder {
1236 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1237 use crate::loom::sys::num_cpus;
1238 use crate::runtime::{Config, runtime::Scheduler};
1239 use crate::runtime::scheduler::{self, MultiThread};
1240
1241 let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1242
1243 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1244
1245 // Create the blocking pool
1246 let blocking_pool =
1247 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1248 let blocking_spawner = blocking_pool.spawner().clone();
1249
1250 // Generate a rng seed for this runtime.
1251 let seed_generator_1 = self.seed_generator.next_generator();
1252 let seed_generator_2 = self.seed_generator.next_generator();
1253
1254 let (scheduler, handle, launch) = MultiThread::new(
1255 core_threads,
1256 driver,
1257 driver_handle,
1258 blocking_spawner,
1259 seed_generator_2,
1260 Config {
1261 before_park: self.before_park.clone(),
1262 after_unpark: self.after_unpark.clone(),
1263 global_queue_interval: self.global_queue_interval,
1264 event_interval: self.event_interval,
1265 local_queue_capacity: self.local_queue_capacity,
1266 #[cfg(tokio_unstable)]
1267 unhandled_panic: self.unhandled_panic.clone(),
1268 disable_lifo_slot: self.disable_lifo_slot,
1269 seed_generator: seed_generator_1,
1270 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1271 },
1272 );
1273
1274 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1275
1276 // Spawn the thread pool workers
1277 let _enter = handle.enter();
1278 launch.launch();
1279
1280 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1281 }
1282
1283 cfg_unstable! {
1284 fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
1285 use crate::loom::sys::num_cpus;
1286 use crate::runtime::{Config, runtime::Scheduler};
1287 use crate::runtime::scheduler::MultiThreadAlt;
1288
1289 let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1290 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1291
1292 // Create the blocking pool
1293 let blocking_pool =
1294 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1295 let blocking_spawner = blocking_pool.spawner().clone();
1296
1297 // Generate a rng seed for this runtime.
1298 let seed_generator_1 = self.seed_generator.next_generator();
1299 let seed_generator_2 = self.seed_generator.next_generator();
1300
1301 let (scheduler, handle) = MultiThreadAlt::new(
1302 core_threads,
1303 driver,
1304 driver_handle,
1305 blocking_spawner,
1306 seed_generator_2,
1307 Config {
1308 before_park: self.before_park.clone(),
1309 after_unpark: self.after_unpark.clone(),
1310 global_queue_interval: self.global_queue_interval,
1311 event_interval: self.event_interval,
1312 local_queue_capacity: self.local_queue_capacity,
1313 #[cfg(tokio_unstable)]
1314 unhandled_panic: self.unhandled_panic.clone(),
1315 disable_lifo_slot: self.disable_lifo_slot,
1316 seed_generator: seed_generator_1,
1317 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1318 },
1319 );
1320
1321 Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
1322 }
1323 }
1324 }
1325}
1326
1327impl fmt::Debug for Builder {
1328 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1329 fmt.debug_struct("Builder")
1330 .field("worker_threads", &self.worker_threads)
1331 .field("max_blocking_threads", &self.max_blocking_threads)
1332 .field(
1333 "thread_name",
1334 &"<dyn Fn() -> String + Send + Sync + 'static>",
1335 )
1336 .field("thread_stack_size", &self.thread_stack_size)
1337 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1338 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1339 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1340 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1341 .finish()
1342 }
1343}