wasm_timer/timer/
ext.rs

1//! Extension traits for the standard `Stream` and `Future` traits.
2
3use std::io;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use futures::prelude::*;
9use pin_utils::unsafe_pinned;
10
11use crate::{Delay, Instant};
12
13/// An extension trait for futures which provides convenient accessors for
14/// timing out execution and such.
15pub trait TryFutureExt: TryFuture + Sized {
16    /// Creates a new future which will take at most `dur` time to resolve from
17    /// the point at which this method is called.
18    ///
19    /// This combinator creates a new future which wraps the receiving future
20    /// in a timeout. The future returned will resolve in at most `dur` time
21    /// specified (relative to when this function is called).
22    ///
23    /// If the future completes before `dur` elapses then the future will
24    /// resolve with that item. Otherwise the future will resolve to an error
25    /// once `dur` has elapsed.
26    ///
27    /// # Examples
28    ///
29    /// ```no_run
30    /// use std::time::Duration;
31    /// use futures::prelude::*;
32    /// use wasm_timer::TryFutureExt;
33    ///
34    /// # fn long_future() -> impl TryFuture<Ok = (), Error = std::io::Error> {
35    /// #     futures::future::ok(())
36    /// # }
37    /// #
38    /// fn main() {
39    ///     let future = long_future();
40    ///     let timed_out = future.timeout(Duration::from_secs(1));
41    ///
42    ///     async_std::task::block_on(async {
43    ///         match timed_out.await {
44    ///             Ok(item) => println!("got {:?} within enough time!", item),
45    ///             Err(_) => println!("took too long to produce the item"),
46    ///         }
47    ///     })
48    /// }
49    /// ```
50    fn timeout(self, dur: Duration) -> Timeout<Self>
51    where
52        Self::Error: From<io::Error>,
53    {
54        Timeout {
55            timeout: Delay::new(dur),
56            future: self,
57        }
58    }
59
60    /// Creates a new future which will resolve no later than `at` specified.
61    ///
62    /// This method is otherwise equivalent to the `timeout` method except that
63    /// it tweaks the moment at when the timeout elapsed to being specified with
64    /// an absolute value rather than a relative one. For more documentation see
65    /// the `timeout` method.
66    fn timeout_at(self, at: Instant) -> Timeout<Self>
67    where
68        Self::Error: From<io::Error>,
69    {
70        Timeout {
71            timeout: Delay::new_at(at),
72            future: self,
73        }
74    }
75}
76
77impl<F: TryFuture> TryFutureExt for F {}
78
79/// Future returned by the `FutureExt::timeout` method.
80#[derive(Debug)]
81pub struct Timeout<F>
82where
83    F: TryFuture,
84    F::Error: From<io::Error>,
85{
86    future: F,
87    timeout: Delay,
88}
89
90impl<F> Timeout<F>
91where
92    F: TryFuture,
93    F::Error: From<io::Error>,
94{
95    unsafe_pinned!(future: F);
96    unsafe_pinned!(timeout: Delay);
97}
98
99impl<F> Future for Timeout<F>
100where
101    F: TryFuture,
102    F::Error: From<io::Error>,
103{
104    type Output = Result<F::Ok, F::Error>;
105
106    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107        match self.as_mut().future().try_poll(cx) {
108            Poll::Pending => {}
109            other => return other,
110        }
111
112        if self.timeout().poll(cx).is_ready() {
113            let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into());
114            Poll::Ready(err)
115        } else {
116            Poll::Pending
117        }
118    }
119}
120
121/// An extension trait for streams which provides convenient accessors for
122/// timing out execution and such.
123pub trait TryStreamExt: TryStream + Sized {
124    /// Creates a new stream which will take at most `dur` time to yield each
125    /// item of the stream.
126    ///
127    /// This combinator creates a new stream which wraps the receiving stream
128    /// in a timeout-per-item. The stream returned will resolve in at most
129    /// `dur` time for each item yielded from the stream. The first item's timer
130    /// starts when this method is called.
131    ///
132    /// If a stream's item completes before `dur` elapses then the timer will be
133    /// reset for the next item. If the timeout elapses, however, then an error
134    /// will be yielded on the stream and the timer will be reset.
135    fn timeout(self, dur: Duration) -> TimeoutStream<Self>
136    where
137        Self::Error: From<io::Error>,
138    {
139        TimeoutStream {
140            timeout: Delay::new(dur),
141            dur,
142            stream: self,
143        }
144    }
145}
146
147impl<S: TryStream> TryStreamExt for S {}
148
149/// Stream returned by the `StreamExt::timeout` method.
150#[derive(Debug)]
151pub struct TimeoutStream<S>
152where
153    S: TryStream,
154    S::Error: From<io::Error>,
155{
156    timeout: Delay,
157    dur: Duration,
158    stream: S,
159}
160
161impl<S> TimeoutStream<S>
162where
163    S: TryStream,
164    S::Error: From<io::Error>,
165{
166    unsafe_pinned!(timeout: Delay);
167    unsafe_pinned!(stream: S);
168}
169
170impl<S> Stream for TimeoutStream<S>
171where
172    S: TryStream,
173    S::Error: From<io::Error>,
174{
175    type Item = Result<S::Ok, S::Error>;
176
177    fn poll_next(
178        mut self: Pin<&mut Self>,
179        cx: &mut Context<'_>,
180    ) -> Poll<Option<Self::Item>> {
181        let dur = self.dur;
182
183        let r = self.as_mut().stream().try_poll_next(cx);
184        match r {
185            Poll::Pending => {}
186            other => {
187                self.as_mut().timeout().reset(dur);
188                return other;
189            }
190        }
191
192        if self.as_mut().timeout().poll(cx).is_ready() {
193            self.as_mut().timeout().reset(dur);
194            Poll::Ready(Some(Err(io::Error::new(
195                io::ErrorKind::TimedOut,
196                "stream item timed out",
197            )
198            .into())))
199        } else {
200            Poll::Pending
201        }
202    }
203}