1use crate::{reflector::ObjectRef, watcher::Error};
2use core::{
3 pin::Pin,
4 task::{ready, Context, Poll},
5};
6use futures::Stream;
7use kube_client::Resource;
8use pin_project::pin_project;
9use std::{
10 collections::{hash_map::DefaultHasher, HashMap},
11 hash::{Hash, Hasher},
12};
13
14fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
15 let mut hasher = DefaultHasher::new();
16 t.hash(&mut hasher);
17 hasher.finish()
18}
19
20pub trait Predicate<K> {
22 fn hash_property(&self, obj: &K) -> Option<u64>;
24
25 fn fallback<F: Predicate<K>>(self, f: F) -> Fallback<Self, F>
37 where
38 Self: Sized,
39 {
40 Fallback(self, f)
41 }
42
43 fn combine<F: Predicate<K>>(self, f: F) -> Combine<Self, F>
55 where
56 Self: Sized,
57 {
58 Combine(self, f)
59 }
60}
61
62impl<K, F: Fn(&K) -> Option<u64>> Predicate<K> for F {
63 fn hash_property(&self, obj: &K) -> Option<u64> {
64 (self)(obj)
65 }
66}
67
68#[derive(Copy, Clone, Debug, PartialEq, Eq)]
70pub struct Fallback<A, B>(pub(super) A, pub(super) B);
71impl<A, B, K> Predicate<K> for Fallback<A, B>
72where
73 A: Predicate<K>,
74 B: Predicate<K>,
75{
76 fn hash_property(&self, obj: &K) -> Option<u64> {
77 self.0.hash_property(obj).or_else(|| self.1.hash_property(obj))
78 }
79}
80#[derive(Copy, Clone, Debug, PartialEq, Eq)]
82pub struct Combine<A, B>(pub(super) A, pub(super) B);
83impl<A, B, K> Predicate<K> for Combine<A, B>
84where
85 A: Predicate<K>,
86 B: Predicate<K>,
87{
88 fn hash_property(&self, obj: &K) -> Option<u64> {
89 match (self.0.hash_property(obj), self.1.hash_property(obj)) {
90 (None, None) => None,
92 (a, b) => Some(hash(&(a, b))),
94 }
95 }
96}
97
98#[allow(clippy::pedantic)]
99#[pin_project]
100#[must_use = "streams do nothing unless polled"]
102pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
103 #[pin]
104 stream: St,
105 predicate: P,
106 cache: HashMap<ObjectRef<K>, u64>,
107}
108impl<St, K, P> PredicateFilter<St, K, P>
109where
110 St: Stream<Item = Result<K, Error>>,
111 K: Resource,
112 P: Predicate<K>,
113{
114 pub(super) fn new(stream: St, predicate: P) -> Self {
115 Self {
116 stream,
117 predicate,
118 cache: HashMap::new(),
119 }
120 }
121}
122impl<St, K, P> Stream for PredicateFilter<St, K, P>
123where
124 St: Stream<Item = Result<K, Error>>,
125 K: Resource,
126 K::DynamicType: Default + Eq + Hash,
127 P: Predicate<K>,
128{
129 type Item = Result<K, Error>;
130
131 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
132 let mut me = self.project();
133 Poll::Ready(loop {
134 break match ready!(me.stream.as_mut().poll_next(cx)) {
135 Some(Ok(obj)) => {
136 if let Some(val) = me.predicate.hash_property(&obj) {
137 let key = ObjectRef::from_obj(&obj);
138 let changed = if let Some(old) = me.cache.get(&key) {
139 *old != val
140 } else {
141 true
142 };
143 if let Some(old) = me.cache.get_mut(&key) {
144 *old = val;
145 } else {
146 me.cache.insert(key, val);
147 }
148 if changed {
149 Some(Ok(obj))
150 } else {
151 continue;
152 }
153 } else {
154 Some(Ok(obj))
156 }
157 }
158 Some(Err(err)) => Some(Err(err)),
159 None => return Poll::Ready(None),
160 };
161 })
162 }
163}
164
165pub mod predicates {
172 use super::hash;
173 use kube_client::{Resource, ResourceExt};
174
175 pub fn generation<K: Resource>(obj: &K) -> Option<u64> {
177 obj.meta().generation.map(|g| hash(&g))
178 }
179
180 pub fn resource_version<K: Resource>(obj: &K) -> Option<u64> {
182 obj.meta().resource_version.as_ref().map(hash)
183 }
184
185 pub fn labels<K: Resource>(obj: &K) -> Option<u64> {
187 Some(hash(obj.labels()))
188 }
189
190 pub fn annotations<K: Resource>(obj: &K) -> Option<u64> {
192 Some(hash(obj.annotations()))
193 }
194
195 pub fn finalizers<K: Resource>(obj: &K) -> Option<u64> {
197 Some(hash(obj.finalizers()))
198 }
199}
200
201#[cfg(test)]
202pub(crate) mod tests {
203 use std::{pin::pin, task::Poll};
204
205 use super::{predicates, Error, PredicateFilter};
206 use futures::{poll, stream, FutureExt, StreamExt};
207 use kube_client::Resource;
208 use serde_json::json;
209
210 #[tokio::test]
211 async fn predicate_filtering_hides_equal_predicate_values() {
212 use k8s_openapi::api::core::v1::Pod;
213 let mkobj = |gen: i32| {
214 let p: Pod = serde_json::from_value(json!({
215 "apiVersion": "v1",
216 "kind": "Pod",
217 "metadata": {
218 "name": "blog",
219 "generation": Some(gen),
220 },
221 "spec": {
222 "containers": [{
223 "name": "blog",
224 "image": "clux/blog:0.1.0"
225 }],
226 }
227 }))
228 .unwrap();
229 p
230 };
231 let data = stream::iter([
232 Ok(mkobj(1)),
233 Err(Error::NoResourceVersion),
234 Ok(mkobj(1)),
235 Ok(mkobj(2)),
236 ]);
237 let mut rx = pin!(PredicateFilter::new(data, predicates::generation));
238
239 let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
241 assert_eq!(first.meta().generation, Some(1));
242
243 assert!(matches!(
245 poll!(rx.next()),
246 Poll::Ready(Some(Err(Error::NoResourceVersion)))
247 ));
248 let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
251 assert_eq!(second.meta().generation, Some(2));
252 assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
253 }
254}