hyper_util/rt/
tokio.rs

1//! [`tokio`] runtime components integration for [`hyper`].
2//!
3//! [`hyper::rt`] exposes a set of traits to allow hyper to be agnostic to
4//! its underlying asynchronous runtime. This submodule provides glue for
5//! [`tokio`] users to bridge those types to [`hyper`]'s interfaces.
6//!
7//! # IO
8//!
9//! [`hyper`] abstracts over asynchronous readers and writers using [`Read`]
10//! and [`Write`], while [`tokio`] abstracts over this using [`AsyncRead`]
11//! and [`AsyncWrite`]. This submodule provides a collection of IO adaptors
12//! to bridge these two IO ecosystems together: [`TokioIo<I>`],
13//! [`WithHyperIo<I>`], and [`WithTokioIo<I>`].
14//!
15//! To compare and constrast these IO adaptors and to help explain which
16//! is the proper choice for your needs, here is a table showing which IO
17//! traits these implement, given two types `T` and `H` which implement
18//! Tokio's and Hyper's corresponding IO traits:
19//!
20//! |                    | [`AsyncRead`]    | [`AsyncWrite`]    | [`Read`]     | [`Write`]    |
21//! |--------------------|------------------|-------------------|--------------|--------------|
22//! | `T`                | ✅ **true**      | ✅ **true**       | ❌ **false** | ❌ **false** |
23//! | `H`                | ❌ **false**     | ❌ **false**      | ✅ **true**  | ✅ **true**  |
24//! | [`TokioIo<T>`]     | ❌ **false**     | ❌ **false**      | ✅ **true**  | ✅ **true**  |
25//! | [`TokioIo<H>`]     | ✅ **true**      | ✅ **true**       | ❌ **false** | ❌ **false** |
26//! | [`WithHyperIo<T>`] | ✅ **true**      | ✅ **true**       | ✅ **true**  | ✅ **true**  |
27//! | [`WithHyperIo<H>`] | ❌ **false**     | ❌ **false**      | ❌ **false** | ❌ **false** |
28//! | [`WithTokioIo<T>`] | ❌ **false**     | ❌ **false**      | ❌ **false** | ❌ **false** |
29//! | [`WithTokioIo<H>`] | ✅ **true**      | ✅ **true**       | ✅ **true**  | ✅ **true**  |
30//!
31//! For most situations, [`TokioIo<I>`] is the proper choice. This should be
32//! constructed, wrapping some underlying [`hyper`] or [`tokio`] IO, at the
33//! call-site of a function like [`hyper::client::conn::http1::handshake`].
34//!
35//! [`TokioIo<I>`] switches across these ecosystems, but notably does not
36//! preserve the existing IO trait implementations of its underlying IO. If
37//! one wishes to _extend_ IO with additional implementations,
38//! [`WithHyperIo<I>`] and [`WithTokioIo<I>`] are the correct choice.
39//!
40//! For example, a Tokio reader/writer can be wrapped in [`WithHyperIo<I>`].
41//! That will implement _both_ sets of IO traits. Conversely,
42//! [`WithTokioIo<I>`] will implement both sets of IO traits given a
43//! reader/writer that implements Hyper's [`Read`] and [`Write`].
44//!
45//! See [`tokio::io`] and ["_Asynchronous IO_"][tokio-async-docs] for more
46//! information.
47//!
48//! [`AsyncRead`]: tokio::io::AsyncRead
49//! [`AsyncWrite`]: tokio::io::AsyncWrite
50//! [`Read`]: hyper::rt::Read
51//! [`Write`]: hyper::rt::Write
52//! [tokio-async-docs]: https://docs.rs/tokio/latest/tokio/#asynchronous-io
53
54use std::{
55    future::Future,
56    pin::Pin,
57    task::{Context, Poll},
58    time::{Duration, Instant},
59};
60
61use hyper::rt::{Executor, Sleep, Timer};
62use pin_project_lite::pin_project;
63
64#[cfg(feature = "tracing")]
65use tracing::instrument::Instrument;
66
67pub use self::{with_hyper_io::WithHyperIo, with_tokio_io::WithTokioIo};
68
69mod with_hyper_io;
70mod with_tokio_io;
71
72/// Future executor that utilises `tokio` threads.
73#[non_exhaustive]
74#[derive(Default, Debug, Clone)]
75pub struct TokioExecutor {}
76
77pin_project! {
78    /// A wrapper that implements Tokio's IO traits for an inner type that
79    /// implements hyper's IO traits, or vice versa (implements hyper's IO
80    /// traits for a type that implements Tokio's IO traits).
81    #[derive(Debug)]
82    pub struct TokioIo<T> {
83        #[pin]
84        inner: T,
85    }
86}
87
88/// A Timer that uses the tokio runtime.
89#[non_exhaustive]
90#[derive(Default, Clone, Debug)]
91pub struct TokioTimer;
92
93// Use TokioSleep to get tokio::time::Sleep to implement Unpin.
94// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
95pin_project! {
96    #[derive(Debug)]
97    struct TokioSleep {
98        #[pin]
99        inner: tokio::time::Sleep,
100    }
101}
102
103// ===== impl TokioExecutor =====
104
105impl<Fut> Executor<Fut> for TokioExecutor
106where
107    Fut: Future + Send + 'static,
108    Fut::Output: Send + 'static,
109{
110    fn execute(&self, fut: Fut) {
111        #[cfg(feature = "tracing")]
112        tokio::spawn(fut.in_current_span());
113
114        #[cfg(not(feature = "tracing"))]
115        tokio::spawn(fut);
116    }
117}
118
119impl TokioExecutor {
120    /// Create new executor that relies on [`tokio::spawn`] to execute futures.
121    pub fn new() -> Self {
122        Self {}
123    }
124}
125
126// ==== impl TokioIo =====
127
128impl<T> TokioIo<T> {
129    /// Wrap a type implementing Tokio's or hyper's IO traits.
130    pub fn new(inner: T) -> Self {
131        Self { inner }
132    }
133
134    /// Borrow the inner type.
135    pub fn inner(&self) -> &T {
136        &self.inner
137    }
138
139    /// Mut borrow the inner type.
140    pub fn inner_mut(&mut self) -> &mut T {
141        &mut self.inner
142    }
143
144    /// Consume this wrapper and get the inner type.
145    pub fn into_inner(self) -> T {
146        self.inner
147    }
148}
149
150impl<T> hyper::rt::Read for TokioIo<T>
151where
152    T: tokio::io::AsyncRead,
153{
154    fn poll_read(
155        self: Pin<&mut Self>,
156        cx: &mut Context<'_>,
157        mut buf: hyper::rt::ReadBufCursor<'_>,
158    ) -> Poll<Result<(), std::io::Error>> {
159        let n = unsafe {
160            let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
161            match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
162                Poll::Ready(Ok(())) => tbuf.filled().len(),
163                other => return other,
164            }
165        };
166
167        unsafe {
168            buf.advance(n);
169        }
170        Poll::Ready(Ok(()))
171    }
172}
173
174impl<T> hyper::rt::Write for TokioIo<T>
175where
176    T: tokio::io::AsyncWrite,
177{
178    fn poll_write(
179        self: Pin<&mut Self>,
180        cx: &mut Context<'_>,
181        buf: &[u8],
182    ) -> Poll<Result<usize, std::io::Error>> {
183        tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
184    }
185
186    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
187        tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
188    }
189
190    fn poll_shutdown(
191        self: Pin<&mut Self>,
192        cx: &mut Context<'_>,
193    ) -> Poll<Result<(), std::io::Error>> {
194        tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
195    }
196
197    fn is_write_vectored(&self) -> bool {
198        tokio::io::AsyncWrite::is_write_vectored(&self.inner)
199    }
200
201    fn poll_write_vectored(
202        self: Pin<&mut Self>,
203        cx: &mut Context<'_>,
204        bufs: &[std::io::IoSlice<'_>],
205    ) -> Poll<Result<usize, std::io::Error>> {
206        tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
207    }
208}
209
210impl<T> tokio::io::AsyncRead for TokioIo<T>
211where
212    T: hyper::rt::Read,
213{
214    fn poll_read(
215        self: Pin<&mut Self>,
216        cx: &mut Context<'_>,
217        tbuf: &mut tokio::io::ReadBuf<'_>,
218    ) -> Poll<Result<(), std::io::Error>> {
219        //let init = tbuf.initialized().len();
220        let filled = tbuf.filled().len();
221        let sub_filled = unsafe {
222            let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());
223
224            match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) {
225                Poll::Ready(Ok(())) => buf.filled().len(),
226                other => return other,
227            }
228        };
229
230        let n_filled = filled + sub_filled;
231        // At least sub_filled bytes had to have been initialized.
232        let n_init = sub_filled;
233        unsafe {
234            tbuf.assume_init(n_init);
235            tbuf.set_filled(n_filled);
236        }
237
238        Poll::Ready(Ok(()))
239    }
240}
241
242impl<T> tokio::io::AsyncWrite for TokioIo<T>
243where
244    T: hyper::rt::Write,
245{
246    fn poll_write(
247        self: Pin<&mut Self>,
248        cx: &mut Context<'_>,
249        buf: &[u8],
250    ) -> Poll<Result<usize, std::io::Error>> {
251        hyper::rt::Write::poll_write(self.project().inner, cx, buf)
252    }
253
254    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
255        hyper::rt::Write::poll_flush(self.project().inner, cx)
256    }
257
258    fn poll_shutdown(
259        self: Pin<&mut Self>,
260        cx: &mut Context<'_>,
261    ) -> Poll<Result<(), std::io::Error>> {
262        hyper::rt::Write::poll_shutdown(self.project().inner, cx)
263    }
264
265    fn is_write_vectored(&self) -> bool {
266        hyper::rt::Write::is_write_vectored(&self.inner)
267    }
268
269    fn poll_write_vectored(
270        self: Pin<&mut Self>,
271        cx: &mut Context<'_>,
272        bufs: &[std::io::IoSlice<'_>],
273    ) -> Poll<Result<usize, std::io::Error>> {
274        hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
275    }
276}
277
278// ==== impl TokioTimer =====
279
280impl Timer for TokioTimer {
281    fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
282        Box::pin(TokioSleep {
283            inner: tokio::time::sleep(duration),
284        })
285    }
286
287    fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
288        Box::pin(TokioSleep {
289            inner: tokio::time::sleep_until(deadline.into()),
290        })
291    }
292
293    fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
294        if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() {
295            sleep.reset(new_deadline)
296        }
297    }
298}
299
300impl TokioTimer {
301    /// Create a new TokioTimer
302    pub fn new() -> Self {
303        Self {}
304    }
305}
306
307impl Future for TokioSleep {
308    type Output = ();
309
310    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
311        self.project().inner.poll(cx)
312    }
313}
314
315impl Sleep for TokioSleep {}
316
317impl TokioSleep {
318    fn reset(self: Pin<&mut Self>, deadline: Instant) {
319        self.project().inner.as_mut().reset(deadline.into());
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use crate::rt::TokioExecutor;
326    use hyper::rt::Executor;
327    use tokio::sync::oneshot;
328
329    #[cfg(not(miri))]
330    #[tokio::test]
331    async fn simple_execute() -> Result<(), Box<dyn std::error::Error>> {
332        let (tx, rx) = oneshot::channel();
333        let executor = TokioExecutor::new();
334        executor.execute(async move {
335            tx.send(()).unwrap();
336        });
337        rx.await.map_err(Into::into)
338    }
339}