madsim_real_tokio/runtime/
builder.rs

1use crate::runtime::handle::Handle;
2use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime};
3use crate::util::rand::{RngSeed, RngSeedGenerator};
4
5use std::fmt;
6use std::io;
7use std::time::Duration;
8
9/// Builds Tokio Runtime with custom configuration values.
10///
11/// Methods can be chained in order to set the configuration values. The
12/// Runtime is constructed by calling [`build`].
13///
14/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
15/// or [`Builder::new_current_thread`].
16///
17/// See function level documentation for details on the various configuration
18/// settings.
19///
20/// [`build`]: method@Self::build
21/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
22/// [`Builder::new_current_thread`]: method@Self::new_current_thread
23///
24/// # Examples
25///
26/// ```
27/// use tokio::runtime::Builder;
28///
29/// fn main() {
30///     // build runtime
31///     let runtime = Builder::new_multi_thread()
32///         .worker_threads(4)
33///         .thread_name("my-custom-name")
34///         .thread_stack_size(3 * 1024 * 1024)
35///         .build()
36///         .unwrap();
37///
38///     // use runtime ...
39/// }
40/// ```
41pub struct Builder {
42    /// Runtime type
43    kind: Kind,
44
45    /// Whether or not to enable the I/O driver
46    enable_io: bool,
47    nevents: usize,
48
49    /// Whether or not to enable the time driver
50    enable_time: bool,
51
52    /// Whether or not the clock should start paused.
53    start_paused: bool,
54
55    /// The number of worker threads, used by Runtime.
56    ///
57    /// Only used when not using the current-thread executor.
58    worker_threads: Option<usize>,
59
60    /// Cap on thread usage.
61    max_blocking_threads: usize,
62
63    /// Name fn used for threads spawned by the runtime.
64    pub(super) thread_name: ThreadNameFn,
65
66    /// Stack size used for threads spawned by the runtime.
67    pub(super) thread_stack_size: Option<usize>,
68
69    /// Callback to run after each thread starts.
70    pub(super) after_start: Option<Callback>,
71
72    /// To run before each worker thread stops
73    pub(super) before_stop: Option<Callback>,
74
75    /// To run before each worker thread is parked.
76    pub(super) before_park: Option<Callback>,
77
78    /// To run after each thread is unparked.
79    pub(super) after_unpark: Option<Callback>,
80
81    /// Customizable keep alive timeout for `BlockingPool`
82    pub(super) keep_alive: Option<Duration>,
83
84    /// How many ticks before pulling a task from the global/remote queue?
85    ///
86    /// When `None`, the value is unspecified and behavior details are left to
87    /// the scheduler. Each scheduler flavor could choose to either pick its own
88    /// default value or use some other strategy to decide when to poll from the
89    /// global queue. For example, the multi-threaded scheduler uses a
90    /// self-tuning strategy based on mean task poll times.
91    pub(super) global_queue_interval: Option<u32>,
92
93    /// How many ticks before yielding to the driver for timer and I/O events?
94    pub(super) event_interval: u32,
95
96    pub(super) local_queue_capacity: usize,
97
98    /// When true, the multi-threade scheduler LIFO slot should not be used.
99    ///
100    /// This option should only be exposed as unstable.
101    pub(super) disable_lifo_slot: bool,
102
103    /// Specify a random number generator seed to provide deterministic results
104    pub(super) seed_generator: RngSeedGenerator,
105
106    /// When true, enables task poll count histogram instrumentation.
107    pub(super) metrics_poll_count_histogram_enable: bool,
108
109    /// Configures the task poll count histogram
110    pub(super) metrics_poll_count_histogram: HistogramBuilder,
111
112    #[cfg(tokio_unstable)]
113    pub(super) unhandled_panic: UnhandledPanic,
114}
115
116cfg_unstable! {
117    /// How the runtime should respond to unhandled panics.
118    ///
119    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
120    /// to configure the runtime behavior when a spawned task panics.
121    ///
122    /// See [`Builder::unhandled_panic`] for more details.
123    #[derive(Debug, Clone)]
124    #[non_exhaustive]
125    pub enum UnhandledPanic {
126        /// The runtime should ignore panics on spawned tasks.
127        ///
128        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
129        /// tasks continue running normally.
130        ///
131        /// This is the default behavior.
132        ///
133        /// # Examples
134        ///
135        /// ```
136        /// use tokio::runtime::{self, UnhandledPanic};
137        ///
138        /// # pub fn main() {
139        /// let rt = runtime::Builder::new_current_thread()
140        ///     .unhandled_panic(UnhandledPanic::Ignore)
141        ///     .build()
142        ///     .unwrap();
143        ///
144        /// let task1 = rt.spawn(async { panic!("boom"); });
145        /// let task2 = rt.spawn(async {
146        ///     // This task completes normally
147        ///     "done"
148        /// });
149        ///
150        /// rt.block_on(async {
151        ///     // The panic on the first task is forwarded to the `JoinHandle`
152        ///     assert!(task1.await.is_err());
153        ///
154        ///     // The second task completes normally
155        ///     assert!(task2.await.is_ok());
156        /// })
157        /// # }
158        /// ```
159        ///
160        /// [`JoinHandle`]: struct@crate::task::JoinHandle
161        Ignore,
162
163        /// The runtime should immediately shutdown if a spawned task panics.
164        ///
165        /// The runtime will immediately shutdown even if the panicked task's
166        /// [`JoinHandle`] is still available. All further spawned tasks will be
167        /// immediately dropped and call to [`Runtime::block_on`] will panic.
168        ///
169        /// # Examples
170        ///
171        /// ```should_panic
172        /// use tokio::runtime::{self, UnhandledPanic};
173        ///
174        /// # pub fn main() {
175        /// let rt = runtime::Builder::new_current_thread()
176        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
177        ///     .build()
178        ///     .unwrap();
179        ///
180        /// rt.spawn(async { panic!("boom"); });
181        /// rt.spawn(async {
182        ///     // This task never completes.
183        /// });
184        ///
185        /// rt.block_on(async {
186        ///     // Do some work
187        /// # loop { tokio::task::yield_now().await; }
188        /// })
189        /// # }
190        /// ```
191        ///
192        /// [`JoinHandle`]: struct@crate::task::JoinHandle
193        ShutdownRuntime,
194    }
195}
196
197pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
198
199#[derive(Clone, Copy)]
200pub(crate) enum Kind {
201    CurrentThread,
202    #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
203    MultiThread,
204    #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
205    MultiThreadAlt,
206}
207
208impl Builder {
209    /// Returns a new builder with the current thread scheduler selected.
210    ///
211    /// Configuration methods can be chained on the return value.
212    ///
213    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
214    /// [`LocalSet`].
215    ///
216    /// [`LocalSet`]: crate::task::LocalSet
217    pub fn new_current_thread() -> Builder {
218        #[cfg(loom)]
219        const EVENT_INTERVAL: u32 = 4;
220        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
221        #[cfg(not(loom))]
222        const EVENT_INTERVAL: u32 = 61;
223
224        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
225    }
226
227    cfg_not_wasi! {
228        /// Returns a new builder with the multi thread scheduler selected.
229        ///
230        /// Configuration methods can be chained on the return value.
231        #[cfg(feature = "rt-multi-thread")]
232        #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
233        pub fn new_multi_thread() -> Builder {
234            // The number `61` is fairly arbitrary. I believe this value was copied from golang.
235            Builder::new(Kind::MultiThread, 61)
236        }
237
238        cfg_unstable! {
239            /// Returns a new builder with the alternate multi thread scheduler
240            /// selected.
241            ///
242            /// The alternate multi threaded scheduler is an in-progress
243            /// candidate to replace the existing multi threaded scheduler. It
244            /// currently does not scale as well to 16+ processors.
245            ///
246            /// This runtime flavor is currently **not considered production
247            /// ready**.
248            ///
249            /// Configuration methods can be chained on the return value.
250            #[cfg(feature = "rt-multi-thread")]
251            #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
252            pub fn new_multi_thread_alt() -> Builder {
253                // The number `61` is fairly arbitrary. I believe this value was copied from golang.
254                Builder::new(Kind::MultiThreadAlt, 61)
255            }
256        }
257    }
258
259    /// Returns a new runtime builder initialized with default configuration
260    /// values.
261    ///
262    /// Configuration methods can be chained on the return value.
263    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
264        Builder {
265            kind,
266
267            // I/O defaults to "off"
268            enable_io: false,
269            nevents: 1024,
270
271            // Time defaults to "off"
272            enable_time: false,
273
274            // The clock starts not-paused
275            start_paused: false,
276
277            // Read from environment variable first in multi-threaded mode.
278            // Default to lazy auto-detection (one thread per CPU core)
279            worker_threads: None,
280
281            max_blocking_threads: 512,
282
283            // Default thread name
284            thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
285
286            // Do not set a stack size by default
287            thread_stack_size: None,
288
289            // No worker thread callbacks
290            after_start: None,
291            before_stop: None,
292            before_park: None,
293            after_unpark: None,
294
295            keep_alive: None,
296
297            // Defaults for these values depend on the scheduler kind, so we get them
298            // as parameters.
299            global_queue_interval: None,
300            event_interval,
301
302            #[cfg(not(loom))]
303            local_queue_capacity: 256,
304
305            #[cfg(loom)]
306            local_queue_capacity: 4,
307
308            seed_generator: RngSeedGenerator::new(RngSeed::new()),
309
310            #[cfg(tokio_unstable)]
311            unhandled_panic: UnhandledPanic::Ignore,
312
313            metrics_poll_count_histogram_enable: false,
314
315            metrics_poll_count_histogram: HistogramBuilder::default(),
316
317            disable_lifo_slot: false,
318        }
319    }
320
321    /// Enables both I/O and time drivers.
322    ///
323    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
324    /// individually. If additional components are added to Tokio in the future,
325    /// `enable_all` will include these future components.
326    ///
327    /// # Examples
328    ///
329    /// ```
330    /// use tokio::runtime;
331    ///
332    /// let rt = runtime::Builder::new_multi_thread()
333    ///     .enable_all()
334    ///     .build()
335    ///     .unwrap();
336    /// ```
337    pub fn enable_all(&mut self) -> &mut Self {
338        #[cfg(any(
339            feature = "net",
340            all(unix, feature = "process"),
341            all(unix, feature = "signal")
342        ))]
343        self.enable_io();
344        #[cfg(feature = "time")]
345        self.enable_time();
346
347        self
348    }
349
350    /// Sets the number of worker threads the `Runtime` will use.
351    ///
352    /// This can be any number above 0 though it is advised to keep this value
353    /// on the smaller side.
354    ///
355    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
356    ///
357    /// # Default
358    ///
359    /// The default value is the number of cores available to the system.
360    ///
361    /// When using the `current_thread` runtime this method has no effect.
362    ///
363    /// # Examples
364    ///
365    /// ## Multi threaded runtime with 4 threads
366    ///
367    /// ```
368    /// use tokio::runtime;
369    ///
370    /// // This will spawn a work-stealing runtime with 4 worker threads.
371    /// let rt = runtime::Builder::new_multi_thread()
372    ///     .worker_threads(4)
373    ///     .build()
374    ///     .unwrap();
375    ///
376    /// rt.spawn(async move {});
377    /// ```
378    ///
379    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
380    ///
381    /// ```
382    /// use tokio::runtime;
383    ///
384    /// // Create a runtime that _must_ be driven from a call
385    /// // to `Runtime::block_on`.
386    /// let rt = runtime::Builder::new_current_thread()
387    ///     .build()
388    ///     .unwrap();
389    ///
390    /// // This will run the runtime and future on the current thread
391    /// rt.block_on(async move {});
392    /// ```
393    ///
394    /// # Panics
395    ///
396    /// This will panic if `val` is not larger than `0`.
397    #[track_caller]
398    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
399        assert!(val > 0, "Worker threads cannot be set to 0");
400        self.worker_threads = Some(val);
401        self
402    }
403
404    /// Specifies the limit for additional threads spawned by the Runtime.
405    ///
406    /// These threads are used for blocking operations like tasks spawned
407    /// through [`spawn_blocking`], this includes but is not limited to:
408    /// - [`fs`] operations
409    /// - dns resolution through [`ToSocketAddrs`]
410    /// - writing to [`Stdout`] or [`Stderr`]
411    /// - reading from [`Stdin`]
412    ///
413    /// Unlike the [`worker_threads`], they are not always active and will exit
414    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
415    ///
416    /// It's recommended to not set this limit too low in order to avoid hanging on operations
417    /// requiring [`spawn_blocking`].
418    ///
419    /// The default value is 512.
420    ///
421    /// # Panics
422    ///
423    /// This will panic if `val` is not larger than `0`.
424    ///
425    /// # Upgrading from 0.x
426    ///
427    /// In old versions `max_threads` limited both blocking and worker threads, but the
428    /// current `max_blocking_threads` does not include async worker threads in the count.
429    ///
430    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
431    /// [`fs`]: mod@crate::fs
432    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
433    /// [`Stdout`]: struct@crate::io::Stdout
434    /// [`Stdin`]: struct@crate::io::Stdin
435    /// [`Stderr`]: struct@crate::io::Stderr
436    /// [`worker_threads`]: Self::worker_threads
437    /// [`thread_keep_alive`]: Self::thread_keep_alive
438    #[track_caller]
439    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
440    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
441        assert!(val > 0, "Max blocking threads cannot be set to 0");
442        self.max_blocking_threads = val;
443        self
444    }
445
446    /// Sets name of threads spawned by the `Runtime`'s thread pool.
447    ///
448    /// The default name is "tokio-runtime-worker".
449    ///
450    /// # Examples
451    ///
452    /// ```
453    /// # use tokio::runtime;
454    ///
455    /// # pub fn main() {
456    /// let rt = runtime::Builder::new_multi_thread()
457    ///     .thread_name("my-pool")
458    ///     .build();
459    /// # }
460    /// ```
461    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
462        let val = val.into();
463        self.thread_name = std::sync::Arc::new(move || val.clone());
464        self
465    }
466
467    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
468    ///
469    /// The default name fn is `|| "tokio-runtime-worker".into()`.
470    ///
471    /// # Examples
472    ///
473    /// ```
474    /// # use tokio::runtime;
475    /// # use std::sync::atomic::{AtomicUsize, Ordering};
476    /// # pub fn main() {
477    /// let rt = runtime::Builder::new_multi_thread()
478    ///     .thread_name_fn(|| {
479    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
480    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
481    ///        format!("my-pool-{}", id)
482    ///     })
483    ///     .build();
484    /// # }
485    /// ```
486    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
487    where
488        F: Fn() -> String + Send + Sync + 'static,
489    {
490        self.thread_name = std::sync::Arc::new(f);
491        self
492    }
493
494    /// Sets the stack size (in bytes) for worker threads.
495    ///
496    /// The actual stack size may be greater than this value if the platform
497    /// specifies minimal stack size.
498    ///
499    /// The default stack size for spawned threads is 2 MiB, though this
500    /// particular stack size is subject to change in the future.
501    ///
502    /// # Examples
503    ///
504    /// ```
505    /// # use tokio::runtime;
506    ///
507    /// # pub fn main() {
508    /// let rt = runtime::Builder::new_multi_thread()
509    ///     .thread_stack_size(32 * 1024)
510    ///     .build();
511    /// # }
512    /// ```
513    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
514        self.thread_stack_size = Some(val);
515        self
516    }
517
518    /// Executes function `f` after each thread is started but before it starts
519    /// doing work.
520    ///
521    /// This is intended for bookkeeping and monitoring use cases.
522    ///
523    /// # Examples
524    ///
525    /// ```
526    /// # use tokio::runtime;
527    /// # pub fn main() {
528    /// let runtime = runtime::Builder::new_multi_thread()
529    ///     .on_thread_start(|| {
530    ///         println!("thread started");
531    ///     })
532    ///     .build();
533    /// # }
534    /// ```
535    #[cfg(not(loom))]
536    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
537    where
538        F: Fn() + Send + Sync + 'static,
539    {
540        self.after_start = Some(std::sync::Arc::new(f));
541        self
542    }
543
544    /// Executes function `f` before each thread stops.
545    ///
546    /// This is intended for bookkeeping and monitoring use cases.
547    ///
548    /// # Examples
549    ///
550    /// ```
551    /// # use tokio::runtime;
552    /// # pub fn main() {
553    /// let runtime = runtime::Builder::new_multi_thread()
554    ///     .on_thread_stop(|| {
555    ///         println!("thread stopping");
556    ///     })
557    ///     .build();
558    /// # }
559    /// ```
560    #[cfg(not(loom))]
561    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
562    where
563        F: Fn() + Send + Sync + 'static,
564    {
565        self.before_stop = Some(std::sync::Arc::new(f));
566        self
567    }
568
569    /// Executes function `f` just before a thread is parked (goes idle).
570    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
571    /// can be called, and may result in this thread being unparked immediately.
572    ///
573    /// This can be used to start work only when the executor is idle, or for bookkeeping
574    /// and monitoring purposes.
575    ///
576    /// Note: There can only be one park callback for a runtime; calling this function
577    /// more than once replaces the last callback defined, rather than adding to it.
578    ///
579    /// # Examples
580    ///
581    /// ## Multithreaded executor
582    /// ```
583    /// # use std::sync::Arc;
584    /// # use std::sync::atomic::{AtomicBool, Ordering};
585    /// # use tokio::runtime;
586    /// # use tokio::sync::Barrier;
587    /// # pub fn main() {
588    /// let once = AtomicBool::new(true);
589    /// let barrier = Arc::new(Barrier::new(2));
590    ///
591    /// let runtime = runtime::Builder::new_multi_thread()
592    ///     .worker_threads(1)
593    ///     .on_thread_park({
594    ///         let barrier = barrier.clone();
595    ///         move || {
596    ///             let barrier = barrier.clone();
597    ///             if once.swap(false, Ordering::Relaxed) {
598    ///                 tokio::spawn(async move { barrier.wait().await; });
599    ///            }
600    ///         }
601    ///     })
602    ///     .build()
603    ///     .unwrap();
604    ///
605    /// runtime.block_on(async {
606    ///    barrier.wait().await;
607    /// })
608    /// # }
609    /// ```
610    /// ## Current thread executor
611    /// ```
612    /// # use std::sync::Arc;
613    /// # use std::sync::atomic::{AtomicBool, Ordering};
614    /// # use tokio::runtime;
615    /// # use tokio::sync::Barrier;
616    /// # pub fn main() {
617    /// let once = AtomicBool::new(true);
618    /// let barrier = Arc::new(Barrier::new(2));
619    ///
620    /// let runtime = runtime::Builder::new_current_thread()
621    ///     .on_thread_park({
622    ///         let barrier = barrier.clone();
623    ///         move || {
624    ///             let barrier = barrier.clone();
625    ///             if once.swap(false, Ordering::Relaxed) {
626    ///                 tokio::spawn(async move { barrier.wait().await; });
627    ///            }
628    ///         }
629    ///     })
630    ///     .build()
631    ///     .unwrap();
632    ///
633    /// runtime.block_on(async {
634    ///    barrier.wait().await;
635    /// })
636    /// # }
637    /// ```
638    #[cfg(not(loom))]
639    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
640    where
641        F: Fn() + Send + Sync + 'static,
642    {
643        self.before_park = Some(std::sync::Arc::new(f));
644        self
645    }
646
647    /// Executes function `f` just after a thread unparks (starts executing tasks).
648    ///
649    /// This is intended for bookkeeping and monitoring use cases; note that work
650    /// in this callback will increase latencies when the application has allowed one or
651    /// more runtime threads to go idle.
652    ///
653    /// Note: There can only be one unpark callback for a runtime; calling this function
654    /// more than once replaces the last callback defined, rather than adding to it.
655    ///
656    /// # Examples
657    ///
658    /// ```
659    /// # use tokio::runtime;
660    /// # pub fn main() {
661    /// let runtime = runtime::Builder::new_multi_thread()
662    ///     .on_thread_unpark(|| {
663    ///         println!("thread unparking");
664    ///     })
665    ///     .build();
666    ///
667    /// runtime.unwrap().block_on(async {
668    ///    tokio::task::yield_now().await;
669    ///    println!("Hello from Tokio!");
670    /// })
671    /// # }
672    /// ```
673    #[cfg(not(loom))]
674    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
675    where
676        F: Fn() + Send + Sync + 'static,
677    {
678        self.after_unpark = Some(std::sync::Arc::new(f));
679        self
680    }
681
682    /// Creates the configured `Runtime`.
683    ///
684    /// The returned `Runtime` instance is ready to spawn tasks.
685    ///
686    /// # Examples
687    ///
688    /// ```
689    /// use tokio::runtime::Builder;
690    ///
691    /// let rt  = Builder::new_multi_thread().build().unwrap();
692    ///
693    /// rt.block_on(async {
694    ///     println!("Hello from the Tokio runtime");
695    /// });
696    /// ```
697    pub fn build(&mut self) -> io::Result<Runtime> {
698        match &self.kind {
699            Kind::CurrentThread => self.build_current_thread_runtime(),
700            #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
701            Kind::MultiThread => self.build_threaded_runtime(),
702            #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
703            Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
704        }
705    }
706
707    fn get_cfg(&self) -> driver::Cfg {
708        driver::Cfg {
709            enable_pause_time: match self.kind {
710                Kind::CurrentThread => true,
711                #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
712                Kind::MultiThread => false,
713                #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
714                Kind::MultiThreadAlt => false,
715            },
716            enable_io: self.enable_io,
717            enable_time: self.enable_time,
718            start_paused: self.start_paused,
719            nevents: self.nevents,
720        }
721    }
722
723    /// Sets a custom timeout for a thread in the blocking pool.
724    ///
725    /// By default, the timeout for a thread is set to 10 seconds. This can
726    /// be overridden using `.thread_keep_alive()`.
727    ///
728    /// # Example
729    ///
730    /// ```
731    /// # use tokio::runtime;
732    /// # use std::time::Duration;
733    /// # pub fn main() {
734    /// let rt = runtime::Builder::new_multi_thread()
735    ///     .thread_keep_alive(Duration::from_millis(100))
736    ///     .build();
737    /// # }
738    /// ```
739    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
740        self.keep_alive = Some(duration);
741        self
742    }
743
744    /// Sets the number of scheduler ticks after which the scheduler will poll the global
745    /// task queue.
746    ///
747    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
748    ///
749    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
750    /// [the module documentation] for the default behavior of the multi-thread scheduler.
751    ///
752    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
753    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
754    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
755    /// getting started on new work, especially if tasks frequently yield rather than complete
756    /// or await on further I/O. Conversely, a higher value prioritizes existing work, and
757    /// is a good choice when most tasks quickly complete polling.
758    ///
759    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
760    ///
761    /// # Examples
762    ///
763    /// ```
764    /// # use tokio::runtime;
765    /// # pub fn main() {
766    /// let rt = runtime::Builder::new_multi_thread()
767    ///     .global_queue_interval(31)
768    ///     .build();
769    /// # }
770    /// ```
771    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
772        self.global_queue_interval = Some(val);
773        self
774    }
775
776    /// Sets the number of scheduler ticks after which the scheduler will poll for
777    /// external events (timers, I/O, and so on).
778    ///
779    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
780    ///
781    /// By default, the event interval is `61` for all scheduler types.
782    ///
783    /// Setting the event interval determines the effective "priority" of delivering
784    /// these external events (which may wake up additional tasks), compared to
785    /// executing tasks that are currently ready to run. A smaller value is useful
786    /// when tasks frequently spend a long time in polling, or frequently yield,
787    /// which can result in overly long delays picking up I/O events. Conversely,
788    /// picking up new events requires extra synchronization and syscall overhead,
789    /// so if tasks generally complete their polling quickly, a higher event interval
790    /// will minimize that overhead while still keeping the scheduler responsive to
791    /// events.
792    ///
793    /// # Examples
794    ///
795    /// ```
796    /// # use tokio::runtime;
797    /// # pub fn main() {
798    /// let rt = runtime::Builder::new_multi_thread()
799    ///     .event_interval(31)
800    ///     .build();
801    /// # }
802    /// ```
803    pub fn event_interval(&mut self, val: u32) -> &mut Self {
804        self.event_interval = val;
805        self
806    }
807
808    cfg_unstable! {
809        /// Configure how the runtime responds to an unhandled panic on a
810        /// spawned task.
811        ///
812        /// By default, an unhandled panic (i.e. a panic not caught by
813        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
814        /// execution. The panic is error value is forwarded to the task's
815        /// [`JoinHandle`] and all other spawned tasks continue running.
816        ///
817        /// The `unhandled_panic` option enables configuring this behavior.
818        ///
819        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
820        ///   spawned tasks have no impact on the runtime's execution.
821        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
822        ///   shutdown immediately when a spawned task panics even if that
823        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
824        ///   will immediately terminate and further calls to
825        ///   [`Runtime::block_on`] will panic.
826        ///
827        /// # Panics
828        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
829        /// on a runtime other than the current thread runtime.
830        ///
831        /// # Unstable
832        ///
833        /// This option is currently unstable and its implementation is
834        /// incomplete. The API may change or be removed in the future. See
835        /// tokio-rs/tokio#4516 for more details.
836        ///
837        /// # Examples
838        ///
839        /// The following demonstrates a runtime configured to shutdown on
840        /// panic. The first spawned task panics and results in the runtime
841        /// shutting down. The second spawned task never has a chance to
842        /// execute. The call to `block_on` will panic due to the runtime being
843        /// forcibly shutdown.
844        ///
845        /// ```should_panic
846        /// use tokio::runtime::{self, UnhandledPanic};
847        ///
848        /// # pub fn main() {
849        /// let rt = runtime::Builder::new_current_thread()
850        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
851        ///     .build()
852        ///     .unwrap();
853        ///
854        /// rt.spawn(async { panic!("boom"); });
855        /// rt.spawn(async {
856        ///     // This task never completes.
857        /// });
858        ///
859        /// rt.block_on(async {
860        ///     // Do some work
861        /// # loop { tokio::task::yield_now().await; }
862        /// })
863        /// # }
864        /// ```
865        ///
866        /// [`JoinHandle`]: struct@crate::task::JoinHandle
867        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
868            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
869                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
870            }
871
872            self.unhandled_panic = behavior;
873            self
874        }
875
876        /// Disables the LIFO task scheduler heuristic.
877        ///
878        /// The multi-threaded scheduler includes a heuristic for optimizing
879        /// message-passing patterns. This heuristic results in the **last**
880        /// scheduled task being polled first.
881        ///
882        /// To implement this heuristic, each worker thread has a slot which
883        /// holds the task that should be polled next. However, this slot cannot
884        /// be stolen by other worker threads, which can result in lower total
885        /// throughput when tasks tend to have longer poll times.
886        ///
887        /// This configuration option will disable this heuristic resulting in
888        /// all scheduled tasks being pushed into the worker-local queue, which
889        /// is stealable.
890        ///
891        /// Consider trying this option when the task "scheduled" time is high
892        /// but the runtime is underutilized. Use tokio-rs/tokio-metrics to
893        /// collect this data.
894        ///
895        /// # Unstable
896        ///
897        /// This configuration option is considered a workaround for the LIFO
898        /// slot not being stealable. When the slot becomes stealable, we will
899        /// revisit whether or not this option is necessary. See
900        /// tokio-rs/tokio#4941.
901        ///
902        /// # Examples
903        ///
904        /// ```
905        /// use tokio::runtime;
906        ///
907        /// let rt = runtime::Builder::new_multi_thread()
908        ///     .disable_lifo_slot()
909        ///     .build()
910        ///     .unwrap();
911        /// ```
912        pub fn disable_lifo_slot(&mut self) -> &mut Self {
913            self.disable_lifo_slot = true;
914            self
915        }
916
917        /// Specifies the random number generation seed to use within all
918        /// threads associated with the runtime being built.
919        ///
920        /// This option is intended to make certain parts of the runtime
921        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
922        /// [`tokio::select!`] it will ensure that the order that branches are
923        /// polled is deterministic.
924        ///
925        /// In addition to the code specifying `rng_seed` and interacting with
926        /// the runtime, the internals of Tokio and the Rust compiler may affect
927        /// the sequences of random numbers. In order to ensure repeatable
928        /// results, the version of Tokio, the versions of all other
929        /// dependencies that interact with Tokio, and the Rust compiler version
930        /// should also all remain constant.
931        ///
932        /// # Examples
933        ///
934        /// ```
935        /// # use tokio::runtime::{self, RngSeed};
936        /// # pub fn main() {
937        /// let seed = RngSeed::from_bytes(b"place your seed here");
938        /// let rt = runtime::Builder::new_current_thread()
939        ///     .rng_seed(seed)
940        ///     .build();
941        /// # }
942        /// ```
943        ///
944        /// [`tokio::select!`]: crate::select
945        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
946            self.seed_generator = RngSeedGenerator::new(seed);
947            self
948        }
949    }
950
951    cfg_metrics! {
952        /// Enables tracking the distribution of task poll times.
953        ///
954        /// Task poll times are not instrumented by default as doing so requires
955        /// calling [`Instant::now()`] twice per task poll, which could add
956        /// measurable overhead. Use the [`Handle::metrics()`] to access the
957        /// metrics data.
958        ///
959        /// The histogram uses fixed bucket sizes. In other words, the histogram
960        /// buckets are not dynamic based on input values. Use the
961        /// `metrics_poll_count_histogram_` builder methods to configure the
962        /// histogram details.
963        ///
964        /// # Examples
965        ///
966        /// ```
967        /// use tokio::runtime;
968        ///
969        /// let rt = runtime::Builder::new_multi_thread()
970        ///     .enable_metrics_poll_count_histogram()
971        ///     .build()
972        ///     .unwrap();
973        /// # // Test default values here
974        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
975        /// # let m = rt.handle().metrics();
976        /// # assert_eq!(m.poll_count_histogram_num_buckets(), 10);
977        /// # assert_eq!(m.poll_count_histogram_bucket_range(0), us(0)..us(100));
978        /// # assert_eq!(m.poll_count_histogram_bucket_range(1), us(100)..us(200));
979        /// ```
980        ///
981        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
982        /// [`Instant::now()`]: std::time::Instant::now
983        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
984            self.metrics_poll_count_histogram_enable = true;
985            self
986        }
987
988        /// Sets the histogram scale for tracking the distribution of task poll
989        /// times.
990        ///
991        /// Tracking the distribution of task poll times can be done using a
992        /// linear or log scale. When using linear scale, each histogram bucket
993        /// will represent the same range of poll times. When using log scale,
994        /// each histogram bucket will cover a range twice as big as the
995        /// previous bucket.
996        ///
997        /// **Default:** linear scale.
998        ///
999        /// # Examples
1000        ///
1001        /// ```
1002        /// use tokio::runtime::{self, HistogramScale};
1003        ///
1004        /// let rt = runtime::Builder::new_multi_thread()
1005        ///     .enable_metrics_poll_count_histogram()
1006        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1007        ///     .build()
1008        ///     .unwrap();
1009        /// ```
1010        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1011            self.metrics_poll_count_histogram.scale = histogram_scale;
1012            self
1013        }
1014
1015        /// Sets the histogram resolution for tracking the distribution of task
1016        /// poll times.
1017        ///
1018        /// The resolution is the histogram's first bucket's range. When using a
1019        /// linear histogram scale, each bucket will cover the same range. When
1020        /// using a log scale, each bucket will cover a range twice as big as
1021        /// the previous bucket. In the log case, the resolution represents the
1022        /// smallest bucket range.
1023        ///
1024        /// Note that, when using log scale, the resolution is rounded up to the
1025        /// nearest power of 2 in nanoseconds.
1026        ///
1027        /// **Default:** 100 microseconds.
1028        ///
1029        /// # Examples
1030        ///
1031        /// ```
1032        /// use tokio::runtime;
1033        /// use std::time::Duration;
1034        ///
1035        /// let rt = runtime::Builder::new_multi_thread()
1036        ///     .enable_metrics_poll_count_histogram()
1037        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1038        ///     .build()
1039        ///     .unwrap();
1040        /// ```
1041        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1042            assert!(resolution > Duration::from_secs(0));
1043            // Sanity check the argument and also make the cast below safe.
1044            assert!(resolution <= Duration::from_secs(1));
1045
1046            let resolution = resolution.as_nanos() as u64;
1047            self.metrics_poll_count_histogram.resolution = resolution;
1048            self
1049        }
1050
1051        /// Sets the number of buckets for the histogram tracking the
1052        /// distribution of task poll times.
1053        ///
1054        /// The last bucket tracks all greater values that fall out of other
1055        /// ranges. So, configuring the histogram using a linear scale,
1056        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1057        /// polls that take more than 450ms to complete.
1058        ///
1059        /// **Default:** 10
1060        ///
1061        /// # Examples
1062        ///
1063        /// ```
1064        /// use tokio::runtime;
1065        ///
1066        /// let rt = runtime::Builder::new_multi_thread()
1067        ///     .enable_metrics_poll_count_histogram()
1068        ///     .metrics_poll_count_histogram_buckets(15)
1069        ///     .build()
1070        ///     .unwrap();
1071        /// ```
1072        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1073            self.metrics_poll_count_histogram.num_buckets = buckets;
1074            self
1075        }
1076    }
1077
1078    cfg_loom! {
1079        pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
1080            assert!(value.is_power_of_two());
1081            self.local_queue_capacity = value;
1082            self
1083        }
1084    }
1085
1086    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1087        use crate::runtime::scheduler::{self, CurrentThread};
1088        use crate::runtime::{runtime::Scheduler, Config};
1089
1090        let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1091
1092        // Blocking pool
1093        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1094        let blocking_spawner = blocking_pool.spawner().clone();
1095
1096        // Generate a rng seed for this runtime.
1097        let seed_generator_1 = self.seed_generator.next_generator();
1098        let seed_generator_2 = self.seed_generator.next_generator();
1099
1100        // And now put a single-threaded scheduler on top of the timer. When
1101        // there are no futures ready to do something, it'll let the timer or
1102        // the reactor to generate some new stimuli for the futures to continue
1103        // in their life.
1104        let (scheduler, handle) = CurrentThread::new(
1105            driver,
1106            driver_handle,
1107            blocking_spawner,
1108            seed_generator_2,
1109            Config {
1110                before_park: self.before_park.clone(),
1111                after_unpark: self.after_unpark.clone(),
1112                global_queue_interval: self.global_queue_interval,
1113                event_interval: self.event_interval,
1114                local_queue_capacity: self.local_queue_capacity,
1115                #[cfg(tokio_unstable)]
1116                unhandled_panic: self.unhandled_panic.clone(),
1117                disable_lifo_slot: self.disable_lifo_slot,
1118                seed_generator: seed_generator_1,
1119                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1120            },
1121        );
1122
1123        let handle = Handle {
1124            inner: scheduler::Handle::CurrentThread(handle),
1125        };
1126
1127        Ok(Runtime::from_parts(
1128            Scheduler::CurrentThread(scheduler),
1129            handle,
1130            blocking_pool,
1131        ))
1132    }
1133
1134    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1135        if self.metrics_poll_count_histogram_enable {
1136            Some(self.metrics_poll_count_histogram.clone())
1137        } else {
1138            None
1139        }
1140    }
1141}
1142
1143cfg_io_driver! {
1144    impl Builder {
1145        /// Enables the I/O driver.
1146        ///
1147        /// Doing this enables using net, process, signal, and some I/O types on
1148        /// the runtime.
1149        ///
1150        /// # Examples
1151        ///
1152        /// ```
1153        /// use tokio::runtime;
1154        ///
1155        /// let rt = runtime::Builder::new_multi_thread()
1156        ///     .enable_io()
1157        ///     .build()
1158        ///     .unwrap();
1159        /// ```
1160        pub fn enable_io(&mut self) -> &mut Self {
1161            self.enable_io = true;
1162            self
1163        }
1164
1165        /// Enables the I/O driver and configures the max number of events to be
1166        /// processed per tick.
1167        ///
1168        /// # Examples
1169        ///
1170        /// ```
1171        /// use tokio::runtime;
1172        ///
1173        /// let rt = runtime::Builder::new_current_thread()
1174        ///     .enable_io()
1175        ///     .max_io_events_per_tick(1024)
1176        ///     .build()
1177        ///     .unwrap();
1178        /// ```
1179        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1180            self.nevents = capacity;
1181            self
1182        }
1183    }
1184}
1185
1186cfg_time! {
1187    impl Builder {
1188        /// Enables the time driver.
1189        ///
1190        /// Doing this enables using `tokio::time` on the runtime.
1191        ///
1192        /// # Examples
1193        ///
1194        /// ```
1195        /// use tokio::runtime;
1196        ///
1197        /// let rt = runtime::Builder::new_multi_thread()
1198        ///     .enable_time()
1199        ///     .build()
1200        ///     .unwrap();
1201        /// ```
1202        pub fn enable_time(&mut self) -> &mut Self {
1203            self.enable_time = true;
1204            self
1205        }
1206    }
1207}
1208
1209cfg_test_util! {
1210    impl Builder {
1211        /// Controls if the runtime's clock starts paused or advancing.
1212        ///
1213        /// Pausing time requires the current-thread runtime; construction of
1214        /// the runtime will panic otherwise.
1215        ///
1216        /// # Examples
1217        ///
1218        /// ```
1219        /// use tokio::runtime;
1220        ///
1221        /// let rt = runtime::Builder::new_current_thread()
1222        ///     .enable_time()
1223        ///     .start_paused(true)
1224        ///     .build()
1225        ///     .unwrap();
1226        /// ```
1227        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1228            self.start_paused = start_paused;
1229            self
1230        }
1231    }
1232}
1233
1234cfg_rt_multi_thread! {
1235    impl Builder {
1236        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1237            use crate::loom::sys::num_cpus;
1238            use crate::runtime::{Config, runtime::Scheduler};
1239            use crate::runtime::scheduler::{self, MultiThread};
1240
1241            let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1242
1243            let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1244
1245            // Create the blocking pool
1246            let blocking_pool =
1247                blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1248            let blocking_spawner = blocking_pool.spawner().clone();
1249
1250            // Generate a rng seed for this runtime.
1251            let seed_generator_1 = self.seed_generator.next_generator();
1252            let seed_generator_2 = self.seed_generator.next_generator();
1253
1254            let (scheduler, handle, launch) = MultiThread::new(
1255                core_threads,
1256                driver,
1257                driver_handle,
1258                blocking_spawner,
1259                seed_generator_2,
1260                Config {
1261                    before_park: self.before_park.clone(),
1262                    after_unpark: self.after_unpark.clone(),
1263                    global_queue_interval: self.global_queue_interval,
1264                    event_interval: self.event_interval,
1265                    local_queue_capacity: self.local_queue_capacity,
1266                    #[cfg(tokio_unstable)]
1267                    unhandled_panic: self.unhandled_panic.clone(),
1268                    disable_lifo_slot: self.disable_lifo_slot,
1269                    seed_generator: seed_generator_1,
1270                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1271                },
1272            );
1273
1274            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1275
1276            // Spawn the thread pool workers
1277            let _enter = handle.enter();
1278            launch.launch();
1279
1280            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1281        }
1282
1283        cfg_unstable! {
1284            fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
1285                use crate::loom::sys::num_cpus;
1286                use crate::runtime::{Config, runtime::Scheduler};
1287                use crate::runtime::scheduler::MultiThreadAlt;
1288
1289                let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
1290                let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1291
1292                // Create the blocking pool
1293                let blocking_pool =
1294                    blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
1295                let blocking_spawner = blocking_pool.spawner().clone();
1296
1297                // Generate a rng seed for this runtime.
1298                let seed_generator_1 = self.seed_generator.next_generator();
1299                let seed_generator_2 = self.seed_generator.next_generator();
1300
1301                let (scheduler, handle) = MultiThreadAlt::new(
1302                    core_threads,
1303                    driver,
1304                    driver_handle,
1305                    blocking_spawner,
1306                    seed_generator_2,
1307                    Config {
1308                        before_park: self.before_park.clone(),
1309                        after_unpark: self.after_unpark.clone(),
1310                        global_queue_interval: self.global_queue_interval,
1311                        event_interval: self.event_interval,
1312                        local_queue_capacity: self.local_queue_capacity,
1313                        #[cfg(tokio_unstable)]
1314                        unhandled_panic: self.unhandled_panic.clone(),
1315                        disable_lifo_slot: self.disable_lifo_slot,
1316                        seed_generator: seed_generator_1,
1317                        metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1318                    },
1319                );
1320
1321                Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
1322            }
1323        }
1324    }
1325}
1326
1327impl fmt::Debug for Builder {
1328    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1329        fmt.debug_struct("Builder")
1330            .field("worker_threads", &self.worker_threads)
1331            .field("max_blocking_threads", &self.max_blocking_threads)
1332            .field(
1333                "thread_name",
1334                &"<dyn Fn() -> String + Send + Sync + 'static>",
1335            )
1336            .field("thread_stack_size", &self.thread_stack_size)
1337            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1338            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1339            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1340            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1341            .finish()
1342    }
1343}