broker_tokio/time/
timeout.rs

1//! Allows a future to execute for a maximum amount of time.
2//!
3//! See [`Timeout`] documentation for more details.
4//!
5//! [`Timeout`]: struct.Timeout.html
6
7use crate::time::{delay_until, Delay, Duration, Instant};
8
9use std::fmt;
10use std::future::Future;
11use std::pin::Pin;
12use std::task::{self, Poll};
13
14/// Require a `Future` to complete before the specified duration has elapsed.
15///
16/// If the future completes before the duration has elapsed, then the completed
17/// value is returned. Otherwise, an error is returned.
18///
19/// # Cancelation
20///
21/// Cancelling a timeout is done by dropping the future. No additional cleanup
22/// or other work is required.
23///
24/// The original future may be obtained by calling [`Timeout::into_inner`]. This
25/// consumes the `Timeout`.
26///
27/// # Examples
28///
29/// Create a new `Timeout` set to expire in 10 milliseconds.
30///
31/// ```rust
32/// use tokio::time::timeout;
33/// use tokio::sync::oneshot;
34///
35/// use std::time::Duration;
36///
37/// # async fn dox() {
38/// let (tx, rx) = oneshot::channel();
39/// # tx.send(()).unwrap();
40///
41/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
42/// if let Err(_) = timeout(Duration::from_millis(10), rx).await {
43///     println!("did not receive value within 10 ms");
44/// }
45/// # }
46/// ```
47pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T>
48where
49    T: Future,
50{
51    let delay = Delay::new_timeout(Instant::now() + duration, duration);
52    Timeout::new_with_delay(future, delay)
53}
54
55/// Require a `Future` to complete before the specified instant in time.
56///
57/// If the future completes before the instant is reached, then the completed
58/// value is returned. Otherwise, an error is returned.
59///
60/// # Cancelation
61///
62/// Cancelling a timeout is done by dropping the future. No additional cleanup
63/// or other work is required.
64///
65/// The original future may be obtained by calling [`Timeout::into_inner`]. This
66/// consumes the `Timeout`.
67///
68/// # Examples
69///
70/// Create a new `Timeout` set to expire in 10 milliseconds.
71///
72/// ```rust
73/// use tokio::time::{Instant, timeout_at};
74/// use tokio::sync::oneshot;
75///
76/// use std::time::Duration;
77///
78/// # async fn dox() {
79/// let (tx, rx) = oneshot::channel();
80/// # tx.send(()).unwrap();
81///
82/// // Wrap the future with a `Timeout` set to expire 10 milliseconds into the
83/// // future.
84/// if let Err(_) = timeout_at(Instant::now() + Duration::from_millis(10), rx).await {
85///     println!("did not receive value within 10 ms");
86/// }
87/// # }
88/// ```
89pub fn timeout_at<T>(deadline: Instant, future: T) -> Timeout<T>
90where
91    T: Future,
92{
93    let delay = delay_until(deadline);
94
95    Timeout {
96        value: future,
97        delay,
98    }
99}
100
101/// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at).
102#[must_use = "futures do nothing unless you `.await` or poll them"]
103#[derive(Debug)]
104pub struct Timeout<T> {
105    value: T,
106    delay: Delay,
107}
108
109/// Error returned by `Timeout`.
110#[derive(Debug)]
111pub struct Elapsed(());
112
113impl<T> Timeout<T> {
114    pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> {
115        Timeout { value, delay }
116    }
117
118    /// Gets a reference to the underlying value in this timeout.
119    pub fn get_ref(&self) -> &T {
120        &self.value
121    }
122
123    /// Gets a mutable reference to the underlying value in this timeout.
124    pub fn get_mut(&mut self) -> &mut T {
125        &mut self.value
126    }
127
128    /// Consumes this timeout, returning the underlying value.
129    pub fn into_inner(self) -> T {
130        self.value
131    }
132}
133
134impl<T> Future for Timeout<T>
135where
136    T: Future,
137{
138    type Output = Result<T::Output, Elapsed>;
139
140    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
141        // First, try polling the future
142
143        // Safety: we never move `self.value`
144        unsafe {
145            let p = self.as_mut().map_unchecked_mut(|me| &mut me.value);
146            if let Poll::Ready(v) = p.poll(cx) {
147                return Poll::Ready(Ok(v));
148            }
149        }
150
151        // Now check the timer
152        // Safety: X_X!
153        unsafe {
154            match self.map_unchecked_mut(|me| &mut me.delay).poll(cx) {
155                Poll::Ready(()) => Poll::Ready(Err(Elapsed(()))),
156                Poll::Pending => Poll::Pending,
157            }
158        }
159    }
160}
161
162// ===== impl Elapsed =====
163
164impl fmt::Display for Elapsed {
165    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
166        "deadline has elapsed".fmt(fmt)
167    }
168}
169
170impl std::error::Error for Elapsed {}
171
172impl From<Elapsed> for std::io::Error {
173    fn from(_err: Elapsed) -> std::io::Error {
174        std::io::ErrorKind::TimedOut.into()
175    }
176}