kube_runtime/utils/
predicate.rs

1use crate::{reflector::ObjectRef, watcher::Error};
2use core::{
3    pin::Pin,
4    task::{ready, Context, Poll},
5};
6use futures::Stream;
7use kube_client::Resource;
8use pin_project::pin_project;
9use std::{
10    collections::{hash_map::DefaultHasher, HashMap},
11    hash::{Hash, Hasher},
12};
13
14fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
15    let mut hasher = DefaultHasher::new();
16    t.hash(&mut hasher);
17    hasher.finish()
18}
19
20/// A predicate is a hasher of Kubernetes objects stream filtering
21pub trait Predicate<K> {
22    /// A predicate only needs to implement optional hashing when keys exist
23    fn hash_property(&self, obj: &K) -> Option<u64>;
24
25    /// Returns a `Predicate` that falls back to an alternate property if the first does not exist
26    ///
27    /// # Usage
28    ///
29    /// ```
30    /// # use k8s_openapi::api::core::v1::Pod;
31    /// use kube::runtime::{predicates, Predicate};
32    /// # fn blah<K>(a: impl Predicate<K>) {}
33    /// let pred = predicates::generation.fallback(predicates::resource_version);
34    /// blah::<Pod>(pred);
35    /// ```
36    fn fallback<F: Predicate<K>>(self, f: F) -> Fallback<Self, F>
37    where
38        Self: Sized,
39    {
40        Fallback(self, f)
41    }
42
43    /// Returns a `Predicate` that combines all available hashes
44    ///
45    /// # Usage
46    ///
47    /// ```
48    /// # use k8s_openapi::api::core::v1::Pod;
49    /// use kube::runtime::{predicates, Predicate};
50    /// # fn blah<K>(a: impl Predicate<K>) {}
51    /// let pred = predicates::labels.combine(predicates::annotations);
52    /// blah::<Pod>(pred);
53    /// ```
54    fn combine<F: Predicate<K>>(self, f: F) -> Combine<Self, F>
55    where
56        Self: Sized,
57    {
58        Combine(self, f)
59    }
60}
61
62impl<K, F: Fn(&K) -> Option<u64>> Predicate<K> for F {
63    fn hash_property(&self, obj: &K) -> Option<u64> {
64        (self)(obj)
65    }
66}
67
68/// See [`Predicate::fallback`]
69#[derive(Copy, Clone, Debug, PartialEq, Eq)]
70pub struct Fallback<A, B>(pub(super) A, pub(super) B);
71impl<A, B, K> Predicate<K> for Fallback<A, B>
72where
73    A: Predicate<K>,
74    B: Predicate<K>,
75{
76    fn hash_property(&self, obj: &K) -> Option<u64> {
77        self.0.hash_property(obj).or_else(|| self.1.hash_property(obj))
78    }
79}
80/// See [`Predicate::combine`]
81#[derive(Copy, Clone, Debug, PartialEq, Eq)]
82pub struct Combine<A, B>(pub(super) A, pub(super) B);
83impl<A, B, K> Predicate<K> for Combine<A, B>
84where
85    A: Predicate<K>,
86    B: Predicate<K>,
87{
88    fn hash_property(&self, obj: &K) -> Option<u64> {
89        match (self.0.hash_property(obj), self.1.hash_property(obj)) {
90            // pass on both missing properties so people can chain .fallback
91            (None, None) => None,
92            // but any other combination of properties are hashed together
93            (a, b) => Some(hash(&(a, b))),
94        }
95    }
96}
97
98#[allow(clippy::pedantic)]
99#[pin_project]
100/// Stream returned by the [`predicate_filter`](super::WatchStreamExt::predicate_filter) method.
101#[must_use = "streams do nothing unless polled"]
102pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
103    #[pin]
104    stream: St,
105    predicate: P,
106    cache: HashMap<ObjectRef<K>, u64>,
107}
108impl<St, K, P> PredicateFilter<St, K, P>
109where
110    St: Stream<Item = Result<K, Error>>,
111    K: Resource,
112    P: Predicate<K>,
113{
114    pub(super) fn new(stream: St, predicate: P) -> Self {
115        Self {
116            stream,
117            predicate,
118            cache: HashMap::new(),
119        }
120    }
121}
122impl<St, K, P> Stream for PredicateFilter<St, K, P>
123where
124    St: Stream<Item = Result<K, Error>>,
125    K: Resource,
126    K::DynamicType: Default + Eq + Hash,
127    P: Predicate<K>,
128{
129    type Item = Result<K, Error>;
130
131    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
132        let mut me = self.project();
133        Poll::Ready(loop {
134            break match ready!(me.stream.as_mut().poll_next(cx)) {
135                Some(Ok(obj)) => {
136                    if let Some(val) = me.predicate.hash_property(&obj) {
137                        let key = ObjectRef::from_obj(&obj);
138                        let changed = if let Some(old) = me.cache.get(&key) {
139                            *old != val
140                        } else {
141                            true
142                        };
143                        if let Some(old) = me.cache.get_mut(&key) {
144                            *old = val;
145                        } else {
146                            me.cache.insert(key, val);
147                        }
148                        if changed {
149                            Some(Ok(obj))
150                        } else {
151                            continue;
152                        }
153                    } else {
154                        // if we can't evaluate predicate, always emit K
155                        Some(Ok(obj))
156                    }
157                }
158                Some(Err(err)) => Some(Err(err)),
159                None => return Poll::Ready(None),
160            };
161        })
162    }
163}
164
165/// Predicate functions for [`WatchStreamExt::predicate_filter`](crate::WatchStreamExt::predicate_filter)
166///
167/// These functions just return a hash of commonly compared values,
168/// to help decide whether to pass a watch event along or not.
169///
170/// Functional rewrite of the [controller-runtime/predicate module](https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/predicate/predicate.go).
171pub mod predicates {
172    use super::hash;
173    use kube_client::{Resource, ResourceExt};
174
175    /// Hash the generation of a Resource K
176    pub fn generation<K: Resource>(obj: &K) -> Option<u64> {
177        obj.meta().generation.map(|g| hash(&g))
178    }
179
180    /// Hash the resource version of a Resource K
181    pub fn resource_version<K: Resource>(obj: &K) -> Option<u64> {
182        obj.meta().resource_version.as_ref().map(hash)
183    }
184
185    /// Hash the labels of a Resource K
186    pub fn labels<K: Resource>(obj: &K) -> Option<u64> {
187        Some(hash(obj.labels()))
188    }
189
190    /// Hash the annotations of a Resource K
191    pub fn annotations<K: Resource>(obj: &K) -> Option<u64> {
192        Some(hash(obj.annotations()))
193    }
194
195    /// Hash the finalizers of a Resource K
196    pub fn finalizers<K: Resource>(obj: &K) -> Option<u64> {
197        Some(hash(obj.finalizers()))
198    }
199}
200
201#[cfg(test)]
202pub(crate) mod tests {
203    use std::{pin::pin, task::Poll};
204
205    use super::{predicates, Error, PredicateFilter};
206    use futures::{poll, stream, FutureExt, StreamExt};
207    use kube_client::Resource;
208    use serde_json::json;
209
210    #[tokio::test]
211    async fn predicate_filtering_hides_equal_predicate_values() {
212        use k8s_openapi::api::core::v1::Pod;
213        let mkobj = |gen: i32| {
214            let p: Pod = serde_json::from_value(json!({
215                "apiVersion": "v1",
216                "kind": "Pod",
217                "metadata": {
218                    "name": "blog",
219                    "generation": Some(gen),
220                },
221                "spec": {
222                    "containers": [{
223                      "name": "blog",
224                      "image": "clux/blog:0.1.0"
225                    }],
226                }
227            }))
228            .unwrap();
229            p
230        };
231        let data = stream::iter([
232            Ok(mkobj(1)),
233            Err(Error::NoResourceVersion),
234            Ok(mkobj(1)),
235            Ok(mkobj(2)),
236        ]);
237        let mut rx = pin!(PredicateFilter::new(data, predicates::generation));
238
239        // mkobj(1) passed through
240        let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
241        assert_eq!(first.meta().generation, Some(1));
242
243        // Error passed through
244        assert!(matches!(
245            poll!(rx.next()),
246            Poll::Ready(Some(Err(Error::NoResourceVersion)))
247        ));
248        // (no repeat mkobj(1) - same generation)
249        // mkobj(2) next
250        let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
251        assert_eq!(second.meta().generation, Some(2));
252        assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
253    }
254}