kube_runtime/utils/
stream_backoff.rs1use std::{future::Future, pin::Pin, task::Poll};
2
3use futures::{Stream, TryStream};
4use pin_project::pin_project;
5use tokio::time::{sleep, Instant, Sleep};
6
7use crate::utils::Backoff;
8
9#[pin_project]
16pub struct StreamBackoff<S, B> {
17 #[pin]
18 stream: S,
19 backoff: B,
20 #[pin]
21 state: State,
22}
23
24#[pin_project(project = StreamBackoffStateProj)]
25#[allow(clippy::large_enum_variant)]
28enum State {
29 BackingOff(#[pin] Sleep),
30 GivenUp,
31 Awake,
32}
33
34impl<S: TryStream, B: Backoff> StreamBackoff<S, B> {
35 pub fn new(stream: S, backoff: B) -> Self {
36 Self {
37 stream,
38 backoff,
39 state: State::Awake,
40 }
41 }
42}
43
44impl<S: TryStream, B: Backoff> Stream for StreamBackoff<S, B> {
45 type Item = Result<S::Ok, S::Error>;
46
47 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
48 let mut this = self.project();
49 match this.state.as_mut().project() {
50 StreamBackoffStateProj::BackingOff(mut backoff_sleep) => match backoff_sleep.as_mut().poll(cx) {
51 Poll::Ready(()) => {
52 tracing::debug!(deadline = ?backoff_sleep.deadline(), "Backoff complete, waking up");
53 this.state.set(State::Awake)
54 }
55 Poll::Pending => {
56 let deadline = backoff_sleep.deadline();
57 tracing::trace!(
58 ?deadline,
59 remaining_duration = ?deadline.saturating_duration_since(Instant::now()),
60 "Still waiting for backoff sleep to complete"
61 );
62 return Poll::Pending;
63 }
64 },
65 StreamBackoffStateProj::GivenUp => {
66 tracing::debug!("Backoff has given up, stream is closed");
67 return Poll::Ready(None);
68 }
69 StreamBackoffStateProj::Awake => {}
70 }
71
72 let next_item = this.stream.try_poll_next(cx);
73 match &next_item {
74 Poll::Ready(Some(Err(_))) => {
75 if let Some(backoff_duration) = this.backoff.next() {
76 let backoff_sleep = sleep(backoff_duration);
77 tracing::debug!(
78 deadline = ?backoff_sleep.deadline(),
79 duration = ?backoff_duration,
80 "Error received, backing off"
81 );
82 this.state.set(State::BackingOff(backoff_sleep));
83 } else {
84 tracing::debug!("Error received, giving up");
85 this.state.set(State::GivenUp);
86 }
87 }
88 Poll::Ready(_) => {
89 tracing::trace!("Non-error received, resetting backoff");
90 this.backoff.reset();
91 }
92 Poll::Pending => {}
93 }
94 next_item
95 }
96}
97
98#[cfg(test)]
99pub(crate) mod tests {
100 use std::{pin::pin, task::Poll, time::Duration};
101
102 use crate::utils::Backoff;
103
104 use super::StreamBackoff;
105 use backon::BackoffBuilder;
106 use futures::{channel::mpsc, poll, stream, StreamExt};
107
108 pub struct ConstantBackoff {
109 inner: backon::ConstantBackoff,
110 delay: Duration,
111 max_times: usize,
112 }
113
114 impl ConstantBackoff {
115 pub fn new(delay: Duration, max_times: usize) -> Self {
116 Self {
117 inner: backon::ConstantBuilder::default()
118 .with_delay(delay)
119 .with_max_times(max_times)
120 .build(),
121 delay,
122 max_times,
123 }
124 }
125 }
126
127 impl Iterator for ConstantBackoff {
128 type Item = Duration;
129
130 fn next(&mut self) -> Option<Duration> {
131 self.inner.next()
132 }
133 }
134
135 impl Backoff for ConstantBackoff {
136 fn reset(&mut self) {
137 self.inner = backon::ConstantBuilder::default()
138 .with_delay(self.delay)
139 .with_max_times(self.max_times)
140 .build();
141 }
142 }
143
144 #[tokio::test]
145 async fn stream_should_back_off() {
146 tokio::time::pause();
147 let tick = Duration::from_secs(1);
148 let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]);
149 let mut rx = pin!(StreamBackoff::new(rx, ConstantBackoff::new(tick, 10)));
150 assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0))));
151 assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1))));
152 assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2))));
153 assert_eq!(poll!(rx.next()), Poll::Pending);
154 tokio::time::advance(tick * 2).await;
155 assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(3))));
156 assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(4))));
157 assert_eq!(poll!(rx.next()), Poll::Ready(None));
158 }
159
160 #[tokio::test]
161 async fn backoff_time_should_update() {
162 tokio::time::pause();
163 let (tx, rx) = mpsc::unbounded();
164 let mut rx = pin!(StreamBackoff::new(rx, LinearBackoff::new(Duration::from_secs(2))));
166 tx.unbounded_send(Ok(0)).unwrap();
167 assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0))));
168 tx.unbounded_send(Ok(1)).unwrap();
169 assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1))));
170 tx.unbounded_send(Err(2)).unwrap();
171 assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2))));
172 assert_eq!(poll!(rx.next()), Poll::Pending);
173 tokio::time::advance(Duration::from_secs(3)).await;
174 assert_eq!(poll!(rx.next()), Poll::Pending);
175 tx.unbounded_send(Err(3)).unwrap();
176 assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(3))));
177 tx.unbounded_send(Ok(4)).unwrap();
178 assert_eq!(poll!(rx.next()), Poll::Pending);
179 tokio::time::advance(Duration::from_secs(3)).await;
180 assert_eq!(poll!(rx.next()), Poll::Pending);
181 tokio::time::advance(Duration::from_secs(2)).await;
182 assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(4))));
183 assert_eq!(poll!(rx.next()), Poll::Pending);
184 drop(tx);
185 assert_eq!(poll!(rx.next()), Poll::Ready(None));
186 }
187
188 #[tokio::test]
189 async fn backoff_should_close_when_requested() {
190 assert_eq!(
191 StreamBackoff::new(stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]), StoppedBackoff {})
192 .collect::<Vec<_>>()
193 .await,
194 vec![Ok(0), Ok(1), Err(2)]
195 );
196 }
197
198 struct StoppedBackoff;
199
200 impl Backoff for StoppedBackoff {
201 fn reset(&mut self) {}
202 }
203
204 impl Iterator for StoppedBackoff {
205 type Item = Duration;
206
207 fn next(&mut self) -> Option<Duration> {
208 None
209 }
210 }
211
212 pub struct LinearBackoff {
214 interval: Duration,
215 current_duration: Duration,
216 }
217
218 impl LinearBackoff {
219 pub fn new(interval: Duration) -> Self {
220 Self {
221 interval,
222 current_duration: Duration::ZERO,
223 }
224 }
225 }
226
227 impl Iterator for LinearBackoff {
228 type Item = Duration;
229
230 fn next(&mut self) -> Option<Duration> {
231 self.current_duration += self.interval;
232 Some(self.current_duration)
233 }
234 }
235
236 impl Backoff for LinearBackoff {
237 fn reset(&mut self) {
238 self.current_duration = Duration::ZERO
239 }
240 }
241}