kube_runtime/reflector/
mod.rs

1//! Caches objects in memory
2
3mod dispatcher;
4mod object_ref;
5pub mod store;
6
7pub use self::{
8    dispatcher::ReflectHandle,
9    object_ref::{Extra as ObjectRefExtra, Lookup, ObjectRef},
10};
11use crate::watcher;
12use async_stream::stream;
13use futures::{Stream, StreamExt};
14use std::hash::Hash;
15#[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared;
16pub use store::{store, Store};
17
18/// Cache objects from a [`watcher()`] stream into a local [`Store`]
19///
20/// Observes the raw `Stream` of [`watcher::Event`] objects, and modifies the cache.
21/// It passes the raw [`watcher()`] stream through unmodified.
22///
23/// ## Usage
24/// Create a [`Store`] through e.g. [`store::store()`]. The `writer` part is not-clonable,
25/// and must be moved into the reflector. The `reader` part is the [`Store`] interface
26/// that you can send to other parts of your program as state.
27///
28/// The cache contains the last-seen state of objects,
29/// which may lag slightly behind the actual state.
30///
31/// ## Example
32///
33/// Infinite watch of [`Node`](k8s_openapi::api::core::v1::Node) resources with a certain label.
34///
35/// The `reader` part being passed around to a webserver is omitted.
36/// For examples see [version-rs](https://github.com/kube-rs/version-rs) for integration with [axum](https://github.com/tokio-rs/axum),
37/// or [controller-rs](https://github.com/kube-rs/controller-rs) for the similar controller integration with [actix-web](https://actix.rs/).
38///
39/// ```no_run
40/// use std::future::ready;
41/// use k8s_openapi::api::core::v1::Node;
42/// use kube::runtime::{reflector, watcher, WatchStreamExt, watcher::Config};
43/// use futures::StreamExt;
44/// # use kube::api::Api;
45/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
46/// # let client: kube::Client = todo!();
47///
48/// let nodes: Api<Node> = Api::all(client);
49/// let node_filter = Config::default().labels("kubernetes.io/arch=amd64");
50/// let (reader, writer) = reflector::store();
51///
52/// // Create the infinite reflector stream
53/// let rf = reflector(writer, watcher(nodes, node_filter));
54///
55/// // !!! pass reader to your webserver/manager as state !!!
56///
57/// // Poll the stream (needed to keep the store up-to-date)
58/// let infinite_watch = rf.applied_objects().for_each(|o| { ready(()) });
59/// infinite_watch.await;
60/// # Ok(())
61/// # }
62/// ```
63///
64///
65/// ## Memory Usage
66///
67/// A reflector often constitutes one of the biggest components of a controller's memory use.
68/// Given a ~2000 pods cluster, a reflector saving everything (including injected sidecars, managed fields)
69/// can quickly consume a couple of hundred megabytes or more, depending on how much of this you are storing.
70///
71/// While generally acceptable, there are techniques you can leverage to reduce the memory usage
72/// depending on your use case.
73///
74/// 1. Reflect a [`PartialObjectMeta<K>`](kube_client::core::PartialObjectMeta) stream rather than a stream of `K`
75///
76/// You can send in a [`metadata_watcher()`](crate::watcher::metadata_watcher()) for a type rather than a [`watcher()`],
77/// and this can drop your memory usage by more than a factor of two,
78/// depending on the size of `K`. 60% reduction seen for `Pod`. Usage is otherwise identical.
79///
80/// 2. Use `modify` the raw [`watcher::Event`] object stream to clear unneeded properties
81///
82/// For instance, managed fields typically constitutes around half the size of `ObjectMeta` and can often be dropped:
83///
84/// ```no_run
85/// # use futures::TryStreamExt;
86/// # use kube::{ResourceExt, Api, runtime::watcher};
87/// # let api: Api<k8s_openapi::api::core::v1::Node> = todo!();
88/// let stream = watcher(api, Default::default()).map_ok(|ev| {
89///     ev.modify(|pod| {
90///         pod.managed_fields_mut().clear();
91///         pod.annotations_mut().clear();
92///         pod.status = None;
93///     })
94/// });
95/// ```
96/// The `stream` can then be passed to `reflector` causing smaller objects to be written to its store.
97/// Note that you **cannot drop everything**; you minimally need the spec properties your app relies on.
98/// Additionally, only `labels`, `annotations` and `managed_fields` are safe to drop from `ObjectMeta`.
99///
100/// For more information check out: <https://kube.rs/controllers/optimization/> for graphs and techniques.
101///
102/// ## Stream sharing
103///
104/// `reflector()` as an interface may optionally create a stream that can be
105/// shared with other components to help with resource usage.
106///
107/// To share a stream, the `Writer<K>` consumed by `reflector()` must be
108/// created through an interface that allows a store to be subscribed on, such
109/// as [`store_shared()`]. When the store supports being subscribed on, it will
110/// broadcast an event to all active listeners after caching any object
111/// contained in the event.
112///
113/// Creating subscribers requires an
114/// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
115/// feature
116pub fn reflector<K, W>(mut writer: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
117where
118    K: Lookup + Clone,
119    K::DynamicType: Eq + Hash + Clone,
120    W: Stream<Item = watcher::Result<watcher::Event<K>>>,
121{
122    let mut stream = Box::pin(stream);
123    stream! {
124        while let Some(event) = stream.next().await {
125            match event {
126                Ok(ev) => {
127                    writer.apply_watcher_event(&ev);
128                    writer.dispatch_event(&ev).await;
129                    yield Ok(ev);
130                },
131                Err(ev) => yield Err(ev)
132            }
133        }
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::{reflector, store, ObjectRef};
140    use crate::watcher;
141    use futures::{stream, StreamExt, TryStreamExt};
142    use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta};
143    use rand::{
144        distr::{Bernoulli, Uniform},
145        Rng,
146    };
147    use std::collections::{BTreeMap, HashMap};
148
149    #[tokio::test]
150    async fn reflector_applied_should_add_object() {
151        let store_w = store::Writer::default();
152        let store = store_w.as_reader();
153        let cm = ConfigMap {
154            metadata: ObjectMeta {
155                name: Some("a".to_string()),
156                ..ObjectMeta::default()
157            },
158            ..ConfigMap::default()
159        };
160        reflector(store_w, stream::iter(vec![Ok(watcher::Event::Apply(cm.clone()))]))
161            .map(|_| ())
162            .collect::<()>()
163            .await;
164        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
165    }
166
167    #[tokio::test]
168    async fn reflector_applied_should_update_object() {
169        let store_w = store::Writer::default();
170        let store = store_w.as_reader();
171        let cm = ConfigMap {
172            metadata: ObjectMeta {
173                name: Some("a".to_string()),
174                ..ObjectMeta::default()
175            },
176            ..ConfigMap::default()
177        };
178        let updated_cm = ConfigMap {
179            data: Some({
180                let mut data = BTreeMap::new();
181                data.insert("data".to_string(), "present!".to_string());
182                data
183            }),
184            ..cm.clone()
185        };
186        reflector(
187            store_w,
188            stream::iter(vec![
189                Ok(watcher::Event::Apply(cm.clone())),
190                Ok(watcher::Event::Apply(updated_cm.clone())),
191            ]),
192        )
193        .map(|_| ())
194        .collect::<()>()
195        .await;
196        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&updated_cm));
197    }
198
199    #[tokio::test]
200    async fn reflector_deleted_should_remove_object() {
201        let store_w = store::Writer::default();
202        let store = store_w.as_reader();
203        let cm = ConfigMap {
204            metadata: ObjectMeta {
205                name: Some("a".to_string()),
206                ..ObjectMeta::default()
207            },
208            ..ConfigMap::default()
209        };
210        reflector(
211            store_w,
212            stream::iter(vec![
213                Ok(watcher::Event::Apply(cm.clone())),
214                Ok(watcher::Event::Delete(cm.clone())),
215            ]),
216        )
217        .map(|_| ())
218        .collect::<()>()
219        .await;
220        assert_eq!(store.get(&ObjectRef::from_obj(&cm)), None);
221    }
222
223    #[tokio::test]
224    async fn reflector_restarted_should_clear_objects() {
225        let store_w = store::Writer::default();
226        let store = store_w.as_reader();
227        let cm_a = ConfigMap {
228            metadata: ObjectMeta {
229                name: Some("a".to_string()),
230                ..ObjectMeta::default()
231            },
232            ..ConfigMap::default()
233        };
234        let cm_b = ConfigMap {
235            metadata: ObjectMeta {
236                name: Some("b".to_string()),
237                ..ObjectMeta::default()
238            },
239            ..ConfigMap::default()
240        };
241        reflector(
242            store_w,
243            stream::iter(vec![
244                Ok(watcher::Event::Apply(cm_a.clone())),
245                Ok(watcher::Event::Init),
246                Ok(watcher::Event::InitApply(cm_b.clone())),
247                Ok(watcher::Event::InitDone),
248            ]),
249        )
250        .map(|_| ())
251        .collect::<()>()
252        .await;
253        assert_eq!(store.get(&ObjectRef::from_obj(&cm_a)), None);
254        assert_eq!(store.get(&ObjectRef::from_obj(&cm_b)).as_deref(), Some(&cm_b));
255    }
256
257    #[tokio::test]
258    async fn reflector_store_should_not_contain_duplicates() {
259        let mut rng = rand::rng();
260        let item_dist = Uniform::new(0_u8, 100).unwrap();
261        let deleted_dist = Bernoulli::new(0.40).unwrap();
262        let store_w = store::Writer::default();
263        let store = store_w.as_reader();
264        reflector(
265            store_w,
266            stream::iter((0_u32..100_000).map(|gen| {
267                let item = rng.sample(item_dist);
268                let deleted = rng.sample(deleted_dist);
269                let obj = ConfigMap {
270                    metadata: ObjectMeta {
271                        name: Some(item.to_string()),
272                        resource_version: Some(gen.to_string()),
273                        ..ObjectMeta::default()
274                    },
275                    ..ConfigMap::default()
276                };
277                Ok(if deleted {
278                    watcher::Event::Delete(obj)
279                } else {
280                    watcher::Event::Apply(obj)
281                })
282            })),
283        )
284        .map_ok(|_| ())
285        .try_collect::<()>()
286        .await
287        .unwrap();
288
289        let mut seen_objects = HashMap::new();
290        for obj in store.state() {
291            assert_eq!(seen_objects.get(obj.metadata.name.as_ref().unwrap()), None);
292            seen_objects.insert(obj.metadata.name.clone().unwrap(), obj);
293        }
294    }
295}