broker_tokio/time/timeout.rs
1//! Allows a future to execute for a maximum amount of time.
2//!
3//! See [`Timeout`] documentation for more details.
4//!
5//! [`Timeout`]: struct.Timeout.html
6
7use crate::time::{delay_until, Delay, Duration, Instant};
8
9use std::fmt;
10use std::future::Future;
11use std::pin::Pin;
12use std::task::{self, Poll};
13
14/// Require a `Future` to complete before the specified duration has elapsed.
15///
16/// If the future completes before the duration has elapsed, then the completed
17/// value is returned. Otherwise, an error is returned.
18///
19/// # Cancelation
20///
21/// Cancelling a timeout is done by dropping the future. No additional cleanup
22/// or other work is required.
23///
24/// The original future may be obtained by calling [`Timeout::into_inner`]. This
25/// consumes the `Timeout`.
26///
27/// # Examples
28///
29/// Create a new `Timeout` set to expire in 10 milliseconds.
30///
31/// ```rust
32/// use tokio::time::timeout;
33/// use tokio::sync::oneshot;
34///
35/// use std::time::Duration;
36///
37/// # async fn dox() {
38/// let (tx, rx) = oneshot::channel();
39/// # tx.send(()).unwrap();
40///
41/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
42/// if let Err(_) = timeout(Duration::from_millis(10), rx).await {
43/// println!("did not receive value within 10 ms");
44/// }
45/// # }
46/// ```
47pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T>
48where
49 T: Future,
50{
51 let delay = Delay::new_timeout(Instant::now() + duration, duration);
52 Timeout::new_with_delay(future, delay)
53}
54
55/// Require a `Future` to complete before the specified instant in time.
56///
57/// If the future completes before the instant is reached, then the completed
58/// value is returned. Otherwise, an error is returned.
59///
60/// # Cancelation
61///
62/// Cancelling a timeout is done by dropping the future. No additional cleanup
63/// or other work is required.
64///
65/// The original future may be obtained by calling [`Timeout::into_inner`]. This
66/// consumes the `Timeout`.
67///
68/// # Examples
69///
70/// Create a new `Timeout` set to expire in 10 milliseconds.
71///
72/// ```rust
73/// use tokio::time::{Instant, timeout_at};
74/// use tokio::sync::oneshot;
75///
76/// use std::time::Duration;
77///
78/// # async fn dox() {
79/// let (tx, rx) = oneshot::channel();
80/// # tx.send(()).unwrap();
81///
82/// // Wrap the future with a `Timeout` set to expire 10 milliseconds into the
83/// // future.
84/// if let Err(_) = timeout_at(Instant::now() + Duration::from_millis(10), rx).await {
85/// println!("did not receive value within 10 ms");
86/// }
87/// # }
88/// ```
89pub fn timeout_at<T>(deadline: Instant, future: T) -> Timeout<T>
90where
91 T: Future,
92{
93 let delay = delay_until(deadline);
94
95 Timeout {
96 value: future,
97 delay,
98 }
99}
100
101/// Future returned by [`timeout`](timeout) and [`timeout_at`](timeout_at).
102#[must_use = "futures do nothing unless you `.await` or poll them"]
103#[derive(Debug)]
104pub struct Timeout<T> {
105 value: T,
106 delay: Delay,
107}
108
109/// Error returned by `Timeout`.
110#[derive(Debug)]
111pub struct Elapsed(());
112
113impl<T> Timeout<T> {
114 pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> {
115 Timeout { value, delay }
116 }
117
118 /// Gets a reference to the underlying value in this timeout.
119 pub fn get_ref(&self) -> &T {
120 &self.value
121 }
122
123 /// Gets a mutable reference to the underlying value in this timeout.
124 pub fn get_mut(&mut self) -> &mut T {
125 &mut self.value
126 }
127
128 /// Consumes this timeout, returning the underlying value.
129 pub fn into_inner(self) -> T {
130 self.value
131 }
132}
133
134impl<T> Future for Timeout<T>
135where
136 T: Future,
137{
138 type Output = Result<T::Output, Elapsed>;
139
140 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
141 // First, try polling the future
142
143 // Safety: we never move `self.value`
144 unsafe {
145 let p = self.as_mut().map_unchecked_mut(|me| &mut me.value);
146 if let Poll::Ready(v) = p.poll(cx) {
147 return Poll::Ready(Ok(v));
148 }
149 }
150
151 // Now check the timer
152 // Safety: X_X!
153 unsafe {
154 match self.map_unchecked_mut(|me| &mut me.delay).poll(cx) {
155 Poll::Ready(()) => Poll::Ready(Err(Elapsed(()))),
156 Poll::Pending => Poll::Pending,
157 }
158 }
159 }
160}
161
162// ===== impl Elapsed =====
163
164impl fmt::Display for Elapsed {
165 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
166 "deadline has elapsed".fmt(fmt)
167 }
168}
169
170impl std::error::Error for Elapsed {}
171
172impl From<Elapsed> for std::io::Error {
173 fn from(_err: Elapsed) -> std::io::Error {
174 std::io::ErrorKind::TimedOut.into()
175 }
176}