futures_time/stream/
stream_ext.rs

1use crate::channel::Parker;
2use crate::future::{IntoFuture, Timer};
3
4use futures_core::Stream;
5
6use super::{Buffer, Debounce, Delay, IntoStream, Park, Sample, Throttle, Timeout};
7
8/// Extend `Stream` with time-based operations.
9pub trait StreamExt: Stream {
10    /// Yield the last item received at the end of each interval.
11    ///
12    /// If no items have been received during an interval, the stream will not
13    /// yield any items. In addition to using a time-based interval, this method can take any
14    /// stream as a source. This enables throttling based on alternative event
15    /// sources, such as variable-rate timers.
16    ///
17    /// See also [`throttle()`] and [`debounce()`].
18    ///
19    /// [`throttle()`]: StreamExt::throttle
20    /// [`debounce()`]: `StreamExt::debounce`
21    ///
22    /// # Data Loss
23    ///
24    /// This method will discard data between intervals. Though the
25    /// discarded items will have their destuctors run, __using this method
26    /// incorrectly may lead to unintended data loss__. This method is best used
27    /// to reduce the number of _duplicate_ items after the first has been
28    /// received, such as repeated mouse clicks or key presses. This method may
29    /// lead to unintended data loss when used to discard _unique_ items, such
30    /// as network request.
31    ///
32    /// # Example
33    ///
34    /// ```
35    /// use futures_lite::prelude::*;
36    /// use futures_time::prelude::*;
37    /// use futures_time::time::{Instant, Duration};
38    /// use futures_time::stream;
39    ///
40    /// fn main() {
41    ///    async_io::block_on(async {
42    ///        let mut counter = 0;
43    ///        stream::interval(Duration::from_millis(100))
44    ///            .take(4)
45    ///            .sample(Duration::from_millis(200))
46    ///            .for_each(|_| counter += 1)
47    ///            .await;
48    ///
49    ///        assert_eq!(counter, 2);
50    ///    })
51    /// }
52    /// ```
53    fn sample<I>(self, interval: I) -> Sample<Self, I::IntoStream>
54    where
55        Self: Sized,
56        I: IntoStream,
57    {
58        Sample::new(self, interval.into_stream())
59    }
60
61    /// Group items into vectors which are yielded at every interval.
62    ///
63    /// In addition to using a time source as a deadline, any stream can be used as a
64    /// deadline too. This enables more interesting buffer strategies to be
65    /// built on top of this primitive.
66    ///
67    /// # Future Improvements
68    ///
69    /// - Lending iterators would allow for internal reusing of the buffer.
70    /// Though different from `Iterator::windows`, it could be more efficient.
71    /// - Contexts/capabilities would enable custom allocators to be used.
72    ///
73    /// # Example
74    ///
75    /// ```
76    /// use futures_lite::prelude::*;
77    /// use futures_time::prelude::*;
78    /// use futures_time::time::{Instant, Duration};
79    /// use futures_time::stream;
80    ///
81    /// fn main() {
82    ///     async_io::block_on(async {
83    ///         let mut counter = 0;
84    ///         stream::interval(Duration::from_millis(5))
85    ///             .take(10)
86    ///             .buffer(Duration::from_millis(20))
87    ///             .for_each(|buf| counter += buf.len())
88    ///             .await;
89    ///
90    ///         assert_eq!(counter, 10);
91    ///     })
92    /// }
93    /// ```
94    fn buffer<I>(self, interval: I) -> Buffer<Self, I::IntoStream>
95    where
96        Self: Sized,
97        I: IntoStream,
98    {
99        Buffer::new(self, interval.into_stream())
100    }
101
102    /// Yield the last item received at the end of a window which resets with
103    /// each item received.
104    ///
105    /// Every time an item is yielded by the underlying stream, the window is
106    /// reset. Once the window expires, the last item seen will be yielded. This
107    /// means that in order to yield an item, no items must be received for the
108    /// entire window, or else the window will reset.
109    ///
110    /// This method is useful to perform actions at the end of bursts of events,
111    /// where performing that same action on _every_ event might not be
112    /// economical.
113    ///
114    /// See also [`sample()`] and [`throttle()`].
115    ///
116    /// [`sample()`]: `StreamExt::sample`
117    /// [`throttle()`]: `StreamExt::throttle`
118    ///
119    /// # Example
120    ///
121    /// ```
122    /// use futures_lite::prelude::*;
123    /// use futures_time::prelude::*;
124    /// use futures_time::time::{Instant, Duration};
125    /// use futures_time::stream;
126    ///
127    /// fn main() {
128    ///     async_io::block_on(async {
129    ///         let mut counter = 0;
130    ///         stream::interval(Duration::from_millis(10))
131    ///             .take(10)
132    ///             .debounce(Duration::from_millis(20)) // the window is greater than the interval
133    ///             .for_each(|_| counter += 1)
134    ///             .await;
135    ///
136    ///         assert_eq!(counter, 1); // so only the last item is received
137    ///     })
138    /// }
139    /// ```
140    fn debounce<D>(self, window: D) -> Debounce<Self, D::IntoFuture>
141    where
142        Self: Sized,
143        D: IntoFuture,
144        D::IntoFuture: Timer,
145    {
146        Debounce::new(self, window.into_future())
147    }
148
149    /// Delay the yielding of items from the stream until the given deadline.
150    ///
151    /// The underlying stream will not be polled until the deadline has expired. In addition
152    /// to using a time source as a deadline, any future can be used as a
153    /// deadline too. When used in combination with a multi-consumer channel,
154    /// this method can be used to synchronize the start of multiple streams and futures.
155    ///
156    /// # Example
157    ///
158    /// ```
159    /// use futures_lite::prelude::*;
160    /// use futures_time::prelude::*;
161    /// use futures_time::time::{Instant, Duration};
162    /// use futures_lite::stream;
163    ///
164    /// fn main() {
165    ///     async_io::block_on(async {
166    ///         let now = Instant::now();
167    ///         let delay = Duration::from_millis(100);
168    ///         let _ = stream::once("meow").delay(delay).next().await;
169    ///         assert!(now.elapsed() >= *delay);
170    ///     });
171    /// }
172    /// ```
173    fn delay<D>(self, deadline: D) -> Delay<Self, D::IntoFuture>
174    where
175        Self: Sized,
176        D: IntoFuture,
177    {
178        Delay::new(self, deadline.into_future())
179    }
180
181    /// Suspend or resume execution of a stream.
182    ///
183    /// When this method is called the execution of the stream will be put into
184    /// a suspended state until the channel returns `Parker::Unpark` or the
185    /// channel's senders are dropped. The underlying stream will not be polled
186    /// while the it is paused.
187    fn park<I>(self, interval: I) -> Park<Self, I::IntoStream>
188    where
189        Self: Sized,
190        I: IntoStream<Item = Parker>,
191    {
192        Park::new(self, interval.into_stream())
193    }
194
195    /// Yield an item, then ignore subsequent items for a duration.
196    ///
197    /// In addition to using a time-based interval, this method can take any
198    /// stream as a source. This enables throttling based on alternative event
199    /// sources, such as variable-rate timers.
200    ///
201    /// See also [`sample()`] and [`debounce()`].
202    ///
203    /// [`sample()`]: `StreamExt::sample`
204    /// [`debounce()`]: `StreamExt::debounce`
205    ///
206    /// # Data Loss
207    ///
208    /// This method will discard data between intervals. Though the
209    /// discarded items will have their destuctors run, __using this method
210    /// incorrectly may lead to unintended data loss__. This method is best used
211    /// to reduce the number of _duplicate_ items after the first has been
212    /// received, such as repeated mouse clicks or key presses. This method may
213    /// lead to unintended data loss when used to discard _unique_ items, such
214    /// as network request.
215    ///
216    /// # Examples
217    ///
218    /// ```
219    /// use futures_lite::prelude::*;
220    /// use futures_time::prelude::*;
221    /// use futures_time::time::Duration;
222    /// use futures_time::stream;
223    ///
224    /// fn main() {
225    ///     async_io::block_on(async {
226    ///         let mut counter = 0;
227    ///         stream::interval(Duration::from_millis(100))  // Yield an item every 100ms
228    ///             .take(4)                                  // Stop after 4 items
229    ///             .throttle(Duration::from_millis(300))     // Only let an item through every 300ms
230    ///             .for_each(|_| counter += 1)               // Increment a counter for each item
231    ///             .await;
232    ///
233    ///         assert_eq!(counter, 2);
234    ///     })
235    /// }
236    /// ```
237    fn throttle<I>(self, interval: I) -> Throttle<Self, I::IntoStream>
238    where
239        Self: Sized,
240        I: IntoStream,
241    {
242        Throttle::new(self, interval.into_stream())
243    }
244
245    /// Return an error if a stream does not yield an item within a given time
246    /// span.
247    ///
248    /// Typically timeouts are, as the name implies, based on _time_. However
249    /// this method can time out based on any future. This can be useful in
250    /// combination with channels, as it allows (long-lived) streams to be
251    /// cancelled based on some external event.
252    ///
253    /// When a timeout is returned, the stream will be dropped and destructors
254    /// will be run.
255    ///
256    /// # Example
257    ///
258    /// ```
259    /// use futures_lite::prelude::*;
260    /// use futures_time::prelude::*;
261    /// use futures_time::time::{Instant, Duration};
262    /// use futures_lite::stream;
263    /// use std::io;
264    ///
265    /// fn main() {
266    ///     async_io::block_on(async {
267    ///         let res = stream::once("meow")
268    ///             .delay(Duration::from_millis(100))  // longer delay
269    ///             .timeout(Duration::from_millis(50)) // shorter timeout
270    ///             .next()
271    ///             .await;
272    ///         assert_eq!(res.unwrap().unwrap_err().kind(), io::ErrorKind::TimedOut); // error
273    ///
274    ///         let res = stream::once("meow")
275    ///             .delay(Duration::from_millis(50))    // shorter delay
276    ///             .timeout(Duration::from_millis(100)) // longer timeout
277    ///             .next()
278    ///             .await;
279    ///         assert_eq!(res.unwrap().unwrap(), "meow"); // success
280    ///     });
281    /// }
282    /// ```
283    fn timeout<D>(self, deadline: D) -> Timeout<Self, D::IntoFuture>
284    where
285        Self: Sized,
286        D: IntoFuture,
287        D::IntoFuture: Timer,
288    {
289        Timeout::new(self, deadline.into_future())
290    }
291}
292
293impl<S> StreamExt for S where S: Stream {}