kube_runtime/utils/
stream_backoff.rs

1use std::{future::Future, pin::Pin, task::Poll};
2
3use futures::{Stream, TryStream};
4use pin_project::pin_project;
5use tokio::time::{sleep, Instant, Sleep};
6
7use crate::utils::Backoff;
8
9/// Applies a [`Backoff`] policy to a [`Stream`]
10///
11/// After any [`Err`] is emitted, the stream is paused for [`Backoff::next_backoff`]. The
12/// [`Backoff`] is [`reset`](`Backoff::reset`) on any [`Ok`] value.
13///
14/// If [`Backoff::next_backoff`] returns [`None`] then the backing stream is given up on, and closed.
15#[pin_project]
16pub struct StreamBackoff<S, B> {
17    #[pin]
18    stream: S,
19    backoff: B,
20    #[pin]
21    state: State,
22}
23
24#[pin_project(project = StreamBackoffStateProj)]
25// It's expected to have relatively few but long-lived `StreamBackoff`s in a project, so we would rather have
26// cheaper sleeps than a smaller `StreamBackoff`.
27#[allow(clippy::large_enum_variant)]
28enum State {
29    BackingOff(#[pin] Sleep),
30    GivenUp,
31    Awake,
32}
33
34impl<S: TryStream, B: Backoff> StreamBackoff<S, B> {
35    pub fn new(stream: S, backoff: B) -> Self {
36        Self {
37            stream,
38            backoff,
39            state: State::Awake,
40        }
41    }
42}
43
44impl<S: TryStream, B: Backoff> Stream for StreamBackoff<S, B> {
45    type Item = Result<S::Ok, S::Error>;
46
47    fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
48        let mut this = self.project();
49        match this.state.as_mut().project() {
50            StreamBackoffStateProj::BackingOff(mut backoff_sleep) => match backoff_sleep.as_mut().poll(cx) {
51                Poll::Ready(()) => {
52                    tracing::debug!(deadline = ?backoff_sleep.deadline(), "Backoff complete, waking up");
53                    this.state.set(State::Awake)
54                }
55                Poll::Pending => {
56                    let deadline = backoff_sleep.deadline();
57                    tracing::trace!(
58                        ?deadline,
59                        remaining_duration = ?deadline.saturating_duration_since(Instant::now()),
60                        "Still waiting for backoff sleep to complete"
61                    );
62                    return Poll::Pending;
63                }
64            },
65            StreamBackoffStateProj::GivenUp => {
66                tracing::debug!("Backoff has given up, stream is closed");
67                return Poll::Ready(None);
68            }
69            StreamBackoffStateProj::Awake => {}
70        }
71
72        let next_item = this.stream.try_poll_next(cx);
73        match &next_item {
74            Poll::Ready(Some(Err(_))) => {
75                if let Some(backoff_duration) = this.backoff.next() {
76                    let backoff_sleep = sleep(backoff_duration);
77                    tracing::debug!(
78                        deadline = ?backoff_sleep.deadline(),
79                        duration = ?backoff_duration,
80                        "Error received, backing off"
81                    );
82                    this.state.set(State::BackingOff(backoff_sleep));
83                } else {
84                    tracing::debug!("Error received, giving up");
85                    this.state.set(State::GivenUp);
86                }
87            }
88            Poll::Ready(_) => {
89                tracing::trace!("Non-error received, resetting backoff");
90                this.backoff.reset();
91            }
92            Poll::Pending => {}
93        }
94        next_item
95    }
96}
97
98#[cfg(test)]
99pub(crate) mod tests {
100    use std::{pin::pin, task::Poll, time::Duration};
101
102    use crate::utils::Backoff;
103
104    use super::StreamBackoff;
105    use backon::BackoffBuilder;
106    use futures::{channel::mpsc, poll, stream, StreamExt};
107
108    pub struct ConstantBackoff {
109        inner: backon::ConstantBackoff,
110        delay: Duration,
111        max_times: usize,
112    }
113
114    impl ConstantBackoff {
115        pub fn new(delay: Duration, max_times: usize) -> Self {
116            Self {
117                inner: backon::ConstantBuilder::default()
118                    .with_delay(delay)
119                    .with_max_times(max_times)
120                    .build(),
121                delay,
122                max_times,
123            }
124        }
125    }
126
127    impl Iterator for ConstantBackoff {
128        type Item = Duration;
129
130        fn next(&mut self) -> Option<Duration> {
131            self.inner.next()
132        }
133    }
134
135    impl Backoff for ConstantBackoff {
136        fn reset(&mut self) {
137            self.inner = backon::ConstantBuilder::default()
138                .with_delay(self.delay)
139                .with_max_times(self.max_times)
140                .build();
141        }
142    }
143
144    #[tokio::test]
145    async fn stream_should_back_off() {
146        tokio::time::pause();
147        let tick = Duration::from_secs(1);
148        let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]);
149        let mut rx = pin!(StreamBackoff::new(rx, ConstantBackoff::new(tick, 10)));
150        assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0))));
151        assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1))));
152        assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2))));
153        assert_eq!(poll!(rx.next()), Poll::Pending);
154        tokio::time::advance(tick * 2).await;
155        assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(3))));
156        assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(4))));
157        assert_eq!(poll!(rx.next()), Poll::Ready(None));
158    }
159
160    #[tokio::test]
161    async fn backoff_time_should_update() {
162        tokio::time::pause();
163        let (tx, rx) = mpsc::unbounded();
164        // let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]);
165        let mut rx = pin!(StreamBackoff::new(rx, LinearBackoff::new(Duration::from_secs(2))));
166        tx.unbounded_send(Ok(0)).unwrap();
167        assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0))));
168        tx.unbounded_send(Ok(1)).unwrap();
169        assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1))));
170        tx.unbounded_send(Err(2)).unwrap();
171        assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2))));
172        assert_eq!(poll!(rx.next()), Poll::Pending);
173        tokio::time::advance(Duration::from_secs(3)).await;
174        assert_eq!(poll!(rx.next()), Poll::Pending);
175        tx.unbounded_send(Err(3)).unwrap();
176        assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(3))));
177        tx.unbounded_send(Ok(4)).unwrap();
178        assert_eq!(poll!(rx.next()), Poll::Pending);
179        tokio::time::advance(Duration::from_secs(3)).await;
180        assert_eq!(poll!(rx.next()), Poll::Pending);
181        tokio::time::advance(Duration::from_secs(2)).await;
182        assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(4))));
183        assert_eq!(poll!(rx.next()), Poll::Pending);
184        drop(tx);
185        assert_eq!(poll!(rx.next()), Poll::Ready(None));
186    }
187
188    #[tokio::test]
189    async fn backoff_should_close_when_requested() {
190        assert_eq!(
191            StreamBackoff::new(stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]), StoppedBackoff {})
192                .collect::<Vec<_>>()
193                .await,
194            vec![Ok(0), Ok(1), Err(2)]
195        );
196    }
197
198    struct StoppedBackoff;
199
200    impl Backoff for StoppedBackoff {
201        fn reset(&mut self) {}
202    }
203
204    impl Iterator for StoppedBackoff {
205        type Item = Duration;
206
207        fn next(&mut self) -> Option<Duration> {
208            None
209        }
210    }
211
212    /// Dynamic backoff policy that is still deterministic and testable
213    pub struct LinearBackoff {
214        interval: Duration,
215        current_duration: Duration,
216    }
217
218    impl LinearBackoff {
219        pub fn new(interval: Duration) -> Self {
220            Self {
221                interval,
222                current_duration: Duration::ZERO,
223            }
224        }
225    }
226
227    impl Iterator for LinearBackoff {
228        type Item = Duration;
229
230        fn next(&mut self) -> Option<Duration> {
231            self.current_duration += self.interval;
232            Some(self.current_duration)
233        }
234    }
235
236    impl Backoff for LinearBackoff {
237        fn reset(&mut self) {
238            self.current_duration = Duration::ZERO
239        }
240    }
241}