broker_tokio/time/
throttle.rs

1//! Slow down a stream by enforcing a delay between items.
2
3use crate::stream::Stream;
4use crate::time::{Delay, Duration, Instant};
5
6use std::future::Future;
7use std::marker::Unpin;
8use std::pin::Pin;
9use std::task::{self, Poll};
10
11use pin_project_lite::pin_project;
12
13/// Slow down a stream by enforcing a delay between items.
14/// They will be produced not more often than the specified interval.
15///
16/// # Example
17///
18/// Create a throttled stream.
19/// ```rust,norun
20/// use std::time::Duration;
21/// use tokio::stream::StreamExt;
22/// use tokio::time::throttle;
23///
24/// # async fn dox() {
25/// let mut item_stream = throttle(Duration::from_secs(2), futures::stream::repeat("one"));
26///
27/// loop {
28///     // The string will be produced at most every 2 seconds
29///     println!("{:?}", item_stream.next().await);
30/// }
31/// # }
32/// ```
33pub fn throttle<T>(duration: Duration, stream: T) -> Throttle<T>
34where
35    T: Stream,
36{
37    let delay = if duration == Duration::from_millis(0) {
38        None
39    } else {
40        Some(Delay::new_timeout(Instant::now() + duration, duration))
41    };
42
43    Throttle {
44        delay,
45        duration,
46        has_delayed: true,
47        stream,
48    }
49}
50
51pin_project! {
52    /// Stream for the [`throttle`](throttle) function.
53    #[derive(Debug)]
54    #[must_use = "streams do nothing unless polled"]
55    pub struct Throttle<T> {
56        // `None` when duration is zero.
57        delay: Option<Delay>,
58        duration: Duration,
59
60        // Set to true when `delay` has returned ready, but `stream` hasn't.
61        has_delayed: bool,
62
63        // The stream to throttle
64        #[pin]
65        stream: T,
66    }
67}
68
69// XXX: are these safe if `T: !Unpin`?
70impl<T: Unpin> Throttle<T> {
71    /// Acquires a reference to the underlying stream that this combinator is
72    /// pulling from.
73    pub fn get_ref(&self) -> &T {
74        &self.stream
75    }
76
77    /// Acquires a mutable reference to the underlying stream that this combinator
78    /// is pulling from.
79    ///
80    /// Note that care must be taken to avoid tampering with the state of the stream
81    /// which may otherwise confuse this combinator.
82    pub fn get_mut(&mut self) -> &mut T {
83        &mut self.stream
84    }
85
86    /// Consumes this combinator, returning the underlying stream.
87    ///
88    /// Note that this may discard intermediate state of this combinator, so care
89    /// should be taken to avoid losing resources when this is called.
90    pub fn into_inner(self) -> T {
91        self.stream
92    }
93}
94
95impl<T: Stream> Stream for Throttle<T> {
96    type Item = T::Item;
97
98    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
99        if !self.has_delayed && self.delay.is_some() {
100            ready!(Pin::new(self.as_mut().project().delay.as_mut().unwrap()).poll(cx));
101            *self.as_mut().project().has_delayed = true;
102        }
103
104        let value = ready!(self.as_mut().project().stream.poll_next(cx));
105
106        if value.is_some() {
107            let dur = self.duration;
108            if let Some(ref mut delay) = self.as_mut().project().delay {
109                delay.reset(Instant::now() + dur);
110            }
111
112            *self.as_mut().project().has_delayed = false;
113        }
114
115        Poll::Ready(value)
116    }
117}