kube_runtime/utils/
reflect.rs1use core::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures::{Stream, TryStream};
7use pin_project::pin_project;
8
9use crate::{
10 reflector::store::Writer,
11 watcher::{Error, Event},
12};
13use kube_client::Resource;
14
15#[pin_project]
17pub struct Reflect<St, K>
18where
19 K: Resource + Clone + 'static,
20 K::DynamicType: Eq + std::hash::Hash + Clone,
21{
22 #[pin]
23 stream: St,
24 writer: Writer<K>,
25}
26
27impl<St, K> Reflect<St, K>
28where
29 St: TryStream<Ok = Event<K>>,
30 K: Resource + Clone,
31 K::DynamicType: Eq + std::hash::Hash + Clone,
32{
33 pub(super) fn new(stream: St, writer: Writer<K>) -> Reflect<St, K> {
34 Self { stream, writer }
35 }
36}
37
38impl<St, K> Stream for Reflect<St, K>
39where
40 K: Resource + Clone,
41 K::DynamicType: Eq + std::hash::Hash + Clone,
42 St: Stream<Item = Result<Event<K>, Error>>,
43{
44 type Item = Result<Event<K>, Error>;
45
46 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47 let mut me = self.project();
48 me.stream.as_mut().poll_next(cx).map_ok(move |event| {
49 me.writer.apply_watcher_event(&event);
50 event
51 })
52 }
53}
54
55#[cfg(test)]
56pub(crate) mod test {
57 use std::{pin::pin, task::Poll};
58
59 use super::{Error, Event, Reflect};
60 use crate::reflector;
61 use futures::{poll, stream, StreamExt};
62 use k8s_openapi::api::core::v1::Pod;
63
64 fn testpod(name: &str) -> Pod {
65 let mut pod = Pod::default();
66 pod.metadata.name = Some(name.to_string());
67 pod
68 }
69
70 #[tokio::test]
71 async fn reflect_passes_events_through() {
72 let foo = testpod("foo");
73 let bar = testpod("bar");
74 let st = stream::iter([
75 Ok(Event::Apply(foo.clone())),
76 Err(Error::NoResourceVersion),
77 Ok(Event::Init),
78 Ok(Event::InitApply(foo)),
79 Ok(Event::InitApply(bar)),
80 Ok(Event::InitDone),
81 ]);
82 let (reader, writer) = reflector::store();
83
84 let mut reflect = pin!(Reflect::new(st, writer));
85 assert_eq!(reader.len(), 0);
86
87 assert!(matches!(
88 poll!(reflect.next()),
89 Poll::Ready(Some(Ok(Event::Apply(_))))
90 ));
91 assert_eq!(reader.len(), 1);
92
93 assert!(matches!(
94 poll!(reflect.next()),
95 Poll::Ready(Some(Err(Error::NoResourceVersion)))
96 ));
97 assert_eq!(reader.len(), 1);
98
99 assert!(matches!(
100 poll!(reflect.next()),
101 Poll::Ready(Some(Ok(Event::Init)))
102 ));
103 assert_eq!(reader.len(), 1);
104
105 let restarted = poll!(reflect.next());
106 assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
107 assert_eq!(reader.len(), 1);
108 let restarted = poll!(reflect.next());
109 assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
110 assert_eq!(reader.len(), 1);
111
112 assert!(matches!(
113 poll!(reflect.next()),
114 Poll::Ready(Some(Ok(Event::InitDone)))
115 ));
116 assert_eq!(reader.len(), 2);
117
118 assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
119 assert_eq!(reader.len(), 2);
120 }
121}