madsim_real_tokio/runtime/
handle.rs

1#[cfg(tokio_unstable)]
2use crate::runtime;
3use crate::runtime::{context, scheduler, RuntimeFlavor};
4
5/// Handle to the runtime.
6///
7/// The handle is internally reference-counted and can be freely cloned. A handle can be
8/// obtained using the [`Runtime::handle`] method.
9///
10/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
11#[derive(Debug, Clone)]
12// When the `rt` feature is *not* enabled, this type is still defined, but not
13// included in the public API.
14pub struct Handle {
15    pub(crate) inner: scheduler::Handle,
16}
17
18use crate::runtime::task::JoinHandle;
19use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};
20
21use std::future::Future;
22use std::marker::PhantomData;
23use std::{error, fmt};
24
25/// Runtime context guard.
26///
27/// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits
28/// the runtime context on drop.
29///
30/// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter
31#[derive(Debug)]
32#[must_use = "Creating and dropping a guard does nothing"]
33pub struct EnterGuard<'a> {
34    _guard: context::SetCurrentGuard,
35    _handle_lifetime: PhantomData<&'a Handle>,
36}
37
38impl Handle {
39    /// Enters the runtime context. This allows you to construct types that must
40    /// have an executor available on creation such as [`Sleep`] or
41    /// [`TcpStream`]. It will also allow you to call methods such as
42    /// [`tokio::spawn`] and [`Handle::current`] without panicking.
43    ///
44    /// # Panics
45    ///
46    /// When calling `Handle::enter` multiple times, the returned guards
47    /// **must** be dropped in the reverse order that they were acquired.
48    /// Failure to do so will result in a panic and possible memory leaks.
49    ///
50    /// # Examples
51    ///
52    /// ```
53    /// use tokio::runtime::Runtime;
54    ///
55    /// let rt = Runtime::new().unwrap();
56    ///
57    /// let _guard = rt.enter();
58    /// tokio::spawn(async {
59    ///     println!("Hello world!");
60    /// });
61    /// ```
62    ///
63    /// Do **not** do the following, this shows a scenario that will result in a
64    /// panic and possible memory leak.
65    ///
66    /// ```should_panic
67    /// use tokio::runtime::Runtime;
68    ///
69    /// let rt1 = Runtime::new().unwrap();
70    /// let rt2 = Runtime::new().unwrap();
71    ///
72    /// let enter1 = rt1.enter();
73    /// let enter2 = rt2.enter();
74    ///
75    /// drop(enter1);
76    /// drop(enter2);
77    /// ```
78    ///
79    /// [`Sleep`]: struct@crate::time::Sleep
80    /// [`TcpStream`]: struct@crate::net::TcpStream
81    /// [`tokio::spawn`]: fn@crate::spawn
82    pub fn enter(&self) -> EnterGuard<'_> {
83        EnterGuard {
84            _guard: match context::try_set_current(&self.inner) {
85                Some(guard) => guard,
86                None => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
87            },
88            _handle_lifetime: PhantomData,
89        }
90    }
91
92    /// Returns a `Handle` view over the currently running `Runtime`.
93    ///
94    /// # Panics
95    ///
96    /// This will panic if called outside the context of a Tokio runtime. That means that you must
97    /// call this on one of the threads **being run by the runtime**, or from a thread with an active
98    /// `EnterGuard`. Calling this from within a thread created by `std::thread::spawn` (for example)
99    /// will cause a panic unless that thread has an active `EnterGuard`.
100    ///
101    /// # Examples
102    ///
103    /// This can be used to obtain the handle of the surrounding runtime from an async
104    /// block or function running on that runtime.
105    ///
106    /// ```
107    /// # use std::thread;
108    /// # use tokio::runtime::Runtime;
109    /// # fn dox() {
110    /// # let rt = Runtime::new().unwrap();
111    /// # rt.spawn(async {
112    /// use tokio::runtime::Handle;
113    ///
114    /// // Inside an async block or function.
115    /// let handle = Handle::current();
116    /// handle.spawn(async {
117    ///     println!("now running in the existing Runtime");
118    /// });
119    ///
120    /// # let handle =
121    /// thread::spawn(move || {
122    ///     // Notice that the handle is created outside of this thread and then moved in
123    ///     handle.spawn(async { /* ... */ });
124    ///     // This next line would cause a panic because we haven't entered the runtime
125    ///     // and created an EnterGuard
126    ///     // let handle2 = Handle::current(); // panic
127    ///     // So we create a guard here with Handle::enter();
128    ///     let _guard = handle.enter();
129    ///     // Now we can call Handle::current();
130    ///     let handle2 = Handle::current();
131    /// });
132    /// # handle.join().unwrap();
133    /// # });
134    /// # }
135    /// ```
136    #[track_caller]
137    pub fn current() -> Self {
138        Handle {
139            inner: scheduler::Handle::current(),
140        }
141    }
142
143    /// Returns a Handle view over the currently running Runtime
144    ///
145    /// Returns an error if no Runtime has been started
146    ///
147    /// Contrary to `current`, this never panics
148    pub fn try_current() -> Result<Self, TryCurrentError> {
149        context::with_current(|inner| Handle {
150            inner: inner.clone(),
151        })
152    }
153
154    /// Spawns a future onto the Tokio runtime.
155    ///
156    /// This spawns the given future onto the runtime's executor, usually a
157    /// thread pool. The thread pool is then responsible for polling the future
158    /// until it completes.
159    ///
160    /// The provided future will start running in the background immediately
161    /// when `spawn` is called, even if you don't await the returned
162    /// `JoinHandle`.
163    ///
164    /// See [module level][mod] documentation for more details.
165    ///
166    /// [mod]: index.html
167    ///
168    /// # Examples
169    ///
170    /// ```
171    /// use tokio::runtime::Runtime;
172    ///
173    /// # fn dox() {
174    /// // Create the runtime
175    /// let rt = Runtime::new().unwrap();
176    /// // Get a handle from this runtime
177    /// let handle = rt.handle();
178    ///
179    /// // Spawn a future onto the runtime using the handle
180    /// handle.spawn(async {
181    ///     println!("now running on a worker thread");
182    /// });
183    /// # }
184    /// ```
185    #[track_caller]
186    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
187    where
188        F: Future + Send + 'static,
189        F::Output: Send + 'static,
190    {
191        self.spawn_named(future, None)
192    }
193
194    /// Runs the provided function on an executor dedicated to blocking
195    /// operations.
196    ///
197    /// # Examples
198    ///
199    /// ```
200    /// use tokio::runtime::Runtime;
201    ///
202    /// # fn dox() {
203    /// // Create the runtime
204    /// let rt = Runtime::new().unwrap();
205    /// // Get a handle from this runtime
206    /// let handle = rt.handle();
207    ///
208    /// // Spawn a blocking function onto the runtime using the handle
209    /// handle.spawn_blocking(|| {
210    ///     println!("now running on a worker thread");
211    /// });
212    /// # }
213    #[track_caller]
214    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
215    where
216        F: FnOnce() -> R + Send + 'static,
217        R: Send + 'static,
218    {
219        self.inner.blocking_spawner().spawn_blocking(self, func)
220    }
221
222    /// Runs a future to completion on this `Handle`'s associated `Runtime`.
223    ///
224    /// This runs the given future on the current thread, blocking until it is
225    /// complete, and yielding its resolved result. Any tasks or timers which
226    /// the future spawns internally will be executed on the runtime.
227    ///
228    /// When this is used on a `current_thread` runtime, only the
229    /// [`Runtime::block_on`] method can drive the IO and timer drivers, but the
230    /// `Handle::block_on` method cannot drive them. This means that, when using
231    /// this method on a `current_thread` runtime, anything that relies on IO or
232    /// timers will not work unless there is another thread currently calling
233    /// [`Runtime::block_on`] on the same runtime.
234    ///
235    /// # If the runtime has been shut down
236    ///
237    /// If the `Handle`'s associated `Runtime` has been shut down (through
238    /// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by
239    /// dropping it) and `Handle::block_on` is used it might return an error or
240    /// panic. Specifically IO resources will return an error and timers will
241    /// panic. Runtime independent futures will run as normal.
242    ///
243    /// # Panics
244    ///
245    /// This function panics if the provided future panics, if called within an
246    /// asynchronous execution context, or if a timer future is executed on a
247    /// runtime that has been shut down.
248    ///
249    /// # Examples
250    ///
251    /// ```
252    /// use tokio::runtime::Runtime;
253    ///
254    /// // Create the runtime
255    /// let rt  = Runtime::new().unwrap();
256    ///
257    /// // Get a handle from this runtime
258    /// let handle = rt.handle();
259    ///
260    /// // Execute the future, blocking the current thread until completion
261    /// handle.block_on(async {
262    ///     println!("hello");
263    /// });
264    /// ```
265    ///
266    /// Or using `Handle::current`:
267    ///
268    /// ```
269    /// use tokio::runtime::Handle;
270    ///
271    /// #[tokio::main]
272    /// async fn main () {
273    ///     let handle = Handle::current();
274    ///     std::thread::spawn(move || {
275    ///         // Using Handle::block_on to run async code in the new thread.
276    ///         handle.block_on(async {
277    ///             println!("hello");
278    ///         });
279    ///     });
280    /// }
281    /// ```
282    ///
283    /// [`JoinError`]: struct@crate::task::JoinError
284    /// [`JoinHandle`]: struct@crate::task::JoinHandle
285    /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
286    /// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background
287    /// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
288    /// [`spawn_blocking`]: crate::task::spawn_blocking
289    /// [`tokio::fs`]: crate::fs
290    /// [`tokio::net`]: crate::net
291    /// [`tokio::time`]: crate::time
292    #[track_caller]
293    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
294        #[cfg(all(
295            tokio_unstable,
296            tokio_taskdump,
297            feature = "rt",
298            target_os = "linux",
299            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
300        ))]
301        let future = super::task::trace::Trace::root(future);
302
303        #[cfg(all(tokio_unstable, feature = "tracing"))]
304        let future =
305            crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64());
306
307        // Enter the runtime context. This sets the current driver handles and
308        // prevents blocking an existing runtime.
309        context::enter_runtime(&self.inner, true, |blocking| {
310            blocking.block_on(future).expect("failed to park thread")
311        })
312    }
313
314    #[track_caller]
315    pub(crate) fn spawn_named<F>(&self, future: F, _name: Option<&str>) -> JoinHandle<F::Output>
316    where
317        F: Future + Send + 'static,
318        F::Output: Send + 'static,
319    {
320        let id = crate::runtime::task::Id::next();
321        #[cfg(all(
322            tokio_unstable,
323            tokio_taskdump,
324            feature = "rt",
325            target_os = "linux",
326            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
327        ))]
328        let future = super::task::trace::Trace::root(future);
329        #[cfg(all(tokio_unstable, feature = "tracing"))]
330        let future = crate::util::trace::task(future, "task", _name, id.as_u64());
331        self.inner.spawn(future, id)
332    }
333
334    /// Returns the flavor of the current `Runtime`.
335    ///
336    /// # Examples
337    ///
338    /// ```
339    /// use tokio::runtime::{Handle, RuntimeFlavor};
340    ///
341    /// #[tokio::main(flavor = "current_thread")]
342    /// async fn main() {
343    ///   assert_eq!(RuntimeFlavor::CurrentThread, Handle::current().runtime_flavor());
344    /// }
345    /// ```
346    ///
347    /// ```
348    /// use tokio::runtime::{Handle, RuntimeFlavor};
349    ///
350    /// #[tokio::main(flavor = "multi_thread", worker_threads = 4)]
351    /// async fn main() {
352    ///   assert_eq!(RuntimeFlavor::MultiThread, Handle::current().runtime_flavor());
353    /// }
354    /// ```
355    pub fn runtime_flavor(&self) -> RuntimeFlavor {
356        match self.inner {
357            scheduler::Handle::CurrentThread(_) => RuntimeFlavor::CurrentThread,
358            #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
359            scheduler::Handle::MultiThread(_) => RuntimeFlavor::MultiThread,
360            #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
361            scheduler::Handle::MultiThreadAlt(_) => RuntimeFlavor::MultiThreadAlt,
362        }
363    }
364
365    cfg_unstable! {
366        /// Returns the [`Id`] of the current `Runtime`.
367        ///
368        /// # Examples
369        ///
370        /// ```
371        /// use tokio::runtime::Handle;
372        ///
373        /// #[tokio::main(flavor = "current_thread")]
374        /// async fn main() {
375        ///   println!("Current runtime id: {}", Handle::current().id());
376        /// }
377        /// ```
378        ///
379        /// **Note**: This is an [unstable API][unstable]. The public API of this type
380        /// may break in 1.x releases. See [the documentation on unstable
381        /// features][unstable] for details.
382        ///
383        /// [unstable]: crate#unstable-features
384        /// [`Id`]: struct@crate::runtime::Id
385        pub fn id(&self) -> runtime::Id {
386            let owned_id = match &self.inner {
387                scheduler::Handle::CurrentThread(handle) => handle.owned_id(),
388                #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
389                scheduler::Handle::MultiThread(handle) => handle.owned_id(),
390                #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
391                scheduler::Handle::MultiThreadAlt(handle) => handle.owned_id(),
392            };
393            owned_id.into()
394        }
395    }
396}
397
398cfg_metrics! {
399    use crate::runtime::RuntimeMetrics;
400
401    impl Handle {
402        /// Returns a view that lets you get information about how the runtime
403        /// is performing.
404        pub fn metrics(&self) -> RuntimeMetrics {
405            RuntimeMetrics::new(self.clone())
406        }
407    }
408}
409
410cfg_taskdump! {
411    impl Handle {
412        /// Captures a snapshot of the runtime's state.
413        ///
414        /// This functionality is experimental, and comes with a number of
415        /// requirements and limitations.
416        ///
417        /// # Examples
418        ///
419        /// This can be used to get call traces of each task in the runtime.
420        /// Calls to `Handle::dump` should usually be enclosed in a
421        /// [timeout][crate::time::timeout], so that dumping does not escalate a
422        /// single blocked runtime thread into an entirely blocked runtime.
423        ///
424        /// ```
425        /// # use tokio::runtime::Runtime;
426        /// # fn dox() {
427        /// # let rt = Runtime::new().unwrap();
428        /// # rt.spawn(async {
429        /// use tokio::runtime::Handle;
430        /// use tokio::time::{timeout, Duration};
431        ///
432        /// // Inside an async block or function.
433        /// let handle = Handle::current();
434        /// if let Ok(dump) = timeout(Duration::from_secs(2), handle.dump()).await {
435        ///     for (i, task) in dump.tasks().iter().enumerate() {
436        ///         let trace = task.trace();
437        ///         println!("TASK {i}:");
438        ///         println!("{trace}\n");
439        ///     }
440        /// }
441        /// # });
442        /// # }
443        /// ```
444        ///
445        /// This produces highly detailed traces of tasks; e.g.:
446        ///
447        /// ```plain
448        /// TASK 0:
449        /// ╼ dump::main::{{closure}}::a::{{closure}} at /tokio/examples/dump.rs:18:20
450        /// └╼ dump::main::{{closure}}::b::{{closure}} at /tokio/examples/dump.rs:23:20
451        ///    └╼ dump::main::{{closure}}::c::{{closure}} at /tokio/examples/dump.rs:28:24
452        ///       └╼ tokio::sync::barrier::Barrier::wait::{{closure}} at /tokio/tokio/src/sync/barrier.rs:129:10
453        ///          └╼ <tokio::util::trace::InstrumentedAsyncOp<F> as core::future::future::Future>::poll at /tokio/tokio/src/util/trace.rs:77:46
454        ///             └╼ tokio::sync::barrier::Barrier::wait_internal::{{closure}} at /tokio/tokio/src/sync/barrier.rs:183:36
455        ///                └╼ tokio::sync::watch::Receiver<T>::changed::{{closure}} at /tokio/tokio/src/sync/watch.rs:604:55
456        ///                   └╼ tokio::sync::watch::changed_impl::{{closure}} at /tokio/tokio/src/sync/watch.rs:755:18
457        ///                      └╼ <tokio::sync::notify::Notified as core::future::future::Future>::poll at /tokio/tokio/src/sync/notify.rs:1103:9
458        ///                         └╼ tokio::sync::notify::Notified::poll_notified at /tokio/tokio/src/sync/notify.rs:996:32
459        /// ```
460        ///
461        /// # Requirements
462        ///
463        /// ## Debug Info Must Be Available
464        ///
465        /// To produce task traces, the application must **not** be compiled
466        /// with `split debuginfo`. On Linux, including `debuginfo` within the
467        /// application binary is the (correct) default. You can further ensure
468        /// this behavior with the following directive in your `Cargo.toml`:
469        ///
470        /// ```toml
471        /// [profile.*]
472        /// split-debuginfo = "off"
473        /// ```
474        ///
475        /// ## Unstable Features
476        ///
477        /// This functionality is **unstable**, and requires both the
478        /// `tokio_unstable` and `tokio_taskdump` `cfg` flags to be set.
479        ///
480        /// You can do this by setting the `RUSTFLAGS` environment variable
481        /// before invoking `cargo`; e.g.:
482        /// ```bash
483        /// RUSTFLAGS="--cfg tokio_unstable --cfg tokio_taskdump" cargo run --example dump
484        /// ```
485        ///
486        /// Or by [configuring][cargo-config] `rustflags` in
487        /// `.cargo/config.toml`:
488        /// ```text
489        /// [build]
490        /// rustflags = ["--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"]
491        /// ```
492        ///
493        /// [cargo-config]:
494        ///     https://doc.rust-lang.org/cargo/reference/config.html
495        ///
496        /// ## Platform Requirements
497        ///
498        /// Task dumps are supported on Linux atop `aarch64`, `x86` and `x86_64`.
499        ///
500        /// ## Current Thread Runtime Requirements
501        ///
502        /// On the `current_thread` runtime, task dumps may only be requested
503        /// from *within* the context of the runtime being dumped. Do not, for
504        /// example, await `Handle::dump()` on a different runtime.
505        ///
506        /// # Limitations
507        ///
508        /// ## Performance
509        ///
510        /// Although enabling the `tokio_taskdump` feature imposes virtually no
511        /// additional runtime overhead, actually calling `Handle::dump` is
512        /// expensive. The runtime must synchronize and pause its workers, then
513        /// re-poll every task in a special tracing mode. Avoid requesting dumps
514        /// often.
515        ///
516        /// ## Local Executors
517        ///
518        /// Tasks managed by local executors (e.g., `FuturesUnordered` and
519        /// [`LocalSet`][crate::task::LocalSet]) may not appear in task dumps.
520        ///
521        /// ## Non-Termination When Workers Are Blocked
522        ///
523        /// The future produced by `Handle::dump` may never produce `Ready` if
524        /// another runtime worker is blocked for more than 250ms. This may
525        /// occur if a dump is requested during shutdown, or if another runtime
526        /// worker is infinite looping or synchronously deadlocked. For these
527        /// reasons, task dumping should usually be paired with an explicit
528        /// [timeout][crate::time::timeout].
529        pub async fn dump(&self) -> crate::runtime::Dump {
530            match &self.inner {
531                scheduler::Handle::CurrentThread(handle) => handle.dump(),
532                #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
533                scheduler::Handle::MultiThread(handle) => {
534                    // perform the trace in a separate thread so that the
535                    // trace itself does not appear in the taskdump.
536                    let handle = handle.clone();
537                    spawn_thread(async {
538                        let handle = handle;
539                        handle.dump().await
540                    }).await
541                },
542                #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
543                scheduler::Handle::MultiThreadAlt(_) => panic!("task dump not implemented for this runtime flavor"),
544            }
545        }
546
547        /// Produces `true` if the current task is being traced for a dump;
548        /// otherwise false. This function is only public for integration
549        /// testing purposes. Do not rely on it.
550        #[doc(hidden)]
551        pub fn is_tracing() -> bool {
552            super::task::trace::Context::is_tracing()
553        }
554    }
555
556    cfg_rt_multi_thread! {
557        /// Spawn a new thread and asynchronously await on its result.
558        async fn spawn_thread<F>(f: F) -> <F as Future>::Output
559        where
560            F: Future + Send + 'static,
561            <F as Future>::Output: Send + 'static
562        {
563            let (tx, rx) = crate::sync::oneshot::channel();
564            crate::loom::thread::spawn(|| {
565                let rt = crate::runtime::Builder::new_current_thread().build().unwrap();
566                rt.block_on(async {
567                    let _ = tx.send(f.await);
568                });
569            });
570            rx.await.unwrap()
571        }
572    }
573}
574
575/// Error returned by `try_current` when no Runtime has been started
576#[derive(Debug)]
577pub struct TryCurrentError {
578    kind: TryCurrentErrorKind,
579}
580
581impl TryCurrentError {
582    pub(crate) fn new_no_context() -> Self {
583        Self {
584            kind: TryCurrentErrorKind::NoContext,
585        }
586    }
587
588    pub(crate) fn new_thread_local_destroyed() -> Self {
589        Self {
590            kind: TryCurrentErrorKind::ThreadLocalDestroyed,
591        }
592    }
593
594    /// Returns true if the call failed because there is currently no runtime in
595    /// the Tokio context.
596    pub fn is_missing_context(&self) -> bool {
597        matches!(self.kind, TryCurrentErrorKind::NoContext)
598    }
599
600    /// Returns true if the call failed because the Tokio context thread-local
601    /// had been destroyed. This can usually only happen if in the destructor of
602    /// other thread-locals.
603    pub fn is_thread_local_destroyed(&self) -> bool {
604        matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed)
605    }
606}
607
608enum TryCurrentErrorKind {
609    NoContext,
610    ThreadLocalDestroyed,
611}
612
613impl fmt::Debug for TryCurrentErrorKind {
614    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
615        match self {
616            TryCurrentErrorKind::NoContext => f.write_str("NoContext"),
617            TryCurrentErrorKind::ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"),
618        }
619    }
620}
621
622impl fmt::Display for TryCurrentError {
623    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624        use TryCurrentErrorKind as E;
625        match self.kind {
626            E::NoContext => f.write_str(CONTEXT_MISSING_ERROR),
627            E::ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR),
628        }
629    }
630}
631
632impl error::Error for TryCurrentError {}