kube_runtime/utils/
event_modify.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use core::{
    pin::Pin,
    task::{Context, Poll},
};

use futures::{Stream, TryStream};
use pin_project::pin_project;

use crate::watcher::{Error, Event};

#[pin_project]
/// Stream returned by the [`modify`](super::WatchStreamExt::modify) method.
/// Modifies the [`Event`] item returned by the inner stream by calling
/// [`modify`](Event::modify()) on it.
pub struct EventModify<St, F> {
    #[pin]
    stream: St,
    f: F,
}

impl<St, F, K> EventModify<St, F>
where
    St: TryStream<Ok = Event<K>>,
    F: FnMut(&mut K),
{
    pub(super) fn new(stream: St, f: F) -> EventModify<St, F> {
        Self { stream, f }
    }
}

impl<St, F, K> Stream for EventModify<St, F>
where
    St: Stream<Item = Result<Event<K>, Error>>,
    F: FnMut(&mut K),
{
    type Item = Result<Event<K>, Error>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut me = self.project();
        me.stream
            .as_mut()
            .poll_next(cx)
            .map_ok(|event| event.modify(me.f))
    }
}

#[cfg(test)]
pub(crate) mod test {
    use std::{pin::pin, task::Poll};

    use super::{Error, Event, EventModify};
    use futures::{poll, stream, StreamExt};

    #[tokio::test]
    async fn eventmodify_modifies_innner_value_of_event() {
        let st = stream::iter([
            Ok(Event::Apply(0)),
            Err(Error::NoResourceVersion),
            Ok(Event::InitApply(10)),
        ]);
        let mut ev_modify = pin!(EventModify::new(st, |x| {
            *x += 1;
        }));

        assert!(matches!(
            poll!(ev_modify.next()),
            Poll::Ready(Some(Ok(Event::Apply(1))))
        ));

        assert!(matches!(
            poll!(ev_modify.next()),
            Poll::Ready(Some(Err(Error::NoResourceVersion)))
        ));

        let restarted = poll!(ev_modify.next());
        assert!(matches!(
            restarted,
            Poll::Ready(Some(Ok(Event::InitApply(x)))) if x == 11
        ));

        assert!(matches!(poll!(ev_modify.next()), Poll::Ready(None)));
    }
}