kube_runtime/utils/
event_modify.rs1use core::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures::{Stream, TryStream};
7use pin_project::pin_project;
8
9use crate::watcher::{Error, Event};
10
11#[pin_project]
12pub struct EventModify<St, F> {
16 #[pin]
17 stream: St,
18 f: F,
19}
20
21impl<St, F, K> EventModify<St, F>
22where
23 St: TryStream<Ok = Event<K>>,
24 F: FnMut(&mut K),
25{
26 pub(super) fn new(stream: St, f: F) -> EventModify<St, F> {
27 Self { stream, f }
28 }
29}
30
31impl<St, F, K> Stream for EventModify<St, F>
32where
33 St: Stream<Item = Result<Event<K>, Error>>,
34 F: FnMut(&mut K),
35{
36 type Item = Result<Event<K>, Error>;
37
38 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
39 let mut me = self.project();
40 me.stream
41 .as_mut()
42 .poll_next(cx)
43 .map_ok(|event| event.modify(me.f))
44 }
45}
46
47#[cfg(test)]
48pub(crate) mod test {
49 use std::{pin::pin, task::Poll};
50
51 use super::{Error, Event, EventModify};
52 use futures::{poll, stream, StreamExt};
53
54 #[tokio::test]
55 async fn eventmodify_modifies_innner_value_of_event() {
56 let st = stream::iter([
57 Ok(Event::Apply(0)),
58 Err(Error::NoResourceVersion),
59 Ok(Event::InitApply(10)),
60 ]);
61 let mut ev_modify = pin!(EventModify::new(st, |x| {
62 *x += 1;
63 }));
64
65 assert!(matches!(
66 poll!(ev_modify.next()),
67 Poll::Ready(Some(Ok(Event::Apply(1))))
68 ));
69
70 assert!(matches!(
71 poll!(ev_modify.next()),
72 Poll::Ready(Some(Err(Error::NoResourceVersion)))
73 ));
74
75 let restarted = poll!(ev_modify.next());
76 assert!(matches!(
77 restarted,
78 Poll::Ready(Some(Ok(Event::InitApply(x)))) if x == 11
79 ));
80
81 assert!(matches!(poll!(ev_modify.next()), Poll::Ready(None)));
82 }
83}