kube_runtime/utils/
event_modify.rs

1use core::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures::{Stream, TryStream};
7use pin_project::pin_project;
8
9use crate::watcher::{Error, Event};
10
11#[pin_project]
12/// Stream returned by the [`modify`](super::WatchStreamExt::modify) method.
13/// Modifies the [`Event`] item returned by the inner stream by calling
14/// [`modify`](Event::modify()) on it.
15pub struct EventModify<St, F> {
16    #[pin]
17    stream: St,
18    f: F,
19}
20
21impl<St, F, K> EventModify<St, F>
22where
23    St: TryStream<Ok = Event<K>>,
24    F: FnMut(&mut K),
25{
26    pub(super) fn new(stream: St, f: F) -> EventModify<St, F> {
27        Self { stream, f }
28    }
29}
30
31impl<St, F, K> Stream for EventModify<St, F>
32where
33    St: Stream<Item = Result<Event<K>, Error>>,
34    F: FnMut(&mut K),
35{
36    type Item = Result<Event<K>, Error>;
37
38    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
39        let mut me = self.project();
40        me.stream
41            .as_mut()
42            .poll_next(cx)
43            .map_ok(|event| event.modify(me.f))
44    }
45}
46
47#[cfg(test)]
48pub(crate) mod test {
49    use std::{pin::pin, task::Poll};
50
51    use super::{Error, Event, EventModify};
52    use futures::{poll, stream, StreamExt};
53
54    #[tokio::test]
55    async fn eventmodify_modifies_innner_value_of_event() {
56        let st = stream::iter([
57            Ok(Event::Apply(0)),
58            Err(Error::NoResourceVersion),
59            Ok(Event::InitApply(10)),
60        ]);
61        let mut ev_modify = pin!(EventModify::new(st, |x| {
62            *x += 1;
63        }));
64
65        assert!(matches!(
66            poll!(ev_modify.next()),
67            Poll::Ready(Some(Ok(Event::Apply(1))))
68        ));
69
70        assert!(matches!(
71            poll!(ev_modify.next()),
72            Poll::Ready(Some(Err(Error::NoResourceVersion)))
73        ));
74
75        let restarted = poll!(ev_modify.next());
76        assert!(matches!(
77            restarted,
78            Poll::Ready(Some(Ok(Event::InitApply(x)))) if x == 11
79        ));
80
81        assert!(matches!(poll!(ev_modify.next()), Poll::Ready(None)));
82    }
83}