tokio_timer/
throttle.rs

1//! Slow down a stream by enforcing a delay between items.
2
3use {clock, Delay, Error};
4
5use futures::future::Either;
6use futures::{Async, Future, Poll, Stream};
7
8use std::{
9    error::Error as StdError,
10    fmt::{Display, Formatter, Result as FmtResult},
11    time::Duration,
12};
13
14/// Slow down a stream by enforcing a delay between items.
15#[derive(Debug)]
16#[must_use = "streams do nothing unless polled"]
17pub struct Throttle<T> {
18    delay: Option<Delay>,
19    duration: Duration,
20    stream: T,
21}
22
23/// Either the error of the underlying stream, or an error within
24/// tokio's timing machinery.
25#[derive(Debug)]
26pub struct ThrottleError<T>(Either<T, Error>);
27
28impl<T> Throttle<T> {
29    /// Slow down a stream by enforcing a delay between items.
30    pub fn new(stream: T, duration: Duration) -> Self {
31        Self {
32            delay: None,
33            duration: duration,
34            stream: stream,
35        }
36    }
37
38    /// Acquires a reference to the underlying stream that this combinator is
39    /// pulling from.
40    pub fn get_ref(&self) -> &T {
41        &self.stream
42    }
43
44    /// Acquires a mutable reference to the underlying stream that this combinator
45    /// is pulling from.
46    ///
47    /// Note that care must be taken to avoid tampering with the state of the stream
48    /// which may otherwise confuse this combinator.
49    pub fn get_mut(&mut self) -> &mut T {
50        &mut self.stream
51    }
52
53    /// Consumes this combinator, returning the underlying stream.
54    ///
55    /// Note that this may discard intermediate state of this combinator, so care
56    /// should be taken to avoid losing resources when this is called.
57    pub fn into_inner(self) -> T {
58        self.stream
59    }
60}
61
62impl<T: Stream> Stream for Throttle<T> {
63    type Item = T::Item;
64    type Error = ThrottleError<T::Error>;
65
66    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
67        if let Some(ref mut delay) = self.delay {
68            try_ready!({ delay.poll().map_err(ThrottleError::from_timer_err) });
69        }
70
71        self.delay = None;
72        let value = try_ready!({ self.stream.poll().map_err(ThrottleError::from_stream_err) });
73
74        if value.is_some() {
75            self.delay = Some(Delay::new(clock::now() + self.duration));
76        }
77
78        Ok(Async::Ready(value))
79    }
80}
81
82impl<T> ThrottleError<T> {
83    /// Creates a new `ThrottleError` from the given stream error.
84    pub fn from_stream_err(err: T) -> Self {
85        ThrottleError(Either::A(err))
86    }
87
88    /// Creates a new `ThrottleError` from the given tokio timer error.
89    pub fn from_timer_err(err: Error) -> Self {
90        ThrottleError(Either::B(err))
91    }
92
93    /// Attempts to get the underlying stream error, if it is present.
94    pub fn get_stream_error(&self) -> Option<&T> {
95        match self.0 {
96            Either::A(ref x) => Some(x),
97            _ => None,
98        }
99    }
100
101    /// Attempts to get the underlying timer error, if it is present.
102    pub fn get_timer_error(&self) -> Option<&Error> {
103        match self.0 {
104            Either::B(ref x) => Some(x),
105            _ => None,
106        }
107    }
108
109    /// Attempts to extract the underlying stream error, if it is present.
110    pub fn into_stream_error(self) -> Option<T> {
111        match self.0 {
112            Either::A(x) => Some(x),
113            _ => None,
114        }
115    }
116
117    /// Attempts to extract the underlying timer error, if it is present.
118    pub fn into_timer_error(self) -> Option<Error> {
119        match self.0 {
120            Either::B(x) => Some(x),
121            _ => None,
122        }
123    }
124
125    /// Returns whether the throttle error has occured because of an error
126    /// in the underlying stream.
127    pub fn is_stream_error(&self) -> bool {
128        !self.is_timer_error()
129    }
130
131    /// Returns whether the throttle error has occured because of an error
132    /// in tokio's timer system.
133    pub fn is_timer_error(&self) -> bool {
134        match self.0 {
135            Either::A(_) => false,
136            Either::B(_) => true,
137        }
138    }
139}
140
141impl<T: StdError> Display for ThrottleError<T> {
142    fn fmt(&self, f: &mut Formatter) -> FmtResult {
143        match self.0 {
144            Either::A(ref err) => write!(f, "stream error: {}", err),
145            Either::B(ref err) => write!(f, "timer error: {}", err),
146        }
147    }
148}
149
150impl<T: StdError + 'static> StdError for ThrottleError<T> {
151    fn description(&self) -> &str {
152        match self.0 {
153            Either::A(_) => "stream error",
154            Either::B(_) => "timer error",
155        }
156    }
157
158    // FIXME(taiki-e): When the minimum support version of tokio reaches Rust 1.30,
159    // replace this with Error::source.
160    #[allow(deprecated)]
161    fn cause(&self) -> Option<&dyn StdError> {
162        match self.0 {
163            Either::A(ref err) => Some(err),
164            Either::B(ref err) => Some(err),
165        }
166    }
167}