tokio_timer/
timeout.rs

1//! Allows a future or stream to execute for a maximum amount of time.
2//!
3//! See [`Timeout`] documentation for more details.
4//!
5//! [`Timeout`]: struct.Timeout.html
6
7use clock::now;
8use Delay;
9
10use futures::{Async, Future, Poll, Stream};
11
12use std::error;
13use std::fmt;
14use std::time::{Duration, Instant};
15
16/// Allows a `Future` or `Stream` to execute for a limited amount of time.
17///
18/// If the future or stream completes before the timeout has expired, then
19/// `Timeout` returns the completed value. Otherwise, `Timeout` returns an
20/// [`Error`].
21///
22/// # Futures and Streams
23///
24/// The exact behavor depends on if the inner value is a `Future` or a `Stream`.
25/// In the case of a `Future`, `Timeout` will require the future to complete by
26/// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item
27/// to take the entire timeout before returning an error.
28///
29/// In order to set an upper bound on the processing of the *entire* stream,
30/// then a timeout should be set on the future that processes the stream. For
31/// example:
32///
33/// ```rust
34/// # extern crate futures;
35/// # extern crate tokio;
36/// // import the `timeout` function, usually this is done
37/// // with `use tokio::prelude::*`
38/// use tokio::prelude::FutureExt;
39/// use futures::Stream;
40/// use futures::sync::mpsc;
41/// use std::time::Duration;
42///
43/// # fn main() {
44/// let (tx, rx) = mpsc::unbounded();
45/// # tx.unbounded_send(()).unwrap();
46/// # drop(tx);
47///
48/// let process = rx.for_each(|item| {
49///     // do something with `item`
50/// # drop(item);
51/// # Ok(())
52/// });
53///
54/// # tokio::runtime::current_thread::block_on_all(
55/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
56/// process.timeout(Duration::from_millis(10))
57/// # ).unwrap();
58/// # }
59/// ```
60///
61/// # Cancelation
62///
63/// Cancelling a `Timeout` is done by dropping the value. No additional cleanup
64/// or other work is required.
65///
66/// The original future or stream may be obtained by calling [`Timeout::into_inner`]. This
67/// consumes the `Timeout`.
68///
69/// [`Error`]: struct.Error.html
70/// [`Timeout::into_inner`]: struct.Timeout.html#method.into_iter
71#[must_use = "futures do nothing unless polled"]
72#[derive(Debug)]
73pub struct Timeout<T> {
74    value: T,
75    delay: Delay,
76}
77
78/// Error returned by `Timeout`.
79#[derive(Debug)]
80pub struct Error<T>(Kind<T>);
81
82/// Timeout error variants
83#[derive(Debug)]
84enum Kind<T> {
85    /// Inner value returned an error
86    Inner(T),
87
88    /// The timeout elapsed.
89    Elapsed,
90
91    /// Timer returned an error.
92    Timer(::Error),
93}
94
95impl<T> Timeout<T> {
96    /// Create a new `Timeout` that allows `value` to execute for a duration of
97    /// at most `timeout`.
98    ///
99    /// The exact behavior depends on if `value` is a `Future` or a `Stream`.
100    ///
101    /// See [type] level documentation for more details.
102    ///
103    /// [type]: #
104    ///
105    /// # Examples
106    ///
107    /// Create a new `Timeout` set to expire in 10 milliseconds.
108    ///
109    /// ```rust
110    /// # extern crate futures;
111    /// # extern crate tokio;
112    /// use tokio::timer::Timeout;
113    /// use futures::Future;
114    /// use futures::sync::oneshot;
115    /// use std::time::Duration;
116    ///
117    /// # fn main() {
118    /// let (tx, rx) = oneshot::channel();
119    /// # tx.send(()).unwrap();
120    ///
121    /// # tokio::runtime::current_thread::block_on_all(
122    /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
123    /// Timeout::new(rx, Duration::from_millis(10))
124    /// # ).unwrap();
125    /// # }
126    /// ```
127    pub fn new(value: T, timeout: Duration) -> Timeout<T> {
128        let delay = Delay::new_timeout(now() + timeout, timeout);
129        Timeout::new_with_delay(value, delay)
130    }
131
132    pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> {
133        Timeout { value, delay }
134    }
135
136    /// Gets a reference to the underlying value in this timeout.
137    pub fn get_ref(&self) -> &T {
138        &self.value
139    }
140
141    /// Gets a mutable reference to the underlying value in this timeout.
142    pub fn get_mut(&mut self) -> &mut T {
143        &mut self.value
144    }
145
146    /// Consumes this timeout, returning the underlying value.
147    pub fn into_inner(self) -> T {
148        self.value
149    }
150}
151
152impl<T: Future> Timeout<T> {
153    /// Create a new `Timeout` that completes when `future` completes or when
154    /// `deadline` is reached.
155    ///
156    /// This function differs from `new` in that:
157    ///
158    /// * It only accepts `Future` arguments.
159    /// * It sets an explicit `Instant` at which the timeout expires.
160    pub fn new_at(future: T, deadline: Instant) -> Timeout<T> {
161        let delay = Delay::new(deadline);
162
163        Timeout {
164            value: future,
165            delay,
166        }
167    }
168}
169
170impl<T> Future for Timeout<T>
171where
172    T: Future,
173{
174    type Item = T::Item;
175    type Error = Error<T::Error>;
176
177    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
178        // First, try polling the future
179        match self.value.poll() {
180            Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
181            Ok(Async::NotReady) => {}
182            Err(e) => return Err(Error::inner(e)),
183        }
184
185        // Now check the timer
186        match self.delay.poll() {
187            Ok(Async::NotReady) => Ok(Async::NotReady),
188            Ok(Async::Ready(_)) => Err(Error::elapsed()),
189            Err(e) => Err(Error::timer(e)),
190        }
191    }
192}
193
194impl<T> Stream for Timeout<T>
195where
196    T: Stream,
197{
198    type Item = T::Item;
199    type Error = Error<T::Error>;
200
201    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
202        // First, try polling the future
203        match self.value.poll() {
204            Ok(Async::Ready(v)) => {
205                if v.is_some() {
206                    self.delay.reset_timeout();
207                }
208                return Ok(Async::Ready(v));
209            }
210            Ok(Async::NotReady) => {}
211            Err(e) => return Err(Error::inner(e)),
212        }
213
214        // Now check the timer
215        match self.delay.poll() {
216            Ok(Async::NotReady) => Ok(Async::NotReady),
217            Ok(Async::Ready(_)) => {
218                self.delay.reset_timeout();
219                Err(Error::elapsed())
220            }
221            Err(e) => Err(Error::timer(e)),
222        }
223    }
224}
225
226// ===== impl Error =====
227
228impl<T> Error<T> {
229    /// Create a new `Error` representing the inner value completing with `Err`.
230    pub fn inner(err: T) -> Error<T> {
231        Error(Kind::Inner(err))
232    }
233
234    /// Returns `true` if the error was caused by the inner value completing
235    /// with `Err`.
236    pub fn is_inner(&self) -> bool {
237        match self.0 {
238            Kind::Inner(_) => true,
239            _ => false,
240        }
241    }
242
243    /// Consumes `self`, returning the inner future error.
244    pub fn into_inner(self) -> Option<T> {
245        match self.0 {
246            Kind::Inner(err) => Some(err),
247            _ => None,
248        }
249    }
250
251    /// Create a new `Error` representing the inner value not completing before
252    /// the deadline is reached.
253    pub fn elapsed() -> Error<T> {
254        Error(Kind::Elapsed)
255    }
256
257    /// Returns `true` if the error was caused by the inner value not completing
258    /// before the deadline is reached.
259    pub fn is_elapsed(&self) -> bool {
260        match self.0 {
261            Kind::Elapsed => true,
262            _ => false,
263        }
264    }
265
266    /// Creates a new `Error` representing an error encountered by the timer
267    /// implementation
268    pub fn timer(err: ::Error) -> Error<T> {
269        Error(Kind::Timer(err))
270    }
271
272    /// Returns `true` if the error was caused by the timer.
273    pub fn is_timer(&self) -> bool {
274        match self.0 {
275            Kind::Timer(_) => true,
276            _ => false,
277        }
278    }
279
280    /// Consumes `self`, returning the error raised by the timer implementation.
281    pub fn into_timer(self) -> Option<::Error> {
282        match self.0 {
283            Kind::Timer(err) => Some(err),
284            _ => None,
285        }
286    }
287}
288
289impl<T: error::Error> error::Error for Error<T> {
290    fn description(&self) -> &str {
291        use self::Kind::*;
292
293        match self.0 {
294            Inner(ref e) => e.description(),
295            Elapsed => "deadline has elapsed",
296            Timer(ref e) => e.description(),
297        }
298    }
299}
300
301impl<T: fmt::Display> fmt::Display for Error<T> {
302    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
303        use self::Kind::*;
304
305        match self.0 {
306            Inner(ref e) => e.fmt(fmt),
307            Elapsed => "deadline has elapsed".fmt(fmt),
308            Timer(ref e) => e.fmt(fmt),
309        }
310    }
311}