wasmtime_wasi/runtime.rs
1//! This module provides an "ambient Tokio runtime"
2//! [`with_ambient_tokio_runtime`]. Embedders of wasmtime-wasi may do so from
3//! synchronous Rust, and not use tokio directly. The implementation of
4//! wasmtime-wasi requires a tokio executor in a way that is [deeply tied to
5//! its
6//! design](https://github.com/bytecodealliance/wasmtime/issues/7973#issuecomment-1960513214).
7//! When used from a synchronous wasmtime context, this module provides the
8//! wrapper function [`in_tokio`] used throughout the shim implementations of
9//! synchronous component binding `Host` traits in terms of the async ones.
10//!
11//! This module also provides a thin wrapper on tokio's tasks.
12//! [`AbortOnDropJoinHandle`], which is exactly like a
13//! [`tokio::task::JoinHandle`] except for the obvious behavioral change. This
14//! whole crate, and any child crates which spawn tasks as part of their
15//! implementations, should please use this crate's [`spawn`] and
16//! [`spawn_blocking`] over tokio's. so we wanted the type name to stick out
17//! if someone misses it.
18//!
19//! Each of these facilities should be used by dependencies of wasmtime-wasi
20//! which when implementing component bindings.
21
22use std::future::Future;
23use std::pin::Pin;
24use std::sync::LazyLock;
25use std::task::{Context, Poll};
26
27pub(crate) static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
28 tokio::runtime::Builder::new_multi_thread()
29 .enable_time()
30 .enable_io()
31 .build()
32 .unwrap()
33});
34
35/// Exactly like a [`tokio::task::JoinHandle`], except that it aborts the task when
36/// the handle is dropped.
37///
38/// This behavior makes it easier to tie a worker task to the lifetime of a Resource
39/// by keeping this handle owned by the Resource.
40#[derive(Debug)]
41pub struct AbortOnDropJoinHandle<T>(tokio::task::JoinHandle<T>);
42impl<T> AbortOnDropJoinHandle<T> {
43 /// Abort the task and wait for it to finish. Optionally returns the result
44 /// of the task if it ran to completion prior to being aborted.
45 pub(crate) async fn cancel(mut self) -> Option<T> {
46 self.0.abort();
47
48 match (&mut self.0).await {
49 Ok(value) => Some(value),
50 Err(err) if err.is_cancelled() => None,
51 Err(err) => std::panic::resume_unwind(err.into_panic()),
52 }
53 }
54}
55impl<T> Drop for AbortOnDropJoinHandle<T> {
56 fn drop(&mut self) {
57 self.0.abort()
58 }
59}
60impl<T> std::ops::Deref for AbortOnDropJoinHandle<T> {
61 type Target = tokio::task::JoinHandle<T>;
62 fn deref(&self) -> &Self::Target {
63 &self.0
64 }
65}
66impl<T> std::ops::DerefMut for AbortOnDropJoinHandle<T> {
67 fn deref_mut(&mut self) -> &mut tokio::task::JoinHandle<T> {
68 &mut self.0
69 }
70}
71impl<T> From<tokio::task::JoinHandle<T>> for AbortOnDropJoinHandle<T> {
72 fn from(jh: tokio::task::JoinHandle<T>) -> Self {
73 AbortOnDropJoinHandle(jh)
74 }
75}
76impl<T> Future for AbortOnDropJoinHandle<T> {
77 type Output = T;
78 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79 match Pin::new(&mut self.as_mut().0).poll(cx) {
80 Poll::Pending => Poll::Pending,
81 Poll::Ready(r) => Poll::Ready(r.expect("child task panicked")),
82 }
83 }
84}
85
86pub fn spawn<F>(f: F) -> AbortOnDropJoinHandle<F::Output>
87where
88 F: Future + Send + 'static,
89 F::Output: Send + 'static,
90{
91 let j = with_ambient_tokio_runtime(|| tokio::task::spawn(f));
92 AbortOnDropJoinHandle(j)
93}
94
95pub fn spawn_blocking<F, R>(f: F) -> AbortOnDropJoinHandle<R>
96where
97 F: FnOnce() -> R + Send + 'static,
98 R: Send + 'static,
99{
100 let j = with_ambient_tokio_runtime(|| tokio::task::spawn_blocking(f));
101 AbortOnDropJoinHandle(j)
102}
103
104pub fn in_tokio<F: Future>(f: F) -> F::Output {
105 match tokio::runtime::Handle::try_current() {
106 Ok(h) => {
107 let _enter = h.enter();
108 h.block_on(f)
109 }
110 // The `yield_now` here is non-obvious and if you're reading this
111 // you're likely curious about why it's here. This is currently required
112 // to get some features of "sync mode" working correctly, such as with
113 // the CLI. To illustrate why this is required, consider a program
114 // organized as:
115 //
116 // * A program has a `pollable` that it's waiting on.
117 // * This `pollable` is always ready .
118 // * Actually making the corresponding operation ready, however,
119 // requires some background work on Tokio's part.
120 // * The program is looping on "wait for readiness" coupled with
121 // performing the operation.
122 //
123 // In this situation this program ends up infinitely looping in waiting
124 // for pollables. The reason appears to be that when we enter the tokio
125 // runtime here it doesn't necessary yield to background work because
126 // the provided future `f` is ready immediately. The future `f` will run
127 // through the list of pollables and determine one of them is ready.
128 //
129 // Historically this happened with UDP sockets. A test send a datagram
130 // from one socket to another and the other socket infinitely didn't
131 // receive the data. This appeared to be because the server socket was
132 // waiting on `READABLE | WRITABLE` (which is itself a bug but ignore
133 // that) and the socket was currently in the "writable" state but never
134 // ended up receiving a notification for the "readable" state. Moving
135 // the socket to "readable" would require Tokio to perform some
136 // background work via epoll/kqueue/handle events but if the future
137 // provided here is always ready, then that never happened.
138 //
139 // Thus the `yield_now()` is an attempt to force Tokio to go do some
140 // background work eventually and look at new interest masks for
141 // example. This is a bit of a kludge but everything's already a bit
142 // wonky in synchronous mode anyway. Note that this is hypothesized to
143 // not be an issue in async mode because async mode typically has the
144 // Tokio runtime in a separate thread or otherwise participating in a
145 // larger application, it's only here in synchronous mode where we
146 // effectively own the runtime that we need some special care.
147 Err(_) => {
148 let _enter = RUNTIME.enter();
149 RUNTIME.block_on(async move {
150 tokio::task::yield_now().await;
151 f.await
152 })
153 }
154 }
155}
156
157/// Executes the closure `f` with an "ambient Tokio runtime" which basically
158/// means that if code in `f` tries to get a runtime `Handle` it'll succeed.
159///
160/// If a `Handle` is already available, e.g. in async contexts, then `f` is run
161/// immediately. Otherwise for synchronous contexts this crate's fallback
162/// runtime is configured and then `f` is executed.
163pub fn with_ambient_tokio_runtime<R>(f: impl FnOnce() -> R) -> R {
164 match tokio::runtime::Handle::try_current() {
165 Ok(_) => f(),
166 Err(_) => {
167 let _enter = RUNTIME.enter();
168 f()
169 }
170 }
171}
172
173/// Attempts to get the result of a `future`.
174///
175/// This function does not block and will poll the provided future once. If the
176/// result is here then `Some` is returned, otherwise `None` is returned.
177///
178/// Note that by polling `future` this means that `future` must be re-polled
179/// later if it's to wake up a task.
180pub fn poll_noop<F>(future: Pin<&mut F>) -> Option<F::Output>
181where
182 F: Future,
183{
184 let mut task = Context::from_waker(futures::task::noop_waker_ref());
185 match future.poll(&mut task) {
186 Poll::Ready(result) => Some(result),
187 Poll::Pending => None,
188 }
189}