kube_runtime/
watcher.rs

1//! Watches a Kubernetes Resource for changes, with error recovery
2//!
3//! See [`watcher`] for the primary entry point.
4
5use crate::utils::{Backoff, ResetTimerBackoff};
6
7use async_trait::async_trait;
8use backon::BackoffBuilder;
9use educe::Educe;
10use futures::{stream::BoxStream, Stream, StreamExt};
11use kube_client::{
12    api::{ListParams, Resource, ResourceExt, VersionMatch, WatchEvent, WatchParams},
13    core::{metadata::PartialObjectMeta, ObjectList, Selector},
14    error::ErrorResponse,
15    Api, Error as ClientErr,
16};
17use serde::de::DeserializeOwned;
18use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration};
19use thiserror::Error;
20use tracing::{debug, error, warn};
21
22#[derive(Debug, Error)]
23pub enum Error {
24    #[error("failed to perform initial object list: {0}")]
25    InitialListFailed(#[source] kube_client::Error),
26    #[error("failed to start watching object: {0}")]
27    WatchStartFailed(#[source] kube_client::Error),
28    #[error("error returned by apiserver during watch: {0}")]
29    WatchError(#[source] ErrorResponse),
30    #[error("watch stream failed: {0}")]
31    WatchFailed(#[source] kube_client::Error),
32    #[error("no metadata.resourceVersion in watch result (does resource support watch?)")]
33    NoResourceVersion,
34}
35pub type Result<T, E = Error> = std::result::Result<T, E>;
36
37#[derive(Debug, Clone)]
38/// Watch events returned from the [`watcher`]
39pub enum Event<K> {
40    /// An object was added or modified
41    Apply(K),
42    /// An object was deleted
43    ///
44    /// NOTE: This should not be used for managing persistent state elsewhere, since
45    /// events may be lost if the watcher is unavailable. Use Finalizers instead.
46    Delete(K),
47    /// The watch stream was restarted.
48    ///
49    /// A series of `InitApply` events are expected to follow until all matching objects
50    /// have been listed. This event can be used to prepare a buffer for `InitApply` events.
51    Init,
52    /// Received an object during `Init`.
53    ///
54    /// Objects returned here are either from the initial stream using the `StreamingList` strategy,
55    /// or from pages using the `ListWatch` strategy.
56    ///
57    /// These events can be passed up if having a complete set of objects is not a concern.
58    /// If you need to wait for a complete set, please buffer these events until an `InitDone`.
59    InitApply(K),
60    /// The initialisation is complete.
61    ///
62    /// This can be used as a signal to replace buffered store contents atomically.
63    /// No more `InitApply` events will happen until the next `Init` event.
64    ///
65    /// Any objects that were previously [`Applied`](Event::Applied) but are not listed in any of
66    /// the `InitApply` events should be assumed to have been [`Deleted`](Event::Deleted).
67    InitDone,
68}
69
70impl<K> Event<K> {
71    /// Flattens out all objects that were added or modified in the event.
72    ///
73    /// `Deleted` objects are ignored, all objects mentioned by `Restarted` events are
74    /// emitted individually.
75    #[deprecated(
76        since = "0.92.0",
77        note = "unnecessary to flatten a single object. This fn will be removed in 0.96.0."
78    )]
79    pub fn into_iter_applied(self) -> impl Iterator<Item = K> {
80        match self {
81            Self::Apply(obj) | Self::InitApply(obj) => Some(obj),
82            Self::Delete(_) | Self::Init | Self::InitDone => None,
83        }
84        .into_iter()
85    }
86
87    /// Flattens out all objects that were added, modified, or deleted in the event.
88    ///
89    /// Note that `Deleted` events may be missed when restarting the stream. Use finalizers
90    /// or owner references instead if you care about cleaning up external resources after
91    /// deleted objects.
92    #[deprecated(
93        since = "0.92.0",
94        note = "unnecessary to flatten a single object. This fn will be removed in 0.96.0."
95    )]
96    pub fn into_iter_touched(self) -> impl Iterator<Item = K> {
97        match self {
98            Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => Some(obj),
99            Self::Init | Self::InitDone => None,
100        }
101        .into_iter()
102    }
103
104    /// Map each object in an event through a mutator fn
105    ///
106    /// This allows for memory optimizations in watch streams.
107    /// If you are chaining a watch stream into a reflector as an in memory state store,
108    /// you can control the space used by each object by dropping fields.
109    ///
110    /// ```no_run
111    /// use k8s_openapi::api::core::v1::Pod;
112    /// use kube::ResourceExt;
113    /// # use kube::runtime::watcher::Event;
114    /// # let event: Event<Pod> = todo!();
115    /// event.modify(|pod| {
116    ///     pod.managed_fields_mut().clear();
117    ///     pod.annotations_mut().clear();
118    ///     pod.status = None;
119    /// });
120    /// ```
121    #[must_use]
122    pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self {
123        match &mut self {
124            Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => (f)(obj),
125            Self::Init | Self::InitDone => {} // markers, nothing to modify
126        }
127        self
128    }
129}
130
131#[derive(Educe, Default)]
132#[educe(Debug)]
133/// The internal finite state machine driving the [`watcher`]
134enum State<K> {
135    /// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
136    #[default]
137    Empty,
138    /// The Watcher is in the process of paginating through the initial LIST
139    InitPage {
140        continue_token: Option<String>,
141        objects: VecDeque<K>,
142        last_bookmark: Option<String>,
143    },
144    /// Kubernetes 1.27 Streaming Lists
145    /// The initial watch is in progress
146    InitialWatch {
147        #[educe(Debug(ignore))]
148        stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
149    },
150    /// The initial LIST was successful, so we should move on to starting the actual watch.
151    InitListed { resource_version: String },
152    /// The watch is in progress, from this point we just return events from the server.
153    ///
154    /// If the connection is disrupted then we propagate the error but try to restart the watch stream by
155    /// returning to the `InitListed` state.
156    /// If we fall out of the K8s watch window then we propagate the error and fall back doing a re-list
157    /// with `Empty`.
158    Watching {
159        resource_version: String,
160        #[educe(Debug(ignore))]
161        stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
162    },
163}
164
165/// Used to control whether the watcher receives the full object, or only the
166/// metadata
167#[async_trait]
168trait ApiMode {
169    type Value: Clone;
170
171    async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>>;
172    async fn watch(
173        &self,
174        wp: &WatchParams,
175        version: &str,
176    ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>>;
177}
178
179/// A wrapper around the `Api` of a `Resource` type that when used by the
180/// watcher will return the entire (full) object
181struct FullObject<'a, K> {
182    api: &'a Api<K>,
183}
184
185/// Configurable list semantics for `watcher` relists
186#[derive(Clone, Default, Debug, PartialEq)]
187pub enum ListSemantic {
188    /// List calls perform a full quorum read for most recent results
189    ///
190    /// Prefer this if you have strong consistency requirements. Note that this
191    /// is more taxing for the apiserver and can be less scalable for the cluster.
192    ///
193    /// If you are observing large resource sets (such as congested `Controller` cases),
194    /// you typically have a delay between the list call completing, and all the events
195    /// getting processed. In such cases, it is probably worth picking `Any` over `MostRecent`,
196    /// as your events are not guaranteed to be up-to-date by the time you get to them anyway.
197    #[default]
198    MostRecent,
199
200    /// List calls returns cached results from apiserver
201    ///
202    /// This is faster and much less taxing on the apiserver, but can result
203    /// in much older results than has previously observed for `Restarted` events,
204    /// particularly in HA configurations, due to partitions or stale caches.
205    ///
206    /// This option makes the most sense for controller usage where events have
207    /// some delay between being seen by the runtime, and it being sent to the reconciler.
208    Any,
209}
210
211/// Configurable watcher listwatch semantics
212
213#[derive(Clone, Default, Debug, PartialEq)]
214pub enum InitialListStrategy {
215    /// List first, then watch from given resouce version
216    ///
217    /// This is the old and default way of watching. The watcher will do a paginated list call first before watching.
218    /// When using this mode, you can configure the `page_size` on the watcher.
219    #[default]
220    ListWatch,
221    /// Kubernetes 1.27 Streaming Lists
222    ///
223    /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
224    /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
225    StreamingList,
226}
227
228/// Accumulates all options that can be used on the watcher invocation.
229#[derive(Clone, Debug, PartialEq)]
230pub struct Config {
231    /// A selector to restrict the list of returned objects by their labels.
232    ///
233    /// Defaults to everything if `None`.
234    pub label_selector: Option<String>,
235
236    /// A selector to restrict the list of returned objects by their fields.
237    ///
238    /// Defaults to everything if `None`.
239    pub field_selector: Option<String>,
240
241    /// Timeout for the list/watch call.
242    ///
243    /// This limits the duration of the call, regardless of any activity or inactivity.
244    /// If unset for a watch call, we will use 290s.
245    /// We limit this to 295s due to [inherent watch limitations](https://github.com/kubernetes/kubernetes/issues/6513).
246    pub timeout: Option<u32>,
247
248    /// Semantics for list calls.
249    ///
250    /// Configures re-list for performance vs. consistency.
251    ///
252    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
253    pub list_semantic: ListSemantic,
254
255    /// Control how the watcher fetches the initial list of objects.
256    ///
257    /// - `ListWatch`: The watcher will fetch the initial list of objects using a list call.
258    /// - `StreamingList`: The watcher will fetch the initial list of objects using a watch call.
259    ///
260    /// `StreamingList` is more efficient than `ListWatch`, but it requires the server to support
261    /// streaming list bookmarks (opt-in feature gate in Kubernetes 1.27).
262    ///
263    /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
264    /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
265    pub initial_list_strategy: InitialListStrategy,
266
267    /// Maximum number of objects retrieved per list operation resyncs.
268    ///
269    /// This can reduce the memory consumption during resyncs, at the cost of requiring more
270    /// API roundtrips to complete.
271    ///
272    /// Defaults to 500. Note that `None` represents unbounded.
273    ///
274    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
275    pub page_size: Option<u32>,
276
277    /// Enables watch events with type "BOOKMARK".
278    ///
279    /// Requests watch bookmarks from the apiserver when enabled for improved watch precision and reduced list calls.
280    /// This is default enabled and should generally not be turned off.
281    pub bookmarks: bool,
282}
283
284impl Default for Config {
285    fn default() -> Self {
286        Self {
287            bookmarks: true,
288            label_selector: None,
289            field_selector: None,
290            timeout: None,
291            list_semantic: ListSemantic::default(),
292            // same default page size limit as client-go
293            // https://github.com/kubernetes/client-go/blob/aed71fa5cf054e1c196d67b2e21f66fd967b8ab1/tools/pager/pager.go#L31
294            page_size: Some(500),
295            initial_list_strategy: InitialListStrategy::ListWatch,
296        }
297    }
298}
299
300/// Builder interface to Config
301///
302/// Usage:
303/// ```
304/// use kube::runtime::watcher::Config;
305/// let wc = Config::default()
306///     .timeout(60)
307///     .labels("kubernetes.io/lifecycle=spot");
308/// ```
309impl Config {
310    /// Configure the timeout for list/watch calls
311    ///
312    /// This limits the duration of the call, regardless of any activity or inactivity.
313    /// Defaults to 290s
314    #[must_use]
315    pub fn timeout(mut self, timeout_secs: u32) -> Self {
316        self.timeout = Some(timeout_secs);
317        self
318    }
319
320    /// Configure the selector to restrict the list of returned objects by their fields.
321    ///
322    /// Defaults to everything.
323    /// Supports `=`, `==`, `!=`, and can be comma separated: `key1=value1,key2=value2`.
324    /// The server only supports a limited number of field queries per type.
325    #[must_use]
326    pub fn fields(mut self, field_selector: &str) -> Self {
327        self.field_selector = Some(field_selector.to_string());
328        self
329    }
330
331    /// Configure the selector to restrict the list of returned objects by their labels.
332    ///
333    /// Defaults to everything.
334    /// Supports `=`, `==`, `!=`, and can be comma separated: `key1=value1,key2=value2`.
335    #[must_use]
336    pub fn labels(mut self, label_selector: &str) -> Self {
337        self.label_selector = Some(label_selector.to_string());
338        self
339    }
340
341    /// Configure typed label selectors
342    ///
343    /// Configure typed selectors from [`Selector`](kube_client::core::Selector) and [`Expression`](kube_client::core::Expression) lists.
344    ///
345    /// ```
346    /// use kube_runtime::watcher::Config;
347    /// use kube_client::core::{Expression, Selector, ParseExpressionError};
348    /// use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
349    /// let selector: Selector = Expression::In("env".into(), ["development".into(), "sandbox".into()].into()).into();
350    /// let cfg = Config::default().labels_from(&selector);
351    /// let cfg = Config::default().labels_from(&Expression::Exists("foo".into()).into());
352    /// let selector: Selector = LabelSelector::default().try_into()?;
353    /// let cfg = Config::default().labels_from(&selector);
354    /// # Ok::<(), ParseExpressionError>(())
355    ///```
356    #[must_use]
357    pub fn labels_from(mut self, selector: &Selector) -> Self {
358        self.label_selector = Some(selector.to_string());
359        self
360    }
361
362    /// Sets list semantic to configure re-list performance and consistency
363    ///
364    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
365    #[must_use]
366    pub fn list_semantic(mut self, semantic: ListSemantic) -> Self {
367        self.list_semantic = semantic;
368        self
369    }
370
371    /// Sets list semantic to `Any` to improve list performance
372    ///
373    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
374    #[must_use]
375    pub fn any_semantic(self) -> Self {
376        self.list_semantic(ListSemantic::Any)
377    }
378
379    /// Disables watch bookmarks to simplify watch handling
380    ///
381    /// This is not recommended to use with production watchers as it can cause desyncs.
382    /// See [#219](https://github.com/kube-rs/kube/issues/219) for details.
383    #[must_use]
384    pub fn disable_bookmarks(mut self) -> Self {
385        self.bookmarks = false;
386        self
387    }
388
389    /// Limits the number of objects retrieved in each list operation during resync.
390    ///
391    /// This can reduce the memory consumption during resyncs, at the cost of requiring more
392    /// API roundtrips to complete.
393    ///
394    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
395    #[must_use]
396    pub fn page_size(mut self, page_size: u32) -> Self {
397        self.page_size = Some(page_size);
398        self
399    }
400
401    /// Kubernetes 1.27 Streaming Lists
402    /// Sets list semantic to `Stream` to make use of watch bookmarks
403    #[must_use]
404    pub fn streaming_lists(mut self) -> Self {
405        self.initial_list_strategy = InitialListStrategy::StreamingList;
406        self
407    }
408
409    /// Converts generic `watcher::Config` structure to the instance of `ListParams` used for list requests.
410    fn to_list_params(&self) -> ListParams {
411        let (resource_version, version_match) = match self.list_semantic {
412            ListSemantic::Any => (Some("0".into()), Some(VersionMatch::NotOlderThan)),
413            ListSemantic::MostRecent => (None, None),
414        };
415        ListParams {
416            label_selector: self.label_selector.clone(),
417            field_selector: self.field_selector.clone(),
418            timeout: self.timeout,
419            version_match,
420            resource_version,
421            // The watcher handles pagination internally.
422            limit: self.page_size,
423            continue_token: None,
424        }
425    }
426
427    /// Converts generic `watcher::Config` structure to the instance of `WatchParams` used for watch requests.
428    fn to_watch_params(&self) -> WatchParams {
429        WatchParams {
430            label_selector: self.label_selector.clone(),
431            field_selector: self.field_selector.clone(),
432            timeout: self.timeout,
433            bookmarks: self.bookmarks,
434            send_initial_events: self.initial_list_strategy == InitialListStrategy::StreamingList,
435        }
436    }
437}
438
439#[async_trait]
440impl<K> ApiMode for FullObject<'_, K>
441where
442    K: Clone + Debug + DeserializeOwned + Send + 'static,
443{
444    type Value = K;
445
446    async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
447        self.api.list(lp).await
448    }
449
450    async fn watch(
451        &self,
452        wp: &WatchParams,
453        version: &str,
454    ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
455        self.api.watch(wp, version).await.map(StreamExt::boxed)
456    }
457}
458
459/// A wrapper around the `Api` of a `Resource` type that when used by the
460/// watcher will return only the metadata associated with an object
461struct MetaOnly<'a, K> {
462    api: &'a Api<K>,
463}
464
465#[async_trait]
466impl<K> ApiMode for MetaOnly<'_, K>
467where
468    K: Clone + Debug + DeserializeOwned + Send + 'static,
469{
470    type Value = PartialObjectMeta<K>;
471
472    async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
473        self.api.list_metadata(lp).await
474    }
475
476    async fn watch(
477        &self,
478        wp: &WatchParams,
479        version: &str,
480    ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
481        self.api.watch_metadata(wp, version).await.map(StreamExt::boxed)
482    }
483}
484
485/// Progresses the watcher a single step, returning (event, state)
486///
487/// This function should be trampolined: if event == `None`
488/// then the function should be called again until it returns a Some.
489#[allow(clippy::too_many_lines)] // for now
490async fn step_trampolined<A>(
491    api: &A,
492    wc: &Config,
493    state: State<A::Value>,
494) -> (Option<Result<Event<A::Value>>>, State<A::Value>)
495where
496    A: ApiMode,
497    A::Value: Resource + 'static,
498{
499    match state {
500        State::Empty => match wc.initial_list_strategy {
501            InitialListStrategy::ListWatch => (Some(Ok(Event::Init)), State::InitPage {
502                continue_token: None,
503                objects: VecDeque::default(),
504                last_bookmark: None,
505            }),
506            InitialListStrategy::StreamingList => match api.watch(&wc.to_watch_params(), "0").await {
507                Ok(stream) => (None, State::InitialWatch { stream }),
508                Err(err) => {
509                    if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
510                        warn!("watch initlist error with 403: {err:?}");
511                    } else {
512                        debug!("watch initlist error: {err:?}");
513                    }
514                    (Some(Err(Error::WatchStartFailed(err))), State::default())
515                }
516            },
517        },
518        State::InitPage {
519            continue_token,
520            mut objects,
521            last_bookmark,
522        } => {
523            if let Some(next) = objects.pop_front() {
524                return (Some(Ok(Event::InitApply(next))), State::InitPage {
525                    continue_token,
526                    objects,
527                    last_bookmark,
528                });
529            }
530            // check if we need to perform more pages
531            if continue_token.is_none() {
532                if let Some(resource_version) = last_bookmark {
533                    // we have drained the last page - move on to next stage
534                    return (Some(Ok(Event::InitDone)), State::InitListed { resource_version });
535                }
536            }
537            let mut lp = wc.to_list_params();
538            lp.continue_token = continue_token;
539            match api.list(&lp).await {
540                Ok(list) => {
541                    let last_bookmark = list.metadata.resource_version.filter(|s| !s.is_empty());
542                    let continue_token = list.metadata.continue_.filter(|s| !s.is_empty());
543                    if last_bookmark.is_none() && continue_token.is_none() {
544                        return (Some(Err(Error::NoResourceVersion)), State::Empty);
545                    }
546                    // Buffer page here, causing us to return to this enum branch (State::InitPage)
547                    // until the objects buffer has drained
548                    (None, State::InitPage {
549                        continue_token,
550                        objects: list.items.into_iter().collect(),
551                        last_bookmark,
552                    })
553                }
554                Err(err) => {
555                    if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
556                        warn!("watch list error with 403: {err:?}");
557                    } else {
558                        debug!("watch list error: {err:?}");
559                    }
560                    (Some(Err(Error::InitialListFailed(err))), State::Empty)
561                }
562            }
563        }
564        State::InitialWatch { mut stream } => {
565            match stream.next().await {
566                Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
567                    (Some(Ok(Event::InitApply(obj))), State::InitialWatch { stream })
568                }
569                Some(Ok(WatchEvent::Deleted(_obj))) => {
570                    // Kubernetes claims these events are impossible
571                    // https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
572                    error!("got deleted event during initial watch. this is a bug");
573                    (None, State::InitialWatch { stream })
574                }
575                Some(Ok(WatchEvent::Bookmark(bm))) => {
576                    let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end");
577                    if marks_initial_end {
578                        (Some(Ok(Event::InitDone)), State::Watching {
579                            resource_version: bm.metadata.resource_version,
580                            stream,
581                        })
582                    } else {
583                        (None, State::InitialWatch { stream })
584                    }
585                }
586                Some(Ok(WatchEvent::Error(err))) => {
587                    // HTTP GONE, means we have desynced and need to start over and re-list :(
588                    let new_state = if err.code == 410 {
589                        State::default()
590                    } else {
591                        State::InitialWatch { stream }
592                    };
593                    if err.code == 403 {
594                        warn!("watcher watchevent error 403: {err:?}");
595                    } else {
596                        debug!("error watchevent error: {err:?}");
597                    }
598                    (Some(Err(Error::WatchError(err))), new_state)
599                }
600                Some(Err(err)) => {
601                    if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
602                        warn!("watcher error 403: {err:?}");
603                    } else {
604                        debug!("watcher error: {err:?}");
605                    }
606                    (Some(Err(Error::WatchFailed(err))), State::InitialWatch { stream })
607                }
608                None => (None, State::default()),
609            }
610        }
611        State::InitListed { resource_version } => {
612            match api.watch(&wc.to_watch_params(), &resource_version).await {
613                Ok(stream) => (None, State::Watching {
614                    resource_version,
615                    stream,
616                }),
617                Err(err) => {
618                    if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
619                        warn!("watch initlist error with 403: {err:?}");
620                    } else {
621                        debug!("watch initlist error: {err:?}");
622                    }
623                    (Some(Err(Error::WatchStartFailed(err))), State::InitListed {
624                        resource_version,
625                    })
626                }
627            }
628        }
629        State::Watching {
630            resource_version,
631            mut stream,
632        } => match stream.next().await {
633            Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
634                let resource_version = obj.resource_version().unwrap_or_default();
635                if resource_version.is_empty() {
636                    (Some(Err(Error::NoResourceVersion)), State::default())
637                } else {
638                    (Some(Ok(Event::Apply(obj))), State::Watching {
639                        resource_version,
640                        stream,
641                    })
642                }
643            }
644            Some(Ok(WatchEvent::Deleted(obj))) => {
645                let resource_version = obj.resource_version().unwrap_or_default();
646                if resource_version.is_empty() {
647                    (Some(Err(Error::NoResourceVersion)), State::default())
648                } else {
649                    (Some(Ok(Event::Delete(obj))), State::Watching {
650                        resource_version,
651                        stream,
652                    })
653                }
654            }
655            Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
656                resource_version: bm.metadata.resource_version,
657                stream,
658            }),
659            Some(Ok(WatchEvent::Error(err))) => {
660                // HTTP GONE, means we have desynced and need to start over and re-list :(
661                let new_state = if err.code == 410 {
662                    State::default()
663                } else {
664                    State::Watching {
665                        resource_version,
666                        stream,
667                    }
668                };
669                if err.code == 403 {
670                    warn!("watcher watchevent error 403: {err:?}");
671                } else {
672                    debug!("error watchevent error: {err:?}");
673                }
674                (Some(Err(Error::WatchError(err))), new_state)
675            }
676            Some(Err(err)) => {
677                if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
678                    warn!("watcher error 403: {err:?}");
679                } else {
680                    debug!("watcher error: {err:?}");
681                }
682                (Some(Err(Error::WatchFailed(err))), State::Watching {
683                    resource_version,
684                    stream,
685                })
686            }
687            None => (None, State::InitListed { resource_version }),
688        },
689    }
690}
691
692/// Trampoline helper for `step_trampolined`
693async fn step<A>(
694    api: &A,
695    config: &Config,
696    mut state: State<A::Value>,
697) -> (Result<Event<A::Value>>, State<A::Value>)
698where
699    A: ApiMode,
700    A::Value: Resource + 'static,
701{
702    loop {
703        match step_trampolined(api, config, state).await {
704            (Some(result), new_state) => return (result, new_state),
705            (None, new_state) => state = new_state,
706        }
707    }
708}
709
710/// Watches a Kubernetes Resource for changes continuously
711///
712/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors.
713///
714/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
715/// You can apply your own backoff by not polling the stream for a duration after errors.
716/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
717/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
718/// will terminate eagerly as soon as they receive an [`Err`].
719///
720/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
721/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
722///
723/// ```no_run
724/// use kube::{
725///   api::{Api, ResourceExt}, Client,
726///   runtime::{watcher, WatchStreamExt}
727/// };
728/// use k8s_openapi::api::core::v1::Pod;
729/// use futures::TryStreamExt;
730/// #[tokio::main]
731/// async fn main() -> Result<(), watcher::Error> {
732///     let client = Client::try_default().await.unwrap();
733///     let pods: Api<Pod> = Api::namespaced(client, "apps");
734///
735///     watcher(pods, watcher::Config::default()).applied_objects()
736///         .try_for_each(|p| async move {
737///          println!("Applied: {}", p.name_any());
738///             Ok(())
739///         })
740///         .await?;
741///     Ok(())
742/// }
743/// ```
744/// [`WatchStreamExt`]: super::WatchStreamExt
745/// [`reflector`]: super::reflector::reflector
746/// [`Api::watch`]: kube_client::Api::watch
747///
748/// # Recovery
749///
750/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
751/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
752/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
753///
754/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
755/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
756/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
757/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
758/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
759pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
760    api: Api<K>,
761    watcher_config: Config,
762) -> impl Stream<Item = Result<Event<K>>> + Send {
763    futures::stream::unfold(
764        (api, watcher_config, State::default()),
765        |(api, watcher_config, state)| async {
766            let (event, state) = step(&FullObject { api: &api }, &watcher_config, state).await;
767            Some((event, (api, watcher_config, state)))
768        },
769    )
770}
771
772/// Watches a Kubernetes Resource for changes continuously and receives only the
773/// metadata
774///
775/// Compared to [`Api::watch_metadata`], this automatically tries to recover the stream upon errors.
776///
777/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
778/// You can apply your own backoff by not polling the stream for a duration after errors.
779/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
780/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
781/// will terminate eagerly as soon as they receive an [`Err`].
782///
783/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
784/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
785///
786/// ```no_run
787/// use kube::{
788///   api::{Api, ResourceExt}, Client,
789///   runtime::{watcher, metadata_watcher, WatchStreamExt}
790/// };
791/// use k8s_openapi::api::core::v1::Pod;
792/// use futures::TryStreamExt;
793/// #[tokio::main]
794/// async fn main() -> Result<(), watcher::Error> {
795///     let client = Client::try_default().await.unwrap();
796///     let pods: Api<Pod> = Api::namespaced(client, "apps");
797///
798///     metadata_watcher(pods, watcher::Config::default()).applied_objects()
799///         .try_for_each(|p| async move {
800///          println!("Applied: {}", p.name_any());
801///             Ok(())
802///         })
803///         .await?;
804///     Ok(())
805/// }
806/// ```
807/// [`WatchStreamExt`]: super::WatchStreamExt
808/// [`reflector`]: super::reflector::reflector
809/// [`Api::watch`]: kube_client::Api::watch
810///
811/// # Recovery
812///
813/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
814/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
815/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
816///
817/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
818/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
819/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
820/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
821/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
822#[allow(clippy::module_name_repetitions)]
823pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
824    api: Api<K>,
825    watcher_config: Config,
826) -> impl Stream<Item = Result<Event<PartialObjectMeta<K>>>> + Send {
827    futures::stream::unfold(
828        (api, watcher_config, State::default()),
829        |(api, watcher_config, state)| async {
830            let (event, state) = step(&MetaOnly { api: &api }, &watcher_config, state).await;
831            Some((event, (api, watcher_config, state)))
832        },
833    )
834}
835
836/// Watch a single named object for updates
837///
838/// Emits `None` if the object is deleted (or not found), and `Some` if an object is updated (or created/found).
839///
840/// Often invoked indirectly via [`await_condition`](crate::wait::await_condition()).
841///
842/// ## Scope Warning
843///
844/// When using this with an `Api::all` on namespaced resources there is a chance of duplicated names.
845/// To avoid getting confusing / wrong answers for this, use `Api::namespaced` bound to a specific namespace
846/// when watching for transitions to namespaced objects.
847pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
848    api: Api<K>,
849    name: &str,
850) -> impl Stream<Item = Result<Option<K>>> + Send {
851    // filtering by object name in given scope, so there's at most one matching object
852    // footgun: Api::all may generate events from namespaced objects with the same name in different namespaces
853    let fields = format!("metadata.name={name}");
854    watcher(api, Config::default().fields(&fields))
855        // The `obj_seen` state is used to track whether the object exists in each Init / InitApply / InitDone
856        // sequence of events. If the object wasn't seen in any particular sequence it is treated as deleted and
857        // `None` is emitted when the InitDone event is received.
858        //
859        // The first check ensures `None` is emitted if the object was already gone (or not found), subsequent
860        // checks ensure `None` is emitted even if for some reason the Delete event wasn't received, which
861        // could happen given K8S events aren't guaranteed delivery.
862        .scan(false, |obj_seen, event| {
863            if matches!(event, Ok(Event::Init)) {
864                *obj_seen = false;
865            } else if matches!(event, Ok(Event::InitApply(_))) {
866                *obj_seen = true;
867            }
868            future::ready(Some((*obj_seen, event)))
869        })
870        .filter_map(|(obj_seen, event)| async move {
871            match event {
872                // Pass up `Some` for Found / Updated
873                Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
874                // Pass up `None` for Deleted
875                Ok(Event::Delete(_)) => Some(Ok(None)),
876                // Pass up `None` if the object wasn't seen in the initial list
877                Ok(Event::InitDone) if !obj_seen => Some(Ok(None)),
878                // Ignore marker events
879                Ok(Event::Init | Event::InitDone) => None,
880                // Bubble up errors
881                Err(err) => Some(Err(err)),
882            }
883        })
884}
885
886pub struct ExponentialBackoff {
887    inner: backon::ExponentialBackoff,
888    builder: backon::ExponentialBuilder,
889}
890
891impl ExponentialBackoff {
892    fn new(min_delay: Duration, max_delay: Duration, factor: f32, enable_jitter: bool) -> Self {
893        let builder = backon::ExponentialBuilder::default()
894            .with_min_delay(min_delay)
895            .with_max_delay(max_delay)
896            .with_factor(factor)
897            .without_max_times();
898
899        if enable_jitter {
900            builder.with_jitter();
901        }
902
903        Self {
904            inner: builder.build(),
905            builder,
906        }
907    }
908}
909
910impl Backoff for ExponentialBackoff {
911    fn reset(&mut self) {
912        self.inner = self.builder.build();
913    }
914}
915
916impl Iterator for ExponentialBackoff {
917    type Item = Duration;
918
919    fn next(&mut self) -> Option<Self::Item> {
920        self.inner.next()
921    }
922}
923
924impl From<backon::ExponentialBuilder> for ExponentialBackoff {
925    fn from(builder: backon::ExponentialBuilder) -> Self {
926        Self {
927            inner: builder.build(),
928            builder,
929        }
930    }
931}
932
933/// Default watcher backoff inspired by Kubernetes' client-go.
934///
935/// The parameters currently optimize for being kind to struggling apiservers.
936/// The exact parameters are taken from
937/// [client-go's reflector source](https://github.com/kubernetes/client-go/blob/980663e185ab6fc79163b1c2565034f6d58368db/tools/cache/reflector.go#L177-L181)
938/// and should not be considered stable.
939///
940/// This struct implements [`Backoff`] and is the default strategy used
941/// when calling `WatchStreamExt::default_backoff`. If you need to create
942/// this manually then [`DefaultBackoff::default`] can be used.
943pub struct DefaultBackoff(Strategy);
944type Strategy = ResetTimerBackoff<ExponentialBackoff>;
945
946impl Default for DefaultBackoff {
947    fn default() -> Self {
948        Self(ResetTimerBackoff::new(
949            ExponentialBackoff::new(Duration::from_millis(800), Duration::from_secs(30), 2.0, true),
950            Duration::from_secs(120),
951        ))
952    }
953}
954
955impl Iterator for DefaultBackoff {
956    type Item = Duration;
957
958    fn next(&mut self) -> Option<Self::Item> {
959        self.0.next()
960    }
961}
962
963impl Backoff for DefaultBackoff {
964    fn reset(&mut self) {
965        self.0.reset();
966    }
967}