tokio_util/
context.rs

1//! Tokio context aware futures utilities.
2//!
3//! This module includes utilities around integrating tokio with other runtimes
4//! by allowing the context to be attached to futures. This allows spawning
5//! futures on other executors while still using tokio to drive them. This
6//! can be useful if you need to use a tokio based library in an executor/runtime
7//! that does not provide a tokio context.
8
9use pin_project_lite::pin_project;
10use std::{
11    future::Future,
12    pin::Pin,
13    task::{Context, Poll},
14};
15use tokio::runtime::{Handle, Runtime};
16
17pin_project! {
18    /// `TokioContext` allows running futures that must be inside Tokio's
19    /// context on a non-Tokio runtime.
20    ///
21    /// It contains a [`Handle`] to the runtime. A handle to the runtime can be
22    /// obtain by calling the [`Runtime::handle()`] method.
23    ///
24    /// Note that the `TokioContext` wrapper only works if the `Runtime` it is
25    /// connected to has not yet been destroyed. You must keep the `Runtime`
26    /// alive until the future has finished executing.
27    ///
28    /// **Warning:** If `TokioContext` is used together with a [current thread]
29    /// runtime, that runtime must be inside a call to `block_on` for the
30    /// wrapped future to work. For this reason, it is recommended to use a
31    /// [multi thread] runtime, even if you configure it to only spawn one
32    /// worker thread.
33    ///
34    /// # Examples
35    ///
36    /// This example creates two runtimes, but only [enables time] on one of
37    /// them. It then uses the context of the runtime with the timer enabled to
38    /// execute a [`sleep`] future on the runtime with timing disabled.
39    /// ```
40    /// use tokio::time::{sleep, Duration};
41    /// use tokio_util::context::RuntimeExt;
42    ///
43    /// // This runtime has timers enabled.
44    /// let rt = tokio::runtime::Builder::new_multi_thread()
45    ///     .enable_all()
46    ///     .build()
47    ///     .unwrap();
48    ///
49    /// // This runtime has timers disabled.
50    /// let rt2 = tokio::runtime::Builder::new_multi_thread()
51    ///     .build()
52    ///     .unwrap();
53    ///
54    /// // Wrap the sleep future in the context of rt.
55    /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await });
56    ///
57    /// // Execute the future on rt2.
58    /// rt2.block_on(fut);
59    /// ```
60    ///
61    /// [`Handle`]: struct@tokio::runtime::Handle
62    /// [`Runtime::handle()`]: fn@tokio::runtime::Runtime::handle
63    /// [`RuntimeExt`]: trait@crate::context::RuntimeExt
64    /// [`new_static`]: fn@Self::new_static
65    /// [`sleep`]: fn@tokio::time::sleep
66    /// [current thread]: fn@tokio::runtime::Builder::new_current_thread
67    /// [enables time]: fn@tokio::runtime::Builder::enable_time
68    /// [multi thread]: fn@tokio::runtime::Builder::new_multi_thread
69    pub struct TokioContext<F> {
70        #[pin]
71        inner: F,
72        handle: Handle,
73    }
74}
75
76impl<F> TokioContext<F> {
77    /// Associate the provided future with the context of the runtime behind
78    /// the provided `Handle`.
79    ///
80    /// This constructor uses a `'static` lifetime to opt-out of checking that
81    /// the runtime still exists.
82    ///
83    /// # Examples
84    ///
85    /// This is the same as the example above, but uses the `new` constructor
86    /// rather than [`RuntimeExt::wrap`].
87    ///
88    /// [`RuntimeExt::wrap`]: fn@RuntimeExt::wrap
89    ///
90    /// ```
91    /// use tokio::time::{sleep, Duration};
92    /// use tokio_util::context::TokioContext;
93    ///
94    /// // This runtime has timers enabled.
95    /// let rt = tokio::runtime::Builder::new_multi_thread()
96    ///     .enable_all()
97    ///     .build()
98    ///     .unwrap();
99    ///
100    /// // This runtime has timers disabled.
101    /// let rt2 = tokio::runtime::Builder::new_multi_thread()
102    ///     .build()
103    ///     .unwrap();
104    ///
105    /// let fut = TokioContext::new(
106    ///     async { sleep(Duration::from_millis(2)).await },
107    ///     rt.handle().clone(),
108    /// );
109    ///
110    /// // Execute the future on rt2.
111    /// rt2.block_on(fut);
112    /// ```
113    pub fn new(future: F, handle: Handle) -> TokioContext<F> {
114        TokioContext {
115            inner: future,
116            handle,
117        }
118    }
119
120    /// Obtain a reference to the handle inside this `TokioContext`.
121    pub fn handle(&self) -> &Handle {
122        &self.handle
123    }
124
125    /// Remove the association between the Tokio runtime and the wrapped future.
126    pub fn into_inner(self) -> F {
127        self.inner
128    }
129}
130
131impl<F: Future> Future for TokioContext<F> {
132    type Output = F::Output;
133
134    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
135        let me = self.project();
136        let handle = me.handle;
137        let fut = me.inner;
138
139        let _enter = handle.enter();
140        fut.poll(cx)
141    }
142}
143
144/// Extension trait that simplifies bundling a `Handle` with a `Future`.
145pub trait RuntimeExt {
146    /// Create a [`TokioContext`] that wraps the provided future and runs it in
147    /// this runtime's context.
148    ///
149    /// # Examples
150    ///
151    /// This example creates two runtimes, but only [enables time] on one of
152    /// them. It then uses the context of the runtime with the timer enabled to
153    /// execute a [`sleep`] future on the runtime with timing disabled.
154    ///
155    /// ```
156    /// use tokio::time::{sleep, Duration};
157    /// use tokio_util::context::RuntimeExt;
158    ///
159    /// // This runtime has timers enabled.
160    /// let rt = tokio::runtime::Builder::new_multi_thread()
161    ///     .enable_all()
162    ///     .build()
163    ///     .unwrap();
164    ///
165    /// // This runtime has timers disabled.
166    /// let rt2 = tokio::runtime::Builder::new_multi_thread()
167    ///     .build()
168    ///     .unwrap();
169    ///
170    /// // Wrap the sleep future in the context of rt.
171    /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await });
172    ///
173    /// // Execute the future on rt2.
174    /// rt2.block_on(fut);
175    /// ```
176    ///
177    /// [`TokioContext`]: struct@crate::context::TokioContext
178    /// [`sleep`]: fn@tokio::time::sleep
179    /// [enables time]: fn@tokio::runtime::Builder::enable_time
180    fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>;
181}
182
183impl RuntimeExt for Runtime {
184    fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> {
185        TokioContext {
186            inner: fut,
187            handle: self.handle().clone(),
188        }
189    }
190}