wasm_timer/timer/ext.rs
1//! Extension traits for the standard `Stream` and `Future` traits.
2
3use std::io;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use futures::prelude::*;
9use pin_utils::unsafe_pinned;
10
11use crate::{Delay, Instant};
12
13/// An extension trait for futures which provides convenient accessors for
14/// timing out execution and such.
15pub trait TryFutureExt: TryFuture + Sized {
16 /// Creates a new future which will take at most `dur` time to resolve from
17 /// the point at which this method is called.
18 ///
19 /// This combinator creates a new future which wraps the receiving future
20 /// in a timeout. The future returned will resolve in at most `dur` time
21 /// specified (relative to when this function is called).
22 ///
23 /// If the future completes before `dur` elapses then the future will
24 /// resolve with that item. Otherwise the future will resolve to an error
25 /// once `dur` has elapsed.
26 ///
27 /// # Examples
28 ///
29 /// ```no_run
30 /// use std::time::Duration;
31 /// use futures::prelude::*;
32 /// use wasm_timer::TryFutureExt;
33 ///
34 /// # fn long_future() -> impl TryFuture<Ok = (), Error = std::io::Error> {
35 /// # futures::future::ok(())
36 /// # }
37 /// #
38 /// fn main() {
39 /// let future = long_future();
40 /// let timed_out = future.timeout(Duration::from_secs(1));
41 ///
42 /// async_std::task::block_on(async {
43 /// match timed_out.await {
44 /// Ok(item) => println!("got {:?} within enough time!", item),
45 /// Err(_) => println!("took too long to produce the item"),
46 /// }
47 /// })
48 /// }
49 /// ```
50 fn timeout(self, dur: Duration) -> Timeout<Self>
51 where
52 Self::Error: From<io::Error>,
53 {
54 Timeout {
55 timeout: Delay::new(dur),
56 future: self,
57 }
58 }
59
60 /// Creates a new future which will resolve no later than `at` specified.
61 ///
62 /// This method is otherwise equivalent to the `timeout` method except that
63 /// it tweaks the moment at when the timeout elapsed to being specified with
64 /// an absolute value rather than a relative one. For more documentation see
65 /// the `timeout` method.
66 fn timeout_at(self, at: Instant) -> Timeout<Self>
67 where
68 Self::Error: From<io::Error>,
69 {
70 Timeout {
71 timeout: Delay::new_at(at),
72 future: self,
73 }
74 }
75}
76
77impl<F: TryFuture> TryFutureExt for F {}
78
79/// Future returned by the `FutureExt::timeout` method.
80#[derive(Debug)]
81pub struct Timeout<F>
82where
83 F: TryFuture,
84 F::Error: From<io::Error>,
85{
86 future: F,
87 timeout: Delay,
88}
89
90impl<F> Timeout<F>
91where
92 F: TryFuture,
93 F::Error: From<io::Error>,
94{
95 unsafe_pinned!(future: F);
96 unsafe_pinned!(timeout: Delay);
97}
98
99impl<F> Future for Timeout<F>
100where
101 F: TryFuture,
102 F::Error: From<io::Error>,
103{
104 type Output = Result<F::Ok, F::Error>;
105
106 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107 match self.as_mut().future().try_poll(cx) {
108 Poll::Pending => {}
109 other => return other,
110 }
111
112 if self.timeout().poll(cx).is_ready() {
113 let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into());
114 Poll::Ready(err)
115 } else {
116 Poll::Pending
117 }
118 }
119}
120
121/// An extension trait for streams which provides convenient accessors for
122/// timing out execution and such.
123pub trait TryStreamExt: TryStream + Sized {
124 /// Creates a new stream which will take at most `dur` time to yield each
125 /// item of the stream.
126 ///
127 /// This combinator creates a new stream which wraps the receiving stream
128 /// in a timeout-per-item. The stream returned will resolve in at most
129 /// `dur` time for each item yielded from the stream. The first item's timer
130 /// starts when this method is called.
131 ///
132 /// If a stream's item completes before `dur` elapses then the timer will be
133 /// reset for the next item. If the timeout elapses, however, then an error
134 /// will be yielded on the stream and the timer will be reset.
135 fn timeout(self, dur: Duration) -> TimeoutStream<Self>
136 where
137 Self::Error: From<io::Error>,
138 {
139 TimeoutStream {
140 timeout: Delay::new(dur),
141 dur,
142 stream: self,
143 }
144 }
145}
146
147impl<S: TryStream> TryStreamExt for S {}
148
149/// Stream returned by the `StreamExt::timeout` method.
150#[derive(Debug)]
151pub struct TimeoutStream<S>
152where
153 S: TryStream,
154 S::Error: From<io::Error>,
155{
156 timeout: Delay,
157 dur: Duration,
158 stream: S,
159}
160
161impl<S> TimeoutStream<S>
162where
163 S: TryStream,
164 S::Error: From<io::Error>,
165{
166 unsafe_pinned!(timeout: Delay);
167 unsafe_pinned!(stream: S);
168}
169
170impl<S> Stream for TimeoutStream<S>
171where
172 S: TryStream,
173 S::Error: From<io::Error>,
174{
175 type Item = Result<S::Ok, S::Error>;
176
177 fn poll_next(
178 mut self: Pin<&mut Self>,
179 cx: &mut Context<'_>,
180 ) -> Poll<Option<Self::Item>> {
181 let dur = self.dur;
182
183 let r = self.as_mut().stream().try_poll_next(cx);
184 match r {
185 Poll::Pending => {}
186 other => {
187 self.as_mut().timeout().reset(dur);
188 return other;
189 }
190 }
191
192 if self.as_mut().timeout().poll(cx).is_ready() {
193 self.as_mut().timeout().reset(dur);
194 Poll::Ready(Some(Err(io::Error::new(
195 io::ErrorKind::TimedOut,
196 "stream item timed out",
197 )
198 .into())))
199 } else {
200 Poll::Pending
201 }
202 }
203}