1use {clock, Delay, Error};
4
5use futures::future::Either;
6use futures::{Async, Future, Poll, Stream};
7
8use std::{
9 error::Error as StdError,
10 fmt::{Display, Formatter, Result as FmtResult},
11 time::Duration,
12};
13
14#[derive(Debug)]
16#[must_use = "streams do nothing unless polled"]
17pub struct Throttle<T> {
18 delay: Option<Delay>,
19 duration: Duration,
20 stream: T,
21}
22
23#[derive(Debug)]
26pub struct ThrottleError<T>(Either<T, Error>);
27
28impl<T> Throttle<T> {
29 pub fn new(stream: T, duration: Duration) -> Self {
31 Self {
32 delay: None,
33 duration: duration,
34 stream: stream,
35 }
36 }
37
38 pub fn get_ref(&self) -> &T {
41 &self.stream
42 }
43
44 pub fn get_mut(&mut self) -> &mut T {
50 &mut self.stream
51 }
52
53 pub fn into_inner(self) -> T {
58 self.stream
59 }
60}
61
62impl<T: Stream> Stream for Throttle<T> {
63 type Item = T::Item;
64 type Error = ThrottleError<T::Error>;
65
66 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
67 if let Some(ref mut delay) = self.delay {
68 try_ready!({ delay.poll().map_err(ThrottleError::from_timer_err) });
69 }
70
71 self.delay = None;
72 let value = try_ready!({ self.stream.poll().map_err(ThrottleError::from_stream_err) });
73
74 if value.is_some() {
75 self.delay = Some(Delay::new(clock::now() + self.duration));
76 }
77
78 Ok(Async::Ready(value))
79 }
80}
81
82impl<T> ThrottleError<T> {
83 pub fn from_stream_err(err: T) -> Self {
85 ThrottleError(Either::A(err))
86 }
87
88 pub fn from_timer_err(err: Error) -> Self {
90 ThrottleError(Either::B(err))
91 }
92
93 pub fn get_stream_error(&self) -> Option<&T> {
95 match self.0 {
96 Either::A(ref x) => Some(x),
97 _ => None,
98 }
99 }
100
101 pub fn get_timer_error(&self) -> Option<&Error> {
103 match self.0 {
104 Either::B(ref x) => Some(x),
105 _ => None,
106 }
107 }
108
109 pub fn into_stream_error(self) -> Option<T> {
111 match self.0 {
112 Either::A(x) => Some(x),
113 _ => None,
114 }
115 }
116
117 pub fn into_timer_error(self) -> Option<Error> {
119 match self.0 {
120 Either::B(x) => Some(x),
121 _ => None,
122 }
123 }
124
125 pub fn is_stream_error(&self) -> bool {
128 !self.is_timer_error()
129 }
130
131 pub fn is_timer_error(&self) -> bool {
134 match self.0 {
135 Either::A(_) => false,
136 Either::B(_) => true,
137 }
138 }
139}
140
141impl<T: StdError> Display for ThrottleError<T> {
142 fn fmt(&self, f: &mut Formatter) -> FmtResult {
143 match self.0 {
144 Either::A(ref err) => write!(f, "stream error: {}", err),
145 Either::B(ref err) => write!(f, "timer error: {}", err),
146 }
147 }
148}
149
150impl<T: StdError + 'static> StdError for ThrottleError<T> {
151 fn description(&self) -> &str {
152 match self.0 {
153 Either::A(_) => "stream error",
154 Either::B(_) => "timer error",
155 }
156 }
157
158 #[allow(deprecated)]
161 fn cause(&self) -> Option<&dyn StdError> {
162 match self.0 {
163 Either::A(ref err) => Some(err),
164 Either::B(ref err) => Some(err),
165 }
166 }
167}