kube_runtime/utils/
reflect.rs

1use core::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures::{Stream, TryStream};
7use pin_project::pin_project;
8
9use crate::{
10    reflector::store::Writer,
11    watcher::{Error, Event},
12};
13use kube_client::Resource;
14
15/// Stream returned by the [`reflect`](super::WatchStreamExt::reflect) method
16#[pin_project]
17pub struct Reflect<St, K>
18where
19    K: Resource + Clone + 'static,
20    K::DynamicType: Eq + std::hash::Hash + Clone,
21{
22    #[pin]
23    stream: St,
24    writer: Writer<K>,
25}
26
27impl<St, K> Reflect<St, K>
28where
29    St: TryStream<Ok = Event<K>>,
30    K: Resource + Clone,
31    K::DynamicType: Eq + std::hash::Hash + Clone,
32{
33    pub(super) fn new(stream: St, writer: Writer<K>) -> Reflect<St, K> {
34        Self { stream, writer }
35    }
36}
37
38impl<St, K> Stream for Reflect<St, K>
39where
40    K: Resource + Clone,
41    K::DynamicType: Eq + std::hash::Hash + Clone,
42    St: Stream<Item = Result<Event<K>, Error>>,
43{
44    type Item = Result<Event<K>, Error>;
45
46    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47        let mut me = self.project();
48        me.stream.as_mut().poll_next(cx).map_ok(move |event| {
49            me.writer.apply_watcher_event(&event);
50            event
51        })
52    }
53}
54
55#[cfg(test)]
56pub(crate) mod test {
57    use std::{pin::pin, task::Poll};
58
59    use super::{Error, Event, Reflect};
60    use crate::reflector;
61    use futures::{poll, stream, StreamExt};
62    use k8s_openapi::api::core::v1::Pod;
63
64    fn testpod(name: &str) -> Pod {
65        let mut pod = Pod::default();
66        pod.metadata.name = Some(name.to_string());
67        pod
68    }
69
70    #[tokio::test]
71    async fn reflect_passes_events_through() {
72        let foo = testpod("foo");
73        let bar = testpod("bar");
74        let st = stream::iter([
75            Ok(Event::Apply(foo.clone())),
76            Err(Error::NoResourceVersion),
77            Ok(Event::Init),
78            Ok(Event::InitApply(foo)),
79            Ok(Event::InitApply(bar)),
80            Ok(Event::InitDone),
81        ]);
82        let (reader, writer) = reflector::store();
83
84        let mut reflect = pin!(Reflect::new(st, writer));
85        assert_eq!(reader.len(), 0);
86
87        assert!(matches!(
88            poll!(reflect.next()),
89            Poll::Ready(Some(Ok(Event::Apply(_))))
90        ));
91        assert_eq!(reader.len(), 1);
92
93        assert!(matches!(
94            poll!(reflect.next()),
95            Poll::Ready(Some(Err(Error::NoResourceVersion)))
96        ));
97        assert_eq!(reader.len(), 1);
98
99        assert!(matches!(
100            poll!(reflect.next()),
101            Poll::Ready(Some(Ok(Event::Init)))
102        ));
103        assert_eq!(reader.len(), 1);
104
105        let restarted = poll!(reflect.next());
106        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
107        assert_eq!(reader.len(), 1);
108        let restarted = poll!(reflect.next());
109        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
110        assert_eq!(reader.len(), 1);
111
112        assert!(matches!(
113            poll!(reflect.next()),
114            Poll::Ready(Some(Ok(Event::InitDone)))
115        ));
116        assert_eq!(reader.len(), 2);
117
118        assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
119        assert_eq!(reader.len(), 2);
120    }
121}