broker_tokio/time/throttle.rs
1//! Slow down a stream by enforcing a delay between items.
2
3use crate::stream::Stream;
4use crate::time::{Delay, Duration, Instant};
5
6use std::future::Future;
7use std::marker::Unpin;
8use std::pin::Pin;
9use std::task::{self, Poll};
10
11use pin_project_lite::pin_project;
12
13/// Slow down a stream by enforcing a delay between items.
14/// They will be produced not more often than the specified interval.
15///
16/// # Example
17///
18/// Create a throttled stream.
19/// ```rust,norun
20/// use std::time::Duration;
21/// use tokio::stream::StreamExt;
22/// use tokio::time::throttle;
23///
24/// # async fn dox() {
25/// let mut item_stream = throttle(Duration::from_secs(2), futures::stream::repeat("one"));
26///
27/// loop {
28/// // The string will be produced at most every 2 seconds
29/// println!("{:?}", item_stream.next().await);
30/// }
31/// # }
32/// ```
33pub fn throttle<T>(duration: Duration, stream: T) -> Throttle<T>
34where
35 T: Stream,
36{
37 let delay = if duration == Duration::from_millis(0) {
38 None
39 } else {
40 Some(Delay::new_timeout(Instant::now() + duration, duration))
41 };
42
43 Throttle {
44 delay,
45 duration,
46 has_delayed: true,
47 stream,
48 }
49}
50
51pin_project! {
52 /// Stream for the [`throttle`](throttle) function.
53 #[derive(Debug)]
54 #[must_use = "streams do nothing unless polled"]
55 pub struct Throttle<T> {
56 // `None` when duration is zero.
57 delay: Option<Delay>,
58 duration: Duration,
59
60 // Set to true when `delay` has returned ready, but `stream` hasn't.
61 has_delayed: bool,
62
63 // The stream to throttle
64 #[pin]
65 stream: T,
66 }
67}
68
69// XXX: are these safe if `T: !Unpin`?
70impl<T: Unpin> Throttle<T> {
71 /// Acquires a reference to the underlying stream that this combinator is
72 /// pulling from.
73 pub fn get_ref(&self) -> &T {
74 &self.stream
75 }
76
77 /// Acquires a mutable reference to the underlying stream that this combinator
78 /// is pulling from.
79 ///
80 /// Note that care must be taken to avoid tampering with the state of the stream
81 /// which may otherwise confuse this combinator.
82 pub fn get_mut(&mut self) -> &mut T {
83 &mut self.stream
84 }
85
86 /// Consumes this combinator, returning the underlying stream.
87 ///
88 /// Note that this may discard intermediate state of this combinator, so care
89 /// should be taken to avoid losing resources when this is called.
90 pub fn into_inner(self) -> T {
91 self.stream
92 }
93}
94
95impl<T: Stream> Stream for Throttle<T> {
96 type Item = T::Item;
97
98 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
99 if !self.has_delayed && self.delay.is_some() {
100 ready!(Pin::new(self.as_mut().project().delay.as_mut().unwrap()).poll(cx));
101 *self.as_mut().project().has_delayed = true;
102 }
103
104 let value = ready!(self.as_mut().project().stream.poll_next(cx));
105
106 if value.is_some() {
107 let dur = self.duration;
108 if let Some(ref mut delay) = self.as_mut().project().delay {
109 delay.reset(Instant::now() + dur);
110 }
111
112 *self.as_mut().project().has_delayed = false;
113 }
114
115 Poll::Ready(value)
116 }
117}