wasmtime_wasi/
runtime.rs

1//! This module provides an "ambient Tokio runtime"
2//! [`with_ambient_tokio_runtime`]. Embedders of wasmtime-wasi may do so from
3//! synchronous Rust, and not use tokio directly. The implementation of
4//! wasmtime-wasi requires a tokio executor in a way that is [deeply tied to
5//! its
6//! design](https://github.com/bytecodealliance/wasmtime/issues/7973#issuecomment-1960513214).
7//! When used from a synchronous wasmtime context, this module provides the
8//! wrapper function [`in_tokio`] used throughout the shim implementations of
9//! synchronous component binding `Host` traits in terms of the async ones.
10//!
11//! This module also provides a thin wrapper on tokio's tasks.
12//! [`AbortOnDropJoinHandle`], which is exactly like a
13//! [`tokio::task::JoinHandle`] except for the obvious behavioral change. This
14//! whole crate, and any child crates which spawn tasks as part of their
15//! implementations, should please use this crate's [`spawn`] and
16//! [`spawn_blocking`] over tokio's. so we wanted the type name to stick out
17//! if someone misses it.
18//!
19//! Each of these facilities should be used by dependencies of wasmtime-wasi
20//! which when implementing component bindings.
21
22use std::future::Future;
23use std::pin::Pin;
24use std::sync::LazyLock;
25use std::task::{Context, Poll};
26
27pub(crate) static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
28    tokio::runtime::Builder::new_multi_thread()
29        .enable_time()
30        .enable_io()
31        .build()
32        .unwrap()
33});
34
35/// Exactly like a [`tokio::task::JoinHandle`], except that it aborts the task when
36/// the handle is dropped.
37///
38/// This behavior makes it easier to tie a worker task to the lifetime of a Resource
39/// by keeping this handle owned by the Resource.
40#[derive(Debug)]
41pub struct AbortOnDropJoinHandle<T>(tokio::task::JoinHandle<T>);
42impl<T> AbortOnDropJoinHandle<T> {
43    /// Abort the task and wait for it to finish. Optionally returns the result
44    /// of the task if it ran to completion prior to being aborted.
45    pub(crate) async fn cancel(mut self) -> Option<T> {
46        self.0.abort();
47
48        match (&mut self.0).await {
49            Ok(value) => Some(value),
50            Err(err) if err.is_cancelled() => None,
51            Err(err) => std::panic::resume_unwind(err.into_panic()),
52        }
53    }
54}
55impl<T> Drop for AbortOnDropJoinHandle<T> {
56    fn drop(&mut self) {
57        self.0.abort()
58    }
59}
60impl<T> std::ops::Deref for AbortOnDropJoinHandle<T> {
61    type Target = tokio::task::JoinHandle<T>;
62    fn deref(&self) -> &Self::Target {
63        &self.0
64    }
65}
66impl<T> std::ops::DerefMut for AbortOnDropJoinHandle<T> {
67    fn deref_mut(&mut self) -> &mut tokio::task::JoinHandle<T> {
68        &mut self.0
69    }
70}
71impl<T> From<tokio::task::JoinHandle<T>> for AbortOnDropJoinHandle<T> {
72    fn from(jh: tokio::task::JoinHandle<T>) -> Self {
73        AbortOnDropJoinHandle(jh)
74    }
75}
76impl<T> Future for AbortOnDropJoinHandle<T> {
77    type Output = T;
78    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79        match Pin::new(&mut self.as_mut().0).poll(cx) {
80            Poll::Pending => Poll::Pending,
81            Poll::Ready(r) => Poll::Ready(r.expect("child task panicked")),
82        }
83    }
84}
85
86pub fn spawn<F>(f: F) -> AbortOnDropJoinHandle<F::Output>
87where
88    F: Future + Send + 'static,
89    F::Output: Send + 'static,
90{
91    let j = with_ambient_tokio_runtime(|| tokio::task::spawn(f));
92    AbortOnDropJoinHandle(j)
93}
94
95pub fn spawn_blocking<F, R>(f: F) -> AbortOnDropJoinHandle<R>
96where
97    F: FnOnce() -> R + Send + 'static,
98    R: Send + 'static,
99{
100    let j = with_ambient_tokio_runtime(|| tokio::task::spawn_blocking(f));
101    AbortOnDropJoinHandle(j)
102}
103
104pub fn in_tokio<F: Future>(f: F) -> F::Output {
105    match tokio::runtime::Handle::try_current() {
106        Ok(h) => {
107            let _enter = h.enter();
108            h.block_on(f)
109        }
110        // The `yield_now` here is non-obvious and if you're reading this
111        // you're likely curious about why it's here. This is currently required
112        // to get some features of "sync mode" working correctly, such as with
113        // the CLI. To illustrate why this is required, consider a program
114        // organized as:
115        //
116        // * A program has a `pollable` that it's waiting on.
117        // * This `pollable` is always ready .
118        // * Actually making the corresponding operation ready, however,
119        //   requires some background work on Tokio's part.
120        // * The program is looping on "wait for readiness" coupled with
121        //   performing the operation.
122        //
123        // In this situation this program ends up infinitely looping in waiting
124        // for pollables. The reason appears to be that when we enter the tokio
125        // runtime here it doesn't necessary yield to background work because
126        // the provided future `f` is ready immediately. The future `f` will run
127        // through the list of pollables and determine one of them is ready.
128        //
129        // Historically this happened with UDP sockets. A test send a datagram
130        // from one socket to another and the other socket infinitely didn't
131        // receive the data. This appeared to be because the server socket was
132        // waiting on `READABLE | WRITABLE` (which is itself a bug but ignore
133        // that) and the socket was currently in the "writable" state but never
134        // ended up receiving a notification for the "readable" state. Moving
135        // the socket to "readable" would require Tokio to perform some
136        // background work via epoll/kqueue/handle events but if the future
137        // provided here is always ready, then that never happened.
138        //
139        // Thus the `yield_now()` is an attempt to force Tokio to go do some
140        // background work eventually and look at new interest masks for
141        // example. This is a bit of a kludge but everything's already a bit
142        // wonky in synchronous mode anyway. Note that this is hypothesized to
143        // not be an issue in async mode because async mode typically has the
144        // Tokio runtime in a separate thread or otherwise participating in a
145        // larger application, it's only here in synchronous mode where we
146        // effectively own the runtime that we need some special care.
147        Err(_) => {
148            let _enter = RUNTIME.enter();
149            RUNTIME.block_on(async move {
150                tokio::task::yield_now().await;
151                f.await
152            })
153        }
154    }
155}
156
157/// Executes the closure `f` with an "ambient Tokio runtime" which basically
158/// means that if code in `f` tries to get a runtime `Handle` it'll succeed.
159///
160/// If a `Handle` is already available, e.g. in async contexts, then `f` is run
161/// immediately. Otherwise for synchronous contexts this crate's fallback
162/// runtime is configured and then `f` is executed.
163pub fn with_ambient_tokio_runtime<R>(f: impl FnOnce() -> R) -> R {
164    match tokio::runtime::Handle::try_current() {
165        Ok(_) => f(),
166        Err(_) => {
167            let _enter = RUNTIME.enter();
168            f()
169        }
170    }
171}
172
173/// Attempts to get the result of a `future`.
174///
175/// This function does not block and will poll the provided future once. If the
176/// result is here then `Some` is returned, otherwise `None` is returned.
177///
178/// Note that by polling `future` this means that `future` must be re-polled
179/// later if it's to wake up a task.
180pub fn poll_noop<F>(future: Pin<&mut F>) -> Option<F::Output>
181where
182    F: Future,
183{
184    let mut task = Context::from_waker(futures::task::noop_waker_ref());
185    match future.poll(&mut task) {
186        Poll::Ready(result) => Some(result),
187        Poll::Pending => None,
188    }
189}