tokio_timer/timeout.rs
1//! Allows a future or stream to execute for a maximum amount of time.
2//!
3//! See [`Timeout`] documentation for more details.
4//!
5//! [`Timeout`]: struct.Timeout.html
6
7use clock::now;
8use Delay;
9
10use futures::{Async, Future, Poll, Stream};
11
12use std::error;
13use std::fmt;
14use std::time::{Duration, Instant};
15
16/// Allows a `Future` or `Stream` to execute for a limited amount of time.
17///
18/// If the future or stream completes before the timeout has expired, then
19/// `Timeout` returns the completed value. Otherwise, `Timeout` returns an
20/// [`Error`].
21///
22/// # Futures and Streams
23///
24/// The exact behavor depends on if the inner value is a `Future` or a `Stream`.
25/// In the case of a `Future`, `Timeout` will require the future to complete by
26/// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item
27/// to take the entire timeout before returning an error.
28///
29/// In order to set an upper bound on the processing of the *entire* stream,
30/// then a timeout should be set on the future that processes the stream. For
31/// example:
32///
33/// ```rust
34/// # extern crate futures;
35/// # extern crate tokio;
36/// // import the `timeout` function, usually this is done
37/// // with `use tokio::prelude::*`
38/// use tokio::prelude::FutureExt;
39/// use futures::Stream;
40/// use futures::sync::mpsc;
41/// use std::time::Duration;
42///
43/// # fn main() {
44/// let (tx, rx) = mpsc::unbounded();
45/// # tx.unbounded_send(()).unwrap();
46/// # drop(tx);
47///
48/// let process = rx.for_each(|item| {
49/// // do something with `item`
50/// # drop(item);
51/// # Ok(())
52/// });
53///
54/// # tokio::runtime::current_thread::block_on_all(
55/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
56/// process.timeout(Duration::from_millis(10))
57/// # ).unwrap();
58/// # }
59/// ```
60///
61/// # Cancelation
62///
63/// Cancelling a `Timeout` is done by dropping the value. No additional cleanup
64/// or other work is required.
65///
66/// The original future or stream may be obtained by calling [`Timeout::into_inner`]. This
67/// consumes the `Timeout`.
68///
69/// [`Error`]: struct.Error.html
70/// [`Timeout::into_inner`]: struct.Timeout.html#method.into_iter
71#[must_use = "futures do nothing unless polled"]
72#[derive(Debug)]
73pub struct Timeout<T> {
74 value: T,
75 delay: Delay,
76}
77
78/// Error returned by `Timeout`.
79#[derive(Debug)]
80pub struct Error<T>(Kind<T>);
81
82/// Timeout error variants
83#[derive(Debug)]
84enum Kind<T> {
85 /// Inner value returned an error
86 Inner(T),
87
88 /// The timeout elapsed.
89 Elapsed,
90
91 /// Timer returned an error.
92 Timer(::Error),
93}
94
95impl<T> Timeout<T> {
96 /// Create a new `Timeout` that allows `value` to execute for a duration of
97 /// at most `timeout`.
98 ///
99 /// The exact behavior depends on if `value` is a `Future` or a `Stream`.
100 ///
101 /// See [type] level documentation for more details.
102 ///
103 /// [type]: #
104 ///
105 /// # Examples
106 ///
107 /// Create a new `Timeout` set to expire in 10 milliseconds.
108 ///
109 /// ```rust
110 /// # extern crate futures;
111 /// # extern crate tokio;
112 /// use tokio::timer::Timeout;
113 /// use futures::Future;
114 /// use futures::sync::oneshot;
115 /// use std::time::Duration;
116 ///
117 /// # fn main() {
118 /// let (tx, rx) = oneshot::channel();
119 /// # tx.send(()).unwrap();
120 ///
121 /// # tokio::runtime::current_thread::block_on_all(
122 /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
123 /// Timeout::new(rx, Duration::from_millis(10))
124 /// # ).unwrap();
125 /// # }
126 /// ```
127 pub fn new(value: T, timeout: Duration) -> Timeout<T> {
128 let delay = Delay::new_timeout(now() + timeout, timeout);
129 Timeout::new_with_delay(value, delay)
130 }
131
132 pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> {
133 Timeout { value, delay }
134 }
135
136 /// Gets a reference to the underlying value in this timeout.
137 pub fn get_ref(&self) -> &T {
138 &self.value
139 }
140
141 /// Gets a mutable reference to the underlying value in this timeout.
142 pub fn get_mut(&mut self) -> &mut T {
143 &mut self.value
144 }
145
146 /// Consumes this timeout, returning the underlying value.
147 pub fn into_inner(self) -> T {
148 self.value
149 }
150}
151
152impl<T: Future> Timeout<T> {
153 /// Create a new `Timeout` that completes when `future` completes or when
154 /// `deadline` is reached.
155 ///
156 /// This function differs from `new` in that:
157 ///
158 /// * It only accepts `Future` arguments.
159 /// * It sets an explicit `Instant` at which the timeout expires.
160 pub fn new_at(future: T, deadline: Instant) -> Timeout<T> {
161 let delay = Delay::new(deadline);
162
163 Timeout {
164 value: future,
165 delay,
166 }
167 }
168}
169
170impl<T> Future for Timeout<T>
171where
172 T: Future,
173{
174 type Item = T::Item;
175 type Error = Error<T::Error>;
176
177 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
178 // First, try polling the future
179 match self.value.poll() {
180 Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
181 Ok(Async::NotReady) => {}
182 Err(e) => return Err(Error::inner(e)),
183 }
184
185 // Now check the timer
186 match self.delay.poll() {
187 Ok(Async::NotReady) => Ok(Async::NotReady),
188 Ok(Async::Ready(_)) => Err(Error::elapsed()),
189 Err(e) => Err(Error::timer(e)),
190 }
191 }
192}
193
194impl<T> Stream for Timeout<T>
195where
196 T: Stream,
197{
198 type Item = T::Item;
199 type Error = Error<T::Error>;
200
201 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
202 // First, try polling the future
203 match self.value.poll() {
204 Ok(Async::Ready(v)) => {
205 if v.is_some() {
206 self.delay.reset_timeout();
207 }
208 return Ok(Async::Ready(v));
209 }
210 Ok(Async::NotReady) => {}
211 Err(e) => return Err(Error::inner(e)),
212 }
213
214 // Now check the timer
215 match self.delay.poll() {
216 Ok(Async::NotReady) => Ok(Async::NotReady),
217 Ok(Async::Ready(_)) => {
218 self.delay.reset_timeout();
219 Err(Error::elapsed())
220 }
221 Err(e) => Err(Error::timer(e)),
222 }
223 }
224}
225
226// ===== impl Error =====
227
228impl<T> Error<T> {
229 /// Create a new `Error` representing the inner value completing with `Err`.
230 pub fn inner(err: T) -> Error<T> {
231 Error(Kind::Inner(err))
232 }
233
234 /// Returns `true` if the error was caused by the inner value completing
235 /// with `Err`.
236 pub fn is_inner(&self) -> bool {
237 match self.0 {
238 Kind::Inner(_) => true,
239 _ => false,
240 }
241 }
242
243 /// Consumes `self`, returning the inner future error.
244 pub fn into_inner(self) -> Option<T> {
245 match self.0 {
246 Kind::Inner(err) => Some(err),
247 _ => None,
248 }
249 }
250
251 /// Create a new `Error` representing the inner value not completing before
252 /// the deadline is reached.
253 pub fn elapsed() -> Error<T> {
254 Error(Kind::Elapsed)
255 }
256
257 /// Returns `true` if the error was caused by the inner value not completing
258 /// before the deadline is reached.
259 pub fn is_elapsed(&self) -> bool {
260 match self.0 {
261 Kind::Elapsed => true,
262 _ => false,
263 }
264 }
265
266 /// Creates a new `Error` representing an error encountered by the timer
267 /// implementation
268 pub fn timer(err: ::Error) -> Error<T> {
269 Error(Kind::Timer(err))
270 }
271
272 /// Returns `true` if the error was caused by the timer.
273 pub fn is_timer(&self) -> bool {
274 match self.0 {
275 Kind::Timer(_) => true,
276 _ => false,
277 }
278 }
279
280 /// Consumes `self`, returning the error raised by the timer implementation.
281 pub fn into_timer(self) -> Option<::Error> {
282 match self.0 {
283 Kind::Timer(err) => Some(err),
284 _ => None,
285 }
286 }
287}
288
289impl<T: error::Error> error::Error for Error<T> {
290 fn description(&self) -> &str {
291 use self::Kind::*;
292
293 match self.0 {
294 Inner(ref e) => e.description(),
295 Elapsed => "deadline has elapsed",
296 Timer(ref e) => e.description(),
297 }
298 }
299}
300
301impl<T: fmt::Display> fmt::Display for Error<T> {
302 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
303 use self::Kind::*;
304
305 match self.0 {
306 Inner(ref e) => e.fmt(fmt),
307 Elapsed => "deadline has elapsed".fmt(fmt),
308 Timer(ref e) => e.fmt(fmt),
309 }
310 }
311}