kube_runtime/reflector/mod.rs
1//! Caches objects in memory
2
3mod dispatcher;
4mod object_ref;
5pub mod store;
6
7pub use self::{
8 dispatcher::ReflectHandle,
9 object_ref::{Extra as ObjectRefExtra, Lookup, ObjectRef},
10};
11use crate::watcher;
12use async_stream::stream;
13use futures::{Stream, StreamExt};
14use std::hash::Hash;
15#[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared;
16pub use store::{store, Store};
17
18/// Cache objects from a [`watcher()`] stream into a local [`Store`]
19///
20/// Observes the raw `Stream` of [`watcher::Event`] objects, and modifies the cache.
21/// It passes the raw [`watcher()`] stream through unmodified.
22///
23/// ## Usage
24/// Create a [`Store`] through e.g. [`store::store()`]. The `writer` part is not-clonable,
25/// and must be moved into the reflector. The `reader` part is the [`Store`] interface
26/// that you can send to other parts of your program as state.
27///
28/// The cache contains the last-seen state of objects,
29/// which may lag slightly behind the actual state.
30///
31/// ## Example
32///
33/// Infinite watch of [`Node`](k8s_openapi::api::core::v1::Node) resources with a certain label.
34///
35/// The `reader` part being passed around to a webserver is omitted.
36/// For examples see [version-rs](https://github.com/kube-rs/version-rs) for integration with [axum](https://github.com/tokio-rs/axum),
37/// or [controller-rs](https://github.com/kube-rs/controller-rs) for the similar controller integration with [actix-web](https://actix.rs/).
38///
39/// ```no_run
40/// use std::future::ready;
41/// use k8s_openapi::api::core::v1::Node;
42/// use kube::runtime::{reflector, watcher, WatchStreamExt, watcher::Config};
43/// use futures::StreamExt;
44/// # use kube::api::Api;
45/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
46/// # let client: kube::Client = todo!();
47///
48/// let nodes: Api<Node> = Api::all(client);
49/// let node_filter = Config::default().labels("kubernetes.io/arch=amd64");
50/// let (reader, writer) = reflector::store();
51///
52/// // Create the infinite reflector stream
53/// let rf = reflector(writer, watcher(nodes, node_filter));
54///
55/// // !!! pass reader to your webserver/manager as state !!!
56///
57/// // Poll the stream (needed to keep the store up-to-date)
58/// let infinite_watch = rf.applied_objects().for_each(|o| { ready(()) });
59/// infinite_watch.await;
60/// # Ok(())
61/// # }
62/// ```
63///
64///
65/// ## Memory Usage
66///
67/// A reflector often constitutes one of the biggest components of a controller's memory use.
68/// Given a ~2000 pods cluster, a reflector saving everything (including injected sidecars, managed fields)
69/// can quickly consume a couple of hundred megabytes or more, depending on how much of this you are storing.
70///
71/// While generally acceptable, there are techniques you can leverage to reduce the memory usage
72/// depending on your use case.
73///
74/// 1. Reflect a [`PartialObjectMeta<K>`](kube_client::core::PartialObjectMeta) stream rather than a stream of `K`
75///
76/// You can send in a [`metadata_watcher()`](crate::watcher::metadata_watcher()) for a type rather than a [`watcher()`],
77/// and this can drop your memory usage by more than a factor of two,
78/// depending on the size of `K`. 60% reduction seen for `Pod`. Usage is otherwise identical.
79///
80/// 2. Use `modify` the raw [`watcher::Event`] object stream to clear unneeded properties
81///
82/// For instance, managed fields typically constitutes around half the size of `ObjectMeta` and can often be dropped:
83///
84/// ```no_run
85/// # use futures::TryStreamExt;
86/// # use kube::{ResourceExt, Api, runtime::watcher};
87/// # let api: Api<k8s_openapi::api::core::v1::Node> = todo!();
88/// let stream = watcher(api, Default::default()).map_ok(|ev| {
89/// ev.modify(|pod| {
90/// pod.managed_fields_mut().clear();
91/// pod.annotations_mut().clear();
92/// pod.status = None;
93/// })
94/// });
95/// ```
96/// The `stream` can then be passed to `reflector` causing smaller objects to be written to its store.
97/// Note that you **cannot drop everything**; you minimally need the spec properties your app relies on.
98/// Additionally, only `labels`, `annotations` and `managed_fields` are safe to drop from `ObjectMeta`.
99///
100/// For more information check out: <https://kube.rs/controllers/optimization/> for graphs and techniques.
101///
102/// ## Stream sharing
103///
104/// `reflector()` as an interface may optionally create a stream that can be
105/// shared with other components to help with resource usage.
106///
107/// To share a stream, the `Writer<K>` consumed by `reflector()` must be
108/// created through an interface that allows a store to be subscribed on, such
109/// as [`store_shared()`]. When the store supports being subscribed on, it will
110/// broadcast an event to all active listeners after caching any object
111/// contained in the event.
112///
113/// Creating subscribers requires an
114/// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
115/// feature
116pub fn reflector<K, W>(mut writer: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
117where
118 K: Lookup + Clone,
119 K::DynamicType: Eq + Hash + Clone,
120 W: Stream<Item = watcher::Result<watcher::Event<K>>>,
121{
122 let mut stream = Box::pin(stream);
123 stream! {
124 while let Some(event) = stream.next().await {
125 match event {
126 Ok(ev) => {
127 writer.apply_watcher_event(&ev);
128 writer.dispatch_event(&ev).await;
129 yield Ok(ev);
130 },
131 Err(ev) => yield Err(ev)
132 }
133 }
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::{reflector, store, ObjectRef};
140 use crate::watcher;
141 use futures::{stream, StreamExt, TryStreamExt};
142 use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta};
143 use rand::{
144 distr::{Bernoulli, Uniform},
145 Rng,
146 };
147 use std::collections::{BTreeMap, HashMap};
148
149 #[tokio::test]
150 async fn reflector_applied_should_add_object() {
151 let store_w = store::Writer::default();
152 let store = store_w.as_reader();
153 let cm = ConfigMap {
154 metadata: ObjectMeta {
155 name: Some("a".to_string()),
156 ..ObjectMeta::default()
157 },
158 ..ConfigMap::default()
159 };
160 reflector(store_w, stream::iter(vec![Ok(watcher::Event::Apply(cm.clone()))]))
161 .map(|_| ())
162 .collect::<()>()
163 .await;
164 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
165 }
166
167 #[tokio::test]
168 async fn reflector_applied_should_update_object() {
169 let store_w = store::Writer::default();
170 let store = store_w.as_reader();
171 let cm = ConfigMap {
172 metadata: ObjectMeta {
173 name: Some("a".to_string()),
174 ..ObjectMeta::default()
175 },
176 ..ConfigMap::default()
177 };
178 let updated_cm = ConfigMap {
179 data: Some({
180 let mut data = BTreeMap::new();
181 data.insert("data".to_string(), "present!".to_string());
182 data
183 }),
184 ..cm.clone()
185 };
186 reflector(
187 store_w,
188 stream::iter(vec![
189 Ok(watcher::Event::Apply(cm.clone())),
190 Ok(watcher::Event::Apply(updated_cm.clone())),
191 ]),
192 )
193 .map(|_| ())
194 .collect::<()>()
195 .await;
196 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&updated_cm));
197 }
198
199 #[tokio::test]
200 async fn reflector_deleted_should_remove_object() {
201 let store_w = store::Writer::default();
202 let store = store_w.as_reader();
203 let cm = ConfigMap {
204 metadata: ObjectMeta {
205 name: Some("a".to_string()),
206 ..ObjectMeta::default()
207 },
208 ..ConfigMap::default()
209 };
210 reflector(
211 store_w,
212 stream::iter(vec![
213 Ok(watcher::Event::Apply(cm.clone())),
214 Ok(watcher::Event::Delete(cm.clone())),
215 ]),
216 )
217 .map(|_| ())
218 .collect::<()>()
219 .await;
220 assert_eq!(store.get(&ObjectRef::from_obj(&cm)), None);
221 }
222
223 #[tokio::test]
224 async fn reflector_restarted_should_clear_objects() {
225 let store_w = store::Writer::default();
226 let store = store_w.as_reader();
227 let cm_a = ConfigMap {
228 metadata: ObjectMeta {
229 name: Some("a".to_string()),
230 ..ObjectMeta::default()
231 },
232 ..ConfigMap::default()
233 };
234 let cm_b = ConfigMap {
235 metadata: ObjectMeta {
236 name: Some("b".to_string()),
237 ..ObjectMeta::default()
238 },
239 ..ConfigMap::default()
240 };
241 reflector(
242 store_w,
243 stream::iter(vec![
244 Ok(watcher::Event::Apply(cm_a.clone())),
245 Ok(watcher::Event::Init),
246 Ok(watcher::Event::InitApply(cm_b.clone())),
247 Ok(watcher::Event::InitDone),
248 ]),
249 )
250 .map(|_| ())
251 .collect::<()>()
252 .await;
253 assert_eq!(store.get(&ObjectRef::from_obj(&cm_a)), None);
254 assert_eq!(store.get(&ObjectRef::from_obj(&cm_b)).as_deref(), Some(&cm_b));
255 }
256
257 #[tokio::test]
258 async fn reflector_store_should_not_contain_duplicates() {
259 let mut rng = rand::rng();
260 let item_dist = Uniform::new(0_u8, 100).unwrap();
261 let deleted_dist = Bernoulli::new(0.40).unwrap();
262 let store_w = store::Writer::default();
263 let store = store_w.as_reader();
264 reflector(
265 store_w,
266 stream::iter((0_u32..100_000).map(|gen| {
267 let item = rng.sample(item_dist);
268 let deleted = rng.sample(deleted_dist);
269 let obj = ConfigMap {
270 metadata: ObjectMeta {
271 name: Some(item.to_string()),
272 resource_version: Some(gen.to_string()),
273 ..ObjectMeta::default()
274 },
275 ..ConfigMap::default()
276 };
277 Ok(if deleted {
278 watcher::Event::Delete(obj)
279 } else {
280 watcher::Event::Apply(obj)
281 })
282 })),
283 )
284 .map_ok(|_| ())
285 .try_collect::<()>()
286 .await
287 .unwrap();
288
289 let mut seen_objects = HashMap::new();
290 for obj in store.state() {
291 assert_eq!(seen_objects.get(obj.metadata.name.as_ref().unwrap()), None);
292 seen_objects.insert(obj.metadata.name.clone().unwrap(), obj);
293 }
294 }
295}