1mod backoff_reset_timer;
4pub(crate) mod delayed_init;
5mod event_decode;
6mod event_modify;
7mod predicate;
8mod reflect;
9mod stream_backoff;
10mod watch_ext;
11
12pub use backoff_reset_timer::{Backoff, ResetTimerBackoff};
13pub use event_decode::EventDecode;
14pub use event_modify::EventModify;
15pub use predicate::{predicates, Predicate, PredicateFilter};
16pub use reflect::Reflect;
17pub use stream_backoff::StreamBackoff;
18pub use watch_ext::WatchStreamExt;
19#[deprecated(
21 since = "0.96.0",
22 note = "renamed to by `EventDecode`. This alias will be removed in 0.100.0."
23)]
24pub use EventDecode as EventFlatten;
25
26use futures::{
27 stream::{self, Peekable},
28 Future, FutureExt, Stream, StreamExt, TryStream, TryStreamExt,
29};
30use pin_project::pin_project;
31use std::{
32 fmt::Debug,
33 pin::{pin, Pin},
34 sync::{Arc, Mutex},
35 task::Poll,
36};
37use stream::IntoStream;
38use tokio::{runtime::Handle, task::JoinHandle};
39
40#[pin_project]
48pub(crate) struct SplitCase<S: Stream, Case> {
49 inner: Arc<Mutex<Peekable<S>>>,
51 should_consume_item: fn(&S::Item) -> bool,
56 try_extract_item_case: fn(S::Item) -> Option<Case>,
61}
62
63impl<S, Case> Stream for SplitCase<S, Case>
64where
65 S: Stream + Unpin,
66 S::Item: Debug,
67{
68 type Item = Case;
69
70 #[allow(clippy::mut_mutex_lock)]
71 fn poll_next(
72 self: std::pin::Pin<&mut Self>,
73 cx: &mut std::task::Context<'_>,
74 ) -> std::task::Poll<Option<Self::Item>> {
75 let this = self.project();
76 let inner = this.inner.lock().unwrap();
80 let mut inner = Pin::new(inner);
81 let inner_peek = pin!(inner.as_mut().peek());
82 match inner_peek.poll(cx) {
83 Poll::Ready(Some(x_ref)) => {
84 if (this.should_consume_item)(x_ref) {
85 let item = inner.as_mut().poll_next(cx);
86 match item {
87 Poll::Ready(Some(x)) => Poll::Ready(Some((this.try_extract_item_case)(x).expect(
88 "`try_extract_item_case` returned `None` despite `should_consume_item` returning `true`",
89 ))),
90 res => panic!(
91 "Peekable::poll_next() returned {res:?} when Peekable::peek() returned Ready(Some(_))"
92 ),
93 }
94 } else {
95 Poll::Pending
97 }
98 }
99 Poll::Ready(None) => Poll::Ready(None),
100 Poll::Pending => Poll::Pending,
101 }
102 }
103}
104
105#[allow(clippy::type_complexity, clippy::arc_with_non_send_sync)]
109fn trystream_split_result<S>(
110 stream: S,
111) -> (
112 SplitCase<IntoStream<S>, S::Ok>,
113 SplitCase<IntoStream<S>, S::Error>,
114)
115where
116 S: TryStream + Unpin,
117 S::Ok: Debug,
118 S::Error: Debug,
119{
120 let stream = Arc::new(Mutex::new(stream.into_stream().peekable()));
121 (
122 SplitCase {
123 inner: stream.clone(),
124 should_consume_item: Result::is_ok,
125 try_extract_item_case: Result::ok,
126 },
127 SplitCase {
128 inner: stream,
129 should_consume_item: Result::is_err,
130 try_extract_item_case: Result::err,
131 },
132 )
133}
134
135pub(crate) fn trystream_try_via<S1, S2>(
137 input_stream: S1,
138 make_via_stream: impl FnOnce(SplitCase<IntoStream<S1>, S1::Ok>) -> S2,
139) -> impl Stream<Item = Result<S2::Ok, S1::Error>>
140where
141 S1: TryStream + Unpin,
142 S2: TryStream<Error = S1::Error>,
143 S1::Ok: Debug,
144 S1::Error: Debug,
145{
146 let (oks, errs) = trystream_split_result(input_stream); let via = make_via_stream(oks); stream::select(via.into_stream(), errs.map(Err)) }
150
151pub struct CancelableJoinHandle<T> {
153 inner: JoinHandle<T>,
154}
155
156impl<T> CancelableJoinHandle<T>
157where
158 T: Send + 'static,
159{
160 pub fn spawn(future: impl Future<Output = T> + Send + 'static, runtime: &Handle) -> Self {
161 CancelableJoinHandle {
162 inner: runtime.spawn(future),
163 }
164 }
165}
166
167impl<T> Drop for CancelableJoinHandle<T> {
168 fn drop(&mut self) {
169 self.inner.abort()
170 }
171}
172
173impl<T> Future for CancelableJoinHandle<T> {
174 type Output = T;
175
176 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
177 self.inner.poll_unpin(cx).map(
178 Result::unwrap,
181 )
182 }
183}
184
185#[pin_project]
186pub(crate) struct OnComplete<S, F> {
187 #[pin]
188 stream: stream::Fuse<S>,
189 #[pin]
190 on_complete: F,
191}
192
193impl<S: Stream, F: Future<Output = ()>> Stream for OnComplete<S, F> {
194 type Item = S::Item;
195
196 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
197 let this = self.project();
198 match this.stream.poll_next(cx) {
199 Poll::Ready(None) => match this.on_complete.poll(cx) {
200 Poll::Pending => Poll::Pending,
201 Poll::Ready(()) => Poll::Ready(None),
202 },
203 x => x,
204 }
205 }
206}
207
208pub(crate) trait KubeRuntimeStreamExt: Stream + Sized {
209 fn on_complete<F: Future<Output = ()>>(self, on_complete: F) -> OnComplete<Self, F> {
211 OnComplete {
212 stream: self.fuse(),
213 on_complete,
214 }
215 }
216}
217
218impl<S: Stream> KubeRuntimeStreamExt for S {}
219
220#[cfg(test)]
221mod tests {
222 use std::convert::Infallible;
223
224 use futures::stream::{self, StreamExt};
225
226 use super::trystream_try_via;
227
228 #[allow(dead_code)]
230 fn trystream_try_via_should_be_able_to_borrow() {
231 struct WeirdComplexObject {}
232 impl Drop for WeirdComplexObject {
233 fn drop(&mut self) {}
234 }
235
236 let mut x = WeirdComplexObject {};
237 let y = WeirdComplexObject {};
238 drop(trystream_try_via(
239 Box::pin(stream::once(async {
240 let _ = &mut x;
241 Result::<_, Infallible>::Ok(())
242 })),
243 |s| {
244 s.map(|()| {
245 let _ = &y;
246 Ok(())
247 })
248 },
249 ));
250 }
251}