kube_runtime/reflector/
store.rs

1use super::{dispatcher::Dispatcher, Lookup, ObjectRef};
2#[cfg(feature = "unstable-runtime-subscribe")]
3use crate::reflector::ReflectHandle;
4use crate::{
5    utils::delayed_init::{self, DelayedInit},
6    watcher,
7};
8use ahash::AHashMap;
9use educe::Educe;
10use parking_lot::RwLock;
11use std::{fmt::Debug, hash::Hash, sync::Arc};
12use thiserror::Error;
13
14type Cache<K> = Arc<RwLock<AHashMap<ObjectRef<K>, Arc<K>>>>;
15
16/// A writable Store handle
17///
18/// This is exclusive since it's not safe to share a single `Store` between multiple reflectors.
19/// In particular, `Restarted` events will clobber the state of other connected reflectors.
20#[derive(Debug)]
21pub struct Writer<K: 'static + Lookup + Clone>
22where
23    K::DynamicType: Eq + Hash + Clone,
24{
25    store: Cache<K>,
26    buffer: AHashMap<ObjectRef<K>, Arc<K>>,
27    dyntype: K::DynamicType,
28    ready_tx: Option<delayed_init::Initializer<()>>,
29    ready_rx: Arc<DelayedInit<()>>,
30    dispatcher: Option<Dispatcher<K>>,
31}
32
33impl<K: 'static + Lookup + Clone> Writer<K>
34where
35    K::DynamicType: Eq + Hash + Clone,
36{
37    /// Creates a new Writer with the specified dynamic type.
38    ///
39    /// If the dynamic type is default-able (for example when writer is used with
40    /// `k8s_openapi` types) you can use `Default` instead.
41    pub fn new(dyntype: K::DynamicType) -> Self {
42        let (ready_tx, ready_rx) = DelayedInit::new();
43        Writer {
44            store: Default::default(),
45            buffer: Default::default(),
46            dyntype,
47            ready_tx: Some(ready_tx),
48            ready_rx: Arc::new(ready_rx),
49            dispatcher: None,
50        }
51    }
52
53    /// Creates a new Writer with the specified dynamic type and buffer size.
54    ///
55    /// When the Writer is created through `new_shared`, it will be able to
56    /// be subscribed. Stored objects will be propagated to all subscribers. The
57    /// buffer size is used for the underlying channel. An object is cleared
58    /// from the buffer only when all subscribers have seen it.
59    ///
60    /// If the dynamic type is default-able (for example when writer is used with
61    /// `k8s_openapi` types) you can use `Default` instead.
62    #[cfg(feature = "unstable-runtime-subscribe")]
63    pub fn new_shared(buf_size: usize, dyntype: K::DynamicType) -> Self {
64        let (ready_tx, ready_rx) = DelayedInit::new();
65        Writer {
66            store: Default::default(),
67            buffer: Default::default(),
68            dyntype,
69            ready_tx: Some(ready_tx),
70            ready_rx: Arc::new(ready_rx),
71            dispatcher: Some(Dispatcher::new(buf_size)),
72        }
73    }
74
75    /// Return a read handle to the store
76    ///
77    /// Multiple read handles may be obtained, by either calling `as_reader` multiple times,
78    /// or by calling `Store::clone()` afterwards.
79    #[must_use]
80    pub fn as_reader(&self) -> Store<K> {
81        Store {
82            store: self.store.clone(),
83            ready_rx: self.ready_rx.clone(),
84        }
85    }
86
87    /// Return a handle to a subscriber
88    ///
89    /// Multiple subscribe handles may be obtained, by either calling
90    /// `subscribe` multiple times, or by calling `clone()`
91    ///
92    /// This function returns a `Some` when the [`Writer`] is constructed through
93    /// [`Writer::new_shared`] or [`store_shared`], and a `None` otherwise.
94    #[cfg(feature = "unstable-runtime-subscribe")]
95    pub fn subscribe(&self) -> Option<ReflectHandle<K>> {
96        self.dispatcher
97            .as_ref()
98            .map(|dispatcher| dispatcher.subscribe(self.as_reader()))
99    }
100
101    /// Applies a single watcher event to the store
102    pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
103        match event {
104            watcher::Event::Apply(obj) => {
105                let key = obj.to_object_ref(self.dyntype.clone());
106                let obj = Arc::new(obj.clone());
107                self.store.write().insert(key, obj);
108            }
109            watcher::Event::Delete(obj) => {
110                let key = obj.to_object_ref(self.dyntype.clone());
111                self.store.write().remove(&key);
112            }
113            watcher::Event::Init => {
114                self.buffer = AHashMap::new();
115            }
116            watcher::Event::InitApply(obj) => {
117                let key = obj.to_object_ref(self.dyntype.clone());
118                let obj = Arc::new(obj.clone());
119                self.buffer.insert(key, obj);
120            }
121            watcher::Event::InitDone => {
122                let mut store = self.store.write();
123
124                // Swap the buffer into the store
125                std::mem::swap(&mut *store, &mut self.buffer);
126
127                // Clear the buffer
128                // This is preferred over self.buffer.clear(), as clear() will keep the allocated memory for reuse.
129                // This way, the old buffer is dropped.
130                self.buffer = AHashMap::new();
131
132                // Mark as ready after the Restart, "releasing" any calls to Store::wait_until_ready()
133                if let Some(ready_tx) = self.ready_tx.take() {
134                    ready_tx.init(())
135                }
136            }
137        }
138    }
139
140    /// Broadcast an event to any downstream listeners subscribed on the store
141    pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event<K>) {
142        if let Some(ref mut dispatcher) = self.dispatcher {
143            match event {
144                watcher::Event::Apply(obj) => {
145                    let obj_ref = obj.to_object_ref(self.dyntype.clone());
146                    // TODO (matei): should this take a timeout to log when backpressure has
147                    // been applied for too long, e.g. 10s
148                    dispatcher.broadcast(obj_ref).await;
149                }
150
151                watcher::Event::InitDone => {
152                    let obj_refs: Vec<_> = {
153                        let store = self.store.read();
154                        store.keys().cloned().collect()
155                    };
156
157                    for obj_ref in obj_refs {
158                        dispatcher.broadcast(obj_ref).await;
159                    }
160                }
161
162                _ => {}
163            }
164        }
165    }
166}
167
168impl<K> Default for Writer<K>
169where
170    K: Lookup + Clone + 'static,
171    K::DynamicType: Default + Eq + Hash + Clone,
172{
173    fn default() -> Self {
174        Self::new(K::DynamicType::default())
175    }
176}
177
178/// A readable cache of Kubernetes objects of kind `K`
179///
180/// Cloning will produce a new reference to the same backing store.
181///
182/// Cannot be constructed directly since one writer handle is required,
183/// use `Writer::as_reader()` instead.
184#[derive(Educe)]
185#[educe(Debug(bound("K: Debug, K::DynamicType: Debug")), Clone)]
186pub struct Store<K: 'static + Lookup>
187where
188    K::DynamicType: Hash + Eq,
189{
190    store: Cache<K>,
191    ready_rx: Arc<DelayedInit<()>>,
192}
193
194#[derive(Debug, Error)]
195#[error("writer was dropped before store became ready")]
196pub struct WriterDropped(delayed_init::InitDropped);
197
198impl<K: 'static + Clone + Lookup> Store<K>
199where
200    K::DynamicType: Eq + Hash + Clone,
201{
202    /// Wait for the store to be populated by Kubernetes.
203    ///
204    /// Note that polling this will _not_ await the source of the stream that populates the [`Writer`].
205    /// The [`reflector`](crate::reflector()) stream must be awaited separately.
206    ///
207    /// # Errors
208    /// Returns an error if the [`Writer`] was dropped before any value was written.
209    pub async fn wait_until_ready(&self) -> Result<(), WriterDropped> {
210        self.ready_rx.get().await.map_err(WriterDropped)
211    }
212
213    /// Retrieve a `clone()` of the entry referred to by `key`, if it is in the cache.
214    ///
215    /// `key.namespace` is ignored for cluster-scoped resources.
216    ///
217    /// Note that this is a cache and may be stale. Deleted objects may still exist in the cache
218    /// despite having been deleted in the cluster, and new objects may not yet exist in the cache.
219    /// If any of these are a problem for you then you should abort your reconciler and retry later.
220    /// If you use `kube_rt::controller` then you can do this by returning an error and specifying a
221    /// reasonable `error_policy`.
222    #[must_use]
223    pub fn get(&self, key: &ObjectRef<K>) -> Option<Arc<K>> {
224        let store = self.store.read();
225        store
226            .get(key)
227            // Try to erase the namespace and try again, in case the object is cluster-scoped
228            .or_else(|| {
229                store.get(&{
230                    let mut cluster_key = key.clone();
231                    cluster_key.namespace = None;
232                    cluster_key
233                })
234            })
235            // Clone to let go of the entry lock ASAP
236            .cloned()
237    }
238
239    /// Return a full snapshot of the current values
240    #[must_use]
241    pub fn state(&self) -> Vec<Arc<K>> {
242        let s = self.store.read();
243        s.values().cloned().collect()
244    }
245
246    /// Retrieve a `clone()` of the entry found by the given predicate
247    #[must_use]
248    pub fn find<P>(&self, predicate: P) -> Option<Arc<K>>
249    where
250        P: Fn(&K) -> bool,
251    {
252        self.store
253            .read()
254            .iter()
255            .map(|(_, k)| k)
256            .find(|k| predicate(k.as_ref()))
257            .cloned()
258    }
259
260    /// Return the number of elements in the store
261    #[must_use]
262    pub fn len(&self) -> usize {
263        self.store.read().len()
264    }
265
266    /// Return whether the store is empty
267    #[must_use]
268    pub fn is_empty(&self) -> bool {
269        self.store.read().is_empty()
270    }
271}
272
273/// Create a (Reader, Writer) for a `Store<K>` for a typed resource `K`
274///
275/// The `Writer` should be passed to a [`reflector`](crate::reflector()),
276/// and the [`Store`] is a read-only handle.
277#[must_use]
278pub fn store<K>() -> (Store<K>, Writer<K>)
279where
280    K: Lookup + Clone + 'static,
281    K::DynamicType: Eq + Hash + Clone + Default,
282{
283    let w = Writer::<K>::default();
284    let r = w.as_reader();
285    (r, w)
286}
287
288/// Create a (Reader, Writer) for a `Store<K>` for a typed resource `K`
289///
290/// The resulting `Writer` can be subscribed on in order to fan out events from
291/// a watcher. The `Writer` should be passed to a [`reflector`](crate::reflector()),
292/// and the [`Store`] is a read-only handle.
293///
294/// A buffer size is used for the underlying message channel. When the buffer is
295/// full, backpressure will be applied by waiting for capacity.
296#[must_use]
297#[allow(clippy::module_name_repetitions)]
298#[cfg(feature = "unstable-runtime-subscribe")]
299pub fn store_shared<K>(buf_size: usize) -> (Store<K>, Writer<K>)
300where
301    K: Lookup + Clone + 'static,
302    K::DynamicType: Eq + Hash + Clone + Default,
303{
304    let w = Writer::<K>::new_shared(buf_size, Default::default());
305    let r = w.as_reader();
306    (r, w)
307}
308
309#[cfg(test)]
310mod tests {
311    use super::{store, Writer};
312    use crate::{reflector::ObjectRef, watcher};
313    use k8s_openapi::api::core::v1::ConfigMap;
314    use kube_client::api::ObjectMeta;
315
316    #[test]
317    fn should_allow_getting_namespaced_object_by_namespaced_ref() {
318        let cm = ConfigMap {
319            metadata: ObjectMeta {
320                name: Some("obj".to_string()),
321                namespace: Some("ns".to_string()),
322                ..ObjectMeta::default()
323            },
324            ..ConfigMap::default()
325        };
326        let mut store_w = Writer::default();
327        store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
328        let store = store_w.as_reader();
329        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
330    }
331
332    #[test]
333    fn should_not_allow_getting_namespaced_object_by_clusterscoped_ref() {
334        let cm = ConfigMap {
335            metadata: ObjectMeta {
336                name: Some("obj".to_string()),
337                namespace: Some("ns".to_string()),
338                ..ObjectMeta::default()
339            },
340            ..ConfigMap::default()
341        };
342        let mut cluster_cm = cm.clone();
343        cluster_cm.metadata.namespace = None;
344        let mut store_w = Writer::default();
345        store_w.apply_watcher_event(&watcher::Event::Apply(cm));
346        let store = store_w.as_reader();
347        assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None);
348    }
349
350    #[test]
351    fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() {
352        let cm = ConfigMap {
353            metadata: ObjectMeta {
354                name: Some("obj".to_string()),
355                namespace: None,
356                ..ObjectMeta::default()
357            },
358            ..ConfigMap::default()
359        };
360        let (store, mut writer) = store();
361        writer.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
362        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
363    }
364
365    #[test]
366    fn should_allow_getting_clusterscoped_object_by_namespaced_ref() {
367        let cm = ConfigMap {
368            metadata: ObjectMeta {
369                name: Some("obj".to_string()),
370                namespace: None,
371                ..ObjectMeta::default()
372            },
373            ..ConfigMap::default()
374        };
375        #[allow(clippy::redundant_clone)] // false positive
376        let mut nsed_cm = cm.clone();
377        nsed_cm.metadata.namespace = Some("ns".to_string());
378        let mut store_w = Writer::default();
379        store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
380        let store = store_w.as_reader();
381        assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), Some(&cm));
382    }
383
384    #[test]
385    fn find_element_in_store() {
386        let cm = ConfigMap {
387            metadata: ObjectMeta {
388                name: Some("obj".to_string()),
389                namespace: None,
390                ..ObjectMeta::default()
391            },
392            ..ConfigMap::default()
393        };
394        let mut target_cm = cm.clone();
395
396        let (reader, mut writer) = store::<ConfigMap>();
397        assert!(reader.is_empty());
398        writer.apply_watcher_event(&watcher::Event::Apply(cm));
399
400        assert_eq!(reader.len(), 1);
401        assert!(reader.find(|k| k.metadata.generation == Some(1234)).is_none());
402
403        target_cm.metadata.name = Some("obj1".to_string());
404        target_cm.metadata.generation = Some(1234);
405        writer.apply_watcher_event(&watcher::Event::Apply(target_cm.clone()));
406        assert!(!reader.is_empty());
407        assert_eq!(reader.len(), 2);
408        let found = reader.find(|k| k.metadata.generation == Some(1234));
409        assert_eq!(found.as_deref(), Some(&target_cm));
410    }
411}