1use super::{dispatcher::Dispatcher, Lookup, ObjectRef};
2#[cfg(feature = "unstable-runtime-subscribe")]
3use crate::reflector::ReflectHandle;
4use crate::{
5 utils::delayed_init::{self, DelayedInit},
6 watcher,
7};
8use ahash::AHashMap;
9use educe::Educe;
10use parking_lot::RwLock;
11use std::{fmt::Debug, hash::Hash, sync::Arc};
12use thiserror::Error;
13
14type Cache<K> = Arc<RwLock<AHashMap<ObjectRef<K>, Arc<K>>>>;
15
16#[derive(Debug)]
21pub struct Writer<K: 'static + Lookup + Clone>
22where
23 K::DynamicType: Eq + Hash + Clone,
24{
25 store: Cache<K>,
26 buffer: AHashMap<ObjectRef<K>, Arc<K>>,
27 dyntype: K::DynamicType,
28 ready_tx: Option<delayed_init::Initializer<()>>,
29 ready_rx: Arc<DelayedInit<()>>,
30 dispatcher: Option<Dispatcher<K>>,
31}
32
33impl<K: 'static + Lookup + Clone> Writer<K>
34where
35 K::DynamicType: Eq + Hash + Clone,
36{
37 pub fn new(dyntype: K::DynamicType) -> Self {
42 let (ready_tx, ready_rx) = DelayedInit::new();
43 Writer {
44 store: Default::default(),
45 buffer: Default::default(),
46 dyntype,
47 ready_tx: Some(ready_tx),
48 ready_rx: Arc::new(ready_rx),
49 dispatcher: None,
50 }
51 }
52
53 #[cfg(feature = "unstable-runtime-subscribe")]
63 pub fn new_shared(buf_size: usize, dyntype: K::DynamicType) -> Self {
64 let (ready_tx, ready_rx) = DelayedInit::new();
65 Writer {
66 store: Default::default(),
67 buffer: Default::default(),
68 dyntype,
69 ready_tx: Some(ready_tx),
70 ready_rx: Arc::new(ready_rx),
71 dispatcher: Some(Dispatcher::new(buf_size)),
72 }
73 }
74
75 #[must_use]
80 pub fn as_reader(&self) -> Store<K> {
81 Store {
82 store: self.store.clone(),
83 ready_rx: self.ready_rx.clone(),
84 }
85 }
86
87 #[cfg(feature = "unstable-runtime-subscribe")]
95 pub fn subscribe(&self) -> Option<ReflectHandle<K>> {
96 self.dispatcher
97 .as_ref()
98 .map(|dispatcher| dispatcher.subscribe(self.as_reader()))
99 }
100
101 pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
103 match event {
104 watcher::Event::Apply(obj) => {
105 let key = obj.to_object_ref(self.dyntype.clone());
106 let obj = Arc::new(obj.clone());
107 self.store.write().insert(key, obj);
108 }
109 watcher::Event::Delete(obj) => {
110 let key = obj.to_object_ref(self.dyntype.clone());
111 self.store.write().remove(&key);
112 }
113 watcher::Event::Init => {
114 self.buffer = AHashMap::new();
115 }
116 watcher::Event::InitApply(obj) => {
117 let key = obj.to_object_ref(self.dyntype.clone());
118 let obj = Arc::new(obj.clone());
119 self.buffer.insert(key, obj);
120 }
121 watcher::Event::InitDone => {
122 let mut store = self.store.write();
123
124 std::mem::swap(&mut *store, &mut self.buffer);
126
127 self.buffer = AHashMap::new();
131
132 if let Some(ready_tx) = self.ready_tx.take() {
134 ready_tx.init(())
135 }
136 }
137 }
138 }
139
140 pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event<K>) {
142 if let Some(ref mut dispatcher) = self.dispatcher {
143 match event {
144 watcher::Event::Apply(obj) => {
145 let obj_ref = obj.to_object_ref(self.dyntype.clone());
146 dispatcher.broadcast(obj_ref).await;
149 }
150
151 watcher::Event::InitDone => {
152 let obj_refs: Vec<_> = {
153 let store = self.store.read();
154 store.keys().cloned().collect()
155 };
156
157 for obj_ref in obj_refs {
158 dispatcher.broadcast(obj_ref).await;
159 }
160 }
161
162 _ => {}
163 }
164 }
165 }
166}
167
168impl<K> Default for Writer<K>
169where
170 K: Lookup + Clone + 'static,
171 K::DynamicType: Default + Eq + Hash + Clone,
172{
173 fn default() -> Self {
174 Self::new(K::DynamicType::default())
175 }
176}
177
178#[derive(Educe)]
185#[educe(Debug(bound("K: Debug, K::DynamicType: Debug")), Clone)]
186pub struct Store<K: 'static + Lookup>
187where
188 K::DynamicType: Hash + Eq,
189{
190 store: Cache<K>,
191 ready_rx: Arc<DelayedInit<()>>,
192}
193
194#[derive(Debug, Error)]
195#[error("writer was dropped before store became ready")]
196pub struct WriterDropped(delayed_init::InitDropped);
197
198impl<K: 'static + Clone + Lookup> Store<K>
199where
200 K::DynamicType: Eq + Hash + Clone,
201{
202 pub async fn wait_until_ready(&self) -> Result<(), WriterDropped> {
210 self.ready_rx.get().await.map_err(WriterDropped)
211 }
212
213 #[must_use]
223 pub fn get(&self, key: &ObjectRef<K>) -> Option<Arc<K>> {
224 let store = self.store.read();
225 store
226 .get(key)
227 .or_else(|| {
229 store.get(&{
230 let mut cluster_key = key.clone();
231 cluster_key.namespace = None;
232 cluster_key
233 })
234 })
235 .cloned()
237 }
238
239 #[must_use]
241 pub fn state(&self) -> Vec<Arc<K>> {
242 let s = self.store.read();
243 s.values().cloned().collect()
244 }
245
246 #[must_use]
248 pub fn find<P>(&self, predicate: P) -> Option<Arc<K>>
249 where
250 P: Fn(&K) -> bool,
251 {
252 self.store
253 .read()
254 .iter()
255 .map(|(_, k)| k)
256 .find(|k| predicate(k.as_ref()))
257 .cloned()
258 }
259
260 #[must_use]
262 pub fn len(&self) -> usize {
263 self.store.read().len()
264 }
265
266 #[must_use]
268 pub fn is_empty(&self) -> bool {
269 self.store.read().is_empty()
270 }
271}
272
273#[must_use]
278pub fn store<K>() -> (Store<K>, Writer<K>)
279where
280 K: Lookup + Clone + 'static,
281 K::DynamicType: Eq + Hash + Clone + Default,
282{
283 let w = Writer::<K>::default();
284 let r = w.as_reader();
285 (r, w)
286}
287
288#[must_use]
297#[allow(clippy::module_name_repetitions)]
298#[cfg(feature = "unstable-runtime-subscribe")]
299pub fn store_shared<K>(buf_size: usize) -> (Store<K>, Writer<K>)
300where
301 K: Lookup + Clone + 'static,
302 K::DynamicType: Eq + Hash + Clone + Default,
303{
304 let w = Writer::<K>::new_shared(buf_size, Default::default());
305 let r = w.as_reader();
306 (r, w)
307}
308
309#[cfg(test)]
310mod tests {
311 use super::{store, Writer};
312 use crate::{reflector::ObjectRef, watcher};
313 use k8s_openapi::api::core::v1::ConfigMap;
314 use kube_client::api::ObjectMeta;
315
316 #[test]
317 fn should_allow_getting_namespaced_object_by_namespaced_ref() {
318 let cm = ConfigMap {
319 metadata: ObjectMeta {
320 name: Some("obj".to_string()),
321 namespace: Some("ns".to_string()),
322 ..ObjectMeta::default()
323 },
324 ..ConfigMap::default()
325 };
326 let mut store_w = Writer::default();
327 store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
328 let store = store_w.as_reader();
329 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
330 }
331
332 #[test]
333 fn should_not_allow_getting_namespaced_object_by_clusterscoped_ref() {
334 let cm = ConfigMap {
335 metadata: ObjectMeta {
336 name: Some("obj".to_string()),
337 namespace: Some("ns".to_string()),
338 ..ObjectMeta::default()
339 },
340 ..ConfigMap::default()
341 };
342 let mut cluster_cm = cm.clone();
343 cluster_cm.metadata.namespace = None;
344 let mut store_w = Writer::default();
345 store_w.apply_watcher_event(&watcher::Event::Apply(cm));
346 let store = store_w.as_reader();
347 assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None);
348 }
349
350 #[test]
351 fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() {
352 let cm = ConfigMap {
353 metadata: ObjectMeta {
354 name: Some("obj".to_string()),
355 namespace: None,
356 ..ObjectMeta::default()
357 },
358 ..ConfigMap::default()
359 };
360 let (store, mut writer) = store();
361 writer.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
362 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
363 }
364
365 #[test]
366 fn should_allow_getting_clusterscoped_object_by_namespaced_ref() {
367 let cm = ConfigMap {
368 metadata: ObjectMeta {
369 name: Some("obj".to_string()),
370 namespace: None,
371 ..ObjectMeta::default()
372 },
373 ..ConfigMap::default()
374 };
375 #[allow(clippy::redundant_clone)] let mut nsed_cm = cm.clone();
377 nsed_cm.metadata.namespace = Some("ns".to_string());
378 let mut store_w = Writer::default();
379 store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone()));
380 let store = store_w.as_reader();
381 assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), Some(&cm));
382 }
383
384 #[test]
385 fn find_element_in_store() {
386 let cm = ConfigMap {
387 metadata: ObjectMeta {
388 name: Some("obj".to_string()),
389 namespace: None,
390 ..ObjectMeta::default()
391 },
392 ..ConfigMap::default()
393 };
394 let mut target_cm = cm.clone();
395
396 let (reader, mut writer) = store::<ConfigMap>();
397 assert!(reader.is_empty());
398 writer.apply_watcher_event(&watcher::Event::Apply(cm));
399
400 assert_eq!(reader.len(), 1);
401 assert!(reader.find(|k| k.metadata.generation == Some(1234)).is_none());
402
403 target_cm.metadata.name = Some("obj1".to_string());
404 target_cm.metadata.generation = Some(1234);
405 writer.apply_watcher_event(&watcher::Event::Apply(target_cm.clone()));
406 assert!(!reader.is_empty());
407 assert_eq!(reader.len(), 2);
408 let found = reader.find(|k| k.metadata.generation == Some(1234));
409 assert_eq!(found.as_deref(), Some(&target_cm));
410 }
411}