use super::ObjectRef;
use crate::watcher;
use ahash::AHashMap;
use derivative::Derivative;
use kube_client::Resource;
use parking_lot::RwLock;
use std::{fmt::Debug, hash::Hash, sync::Arc};
type Cache<K> = Arc<RwLock<AHashMap<ObjectRef<K>, Arc<K>>>>;
#[derive(Debug, Derivative)]
#[derivative(Default(bound = "K::DynamicType: Default"))]
pub struct Writer<K: 'static + Resource>
where
K::DynamicType: Eq + Hash,
{
store: Cache<K>,
dyntype: K::DynamicType,
}
impl<K: 'static + Resource + Clone> Writer<K>
where
K::DynamicType: Eq + Hash + Clone,
{
pub fn new(dyntype: K::DynamicType) -> Self {
Writer {
store: Default::default(),
dyntype,
}
}
#[must_use]
pub fn as_reader(&self) -> Store<K> {
Store {
store: self.store.clone(),
}
}
pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
match event {
watcher::Event::Applied(obj) => {
let key = ObjectRef::from_obj_with(obj, self.dyntype.clone());
let obj = Arc::new(obj.clone());
self.store.write().insert(key, obj);
}
watcher::Event::Deleted(obj) => {
let key = ObjectRef::from_obj_with(obj, self.dyntype.clone());
self.store.write().remove(&key);
}
watcher::Event::Restarted(new_objs) => {
let new_objs = new_objs
.iter()
.map(|obj| {
(
ObjectRef::from_obj_with(obj, self.dyntype.clone()),
Arc::new(obj.clone()),
)
})
.collect::<AHashMap<_, _>>();
*self.store.write() = new_objs;
}
}
}
}
#[derive(Derivative)]
#[derivative(Debug(bound = "K: Debug, K::DynamicType: Debug"), Clone)]
pub struct Store<K: 'static + Resource>
where
K::DynamicType: Hash + Eq,
{
store: Cache<K>,
}
impl<K: 'static + Clone + Resource> Store<K>
where
K::DynamicType: Eq + Hash + Clone,
{
#[must_use]
pub fn get(&self, key: &ObjectRef<K>) -> Option<Arc<K>> {
let store = self.store.read();
store
.get(key)
.or_else(|| {
store.get(&{
let mut cluster_key = key.clone();
cluster_key.namespace = None;
cluster_key
})
})
.cloned()
}
#[must_use]
pub fn state(&self) -> Vec<Arc<K>> {
let s = self.store.read();
s.values().cloned().collect()
}
#[must_use]
pub fn find<P>(&self, predicate: P) -> Option<Arc<K>>
where
P: Fn(&K) -> bool,
{
self.store
.read()
.iter()
.map(|(_, k)| k)
.find(|k| predicate(k.as_ref()))
.cloned()
}
#[must_use]
pub fn len(&self) -> usize {
self.store.read().len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.store.read().is_empty()
}
}
#[must_use]
pub fn store<K>() -> (Store<K>, Writer<K>)
where
K: Resource + Clone + 'static,
K::DynamicType: Eq + Hash + Clone + Default,
{
let w = Writer::<K>::default();
let r = w.as_reader();
(r, w)
}
#[cfg(test)]
mod tests {
use super::{store, Writer};
use crate::{reflector::ObjectRef, watcher};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::api::ObjectMeta;
#[test]
fn should_allow_getting_namespaced_object_by_namespaced_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: Some("ns".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
}
#[test]
fn should_not_allow_getting_namespaced_object_by_clusterscoped_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: Some("ns".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
let mut cluster_cm = cm.clone();
cluster_cm.metadata.namespace = None;
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None);
}
#[test]
fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: None,
..ObjectMeta::default()
},
..ConfigMap::default()
};
let (store, mut writer) = store();
writer.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
}
#[test]
fn should_allow_getting_clusterscoped_object_by_namespaced_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: None,
..ObjectMeta::default()
},
..ConfigMap::default()
};
let mut nsed_cm = cm.clone();
nsed_cm.metadata.namespace = Some("ns".to_string());
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), Some(&cm));
}
#[test]
fn find_element_in_store() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: None,
..ObjectMeta::default()
},
..ConfigMap::default()
};
let mut target_cm = cm.clone();
let (reader, mut writer) = store::<ConfigMap>();
assert!(reader.is_empty());
writer.apply_watcher_event(&watcher::Event::Applied(cm));
assert_eq!(reader.len(), 1);
assert!(reader.find(|k| k.metadata.generation == Some(1234)).is_none());
target_cm.metadata.name = Some("obj1".to_string());
target_cm.metadata.generation = Some(1234);
writer.apply_watcher_event(&watcher::Event::Applied(target_cm.clone()));
assert!(!reader.is_empty());
assert_eq!(reader.len(), 2);
let found = reader.find(|k| k.metadata.generation == Some(1234));
assert_eq!(found.as_deref(), Some(&target_cm));
}
}