kube_runtime/controller/
mod.rs

1//! Runs a user-supplied reconciler function on objects when they (or related objects) are updated
2
3use self::runner::Runner;
4use crate::{
5    reflector::{
6        self, reflector,
7        store::{Store, Writer},
8        ObjectRef,
9    },
10    scheduler::{debounced_scheduler, ScheduleRequest},
11    utils::{
12        trystream_try_via, Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt,
13    },
14    watcher::{self, metadata_watcher, watcher, DefaultBackoff},
15};
16use educe::Educe;
17use futures::{
18    channel,
19    future::{self, BoxFuture},
20    stream, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
21};
22use kube_client::api::{Api, DynamicObject, Resource};
23use pin_project::pin_project;
24use serde::de::DeserializeOwned;
25use std::{
26    fmt::{Debug, Display},
27    future::Future,
28    hash::Hash,
29    sync::Arc,
30    task::{ready, Poll},
31    time::Duration,
32};
33use stream::BoxStream;
34use thiserror::Error;
35use tokio::{runtime::Handle, time::Instant};
36use tracing::{info_span, Instrument};
37
38mod future_hash_map;
39mod runner;
40
41pub type RunnerError = runner::Error<reflector::store::WriterDropped>;
42
43#[derive(Debug, Error)]
44pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> {
45    #[error("tried to reconcile object {0} that was not found in local store")]
46    ObjectNotFound(ObjectRef<DynamicObject>),
47    #[error("reconciler for object {1} failed")]
48    ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>),
49    #[error("event queue error")]
50    QueueError(#[source] QueueErr),
51    #[error("runner error")]
52    RunnerError(#[source] RunnerError),
53}
54
55/// Results of the reconciliation attempt
56#[derive(Debug, Clone, Eq, PartialEq)]
57pub struct Action {
58    /// Whether (and when) to next trigger the reconciliation if no external watch triggers hit
59    ///
60    /// For example, use this to query external systems for updates, expire time-limited resources, or
61    /// (in your `error_policy`) retry after errors.
62    requeue_after: Option<Duration>,
63}
64
65impl Action {
66    /// Action to the reconciliation at this time even if no external watch triggers hit
67    ///
68    /// This is the best-practice action that ensures eventual consistency of your controller
69    /// even in the case of missed changes (which can happen).
70    ///
71    /// Watch events are not normally missed, so running this once per hour (`Default`) as a fallback is reasonable.
72    #[must_use]
73    pub fn requeue(duration: Duration) -> Self {
74        Self {
75            requeue_after: Some(duration),
76        }
77    }
78
79    /// Do nothing until a change is detected
80    ///
81    /// This stops the controller periodically reconciling this object until a relevant watch event
82    /// was **detected**.
83    ///
84    /// **Warning**: If you have watch desyncs, it is possible to miss changes entirely.
85    /// It is therefore not recommended to disable requeuing this way, unless you have
86    /// frequent changes to the underlying object, or some other hook to retain eventual consistency.
87    #[must_use]
88    pub fn await_change() -> Self {
89        Self { requeue_after: None }
90    }
91}
92
93/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
94pub fn trigger_with<T, K, I, S>(
95    stream: S,
96    mapper: impl Fn(T) -> I,
97) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
98where
99    S: TryStream<Ok = T>,
100    I: IntoIterator,
101    I::Item: Into<ReconcileRequest<K>>,
102    K: Resource,
103{
104    stream
105        .map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Into::into).map(Ok)))
106        .try_flatten()
107}
108
109/// Enqueues the object itself for reconciliation
110pub fn trigger_self<K, S>(
111    stream: S,
112    dyntype: K::DynamicType,
113) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
114where
115    S: TryStream<Ok = K>,
116    K: Resource,
117    K::DynamicType: Clone,
118{
119    trigger_with(stream, move |obj| {
120        Some(ReconcileRequest {
121            obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
122            reason: ReconcileReason::ObjectUpdated,
123        })
124    })
125}
126
127/// Enqueues the object itself for reconciliation when the object is behind a
128/// shared pointer
129#[cfg(feature = "unstable-runtime-subscribe")]
130fn trigger_self_shared<K, S>(
131    stream: S,
132    dyntype: K::DynamicType,
133) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
134where
135    // Input stream has item as some Arc'd Resource (via
136    // Controller::for_shared_stream)
137    S: TryStream<Ok = Arc<K>>,
138    K: Resource,
139    K::DynamicType: Clone,
140{
141    trigger_with(stream, move |obj| {
142        Some(ReconcileRequest {
143            obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
144            reason: ReconcileReason::ObjectUpdated,
145        })
146    })
147}
148
149/// Enqueues any mapper returned `K` types for reconciliation
150fn trigger_others<S, K, I>(
151    stream: S,
152    mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
153    dyntype: <S::Ok as Resource>::DynamicType,
154) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
155where
156    // Input stream has items as some Resource (via Controller::watches)
157    S: TryStream,
158    S::Ok: Resource,
159    <S::Ok as Resource>::DynamicType: Clone,
160    // Output stream is requests for the root type K
161    K: Resource,
162    K::DynamicType: Clone,
163    // but the mapper can produce many of them
164    I: 'static + IntoIterator<Item = ObjectRef<K>>,
165    I::IntoIter: Send,
166{
167    trigger_with(stream, move |obj| {
168        let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
169        mapper(obj)
170            .into_iter()
171            .map(move |mapped_obj_ref| ReconcileRequest {
172                obj_ref: mapped_obj_ref,
173                reason: ReconcileReason::RelatedObjectUpdated {
174                    obj_ref: Box::new(watch_ref.clone()),
175                },
176            })
177    })
178}
179
180/// Enqueues any mapper returned `Arc<K>` types for reconciliation
181#[cfg(feature = "unstable-runtime-subscribe")]
182fn trigger_others_shared<S, O, K, I>(
183    stream: S,
184    mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
185    dyntype: O::DynamicType,
186) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
187where
188    // Input is some shared resource (`Arc<O>`) obtained via `reflect`
189    S: TryStream<Ok = Arc<O>>,
190    O: Resource,
191    O::DynamicType: Clone,
192    // Output stream is requests for the root type K
193    K: Resource,
194    K::DynamicType: Clone,
195    // but the mapper can produce many of them
196    I: 'static + IntoIterator<Item = ObjectRef<K>>,
197    I::IntoIter: Send,
198{
199    trigger_with(stream, move |obj| {
200        let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase();
201        mapper(obj)
202            .into_iter()
203            .map(move |mapped_obj_ref| ReconcileRequest {
204                obj_ref: mapped_obj_ref,
205                reason: ReconcileReason::RelatedObjectUpdated {
206                    obj_ref: Box::new(watch_ref.clone()),
207                },
208            })
209    })
210}
211
212/// Enqueues any owners of type `KOwner` for reconciliation
213pub fn trigger_owners<KOwner, S>(
214    stream: S,
215    owner_type: KOwner::DynamicType,
216    child_type: <S::Ok as Resource>::DynamicType,
217) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
218where
219    S: TryStream,
220    S::Ok: Resource,
221    <S::Ok as Resource>::DynamicType: Clone,
222    KOwner: Resource,
223    KOwner::DynamicType: Clone,
224{
225    let mapper = move |obj: S::Ok| {
226        let meta = obj.meta().clone();
227        let ns = meta.namespace;
228        let owner_type = owner_type.clone();
229        meta.owner_references
230            .into_iter()
231            .flatten()
232            .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
233    };
234    trigger_others(stream, mapper, child_type)
235}
236
237// TODO: do we really need to deal with a trystream? can we simplify this at
238// all?
239/// Enqueues any owners of type `KOwner` for reconciliation based on a stream of
240/// owned `K` objects
241#[cfg(feature = "unstable-runtime-subscribe")]
242fn trigger_owners_shared<KOwner, S, K>(
243    stream: S,
244    owner_type: KOwner::DynamicType,
245    child_type: K::DynamicType,
246) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
247where
248    S: TryStream<Ok = Arc<K>>,
249    K: Resource,
250    K::DynamicType: Clone,
251    KOwner: Resource,
252    KOwner::DynamicType: Clone,
253{
254    let mapper = move |obj: S::Ok| {
255        let meta = obj.meta().clone();
256        let ns = meta.namespace;
257        let owner_type = owner_type.clone();
258        meta.owner_references
259            .into_iter()
260            .flatten()
261            .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
262    };
263    trigger_others_shared(stream, mapper, child_type)
264}
265
266/// A request to reconcile an object, annotated with why that request was made.
267///
268/// NOTE: The reason is ignored for comparison purposes. This means that, for example,
269/// an object can only occupy one scheduler slot, even if it has been scheduled for multiple reasons.
270/// In this case, only *the first* reason is stored.
271#[derive(Educe)]
272#[educe(
273    Debug(bound("K::DynamicType: Debug")),
274    Clone(bound("K::DynamicType: Clone")),
275    PartialEq(bound("K::DynamicType: PartialEq")),
276    Hash(bound("K::DynamicType: Hash"))
277)]
278pub struct ReconcileRequest<K: Resource> {
279    pub obj_ref: ObjectRef<K>,
280    #[educe(PartialEq(ignore), Hash(ignore))]
281    pub reason: ReconcileReason,
282}
283
284impl<K: Resource> Eq for ReconcileRequest<K> where K::DynamicType: Eq {}
285
286impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> {
287    fn from(obj_ref: ObjectRef<K>) -> Self {
288        ReconcileRequest {
289            obj_ref,
290            reason: ReconcileReason::Unknown,
291        }
292    }
293}
294
295#[derive(Debug, Clone)]
296pub enum ReconcileReason {
297    Unknown,
298    ObjectUpdated,
299    RelatedObjectUpdated { obj_ref: Box<ObjectRef<DynamicObject>> },
300    ReconcilerRequestedRetry,
301    ErrorPolicyRequestedRetry,
302    BulkReconcile,
303    Custom { reason: String },
304}
305
306impl Display for ReconcileReason {
307    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308        match self {
309            ReconcileReason::Unknown => f.write_str("unknown"),
310            ReconcileReason::ObjectUpdated => f.write_str("object updated"),
311            ReconcileReason::RelatedObjectUpdated { obj_ref: object } => {
312                f.write_fmt(format_args!("related object updated: {object}"))
313            }
314            ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"),
315            ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"),
316            ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"),
317            ReconcileReason::Custom { reason } => f.write_str(reason),
318        }
319    }
320}
321
322const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
323
324/// Apply a reconciler to an input stream, with a given retry policy
325///
326/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector()`].
327///
328/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be
329/// the [`reflector()`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
330/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector()`]
331/// with a [`watcher()`] or [`reflector()`] for the subobject.
332///
333/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
334/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
335#[allow(clippy::needless_pass_by_value)]
336#[allow(clippy::type_complexity)]
337pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
338    mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
339    error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
340    context: Arc<Ctx>,
341    store: Store<K>,
342    queue: QueueStream,
343    config: Config,
344) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
345where
346    K: Clone + Resource + 'static,
347    K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
348    ReconcilerFut: TryFuture<Ok = Action> + Unpin,
349    ReconcilerFut::Error: std::error::Error + 'static,
350    QueueStream: TryStream,
351    QueueStream::Ok: Into<ReconcileRequest<K>>,
352    QueueStream::Error: std::error::Error + 'static,
353{
354    let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
355    let (scheduler_tx, scheduler_rx) =
356        channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
357    let error_policy = Arc::new(error_policy);
358    let delay_store = store.clone();
359    // Create a stream of ObjectRefs that need to be reconciled
360    trystream_try_via(
361        // input: stream combining scheduled tasks and user specified inputs event
362        Box::pin(stream::select(
363            // 1. inputs from users queue stream
364            queue
365                .map_err(Error::QueueError)
366                .map_ok(|request| ScheduleRequest {
367                    message: request.into(),
368                    run_at: Instant::now(),
369                })
370                .on_complete(async move {
371                    // On error: scheduler has already been shut down and there is nothing for us to do
372                    let _ = scheduler_shutdown_tx.send(());
373                    tracing::debug!("applier queue terminated, starting graceful shutdown")
374                }),
375            // 2. requests sent to scheduler_tx
376            scheduler_rx
377                .map(Ok)
378                .take_until(scheduler_shutdown_rx)
379                .on_complete(async { tracing::debug!("applier scheduler consumer terminated") }),
380        )),
381        // all the Oks from the select gets passed through the scheduler stream, and are then executed
382        move |s| {
383            Runner::new(
384                debounced_scheduler(s, config.debounce),
385                config.concurrency,
386                move |request| {
387                    let request = request.clone();
388                    match store.get(&request.obj_ref) {
389                        Some(obj) => {
390                            let scheduler_tx = scheduler_tx.clone();
391                            let error_policy_ctx = context.clone();
392                            let error_policy = error_policy.clone();
393                            let reconciler_span = info_span!(
394                                "reconciling object",
395                                "object.ref" = %request.obj_ref,
396                                object.reason = %request.reason
397                            );
398                            reconciler_span
399                                .in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
400                                .into_future()
401                                .then(move |res| {
402                                    let error_policy = error_policy;
403                                    RescheduleReconciliation::new(
404                                        res,
405                                        |err| error_policy(obj, err, error_policy_ctx),
406                                        request.obj_ref.clone(),
407                                        scheduler_tx,
408                                    )
409                                    // Reconciler errors are OK from the applier's PoV, we need to apply the error policy
410                                    // to them separately
411                                    .map(|res| Ok((request.obj_ref, res)))
412                                })
413                                .instrument(reconciler_span)
414                                .left_future()
415                        }
416                        None => std::future::ready(Err(Error::ObjectNotFound(request.obj_ref.erase())))
417                            .right_future(),
418                    }
419                },
420            )
421            .delay_tasks_until(async move {
422                tracing::debug!("applier runner held until store is ready");
423                let res = delay_store.wait_until_ready().await;
424                tracing::debug!("store is ready, starting runner");
425                res
426            })
427            .map(|runner_res| runner_res.unwrap_or_else(|err| Err(Error::RunnerError(err))))
428            .on_complete(async { tracing::debug!("applier runner terminated") })
429        },
430    )
431    .on_complete(async { tracing::debug!("applier runner-merge terminated") })
432    // finally, for each completed reconcile call:
433    .and_then(move |(obj_ref, reconciler_result)| async move {
434        match reconciler_result {
435            Ok(action) => Ok((obj_ref, action)),
436            Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())),
437        }
438    })
439    .on_complete(async { tracing::debug!("applier terminated") })
440}
441
442/// Internal helper [`Future`] that reschedules reconciliation of objects (if required), in the scheduled context of the reconciler
443///
444/// This could be an `async fn`, but isn't because we want it to be [`Unpin`]
445#[pin_project]
446#[must_use]
447struct RescheduleReconciliation<K: Resource, ReconcilerErr> {
448    reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
449
450    reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
451    result: Option<Result<Action, ReconcilerErr>>,
452}
453
454impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr>
455where
456    K: Resource,
457{
458    fn new(
459        result: Result<Action, ReconcilerErr>,
460        error_policy: impl FnOnce(&ReconcilerErr) -> Action,
461        obj_ref: ObjectRef<K>,
462        reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
463    ) -> Self {
464        let reconciler_finished_at = Instant::now();
465
466        let (action, reschedule_reason) = result.as_ref().map_or_else(
467            |err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
468            |action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
469        );
470
471        Self {
472            reschedule_tx,
473            reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
474                message: ReconcileRequest {
475                    obj_ref,
476                    reason: reschedule_reason,
477                },
478                run_at: reconciler_finished_at
479                    .checked_add(requeue_after)
480                    .unwrap_or_else(crate::scheduler::far_future),
481            }),
482            result: Some(result),
483        }
484    }
485}
486
487impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr>
488where
489    K: Resource,
490{
491    type Output = Result<Action, ReconcilerErr>;
492
493    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
494        let this = self.get_mut();
495
496        if this.reschedule_request.is_some() {
497            let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
498            let reschedule_request = this
499                .reschedule_request
500                .take()
501                .expect("PostReconciler::reschedule_request was taken during processing");
502            // Failure to schedule item = in graceful shutdown mode, ignore
503            if let Ok(()) = rescheduler_ready {
504                let _ = this.reschedule_tx.start_send(reschedule_request);
505            }
506        }
507
508        Poll::Ready(
509            this.result
510                .take()
511                .expect("PostReconciler::result was already taken"),
512        )
513    }
514}
515
516/// Accumulates all options that can be used on a [`Controller`] invocation.
517#[derive(Clone, Debug, Default)]
518pub struct Config {
519    debounce: Duration,
520    concurrency: u16,
521}
522
523impl Config {
524    /// The debounce duration used to deduplicate reconciliation requests.
525    ///
526    /// When set to a non-zero duration, debouncing is enabled in the [`scheduler`](crate::scheduler())
527    /// resulting in __trailing edge debouncing__ of reconciler requests.
528    /// This option can help to reduce the amount of unnecessary reconciler calls
529    /// when using multiple controller relations, or during rapid phase transitions.
530    ///
531    /// ## Warning
532    /// This option delays (and keeps delaying) reconcile requests for objects while
533    /// the object is updated. It can **permanently hide** updates from your reconciler
534    /// if set too high on objects that are updated frequently (like nodes).
535    #[must_use]
536    pub fn debounce(mut self, debounce: Duration) -> Self {
537        self.debounce = debounce;
538        self
539    }
540
541    /// The number of concurrent reconciliations of that are allowed to run at an given moment.
542    ///
543    /// This can be adjusted to the controller's needs to increase
544    /// performance and/or make performance predictable. By default, its 0 meaning
545    /// the controller runs with unbounded concurrency.
546    ///
547    /// Note that despite concurrency, a controller never schedules concurrent reconciles
548    /// on the same object.
549    #[must_use]
550    pub fn concurrency(mut self, concurrency: u16) -> Self {
551        self.concurrency = concurrency;
552        self
553    }
554}
555
556/// Controller for a Resource `K`
557///
558/// A controller is an infinite stream of objects to be reconciled.
559///
560/// Once `run` and continuously awaited, it continuously calls out to user provided
561/// `reconcile` and `error_policy` callbacks whenever relevant changes are detected
562/// or if errors are seen from `reconcile`.
563///
564/// Reconciles are generally requested for all changes on your root objects.
565/// Changes to managed child resources will also trigger the reconciler for the
566/// managing object by traversing owner references (for `Controller::owns`),
567/// or traverse a custom mapping (for `Controller::watches`).
568///
569/// This mapping mechanism ultimately hides the reason for the reconciliation request,
570/// and forces you to write an idempotent reconciler.
571///
572/// General setup:
573/// ```no_run
574/// use kube::{Api, Client, CustomResource};
575/// use kube::runtime::{controller::{Controller, Action}, watcher};
576/// # use serde::{Deserialize, Serialize};
577/// # use tokio::time::Duration;
578/// use futures::StreamExt;
579/// use k8s_openapi::api::core::v1::ConfigMap;
580/// use schemars::JsonSchema;
581/// # use std::sync::Arc;
582/// use thiserror::Error;
583///
584/// #[derive(Debug, Error)]
585/// enum Error {}
586///
587/// /// A custom resource
588/// #[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
589/// #[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
590/// struct ConfigMapGeneratorSpec {
591///     content: String,
592/// }
593///
594/// /// The reconciler that will be called when either object change
595/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
596///     // .. use api here to reconcile a child ConfigMap with ownerreferences
597///     // see configmapgen_controller example for full info
598///     Ok(Action::requeue(Duration::from_secs(300)))
599/// }
600/// /// an error handler that will be called when the reconciler fails with access to both the
601/// /// object that caused the failure and the actual error
602/// fn error_policy(obj: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<()>) -> Action {
603///     Action::requeue(Duration::from_secs(60))
604/// }
605///
606/// /// something to drive the controller
607/// #[tokio::main]
608/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
609///     let client = Client::try_default().await?;
610///     let context = Arc::new(()); // bad empty context - put client in here
611///     let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
612///     let cms = Api::<ConfigMap>::all(client.clone());
613///     Controller::new(cmgs, watcher::Config::default())
614///         .owns(cms, watcher::Config::default())
615///         .run(reconcile, error_policy, context)
616///         .for_each(|res| async move {
617///             match res {
618///                 Ok(o) => println!("reconciled {:?}", o),
619///                 Err(e) => println!("reconcile failed: {:?}", e),
620///             }
621///         })
622///         .await; // controller does nothing unless polled
623///     Ok(())
624/// }
625/// ```
626pub struct Controller<K>
627where
628    K: Clone + Resource + Debug + 'static,
629    K::DynamicType: Eq + Hash,
630{
631    // NB: Need to Unpin for stream::select_all
632    trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
633    trigger_backoff: Box<dyn Backoff + Send>,
634    /// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete,
635    /// refusing to start any new reconciliations but letting any existing ones finish.
636    graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
637    /// [`run`](crate::Controller::run) terminates immediately when any of these [`Future`]s complete,
638    /// requesting that all running reconciliations be aborted.
639    /// However, note that they *will* keep running until their next yield point (`.await`),
640    /// blocking [`tokio::runtime::Runtime`] destruction (unless you follow up by calling [`std::process::exit`] after `run`).
641    forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
642    dyntype: K::DynamicType,
643    reader: Store<K>,
644    config: Config,
645}
646
647impl<K> Controller<K>
648where
649    K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
650    K::DynamicType: Eq + Hash + Clone,
651{
652    /// Create a Controller for a resource `K`
653    ///
654    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
655    ///
656    /// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage
657    /// and receive reconcile events for.
658    /// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`].
659    #[must_use]
660    pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self
661    where
662        K::DynamicType: Default,
663    {
664        Self::new_with(main_api, wc, Default::default())
665    }
666
667    /// Create a Controller for a resource `K`
668    ///
669    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
670    ///
671    /// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
672    /// to watch - in the Api's  configured scope - and receive reconcile events for.
673    /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
674    ///
675    /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
676    ///
677    /// [`watcher::Config`]: crate::watcher::Config
678    /// [`Api`]: kube_client::Api
679    /// [`dynamic`]: kube_client::core::dynamic
680    /// [`Config::default`]: crate::watcher::Config::default
681    pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {
682        let writer = Writer::<K>::new(dyntype.clone());
683        let reader = writer.as_reader();
684        let mut trigger_selector = stream::SelectAll::new();
685        let self_watcher = trigger_self(
686            reflector(writer, watcher(main_api, wc)).applied_objects(),
687            dyntype.clone(),
688        )
689        .boxed();
690        trigger_selector.push(self_watcher);
691        Self {
692            trigger_selector,
693            trigger_backoff: Box::<DefaultBackoff>::default(),
694            graceful_shutdown_selector: vec![
695                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
696                future::pending().boxed(),
697            ],
698            forceful_shutdown_selector: vec![
699                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
700                future::pending().boxed(),
701            ],
702            dyntype,
703            reader,
704            config: Default::default(),
705        }
706    }
707
708    /// Create a Controller for a resource `K` from a stream of `K` objects
709    ///
710    /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
711    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
712    /// as well as sharing input streams between multiple controllers.
713    ///
714    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
715    ///
716    /// # Example:
717    ///
718    /// ```no_run
719    /// # use futures::StreamExt;
720    /// # use k8s_openapi::api::apps::v1::Deployment;
721    /// # use kube::runtime::controller::{Action, Controller};
722    /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
723    /// # use kube::{Api, Client, Error, ResourceExt};
724    /// # use std::sync::Arc;
725    /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
726    /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
727    /// # async fn doc(client: kube::Client) {
728    /// let api: Api<Deployment> = Api::default_namespaced(client);
729    /// let (reader, writer) = reflector::store();
730    /// let deploys = watcher(api, watcher::Config::default())
731    ///     .default_backoff()
732    ///     .reflect(writer)
733    ///     .applied_objects()
734    ///     .predicate_filter(predicates::generation);
735    ///
736    /// Controller::for_stream(deploys, reader)
737    ///     .run(reconcile, error_policy, Arc::new(()))
738    ///     .for_each(|_| std::future::ready(()))
739    ///     .await;
740    /// # }
741    /// ```
742    ///
743    /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
744    #[cfg(feature = "unstable-runtime-stream-control")]
745    pub fn for_stream(
746        trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
747        reader: Store<K>,
748    ) -> Self
749    where
750        K::DynamicType: Default,
751    {
752        Self::for_stream_with(trigger, reader, Default::default())
753    }
754
755    /// Create a Controller for a resource `K` from a stream of `K` objects
756    ///
757    /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
758    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
759    /// as well as sharing input streams between multiple controllers.
760    ///
761    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
762    ///
763    /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
764    ///
765    /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::for_stream`] for static types.
766    ///
767    /// [`dynamic`]: kube_client::core::dynamic
768    #[cfg(feature = "unstable-runtime-stream-control")]
769    pub fn for_stream_with(
770        trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
771        reader: Store<K>,
772        dyntype: K::DynamicType,
773    ) -> Self {
774        let mut trigger_selector = stream::SelectAll::new();
775        let self_watcher = trigger_self(trigger, dyntype.clone()).boxed();
776        trigger_selector.push(self_watcher);
777        Self {
778            trigger_selector,
779            trigger_backoff: Box::<DefaultBackoff>::default(),
780            graceful_shutdown_selector: vec![
781                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
782                future::pending().boxed(),
783            ],
784            forceful_shutdown_selector: vec![
785                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
786                future::pending().boxed(),
787            ],
788            dyntype,
789            reader,
790            config: Default::default(),
791        }
792    }
793
794    /// This is the same as [`Controller::for_stream`]. Instead of taking an
795    /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
796    /// streams can be created out-of-band by subscribing on a store `Writer`.
797    /// Through this interface, multiple controllers can use the same root
798    /// (shared) input stream of resources to keep memory overheads smaller.
799    ///
800    /// **N.B**: This constructor requires an
801    /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
802    /// feature.
803    ///
804    /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
805    /// need to share the stream.
806    ///
807    /// ## Warning:
808    ///
809    /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
810    /// is driven to readiness independently of this controller to ensure the
811    /// watcher never deadlocks.
812    ///
813    /// # Example:
814    ///
815    /// ```no_run
816    /// # use futures::StreamExt;
817    /// # use k8s_openapi::api::apps::v1::Deployment;
818    /// # use kube::runtime::controller::{Action, Controller};
819    /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
820    /// # use kube::{Api, Client, Error, ResourceExt};
821    /// # use std::sync::Arc;
822    /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
823    /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
824    /// # async fn doc(client: kube::Client) {
825    /// let api: Api<Deployment> = Api::default_namespaced(client);
826    /// let (reader, writer) = reflector::store_shared(128);
827    /// let subscriber = writer
828    ///     .subscribe()
829    ///     .expect("subscribers can only be created from shared stores");
830    /// let deploys = watcher(api, watcher::Config::default())
831    ///     .default_backoff()
832    ///     .reflect(writer)
833    ///     .applied_objects()
834    ///     .for_each(|ev| async move {
835    ///         match ev {
836    ///             Ok(obj) => tracing::info!("got obj {obj:?}"),
837    ///             Err(error) => tracing::error!(%error, "received error")
838    ///         }
839    ///     });
840    ///
841    /// let controller = Controller::for_shared_stream(subscriber, reader)
842    ///     .run(reconcile, error_policy, Arc::new(()))
843    ///     .for_each(|ev| async move {
844    ///         tracing::info!("reconciled {ev:?}")
845    ///     });
846    ///
847    /// // Drive streams using a select statement
848    /// tokio::select! {
849    ///   _ = deploys => {},
850    ///   _ = controller => {},
851    /// }
852    /// # }
853    #[cfg(feature = "unstable-runtime-subscribe")]
854    pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
855    where
856        K::DynamicType: Default,
857    {
858        Self::for_shared_stream_with(trigger, reader, Default::default())
859    }
860
861    /// This is the same as [`Controller::for_stream`]. Instead of taking an
862    /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
863    /// streams can be created out-of-band by subscribing on a store `Writer`.
864    /// Through this interface, multiple controllers can use the same root
865    /// (shared) input stream of resources to keep memory overheads smaller.
866    ///
867    /// **N.B**: This constructor requires an
868    /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
869    /// feature.
870    ///
871    /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
872    /// need to share the stream.
873    ///
874    /// This variant constructor is used for [`dynamic`] types found through
875    /// discovery. Prefer [`Controller::for_shared_stream`] for static types (i.e.
876    /// known at compile time).
877    ///
878    /// [`dynamic`]: kube_client::core::dynamic
879    #[cfg(feature = "unstable-runtime-subscribe")]
880    pub fn for_shared_stream_with(
881        trigger: impl Stream<Item = Arc<K>> + Send + 'static,
882        reader: Store<K>,
883        dyntype: K::DynamicType,
884    ) -> Self {
885        let mut trigger_selector = stream::SelectAll::new();
886        let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
887        trigger_selector.push(self_watcher);
888        Self {
889            trigger_selector,
890            trigger_backoff: Box::<DefaultBackoff>::default(),
891            graceful_shutdown_selector: vec![
892                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
893                future::pending().boxed(),
894            ],
895            forceful_shutdown_selector: vec![
896                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
897                future::pending().boxed(),
898            ],
899            dyntype,
900            reader,
901            config: Default::default(),
902        }
903    }
904
905    /// Specify the configuration for the controller's behavior.
906    #[must_use]
907    pub fn with_config(mut self, config: Config) -> Self {
908        self.config = config;
909        self
910    }
911
912    /// Specify the backoff policy for "trigger" watches
913    ///
914    /// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`].
915    ///
916    /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
917    /// but can be overridden by calling this method.
918    #[must_use]
919    pub fn trigger_backoff(mut self, backoff: impl Backoff + 'static) -> Self {
920        self.trigger_backoff = Box::new(backoff);
921        self
922    }
923
924    /// Retrieve a copy of the reader before starting the controller
925    pub fn store(&self) -> Store<K> {
926        self.reader.clone()
927    }
928
929    /// Specify `Child` objects which `K` owns and should be watched
930    ///
931    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Child`.
932    /// All owned `Child` objects **must** contain an [`OwnerReference`] pointing back to a `K`.
933    ///
934    /// The [`watcher::Config`] controls the subset of `Child` objects that you want the [`Api`]
935    /// to watch - in the Api's configured scope - and receive reconcile events for.
936    /// To watch the full set of `Child` objects in the given `Api` scope, you can use [`watcher::Config::default`].
937    ///
938    /// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
939    #[must_use]
940    pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
941        self,
942        api: Api<Child>,
943        wc: watcher::Config,
944    ) -> Self {
945        self.owns_with(api, (), wc)
946    }
947
948    /// Specify `Child` objects which `K` owns and should be watched
949    ///
950    /// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources.
951    #[must_use]
952    pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
953        mut self,
954        api: Api<Child>,
955        dyntype: Child::DynamicType,
956        wc: watcher::Config,
957    ) -> Self
958    where
959        Child::DynamicType: Debug + Eq + Hash + Clone,
960    {
961        // TODO: call owns_stream_with when it's stable
962        let child_watcher = trigger_owners(
963            metadata_watcher(api, wc).touched_objects(),
964            self.dyntype.clone(),
965            dyntype,
966        );
967        self.trigger_selector.push(child_watcher.boxed());
968        self
969    }
970
971    /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
972    ///
973    /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
974    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
975    /// as well as sharing input streams between multiple controllers.
976    ///
977    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
978    ///
979    /// Watcher streams passed in here should be filtered first through `touched_objects`.
980    ///
981    /// # Example:
982    ///
983    /// ```no_run
984    /// # use futures::StreamExt;
985    /// # use k8s_openapi::api::core::v1::ConfigMap;
986    /// # use k8s_openapi::api::apps::v1::StatefulSet;
987    /// # use kube::runtime::controller::Action;
988    /// # use kube::runtime::{predicates, metadata_watcher, watcher, Controller, WatchStreamExt};
989    /// # use kube::{Api, Client, Error, ResourceExt};
990    /// # use std::sync::Arc;
991    /// # type CustomResource = ConfigMap;
992    /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
993    /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
994    /// # async fn doc(client: kube::Client) {
995    /// let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
996    ///     .touched_objects()
997    ///     .predicate_filter(predicates::generation);
998    ///
999    /// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
1000    ///     .owns_stream(sts_stream)
1001    ///     .run(reconcile, error_policy, Arc::new(()))
1002    ///     .for_each(|_| std::future::ready(()))
1003    ///     .await;
1004    /// # }
1005    /// ```
1006    #[cfg(feature = "unstable-runtime-stream-control")]
1007    #[must_use]
1008    pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1009        self,
1010        trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1011    ) -> Self {
1012        self.owns_stream_with(trigger, ())
1013    }
1014
1015    /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
1016    ///
1017    /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
1018    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1019    /// as well as sharing input streams between multiple controllers.
1020    ///
1021    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1022    ///
1023    /// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1024    #[cfg(feature = "unstable-runtime-stream-control")]
1025    #[must_use]
1026    pub fn owns_stream_with<Child: Resource + Send + 'static>(
1027        mut self,
1028        trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1029        dyntype: Child::DynamicType,
1030    ) -> Self
1031    where
1032        Child::DynamicType: Debug + Eq + Hash + Clone,
1033    {
1034        let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype);
1035        self.trigger_selector.push(child_watcher.boxed());
1036        self
1037    }
1038
1039    /// This is the same as [`Controller::for_stream`]. Instead of taking an
1040    /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
1041    /// streams can be created out-of-band by subscribing on a store `Writer`.
1042    /// Through this interface, multiple controllers can use the same root
1043    /// (shared) input stream of resources to keep memory overheads smaller.
1044    ///
1045    /// **N.B**: This constructor requires an
1046    /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
1047    /// feature.
1048    ///
1049    /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
1050    /// need to share the stream.
1051    ///
1052    /// ## Warning:
1053    ///
1054    /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
1055    /// is driven to readiness independently of this controller to ensure the
1056    /// watcher never deadlocks.
1057    ///
1058    ///
1059    /// Trigger the reconciliation process for a shared stream of `Child`
1060    /// objects of the owner `K`
1061    ///
1062    /// Conceptually the same as [`Controller::owns`], but a stream is used
1063    /// instead of an `Api`. This interface behaves similarly to its non-shared
1064    /// counterpart [`Controller::owns_stream`].
1065    ///
1066    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1067    ///
1068    /// # Example:
1069    ///
1070    /// ```no_run
1071    /// # use futures::StreamExt;
1072    /// # use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod};
1073    /// # use kube::runtime::controller::{Action, Controller};
1074    /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
1075    /// # use kube::{Api, Client, Error, ResourceExt};
1076    /// # use std::sync::Arc;
1077    /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1078    /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1079    /// # async fn doc(client: kube::Client) {
1080    /// let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
1081    /// let pod_api: Api<Pod> = Api::default_namespaced(client);
1082    ///
1083    /// let (reader, writer) = reflector::store_shared(128);
1084    /// let subscriber = writer
1085    ///     .subscribe()
1086    ///     .expect("subscribers can only be created from shared stores");
1087    /// let pods = watcher(pod_api, watcher::Config::default())
1088    ///     .default_backoff()
1089    ///     .reflect(writer)
1090    ///     .applied_objects()
1091    ///     .for_each(|ev| async move {
1092    ///         match ev {
1093    ///             Ok(obj) => tracing::info!("got obj {obj:?}"),
1094    ///             Err(error) => tracing::error!(%error, "received error")
1095    ///         }
1096    ///     });
1097    ///
1098    /// let controller = Controller::new(deploys, Default::default())
1099    ///     .owns_shared_stream(subscriber)
1100    ///     .run(reconcile, error_policy, Arc::new(()))
1101    ///     .for_each(|ev| async move {
1102    ///         tracing::info!("reconciled {ev:?}")
1103    ///     });
1104    ///
1105    /// // Drive streams using a select statement
1106    /// tokio::select! {
1107    ///   _ = pods => {},
1108    ///   _ = controller => {},
1109    /// }
1110    /// # }
1111    #[cfg(feature = "unstable-runtime-subscribe")]
1112    #[must_use]
1113    pub fn owns_shared_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1114        self,
1115        trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1116    ) -> Self {
1117        self.owns_shared_stream_with(trigger, ())
1118    }
1119
1120    /// Trigger the reconciliation process for a shared stream of `Child` objects of the owner `K`
1121    ///
1122    /// Same as [`Controller::owns`], but instead of an `Api`, a shared stream of resources is used.
1123    /// The source stream can be shared between multiple controllers, optimising
1124    /// resource usage.
1125    ///
1126    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1127    ///
1128    /// Same as [`Controller::owns_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1129    #[cfg(feature = "unstable-runtime-subscribe")]
1130    #[must_use]
1131    pub fn owns_shared_stream_with<Child: Resource<DynamicType = ()> + Send + 'static>(
1132        mut self,
1133        trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1134        dyntype: Child::DynamicType,
1135    ) -> Self
1136    where
1137        Child::DynamicType: Debug + Eq + Hash + Clone,
1138    {
1139        let child_watcher = trigger_owners_shared(trigger.map(Ok), self.dyntype.clone(), dyntype);
1140        self.trigger_selector.push(child_watcher.boxed());
1141        self
1142    }
1143
1144    /// Specify `Watched` object which `K` has a custom relation to and should be watched
1145    ///
1146    /// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,
1147    /// when given a `Watched` object, returns an option or iterator of relevant `ObjectRef<K>` to reconcile.
1148    ///
1149    /// If the relation `K` has to `Watched` is that `K` owns `Watched`, consider using [`Controller::owns`].
1150    ///
1151    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Watched`.
1152    ///
1153    /// The [`watcher::Config`] controls the subset of `Watched` objects that you want the [`Api`]
1154    /// to watch - in the Api's configured scope - and run through the custom mapper.
1155    /// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`watcher::Config::default`].
1156    ///
1157    /// # Example
1158    ///
1159    /// Tracking cross cluster references using the [Operator-SDK] annotations.
1160    ///
1161    /// ```
1162    /// # use kube::runtime::{Controller, controller::Action, reflector::ObjectRef, watcher};
1163    /// # use kube::{Api, ResourceExt};
1164    /// # use k8s_openapi::api::core::v1::{ConfigMap, Namespace};
1165    /// # use futures::StreamExt;
1166    /// # use std::sync::Arc;
1167    /// # type WatchedResource = Namespace;
1168    /// # struct Context;
1169    /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<Context>) -> Result<Action, kube::Error> {
1170    /// #     Ok(Action::await_change())
1171    /// # };
1172    /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Context>) -> Action {
1173    /// #     Action::await_change()
1174    /// # }
1175    /// # async fn doc(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
1176    /// # let memcached = Api::<ConfigMap>::all(client.clone());
1177    /// # let context = Arc::new(Context);
1178    /// Controller::new(memcached, watcher::Config::default())
1179    ///     .watches(
1180    ///         Api::<WatchedResource>::all(client.clone()),
1181    ///         watcher::Config::default(),
1182    ///         |ar| {
1183    ///             let prt = ar
1184    ///                 .annotations()
1185    ///                 .get("operator-sdk/primary-resource-type")
1186    ///                 .map(String::as_str);
1187    ///
1188    ///             if prt != Some("Memcached.cache.example.com") {
1189    ///                 return None;
1190    ///             }
1191    ///
1192    ///             let (namespace, name) = ar
1193    ///                 .annotations()
1194    ///                 .get("operator-sdk/primary-resource")?
1195    ///                 .split_once('/')?;
1196    ///
1197    ///             Some(ObjectRef::new(name).within(namespace))
1198    ///         }
1199    ///     )
1200    ///     .run(reconcile, error_policy, context)
1201    ///     .for_each(|_| futures::future::ready(()))
1202    ///     .await;
1203    /// # Ok(())
1204    /// # }
1205    /// ```
1206    ///
1207    /// [Operator-SDK]: https://sdk.operatorframework.io/docs/building-operators/ansible/reference/retroactively-owned-resources/
1208    #[must_use]
1209    pub fn watches<Other, I>(
1210        self,
1211        api: Api<Other>,
1212        wc: watcher::Config,
1213        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1214    ) -> Self
1215    where
1216        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1217        Other::DynamicType: Default + Debug + Clone + Eq + Hash,
1218        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1219        I::IntoIter: Send,
1220    {
1221        self.watches_with(api, Default::default(), wc, mapper)
1222    }
1223
1224    /// Specify `Watched` object which `K` has a custom relation to and should be watched
1225    ///
1226    /// Same as [`Controller::watches`], but accepts a `DynamicType` so it can be used with dynamic resources.
1227    #[must_use]
1228    pub fn watches_with<Other, I>(
1229        mut self,
1230        api: Api<Other>,
1231        dyntype: Other::DynamicType,
1232        wc: watcher::Config,
1233        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1234    ) -> Self
1235    where
1236        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1237        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1238        I::IntoIter: Send,
1239        Other::DynamicType: Debug + Clone + Eq + Hash,
1240    {
1241        let other_watcher = trigger_others(watcher(api, wc).touched_objects(), mapper, dyntype);
1242        self.trigger_selector.push(other_watcher.boxed());
1243        self
1244    }
1245
1246    /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1247    ///
1248    /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1249    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1250    /// as well as sharing input streams between multiple controllers.
1251    ///
1252    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1253    ///
1254    /// Watcher streams passed in here should be filtered first through `touched_objects`.
1255    ///
1256    /// # Example:
1257    ///
1258    /// ```no_run
1259    /// # use futures::StreamExt;
1260    /// # use k8s_openapi::api::core::v1::ConfigMap;
1261    /// # use k8s_openapi::api::apps::v1::DaemonSet;
1262    /// # use kube::runtime::controller::Action;
1263    /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1264    /// # use kube::{Api, Client, Error, ResourceExt};
1265    /// # use std::sync::Arc;
1266    /// # type CustomResource = ConfigMap;
1267    /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1268    /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1269    /// fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
1270    /// # async fn doc(client: kube::Client) {
1271    /// let api: Api<DaemonSet> = Api::all(client.clone());
1272    /// let cr: Api<CustomResource> = Api::all(client.clone());
1273    /// let daemons = watcher(api, watcher::Config::default())
1274    ///     .touched_objects()
1275    ///     .predicate_filter(predicates::generation);
1276    ///
1277    /// Controller::new(cr, watcher::Config::default())
1278    ///     .watches_stream(daemons, mapper)
1279    ///     .run(reconcile, error_policy, Arc::new(()))
1280    ///     .for_each(|_| std::future::ready(()))
1281    ///     .await;
1282    /// # }
1283    /// ```
1284    #[cfg(feature = "unstable-runtime-stream-control")]
1285    #[must_use]
1286    pub fn watches_stream<Other, I>(
1287        self,
1288        trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1289        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1290    ) -> Self
1291    where
1292        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1293        Other::DynamicType: Default + Debug + Clone,
1294        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1295        I::IntoIter: Send,
1296    {
1297        self.watches_stream_with(trigger, mapper, Default::default())
1298    }
1299
1300    /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1301    ///
1302    /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1303    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1304    /// as well as sharing input streams between multiple controllers.
1305    ///
1306    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1307    ///
1308    /// Same as [`Controller::watches_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1309    #[cfg(feature = "unstable-runtime-stream-control")]
1310    #[must_use]
1311    pub fn watches_stream_with<Other, I>(
1312        mut self,
1313        trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1314        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1315        dyntype: Other::DynamicType,
1316    ) -> Self
1317    where
1318        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1319        Other::DynamicType: Debug + Clone,
1320        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1321        I::IntoIter: Send,
1322    {
1323        let other_watcher = trigger_others(trigger, mapper, dyntype);
1324        self.trigger_selector.push(other_watcher.boxed());
1325        self
1326    }
1327
1328    /// Trigger the reconciliation process for a shared stream of `Other`
1329    /// objects related to a `K`
1330    ///
1331    /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1332    /// stream of resources is used. This allows for sharing input streams
1333    /// between multiple controllers.
1334    ///
1335    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1336    ///
1337    /// Watcher streams passed in here should be filtered first through `touched_objects`.
1338    ///
1339    /// # Example:
1340    ///
1341    /// ```no_run
1342    /// # use futures::StreamExt;
1343    /// # use k8s_openapi::api::core::v1::ConfigMap;
1344    /// # use k8s_openapi::api::apps::v1::DaemonSet;
1345    /// # use kube::runtime::controller::Action;
1346    /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1347    /// # use kube::{Api, Client, Error, ResourceExt};
1348    /// # use std::sync::Arc;
1349    /// # type CustomResource = ConfigMap;
1350    /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1351    /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1352    /// fn mapper(_: Arc<DaemonSet>) -> Option<ObjectRef<CustomResource>> { todo!() }
1353    /// # async fn doc(client: kube::Client) {
1354    /// let api: Api<DaemonSet> = Api::all(client.clone());
1355    /// let cr: Api<CustomResource> = Api::all(client.clone());
1356    /// let (reader, writer) = kube_runtime::reflector::store_shared(128);
1357    /// let subscriber = writer
1358    ///     .subscribe()
1359    ///     .expect("subscribers can only be created from shared stores");
1360    /// let daemons = watcher(api, watcher::Config::default())
1361    ///     .reflect(writer)
1362    ///     .touched_objects()
1363    ///     .for_each(|ev| async move {
1364    ///         match ev {
1365    ///             Ok(obj) => {},
1366    ///             Err(error) => tracing::error!(%error, "received err")
1367    ///         }
1368    ///     });
1369    ///
1370    /// let controller = Controller::new(cr, watcher::Config::default())
1371    ///     .watches_shared_stream(subscriber, mapper)
1372    ///     .run(reconcile, error_policy, Arc::new(()))
1373    ///     .for_each(|_| std::future::ready(()));
1374    ///
1375    /// // Drive streams using a select statement
1376    /// tokio::select! {
1377    ///   _ = daemons => {},
1378    ///   _ = controller => {},
1379    /// }
1380    /// # }
1381    /// ```
1382    #[cfg(feature = "unstable-runtime-subscribe")]
1383    #[must_use]
1384    pub fn watches_shared_stream<Other, I>(
1385        self,
1386        trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1387        mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1388    ) -> Self
1389    where
1390        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1391        Other::DynamicType: Default + Debug + Clone,
1392        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1393        I::IntoIter: Send,
1394    {
1395        self.watches_shared_stream_with(trigger, mapper, Default::default())
1396    }
1397
1398    /// Trigger the reconciliation process for a shared stream of `Other` objects related to a `K`
1399    ///
1400    /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1401    /// stream of resources is used. This allows for sharing of streams between
1402    /// multiple controllers.
1403    ///
1404    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1405    ///
1406    /// Same as [`Controller::watches_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1407    #[cfg(feature = "unstable-runtime-subscribe")]
1408    #[must_use]
1409    pub fn watches_shared_stream_with<Other, I>(
1410        mut self,
1411        trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1412        mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1413        dyntype: Other::DynamicType,
1414    ) -> Self
1415    where
1416        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1417        Other::DynamicType: Debug + Clone,
1418        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1419        I::IntoIter: Send,
1420    {
1421        let other_watcher = trigger_others_shared(trigger.map(Ok), mapper, dyntype);
1422        self.trigger_selector.push(other_watcher.boxed());
1423        self
1424    }
1425
1426    /// Trigger a reconciliation for all managed objects whenever `trigger` emits a value
1427    ///
1428    /// For example, this can be used to reconcile all objects whenever the controller's configuration changes.
1429    ///
1430    /// To reconcile all objects when a new line is entered:
1431    ///
1432    /// ```
1433    /// # async {
1434    /// use futures::stream::StreamExt;
1435    /// use k8s_openapi::api::core::v1::ConfigMap;
1436    /// use kube::{
1437    ///     Client,
1438    ///     api::{Api, ResourceExt},
1439    ///     runtime::{
1440    ///         controller::{Controller, Action},
1441    ///         watcher,
1442    ///     },
1443    /// };
1444    /// use std::{convert::Infallible, io::BufRead, sync::Arc};
1445    /// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
1446    /// // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
1447    /// // and its worker prevents the Tokio runtime from shutting down.
1448    /// std::thread::spawn(move || {
1449    ///     for _ in std::io::BufReader::new(std::io::stdin()).lines() {
1450    ///         let _ = reload_tx.try_send(());
1451    ///     }
1452    /// });
1453    /// Controller::new(
1454    ///     Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1455    ///     watcher::Config::default(),
1456    /// )
1457    /// .reconcile_all_on(reload_rx.map(|_| ()))
1458    /// .run(
1459    ///     |o, _| async move {
1460    ///         println!("Reconciling {}", o.name_any());
1461    ///         Ok(Action::await_change())
1462    ///     },
1463    ///     |_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
1464    ///     Arc::new(()),
1465    /// );
1466    /// # };
1467    /// ```
1468    ///
1469    /// This can be called multiple times, in which case they are additive; reconciles are scheduled whenever *any* [`Stream`] emits a new item.
1470    ///
1471    /// If a [`Stream`] is terminated (by emitting [`None`]) then the [`Controller`] keeps running, but the [`Stream`] stops being polled.
1472    #[must_use]
1473    pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
1474        let store = self.store();
1475        let dyntype = self.dyntype.clone();
1476        self.trigger_selector.push(
1477            trigger
1478                .flat_map(move |()| {
1479                    let dyntype = dyntype.clone();
1480                    stream::iter(store.state().into_iter().map(move |obj| {
1481                        Ok(ReconcileRequest {
1482                            obj_ref: ObjectRef::from_obj_with(&*obj, dyntype.clone()),
1483                            reason: ReconcileReason::BulkReconcile,
1484                        })
1485                    }))
1486                })
1487                .boxed(),
1488        );
1489        self
1490    }
1491
1492    /// Trigger the reconciliation process for a managed object `ObjectRef<K>` whenever `trigger` emits a value
1493    ///
1494    /// This can be used to inject reconciliations for specific objects from an external resource.
1495    ///
1496    /// # Example:
1497    ///
1498    /// ```no_run
1499    /// # async {
1500    /// # use futures::{StreamExt, Stream, stream, TryStreamExt};
1501    /// # use k8s_openapi::api::core::v1::{ConfigMap};
1502    /// # use kube::api::Api;
1503    /// # use kube::runtime::controller::Action;
1504    /// # use kube::runtime::reflector::{ObjectRef, Store};
1505    /// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt};
1506    /// # use kube::runtime::watcher::Config;
1507    /// # use kube::{Client, Error, ResourceExt};
1508    /// # use std::future;
1509    /// # use std::sync::Arc;
1510    /// #
1511    /// # let client: Client = todo!();
1512    /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1513    /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1514    /// # fn watch_external_objects() -> impl Stream<Item = ExternalObject> { stream::iter(vec![]) }
1515    /// # let ns = "controller-ns".to_string();
1516    /// struct ExternalObject {
1517    ///     name: String,
1518    /// }
1519    /// let external_stream = watch_external_objects().map(|ext| {
1520    ///     ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)
1521    /// });
1522    ///
1523    /// Controller::new(Api::<ConfigMap>::namespaced(client, &ns), Config::default())
1524    ///     .reconcile_on(external_stream)
1525    ///     .run(reconcile, error_policy, Arc::new(()))
1526    ///     .for_each(|_| future::ready(()))
1527    ///     .await;
1528    /// # };
1529    /// ```
1530    #[cfg(feature = "unstable-runtime-reconcile-on")]
1531    #[must_use]
1532    pub fn reconcile_on(mut self, trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static) -> Self {
1533        self.trigger_selector.push(
1534            trigger
1535                .map(move |obj| {
1536                    Ok(ReconcileRequest {
1537                        obj_ref: obj,
1538                        reason: ReconcileReason::Unknown,
1539                    })
1540                })
1541                .boxed(),
1542        );
1543        self
1544    }
1545
1546    /// Start a graceful shutdown when `trigger` resolves. Once a graceful shutdown has been initiated:
1547    ///
1548    /// - No new reconciliations are started from the scheduler
1549    /// - The underlying Kubernetes watch is terminated
1550    /// - All running reconciliations are allowed to finish
1551    /// - [`Controller::run`]'s [`Stream`] terminates once all running reconciliations are done.
1552    ///
1553    /// For example, to stop the reconciler whenever the user presses Ctrl+C:
1554    ///
1555    /// ```rust
1556    /// # async {
1557    /// use futures::future::FutureExt;
1558    /// use k8s_openapi::api::core::v1::ConfigMap;
1559    /// use kube::{Api, Client, ResourceExt};
1560    /// use kube_runtime::{
1561    ///     controller::{Controller, Action},
1562    ///     watcher,
1563    /// };
1564    /// use std::{convert::Infallible, sync::Arc};
1565    /// Controller::new(
1566    ///     Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1567    ///     watcher::Config::default(),
1568    /// )
1569    /// .graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
1570    /// .run(
1571    ///     |o, _| async move {
1572    ///         println!("Reconciling {}", o.name_any());
1573    ///         Ok(Action::await_change())
1574    ///     },
1575    ///     |_, err: &Infallible, _| Err(err).unwrap(),
1576    ///     Arc::new(()),
1577    /// );
1578    /// # };
1579    /// ```
1580    ///
1581    /// This can be called multiple times, in which case they are additive; the [`Controller`] starts to terminate
1582    /// as soon as *any* [`Future`] resolves.
1583    #[must_use]
1584    pub fn graceful_shutdown_on(mut self, trigger: impl Future<Output = ()> + Send + Sync + 'static) -> Self {
1585        self.graceful_shutdown_selector.push(trigger.boxed());
1586        self
1587    }
1588
1589    /// Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
1590    ///
1591    /// Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again
1592    /// to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
1593    ///
1594    /// NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the [`Controller`] has
1595    /// terminated. If you run this in a process containing more tasks than just the [`Controller`], ensure that
1596    /// all other tasks either terminate when the [`Controller`] does, that they have their own signal handlers,
1597    /// or use [`Controller::graceful_shutdown_on`] to manage your own shutdown strategy.
1598    ///
1599    /// NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
1600    /// [`Controller::graceful_shutdown_on`].
1601    ///
1602    /// NOTE: [`Controller::run`] terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
1603    /// in the background while they terminate. This will block [`tokio::runtime::Runtime`] termination until they actually terminate,
1604    /// unless you run [`std::process::exit`] afterwards.
1605    #[must_use]
1606    pub fn shutdown_on_signal(mut self) -> Self {
1607        async fn shutdown_signal() {
1608            futures::future::select(
1609                tokio::signal::ctrl_c().map(|_| ()).boxed(),
1610                #[cfg(unix)]
1611                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1612                    .unwrap()
1613                    .recv()
1614                    .map(|_| ())
1615                    .boxed(),
1616                // Assume that ctrl_c is enough on non-Unix platforms (such as Windows)
1617                #[cfg(not(unix))]
1618                futures::future::pending::<()>(),
1619            )
1620            .await;
1621        }
1622
1623        let (graceful_tx, graceful_rx) = channel::oneshot::channel();
1624        self.graceful_shutdown_selector
1625            .push(graceful_rx.map(|_| ()).boxed());
1626        self.forceful_shutdown_selector.push(
1627            async {
1628                tracing::info!("press ctrl+c to shut down gracefully");
1629                shutdown_signal().await;
1630                if let Ok(()) = graceful_tx.send(()) {
1631                    tracing::info!("graceful shutdown requested, press ctrl+c again to force shutdown");
1632                } else {
1633                    tracing::info!(
1634                        "graceful shutdown already requested, press ctrl+c again to force shutdown"
1635                    );
1636                }
1637                shutdown_signal().await;
1638                tracing::info!("forced shutdown requested");
1639            }
1640            .boxed(),
1641        );
1642        self
1643    }
1644
1645    /// Consume all the parameters of the Controller and start the applier stream
1646    ///
1647    /// This creates a stream from all builder calls and starts an applier with
1648    /// a specified `reconciler` and `error_policy` callbacks. Each of these will be called
1649    /// with a configurable `context`.
1650    pub fn run<ReconcilerFut, Ctx>(
1651        self,
1652        mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
1653        error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
1654        context: Arc<Ctx>,
1655    ) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
1656    where
1657        K::DynamicType: Debug + Unpin,
1658        ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
1659        ReconcilerFut::Error: std::error::Error + Send + 'static,
1660    {
1661        applier(
1662            move |obj, ctx| {
1663                CancelableJoinHandle::spawn(
1664                    reconciler(obj, ctx).into_future().in_current_span(),
1665                    &Handle::current(),
1666                )
1667            },
1668            error_policy,
1669            context,
1670            self.reader,
1671            StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
1672                .take_until(future::select_all(self.graceful_shutdown_selector)),
1673            self.config,
1674        )
1675        .take_until(futures::future::select_all(self.forceful_shutdown_selector))
1676    }
1677}
1678
1679#[cfg(test)]
1680mod tests {
1681    use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration};
1682
1683    use super::{Action, APPLIER_REQUEUE_BUF_SIZE};
1684    use crate::{
1685        applier,
1686        reflector::{self, ObjectRef},
1687        watcher::{self, metadata_watcher, watcher, Event},
1688        Config, Controller,
1689    };
1690    use futures::{Stream, StreamExt, TryStreamExt};
1691    use k8s_openapi::api::core::v1::ConfigMap;
1692    use kube_client::{core::ObjectMeta, Api, Resource};
1693    use serde::de::DeserializeOwned;
1694    use tokio::time::timeout;
1695
1696    fn assert_send<T: Send>(x: T) -> T {
1697        x
1698    }
1699
1700    // Used to typecheck that a type T is a generic type that implements Stream
1701    // and returns a WatchEvent generic over a resource `K`
1702    fn assert_stream<T, K>(x: T) -> T
1703    where
1704        T: Stream<Item = watcher::Result<Event<K>>> + Send,
1705        K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
1706    {
1707        x
1708    }
1709
1710    fn mock_type<T>() -> T {
1711        unimplemented!(
1712            "mock_type is not supposed to be called, only used for filling holes in type assertions"
1713        )
1714    }
1715
1716    // not #[test] because we don't want to actually run it, we just want to assert that it typechecks
1717    #[allow(dead_code, unused_must_use)]
1718    fn test_controller_should_be_send() {
1719        assert_send(
1720            Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
1721                |_, _| async { Ok(mock_type::<Action>()) },
1722                |_: Arc<ConfigMap>, _: &std::io::Error, _| mock_type::<Action>(),
1723                Arc::new(()),
1724            ),
1725        );
1726    }
1727
1728    // not #[test] because we don't want to actually run it, we just want to
1729    // assert that it typechecks
1730    //
1731    // will check return types for `watcher` and `watch_metadata` do not drift
1732    // given an arbitrary K that implements `Resource` (e.g ConfigMap)
1733    #[allow(dead_code, unused_must_use)]
1734    fn test_watcher_stream_type_drift() {
1735        assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
1736        assert_stream(metadata_watcher(
1737            mock_type::<Api<ConfigMap>>(),
1738            Default::default(),
1739        ));
1740    }
1741
1742    #[tokio::test]
1743    async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
1744        // This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles
1745        // This is intended to avoid regressing on https://github.com/kube-rs/kube/issues/926
1746
1747        // Assume that we can keep APPLIER_REQUEUE_BUF_SIZE flooded if we have 100x the number of objects "in rotation"
1748        // On my (@nightkr)'s 3900X I can reliably trigger this with 10x, but let's have some safety margin to avoid false negatives
1749        let items = APPLIER_REQUEUE_BUF_SIZE * 50;
1750        // Assume that everything's OK if we can reconcile every object 3 times on average
1751        let reconciles = items * 3;
1752
1753        let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
1754        let (store_rx, mut store_tx) = reflector::store();
1755        let mut applier = pin!(applier(
1756            |_obj, _| {
1757                Box::pin(async move {
1758                    // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
1759                    //println!("reconciling {:?}", obj.metadata.name);
1760                    Ok(Action::requeue(Duration::ZERO))
1761                })
1762            },
1763            |_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
1764            Arc::new(()),
1765            store_rx,
1766            queue_rx.map(Result::<_, Infallible>::Ok),
1767            Config::default(),
1768        ));
1769        store_tx.apply_watcher_event(&watcher::Event::InitDone);
1770        for i in 0..items {
1771            let obj = ConfigMap {
1772                metadata: ObjectMeta {
1773                    name: Some(format!("cm-{i}")),
1774                    namespace: Some("default".to_string()),
1775                    ..Default::default()
1776                },
1777                ..Default::default()
1778            };
1779            store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone()));
1780            queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
1781        }
1782
1783        timeout(
1784            Duration::from_secs(10),
1785            applier
1786                .as_mut()
1787                .take(reconciles)
1788                .try_for_each(|_| async { Ok(()) }),
1789        )
1790        .await
1791        .expect("test timeout expired, applier likely deadlocked")
1792        .unwrap();
1793
1794        // Do an orderly shutdown to ensure that no individual reconcilers are stuck
1795        drop(queue_tx);
1796        timeout(
1797            Duration::from_secs(10),
1798            applier.try_for_each(|_| async { Ok(()) }),
1799        )
1800        .await
1801        .expect("applier cleanup timeout expired, individual reconciler likely deadlocked?")
1802        .unwrap();
1803    }
1804}