futures_time/stream/stream_ext.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
use crate::channel::Parker;
use crate::future::{IntoFuture, Timer};
use futures_core::Stream;
use super::{Buffer, Debounce, Delay, IntoStream, Park, Sample, Throttle, Timeout};
/// Extend `Stream` with time-based operations.
pub trait StreamExt: Stream {
/// Yield the last item received at the end of each interval.
///
/// If no items have been received during an interval, the stream will not
/// yield any items. In addition to using a time-based interval, this method can take any
/// stream as a source. This enables throttling based on alternative event
/// sources, such as variable-rate timers.
///
/// See also [`throttle()`] and [`debounce()`].
///
/// [`throttle()`]: StreamExt::throttle
/// [`debounce()`]: `StreamExt::debounce`
///
/// # Data Loss
///
/// This method will discard data between intervals. Though the
/// discarded items will have their destuctors run, __using this method
/// incorrectly may lead to unintended data loss__. This method is best used
/// to reduce the number of _duplicate_ items after the first has been
/// received, such as repeated mouse clicks or key presses. This method may
/// lead to unintended data loss when used to discard _unique_ items, such
/// as network request.
///
/// # Example
///
/// ```
/// use futures_lite::prelude::*;
/// use futures_time::prelude::*;
/// use futures_time::time::{Instant, Duration};
/// use futures_time::stream;
///
/// fn main() {
/// async_io::block_on(async {
/// let mut counter = 0;
/// stream::interval(Duration::from_millis(100))
/// .take(4)
/// .sample(Duration::from_millis(200))
/// .for_each(|_| counter += 1)
/// .await;
///
/// assert_eq!(counter, 2);
/// })
/// }
/// ```
fn sample<I>(self, interval: I) -> Sample<Self, I::IntoStream>
where
Self: Sized,
I: IntoStream,
{
Sample::new(self, interval.into_stream())
}
/// Group items into vectors which are yielded at every interval.
///
/// In addition to using a time source as a deadline, any stream can be used as a
/// deadline too. This enables more interesting buffer strategies to be
/// built on top of this primitive.
///
/// # Future Improvements
///
/// - Lending iterators would allow for internal reusing of the buffer.
/// Though different from `Iterator::windows`, it could be more efficient.
/// - Contexts/capabilities would enable custom allocators to be used.
///
/// # Example
///
/// ```
/// use futures_lite::prelude::*;
/// use futures_time::prelude::*;
/// use futures_time::time::{Instant, Duration};
/// use futures_time::stream;
///
/// fn main() {
/// async_io::block_on(async {
/// let mut counter = 0;
/// stream::interval(Duration::from_millis(5))
/// .take(10)
/// .buffer(Duration::from_millis(20))
/// .for_each(|buf| counter += buf.len())
/// .await;
///
/// assert_eq!(counter, 10);
/// })
/// }
/// ```
fn buffer<I>(self, interval: I) -> Buffer<Self, I::IntoStream>
where
Self: Sized,
I: IntoStream,
{
Buffer::new(self, interval.into_stream())
}
/// Yield the last item received at the end of a window which resets with
/// each item received.
///
/// Every time an item is yielded by the underlying stream, the window is
/// reset. Once the window expires, the last item seen will be yielded. This
/// means that in order to yield an item, no items must be received for the
/// entire window, or else the window will reset.
///
/// This method is useful to perform actions at the end of bursts of events,
/// where performing that same action on _every_ event might not be
/// economical.
///
/// See also [`sample()`] and [`throttle()`].
///
/// [`sample()`]: `StreamExt::sample`
/// [`throttle()`]: `StreamExt::throttle`
///
/// # Example
///
/// ```
/// use futures_lite::prelude::*;
/// use futures_time::prelude::*;
/// use futures_time::time::{Instant, Duration};
/// use futures_time::stream;
///
/// fn main() {
/// async_io::block_on(async {
/// let mut counter = 0;
/// stream::interval(Duration::from_millis(10))
/// .take(10)
/// .debounce(Duration::from_millis(20)) // the window is greater than the interval
/// .for_each(|_| counter += 1)
/// .await;
///
/// assert_eq!(counter, 1); // so only the last item is received
/// })
/// }
/// ```
fn debounce<D>(self, window: D) -> Debounce<Self, D::IntoFuture>
where
Self: Sized,
D: IntoFuture,
D::IntoFuture: Timer,
{
Debounce::new(self, window.into_future())
}
/// Delay the yielding of items from the stream until the given deadline.
///
/// The underlying stream will not be polled until the deadline has expired. In addition
/// to using a time source as a deadline, any future can be used as a
/// deadline too. When used in combination with a multi-consumer channel,
/// this method can be used to synchronize the start of multiple streams and futures.
///
/// # Example
///
/// ```
/// use futures_lite::prelude::*;
/// use futures_time::prelude::*;
/// use futures_time::time::{Instant, Duration};
/// use futures_lite::stream;
///
/// fn main() {
/// async_io::block_on(async {
/// let now = Instant::now();
/// let delay = Duration::from_millis(100);
/// let _ = stream::once("meow").delay(delay).next().await;
/// assert!(now.elapsed() >= *delay);
/// });
/// }
/// ```
fn delay<D>(self, deadline: D) -> Delay<Self, D::IntoFuture>
where
Self: Sized,
D: IntoFuture,
{
Delay::new(self, deadline.into_future())
}
/// Suspend or resume execution of a stream.
///
/// When this method is called the execution of the stream will be put into
/// a suspended state until the channel returns `Parker::Unpark` or the
/// channel's senders are dropped. The underlying stream will not be polled
/// while the it is paused.
fn park<I>(self, interval: I) -> Park<Self, I::IntoStream>
where
Self: Sized,
I: IntoStream<Item = Parker>,
{
Park::new(self, interval.into_stream())
}
/// Yield an item, then ignore subsequent items for a duration.
///
/// In addition to using a time-based interval, this method can take any
/// stream as a source. This enables throttling based on alternative event
/// sources, such as variable-rate timers.
///
/// See also [`sample()`] and [`debounce()`].
///
/// [`sample()`]: `StreamExt::sample`
/// [`debounce()`]: `StreamExt::debounce`
///
/// # Data Loss
///
/// This method will discard data between intervals. Though the
/// discarded items will have their destuctors run, __using this method
/// incorrectly may lead to unintended data loss__. This method is best used
/// to reduce the number of _duplicate_ items after the first has been
/// received, such as repeated mouse clicks or key presses. This method may
/// lead to unintended data loss when used to discard _unique_ items, such
/// as network request.
///
/// # Examples
///
/// ```
/// use futures_lite::prelude::*;
/// use futures_time::prelude::*;
/// use futures_time::time::Duration;
/// use futures_time::stream;
///
/// fn main() {
/// async_io::block_on(async {
/// let mut counter = 0;
/// stream::interval(Duration::from_millis(100)) // Yield an item every 100ms
/// .take(4) // Stop after 4 items
/// .throttle(Duration::from_millis(300)) // Only let an item through every 300ms
/// .for_each(|_| counter += 1) // Increment a counter for each item
/// .await;
///
/// assert_eq!(counter, 2);
/// })
/// }
/// ```
fn throttle<I>(self, interval: I) -> Throttle<Self, I::IntoStream>
where
Self: Sized,
I: IntoStream,
{
Throttle::new(self, interval.into_stream())
}
/// Return an error if a stream does not yield an item within a given time
/// span.
///
/// Typically timeouts are, as the name implies, based on _time_. However
/// this method can time out based on any future. This can be useful in
/// combination with channels, as it allows (long-lived) streams to be
/// cancelled based on some external event.
///
/// When a timeout is returned, the stream will be dropped and destructors
/// will be run.
///
/// # Example
///
/// ```
/// use futures_lite::prelude::*;
/// use futures_time::prelude::*;
/// use futures_time::time::{Instant, Duration};
/// use futures_lite::stream;
/// use std::io;
///
/// fn main() {
/// async_io::block_on(async {
/// let res = stream::once("meow")
/// .delay(Duration::from_millis(100)) // longer delay
/// .timeout(Duration::from_millis(50)) // shorter timeout
/// .next()
/// .await;
/// assert_eq!(res.unwrap().unwrap_err().kind(), io::ErrorKind::TimedOut); // error
///
/// let res = stream::once("meow")
/// .delay(Duration::from_millis(50)) // shorter delay
/// .timeout(Duration::from_millis(100)) // longer timeout
/// .next()
/// .await;
/// assert_eq!(res.unwrap().unwrap(), "meow"); // success
/// });
/// }
/// ```
fn timeout<D>(self, deadline: D) -> Timeout<Self, D::IntoFuture>
where
Self: Sized,
D: IntoFuture,
D::IntoFuture: Timer,
{
Timeout::new(self, deadline.into_future())
}
}
impl<S> StreamExt for S where S: Stream {}