kube_runtime/utils/
mod.rs

1//! Helpers for manipulating built-in streams
2
3mod backoff_reset_timer;
4pub(crate) mod delayed_init;
5mod event_decode;
6mod event_modify;
7mod predicate;
8mod reflect;
9mod stream_backoff;
10mod watch_ext;
11
12pub use backoff_reset_timer::{Backoff, ResetTimerBackoff};
13pub use event_decode::EventDecode;
14pub use event_modify::EventModify;
15pub use predicate::{predicates, Predicate, PredicateFilter};
16pub use reflect::Reflect;
17pub use stream_backoff::StreamBackoff;
18pub use watch_ext::WatchStreamExt;
19/// Deprecated type alias for `EventDecode`
20#[deprecated(
21    since = "0.96.0",
22    note = "renamed to by `EventDecode`. This alias will be removed in 0.100.0."
23)]
24pub use EventDecode as EventFlatten;
25
26use futures::{
27    stream::{self, Peekable},
28    Future, FutureExt, Stream, StreamExt, TryStream, TryStreamExt,
29};
30use pin_project::pin_project;
31use std::{
32    fmt::Debug,
33    pin::{pin, Pin},
34    sync::{Arc, Mutex},
35    task::Poll,
36};
37use stream::IntoStream;
38use tokio::{runtime::Handle, task::JoinHandle};
39
40/// Allows splitting a `Stream` into several streams that each emit a disjoint subset of the input stream's items,
41/// like a streaming variant of pattern matching.
42///
43/// NOTE: The cases MUST be reunited into the same final stream (using `futures::stream::select` or similar),
44/// since cases for rejected items will *not* register wakeup correctly, and may otherwise lose items and/or deadlock.
45///
46/// NOTE: The whole set of cases will deadlock if there is ever an item that no live case wants to consume.
47#[pin_project]
48pub(crate) struct SplitCase<S: Stream, Case> {
49    // Future-unaware `Mutex` is OK because it's only taken inside single poll()s
50    inner: Arc<Mutex<Peekable<S>>>,
51    /// Tests whether an item from the stream should be consumed
52    ///
53    /// NOTE: This MUST be total over all `SplitCase`s, otherwise the input stream
54    /// will get stuck deadlocked because no candidate tries to consume the item.
55    should_consume_item: fn(&S::Item) -> bool,
56    /// Narrows the type of the consumed type, using the same precondition as `should_consume_item`.
57    ///
58    /// NOTE: This MUST return `Some` if `should_consume_item` returns `true`, since we can't put
59    /// an item back into the input stream once consumed.
60    try_extract_item_case: fn(S::Item) -> Option<Case>,
61}
62
63impl<S, Case> Stream for SplitCase<S, Case>
64where
65    S: Stream + Unpin,
66    S::Item: Debug,
67{
68    type Item = Case;
69
70    #[allow(clippy::mut_mutex_lock)]
71    fn poll_next(
72        self: std::pin::Pin<&mut Self>,
73        cx: &mut std::task::Context<'_>,
74    ) -> std::task::Poll<Option<Self::Item>> {
75        let this = self.project();
76        // this code triggers false positive in Clippy
77        // https://github.com/rust-lang/rust-clippy/issues/9415
78        // TODO: remove #[allow] once fix reaches nightly.
79        let inner = this.inner.lock().unwrap();
80        let mut inner = Pin::new(inner);
81        let inner_peek = pin!(inner.as_mut().peek());
82        match inner_peek.poll(cx) {
83            Poll::Ready(Some(x_ref)) => {
84                if (this.should_consume_item)(x_ref) {
85                    let item = inner.as_mut().poll_next(cx);
86                    match item {
87                        Poll::Ready(Some(x)) => Poll::Ready(Some((this.try_extract_item_case)(x).expect(
88                            "`try_extract_item_case` returned `None` despite `should_consume_item` returning `true`",
89                        ))),
90                        res => panic!(
91                    "Peekable::poll_next() returned {res:?} when Peekable::peek() returned Ready(Some(_))"
92                ),
93                    }
94                } else {
95                    // Handled by another SplitCase instead
96                    Poll::Pending
97                }
98            }
99            Poll::Ready(None) => Poll::Ready(None),
100            Poll::Pending => Poll::Pending,
101        }
102    }
103}
104
105/// Splits a `TryStream` into separate `Ok` and `Error` streams.
106///
107/// Note: This will deadlock if one branch outlives the other
108#[allow(clippy::type_complexity, clippy::arc_with_non_send_sync)]
109fn trystream_split_result<S>(
110    stream: S,
111) -> (
112    SplitCase<IntoStream<S>, S::Ok>,
113    SplitCase<IntoStream<S>, S::Error>,
114)
115where
116    S: TryStream + Unpin,
117    S::Ok: Debug,
118    S::Error: Debug,
119{
120    let stream = Arc::new(Mutex::new(stream.into_stream().peekable()));
121    (
122        SplitCase {
123            inner: stream.clone(),
124            should_consume_item: Result::is_ok,
125            try_extract_item_case: Result::ok,
126        },
127        SplitCase {
128            inner: stream,
129            should_consume_item: Result::is_err,
130            try_extract_item_case: Result::err,
131        },
132    )
133}
134
135/// Forwards Ok elements via a stream built from `make_via_stream`, while passing errors through unmodified
136pub(crate) fn trystream_try_via<S1, S2>(
137    input_stream: S1,
138    make_via_stream: impl FnOnce(SplitCase<IntoStream<S1>, S1::Ok>) -> S2,
139) -> impl Stream<Item = Result<S2::Ok, S1::Error>>
140where
141    S1: TryStream + Unpin,
142    S2: TryStream<Error = S1::Error>,
143    S1::Ok: Debug,
144    S1::Error: Debug,
145{
146    let (oks, errs) = trystream_split_result(input_stream); // the select -> SplitCase
147    let via = make_via_stream(oks); // the map_ok/err function
148    stream::select(via.into_stream(), errs.map(Err)) // recombine
149}
150
151/// A [`JoinHandle`] that cancels the [`Future`] when dropped, rather than detaching it
152pub struct CancelableJoinHandle<T> {
153    inner: JoinHandle<T>,
154}
155
156impl<T> CancelableJoinHandle<T>
157where
158    T: Send + 'static,
159{
160    pub fn spawn(future: impl Future<Output = T> + Send + 'static, runtime: &Handle) -> Self {
161        CancelableJoinHandle {
162            inner: runtime.spawn(future),
163        }
164    }
165}
166
167impl<T> Drop for CancelableJoinHandle<T> {
168    fn drop(&mut self) {
169        self.inner.abort()
170    }
171}
172
173impl<T> Future for CancelableJoinHandle<T> {
174    type Output = T;
175
176    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
177        self.inner.poll_unpin(cx).map(
178            // JoinError => underlying future was either aborted (which should only happen when the handle is dropped), or
179            // panicked (which should be propagated)
180            Result::unwrap,
181        )
182    }
183}
184
185#[pin_project]
186pub(crate) struct OnComplete<S, F> {
187    #[pin]
188    stream: stream::Fuse<S>,
189    #[pin]
190    on_complete: F,
191}
192
193impl<S: Stream, F: Future<Output = ()>> Stream for OnComplete<S, F> {
194    type Item = S::Item;
195
196    fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
197        let this = self.project();
198        match this.stream.poll_next(cx) {
199            Poll::Ready(None) => match this.on_complete.poll(cx) {
200                Poll::Pending => Poll::Pending,
201                Poll::Ready(()) => Poll::Ready(None),
202            },
203            x => x,
204        }
205    }
206}
207
208pub(crate) trait KubeRuntimeStreamExt: Stream + Sized {
209    /// Runs the [`Future`] `on_complete` once the [`Stream`] finishes (by returning [`None`]).
210    fn on_complete<F: Future<Output = ()>>(self, on_complete: F) -> OnComplete<Self, F> {
211        OnComplete {
212            stream: self.fuse(),
213            on_complete,
214        }
215    }
216}
217
218impl<S: Stream> KubeRuntimeStreamExt for S {}
219
220#[cfg(test)]
221mod tests {
222    use std::convert::Infallible;
223
224    use futures::stream::{self, StreamExt};
225
226    use super::trystream_try_via;
227
228    // Type-level test does not need to be executed
229    #[allow(dead_code)]
230    fn trystream_try_via_should_be_able_to_borrow() {
231        struct WeirdComplexObject {}
232        impl Drop for WeirdComplexObject {
233            fn drop(&mut self) {}
234        }
235
236        let mut x = WeirdComplexObject {};
237        let y = WeirdComplexObject {};
238        drop(trystream_try_via(
239            Box::pin(stream::once(async {
240                let _ = &mut x;
241                Result::<_, Infallible>::Ok(())
242            })),
243            |s| {
244                s.map(|()| {
245                    let _ = &y;
246                    Ok(())
247                })
248            },
249        ));
250    }
251}