futures_time/stream/stream_ext.rs
1use crate::channel::Parker;
2use crate::future::{IntoFuture, Timer};
3
4use futures_core::Stream;
5
6use super::{Buffer, Debounce, Delay, IntoStream, Park, Sample, Throttle, Timeout};
7
8/// Extend `Stream` with time-based operations.
9pub trait StreamExt: Stream {
10 /// Yield the last item received at the end of each interval.
11 ///
12 /// If no items have been received during an interval, the stream will not
13 /// yield any items. In addition to using a time-based interval, this method can take any
14 /// stream as a source. This enables throttling based on alternative event
15 /// sources, such as variable-rate timers.
16 ///
17 /// See also [`throttle()`] and [`debounce()`].
18 ///
19 /// [`throttle()`]: StreamExt::throttle
20 /// [`debounce()`]: `StreamExt::debounce`
21 ///
22 /// # Data Loss
23 ///
24 /// This method will discard data between intervals. Though the
25 /// discarded items will have their destuctors run, __using this method
26 /// incorrectly may lead to unintended data loss__. This method is best used
27 /// to reduce the number of _duplicate_ items after the first has been
28 /// received, such as repeated mouse clicks or key presses. This method may
29 /// lead to unintended data loss when used to discard _unique_ items, such
30 /// as network request.
31 ///
32 /// # Example
33 ///
34 /// ```
35 /// use futures_lite::prelude::*;
36 /// use futures_time::prelude::*;
37 /// use futures_time::time::{Instant, Duration};
38 /// use futures_time::stream;
39 ///
40 /// fn main() {
41 /// async_io::block_on(async {
42 /// let mut counter = 0;
43 /// stream::interval(Duration::from_millis(100))
44 /// .take(4)
45 /// .sample(Duration::from_millis(200))
46 /// .for_each(|_| counter += 1)
47 /// .await;
48 ///
49 /// assert_eq!(counter, 2);
50 /// })
51 /// }
52 /// ```
53 fn sample<I>(self, interval: I) -> Sample<Self, I::IntoStream>
54 where
55 Self: Sized,
56 I: IntoStream,
57 {
58 Sample::new(self, interval.into_stream())
59 }
60
61 /// Group items into vectors which are yielded at every interval.
62 ///
63 /// In addition to using a time source as a deadline, any stream can be used as a
64 /// deadline too. This enables more interesting buffer strategies to be
65 /// built on top of this primitive.
66 ///
67 /// # Future Improvements
68 ///
69 /// - Lending iterators would allow for internal reusing of the buffer.
70 /// Though different from `Iterator::windows`, it could be more efficient.
71 /// - Contexts/capabilities would enable custom allocators to be used.
72 ///
73 /// # Example
74 ///
75 /// ```
76 /// use futures_lite::prelude::*;
77 /// use futures_time::prelude::*;
78 /// use futures_time::time::{Instant, Duration};
79 /// use futures_time::stream;
80 ///
81 /// fn main() {
82 /// async_io::block_on(async {
83 /// let mut counter = 0;
84 /// stream::interval(Duration::from_millis(5))
85 /// .take(10)
86 /// .buffer(Duration::from_millis(20))
87 /// .for_each(|buf| counter += buf.len())
88 /// .await;
89 ///
90 /// assert_eq!(counter, 10);
91 /// })
92 /// }
93 /// ```
94 fn buffer<I>(self, interval: I) -> Buffer<Self, I::IntoStream>
95 where
96 Self: Sized,
97 I: IntoStream,
98 {
99 Buffer::new(self, interval.into_stream())
100 }
101
102 /// Yield the last item received at the end of a window which resets with
103 /// each item received.
104 ///
105 /// Every time an item is yielded by the underlying stream, the window is
106 /// reset. Once the window expires, the last item seen will be yielded. This
107 /// means that in order to yield an item, no items must be received for the
108 /// entire window, or else the window will reset.
109 ///
110 /// This method is useful to perform actions at the end of bursts of events,
111 /// where performing that same action on _every_ event might not be
112 /// economical.
113 ///
114 /// See also [`sample()`] and [`throttle()`].
115 ///
116 /// [`sample()`]: `StreamExt::sample`
117 /// [`throttle()`]: `StreamExt::throttle`
118 ///
119 /// # Example
120 ///
121 /// ```
122 /// use futures_lite::prelude::*;
123 /// use futures_time::prelude::*;
124 /// use futures_time::time::{Instant, Duration};
125 /// use futures_time::stream;
126 ///
127 /// fn main() {
128 /// async_io::block_on(async {
129 /// let mut counter = 0;
130 /// stream::interval(Duration::from_millis(10))
131 /// .take(10)
132 /// .debounce(Duration::from_millis(20)) // the window is greater than the interval
133 /// .for_each(|_| counter += 1)
134 /// .await;
135 ///
136 /// assert_eq!(counter, 1); // so only the last item is received
137 /// })
138 /// }
139 /// ```
140 fn debounce<D>(self, window: D) -> Debounce<Self, D::IntoFuture>
141 where
142 Self: Sized,
143 D: IntoFuture,
144 D::IntoFuture: Timer,
145 {
146 Debounce::new(self, window.into_future())
147 }
148
149 /// Delay the yielding of items from the stream until the given deadline.
150 ///
151 /// The underlying stream will not be polled until the deadline has expired. In addition
152 /// to using a time source as a deadline, any future can be used as a
153 /// deadline too. When used in combination with a multi-consumer channel,
154 /// this method can be used to synchronize the start of multiple streams and futures.
155 ///
156 /// # Example
157 ///
158 /// ```
159 /// use futures_lite::prelude::*;
160 /// use futures_time::prelude::*;
161 /// use futures_time::time::{Instant, Duration};
162 /// use futures_lite::stream;
163 ///
164 /// fn main() {
165 /// async_io::block_on(async {
166 /// let now = Instant::now();
167 /// let delay = Duration::from_millis(100);
168 /// let _ = stream::once("meow").delay(delay).next().await;
169 /// assert!(now.elapsed() >= *delay);
170 /// });
171 /// }
172 /// ```
173 fn delay<D>(self, deadline: D) -> Delay<Self, D::IntoFuture>
174 where
175 Self: Sized,
176 D: IntoFuture,
177 {
178 Delay::new(self, deadline.into_future())
179 }
180
181 /// Suspend or resume execution of a stream.
182 ///
183 /// When this method is called the execution of the stream will be put into
184 /// a suspended state until the channel returns `Parker::Unpark` or the
185 /// channel's senders are dropped. The underlying stream will not be polled
186 /// while the it is paused.
187 fn park<I>(self, interval: I) -> Park<Self, I::IntoStream>
188 where
189 Self: Sized,
190 I: IntoStream<Item = Parker>,
191 {
192 Park::new(self, interval.into_stream())
193 }
194
195 /// Yield an item, then ignore subsequent items for a duration.
196 ///
197 /// In addition to using a time-based interval, this method can take any
198 /// stream as a source. This enables throttling based on alternative event
199 /// sources, such as variable-rate timers.
200 ///
201 /// See also [`sample()`] and [`debounce()`].
202 ///
203 /// [`sample()`]: `StreamExt::sample`
204 /// [`debounce()`]: `StreamExt::debounce`
205 ///
206 /// # Data Loss
207 ///
208 /// This method will discard data between intervals. Though the
209 /// discarded items will have their destuctors run, __using this method
210 /// incorrectly may lead to unintended data loss__. This method is best used
211 /// to reduce the number of _duplicate_ items after the first has been
212 /// received, such as repeated mouse clicks or key presses. This method may
213 /// lead to unintended data loss when used to discard _unique_ items, such
214 /// as network request.
215 ///
216 /// # Examples
217 ///
218 /// ```
219 /// use futures_lite::prelude::*;
220 /// use futures_time::prelude::*;
221 /// use futures_time::time::Duration;
222 /// use futures_time::stream;
223 ///
224 /// fn main() {
225 /// async_io::block_on(async {
226 /// let mut counter = 0;
227 /// stream::interval(Duration::from_millis(100)) // Yield an item every 100ms
228 /// .take(4) // Stop after 4 items
229 /// .throttle(Duration::from_millis(300)) // Only let an item through every 300ms
230 /// .for_each(|_| counter += 1) // Increment a counter for each item
231 /// .await;
232 ///
233 /// assert_eq!(counter, 2);
234 /// })
235 /// }
236 /// ```
237 fn throttle<I>(self, interval: I) -> Throttle<Self, I::IntoStream>
238 where
239 Self: Sized,
240 I: IntoStream,
241 {
242 Throttle::new(self, interval.into_stream())
243 }
244
245 /// Return an error if a stream does not yield an item within a given time
246 /// span.
247 ///
248 /// Typically timeouts are, as the name implies, based on _time_. However
249 /// this method can time out based on any future. This can be useful in
250 /// combination with channels, as it allows (long-lived) streams to be
251 /// cancelled based on some external event.
252 ///
253 /// When a timeout is returned, the stream will be dropped and destructors
254 /// will be run.
255 ///
256 /// # Example
257 ///
258 /// ```
259 /// use futures_lite::prelude::*;
260 /// use futures_time::prelude::*;
261 /// use futures_time::time::{Instant, Duration};
262 /// use futures_lite::stream;
263 /// use std::io;
264 ///
265 /// fn main() {
266 /// async_io::block_on(async {
267 /// let res = stream::once("meow")
268 /// .delay(Duration::from_millis(100)) // longer delay
269 /// .timeout(Duration::from_millis(50)) // shorter timeout
270 /// .next()
271 /// .await;
272 /// assert_eq!(res.unwrap().unwrap_err().kind(), io::ErrorKind::TimedOut); // error
273 ///
274 /// let res = stream::once("meow")
275 /// .delay(Duration::from_millis(50)) // shorter delay
276 /// .timeout(Duration::from_millis(100)) // longer timeout
277 /// .next()
278 /// .await;
279 /// assert_eq!(res.unwrap().unwrap(), "meow"); // success
280 /// });
281 /// }
282 /// ```
283 fn timeout<D>(self, deadline: D) -> Timeout<Self, D::IntoFuture>
284 where
285 Self: Sized,
286 D: IntoFuture,
287 D::IntoFuture: Timer,
288 {
289 Timeout::new(self, deadline.into_future())
290 }
291}
292
293impl<S> StreamExt for S where S: Stream {}