kube_runtime/controller/mod.rs
1//! Runs a user-supplied reconciler function on objects when they (or related objects) are updated
2
3use self::runner::Runner;
4use crate::{
5 reflector::{
6 self, reflector,
7 store::{Store, Writer},
8 ObjectRef,
9 },
10 scheduler::{debounced_scheduler, ScheduleRequest},
11 utils::{
12 trystream_try_via, Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt,
13 },
14 watcher::{self, metadata_watcher, watcher, DefaultBackoff},
15};
16use educe::Educe;
17use futures::{
18 channel,
19 future::{self, BoxFuture},
20 stream, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
21};
22use kube_client::api::{Api, DynamicObject, Resource};
23use pin_project::pin_project;
24use serde::de::DeserializeOwned;
25use std::{
26 fmt::{Debug, Display},
27 future::Future,
28 hash::Hash,
29 sync::Arc,
30 task::{ready, Poll},
31 time::Duration,
32};
33use stream::BoxStream;
34use thiserror::Error;
35use tokio::{runtime::Handle, time::Instant};
36use tracing::{info_span, Instrument};
37
38mod future_hash_map;
39mod runner;
40
41pub type RunnerError = runner::Error<reflector::store::WriterDropped>;
42
43#[derive(Debug, Error)]
44pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> {
45 #[error("tried to reconcile object {0} that was not found in local store")]
46 ObjectNotFound(ObjectRef<DynamicObject>),
47 #[error("reconciler for object {1} failed")]
48 ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>),
49 #[error("event queue error")]
50 QueueError(#[source] QueueErr),
51 #[error("runner error")]
52 RunnerError(#[source] RunnerError),
53}
54
55/// Results of the reconciliation attempt
56#[derive(Debug, Clone, Eq, PartialEq)]
57pub struct Action {
58 /// Whether (and when) to next trigger the reconciliation if no external watch triggers hit
59 ///
60 /// For example, use this to query external systems for updates, expire time-limited resources, or
61 /// (in your `error_policy`) retry after errors.
62 requeue_after: Option<Duration>,
63}
64
65impl Action {
66 /// Action to the reconciliation at this time even if no external watch triggers hit
67 ///
68 /// This is the best-practice action that ensures eventual consistency of your controller
69 /// even in the case of missed changes (which can happen).
70 ///
71 /// Watch events are not normally missed, so running this once per hour (`Default`) as a fallback is reasonable.
72 #[must_use]
73 pub fn requeue(duration: Duration) -> Self {
74 Self {
75 requeue_after: Some(duration),
76 }
77 }
78
79 /// Do nothing until a change is detected
80 ///
81 /// This stops the controller periodically reconciling this object until a relevant watch event
82 /// was **detected**.
83 ///
84 /// **Warning**: If you have watch desyncs, it is possible to miss changes entirely.
85 /// It is therefore not recommended to disable requeuing this way, unless you have
86 /// frequent changes to the underlying object, or some other hook to retain eventual consistency.
87 #[must_use]
88 pub fn await_change() -> Self {
89 Self { requeue_after: None }
90 }
91}
92
93/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
94pub fn trigger_with<T, K, I, S>(
95 stream: S,
96 mapper: impl Fn(T) -> I,
97) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
98where
99 S: TryStream<Ok = T>,
100 I: IntoIterator,
101 I::Item: Into<ReconcileRequest<K>>,
102 K: Resource,
103{
104 stream
105 .map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Into::into).map(Ok)))
106 .try_flatten()
107}
108
109/// Enqueues the object itself for reconciliation
110pub fn trigger_self<K, S>(
111 stream: S,
112 dyntype: K::DynamicType,
113) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
114where
115 S: TryStream<Ok = K>,
116 K: Resource,
117 K::DynamicType: Clone,
118{
119 trigger_with(stream, move |obj| {
120 Some(ReconcileRequest {
121 obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
122 reason: ReconcileReason::ObjectUpdated,
123 })
124 })
125}
126
127/// Enqueues the object itself for reconciliation when the object is behind a
128/// shared pointer
129#[cfg(feature = "unstable-runtime-subscribe")]
130fn trigger_self_shared<K, S>(
131 stream: S,
132 dyntype: K::DynamicType,
133) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
134where
135 // Input stream has item as some Arc'd Resource (via
136 // Controller::for_shared_stream)
137 S: TryStream<Ok = Arc<K>>,
138 K: Resource,
139 K::DynamicType: Clone,
140{
141 trigger_with(stream, move |obj| {
142 Some(ReconcileRequest {
143 obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
144 reason: ReconcileReason::ObjectUpdated,
145 })
146 })
147}
148
149/// Enqueues any mapper returned `K` types for reconciliation
150fn trigger_others<S, K, I>(
151 stream: S,
152 mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
153 dyntype: <S::Ok as Resource>::DynamicType,
154) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
155where
156 // Input stream has items as some Resource (via Controller::watches)
157 S: TryStream,
158 S::Ok: Resource,
159 <S::Ok as Resource>::DynamicType: Clone,
160 // Output stream is requests for the root type K
161 K: Resource,
162 K::DynamicType: Clone,
163 // but the mapper can produce many of them
164 I: 'static + IntoIterator<Item = ObjectRef<K>>,
165 I::IntoIter: Send,
166{
167 trigger_with(stream, move |obj| {
168 let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
169 mapper(obj)
170 .into_iter()
171 .map(move |mapped_obj_ref| ReconcileRequest {
172 obj_ref: mapped_obj_ref,
173 reason: ReconcileReason::RelatedObjectUpdated {
174 obj_ref: Box::new(watch_ref.clone()),
175 },
176 })
177 })
178}
179
180/// Enqueues any mapper returned `Arc<K>` types for reconciliation
181#[cfg(feature = "unstable-runtime-subscribe")]
182fn trigger_others_shared<S, O, K, I>(
183 stream: S,
184 mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
185 dyntype: O::DynamicType,
186) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
187where
188 // Input is some shared resource (`Arc<O>`) obtained via `reflect`
189 S: TryStream<Ok = Arc<O>>,
190 O: Resource,
191 O::DynamicType: Clone,
192 // Output stream is requests for the root type K
193 K: Resource,
194 K::DynamicType: Clone,
195 // but the mapper can produce many of them
196 I: 'static + IntoIterator<Item = ObjectRef<K>>,
197 I::IntoIter: Send,
198{
199 trigger_with(stream, move |obj| {
200 let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase();
201 mapper(obj)
202 .into_iter()
203 .map(move |mapped_obj_ref| ReconcileRequest {
204 obj_ref: mapped_obj_ref,
205 reason: ReconcileReason::RelatedObjectUpdated {
206 obj_ref: Box::new(watch_ref.clone()),
207 },
208 })
209 })
210}
211
212/// Enqueues any owners of type `KOwner` for reconciliation
213pub fn trigger_owners<KOwner, S>(
214 stream: S,
215 owner_type: KOwner::DynamicType,
216 child_type: <S::Ok as Resource>::DynamicType,
217) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
218where
219 S: TryStream,
220 S::Ok: Resource,
221 <S::Ok as Resource>::DynamicType: Clone,
222 KOwner: Resource,
223 KOwner::DynamicType: Clone,
224{
225 let mapper = move |obj: S::Ok| {
226 let meta = obj.meta().clone();
227 let ns = meta.namespace;
228 let owner_type = owner_type.clone();
229 meta.owner_references
230 .into_iter()
231 .flatten()
232 .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
233 };
234 trigger_others(stream, mapper, child_type)
235}
236
237// TODO: do we really need to deal with a trystream? can we simplify this at
238// all?
239/// Enqueues any owners of type `KOwner` for reconciliation based on a stream of
240/// owned `K` objects
241#[cfg(feature = "unstable-runtime-subscribe")]
242fn trigger_owners_shared<KOwner, S, K>(
243 stream: S,
244 owner_type: KOwner::DynamicType,
245 child_type: K::DynamicType,
246) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
247where
248 S: TryStream<Ok = Arc<K>>,
249 K: Resource,
250 K::DynamicType: Clone,
251 KOwner: Resource,
252 KOwner::DynamicType: Clone,
253{
254 let mapper = move |obj: S::Ok| {
255 let meta = obj.meta().clone();
256 let ns = meta.namespace;
257 let owner_type = owner_type.clone();
258 meta.owner_references
259 .into_iter()
260 .flatten()
261 .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
262 };
263 trigger_others_shared(stream, mapper, child_type)
264}
265
266/// A request to reconcile an object, annotated with why that request was made.
267///
268/// NOTE: The reason is ignored for comparison purposes. This means that, for example,
269/// an object can only occupy one scheduler slot, even if it has been scheduled for multiple reasons.
270/// In this case, only *the first* reason is stored.
271#[derive(Educe)]
272#[educe(
273 Debug(bound("K::DynamicType: Debug")),
274 Clone(bound("K::DynamicType: Clone")),
275 PartialEq(bound("K::DynamicType: PartialEq")),
276 Hash(bound("K::DynamicType: Hash"))
277)]
278pub struct ReconcileRequest<K: Resource> {
279 pub obj_ref: ObjectRef<K>,
280 #[educe(PartialEq(ignore), Hash(ignore))]
281 pub reason: ReconcileReason,
282}
283
284impl<K: Resource> Eq for ReconcileRequest<K> where K::DynamicType: Eq {}
285
286impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> {
287 fn from(obj_ref: ObjectRef<K>) -> Self {
288 ReconcileRequest {
289 obj_ref,
290 reason: ReconcileReason::Unknown,
291 }
292 }
293}
294
295#[derive(Debug, Clone)]
296pub enum ReconcileReason {
297 Unknown,
298 ObjectUpdated,
299 RelatedObjectUpdated { obj_ref: Box<ObjectRef<DynamicObject>> },
300 ReconcilerRequestedRetry,
301 ErrorPolicyRequestedRetry,
302 BulkReconcile,
303 Custom { reason: String },
304}
305
306impl Display for ReconcileReason {
307 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308 match self {
309 ReconcileReason::Unknown => f.write_str("unknown"),
310 ReconcileReason::ObjectUpdated => f.write_str("object updated"),
311 ReconcileReason::RelatedObjectUpdated { obj_ref: object } => {
312 f.write_fmt(format_args!("related object updated: {object}"))
313 }
314 ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"),
315 ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"),
316 ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"),
317 ReconcileReason::Custom { reason } => f.write_str(reason),
318 }
319 }
320}
321
322const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
323
324/// Apply a reconciler to an input stream, with a given retry policy
325///
326/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector()`].
327///
328/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be
329/// the [`reflector()`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
330/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector()`]
331/// with a [`watcher()`] or [`reflector()`] for the subobject.
332///
333/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
334/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
335#[allow(clippy::needless_pass_by_value)]
336#[allow(clippy::type_complexity)]
337pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
338 mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
339 error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
340 context: Arc<Ctx>,
341 store: Store<K>,
342 queue: QueueStream,
343 config: Config,
344) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
345where
346 K: Clone + Resource + 'static,
347 K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
348 ReconcilerFut: TryFuture<Ok = Action> + Unpin,
349 ReconcilerFut::Error: std::error::Error + 'static,
350 QueueStream: TryStream,
351 QueueStream::Ok: Into<ReconcileRequest<K>>,
352 QueueStream::Error: std::error::Error + 'static,
353{
354 let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
355 let (scheduler_tx, scheduler_rx) =
356 channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
357 let error_policy = Arc::new(error_policy);
358 let delay_store = store.clone();
359 // Create a stream of ObjectRefs that need to be reconciled
360 trystream_try_via(
361 // input: stream combining scheduled tasks and user specified inputs event
362 Box::pin(stream::select(
363 // 1. inputs from users queue stream
364 queue
365 .map_err(Error::QueueError)
366 .map_ok(|request| ScheduleRequest {
367 message: request.into(),
368 run_at: Instant::now(),
369 })
370 .on_complete(async move {
371 // On error: scheduler has already been shut down and there is nothing for us to do
372 let _ = scheduler_shutdown_tx.send(());
373 tracing::debug!("applier queue terminated, starting graceful shutdown")
374 }),
375 // 2. requests sent to scheduler_tx
376 scheduler_rx
377 .map(Ok)
378 .take_until(scheduler_shutdown_rx)
379 .on_complete(async { tracing::debug!("applier scheduler consumer terminated") }),
380 )),
381 // all the Oks from the select gets passed through the scheduler stream, and are then executed
382 move |s| {
383 Runner::new(
384 debounced_scheduler(s, config.debounce),
385 config.concurrency,
386 move |request| {
387 let request = request.clone();
388 match store.get(&request.obj_ref) {
389 Some(obj) => {
390 let scheduler_tx = scheduler_tx.clone();
391 let error_policy_ctx = context.clone();
392 let error_policy = error_policy.clone();
393 let reconciler_span = info_span!(
394 "reconciling object",
395 "object.ref" = %request.obj_ref,
396 object.reason = %request.reason
397 );
398 reconciler_span
399 .in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
400 .into_future()
401 .then(move |res| {
402 let error_policy = error_policy;
403 RescheduleReconciliation::new(
404 res,
405 |err| error_policy(obj, err, error_policy_ctx),
406 request.obj_ref.clone(),
407 scheduler_tx,
408 )
409 // Reconciler errors are OK from the applier's PoV, we need to apply the error policy
410 // to them separately
411 .map(|res| Ok((request.obj_ref, res)))
412 })
413 .instrument(reconciler_span)
414 .left_future()
415 }
416 None => std::future::ready(Err(Error::ObjectNotFound(request.obj_ref.erase())))
417 .right_future(),
418 }
419 },
420 )
421 .delay_tasks_until(async move {
422 tracing::debug!("applier runner held until store is ready");
423 let res = delay_store.wait_until_ready().await;
424 tracing::debug!("store is ready, starting runner");
425 res
426 })
427 .map(|runner_res| runner_res.unwrap_or_else(|err| Err(Error::RunnerError(err))))
428 .on_complete(async { tracing::debug!("applier runner terminated") })
429 },
430 )
431 .on_complete(async { tracing::debug!("applier runner-merge terminated") })
432 // finally, for each completed reconcile call:
433 .and_then(move |(obj_ref, reconciler_result)| async move {
434 match reconciler_result {
435 Ok(action) => Ok((obj_ref, action)),
436 Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())),
437 }
438 })
439 .on_complete(async { tracing::debug!("applier terminated") })
440}
441
442/// Internal helper [`Future`] that reschedules reconciliation of objects (if required), in the scheduled context of the reconciler
443///
444/// This could be an `async fn`, but isn't because we want it to be [`Unpin`]
445#[pin_project]
446#[must_use]
447struct RescheduleReconciliation<K: Resource, ReconcilerErr> {
448 reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
449
450 reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
451 result: Option<Result<Action, ReconcilerErr>>,
452}
453
454impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr>
455where
456 K: Resource,
457{
458 fn new(
459 result: Result<Action, ReconcilerErr>,
460 error_policy: impl FnOnce(&ReconcilerErr) -> Action,
461 obj_ref: ObjectRef<K>,
462 reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
463 ) -> Self {
464 let reconciler_finished_at = Instant::now();
465
466 let (action, reschedule_reason) = result.as_ref().map_or_else(
467 |err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
468 |action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
469 );
470
471 Self {
472 reschedule_tx,
473 reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
474 message: ReconcileRequest {
475 obj_ref,
476 reason: reschedule_reason,
477 },
478 run_at: reconciler_finished_at
479 .checked_add(requeue_after)
480 .unwrap_or_else(crate::scheduler::far_future),
481 }),
482 result: Some(result),
483 }
484 }
485}
486
487impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr>
488where
489 K: Resource,
490{
491 type Output = Result<Action, ReconcilerErr>;
492
493 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
494 let this = self.get_mut();
495
496 if this.reschedule_request.is_some() {
497 let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
498 let reschedule_request = this
499 .reschedule_request
500 .take()
501 .expect("PostReconciler::reschedule_request was taken during processing");
502 // Failure to schedule item = in graceful shutdown mode, ignore
503 if let Ok(()) = rescheduler_ready {
504 let _ = this.reschedule_tx.start_send(reschedule_request);
505 }
506 }
507
508 Poll::Ready(
509 this.result
510 .take()
511 .expect("PostReconciler::result was already taken"),
512 )
513 }
514}
515
516/// Accumulates all options that can be used on a [`Controller`] invocation.
517#[derive(Clone, Debug, Default)]
518pub struct Config {
519 debounce: Duration,
520 concurrency: u16,
521}
522
523impl Config {
524 /// The debounce duration used to deduplicate reconciliation requests.
525 ///
526 /// When set to a non-zero duration, debouncing is enabled in the [`scheduler`](crate::scheduler())
527 /// resulting in __trailing edge debouncing__ of reconciler requests.
528 /// This option can help to reduce the amount of unnecessary reconciler calls
529 /// when using multiple controller relations, or during rapid phase transitions.
530 ///
531 /// ## Warning
532 /// This option delays (and keeps delaying) reconcile requests for objects while
533 /// the object is updated. It can **permanently hide** updates from your reconciler
534 /// if set too high on objects that are updated frequently (like nodes).
535 #[must_use]
536 pub fn debounce(mut self, debounce: Duration) -> Self {
537 self.debounce = debounce;
538 self
539 }
540
541 /// The number of concurrent reconciliations of that are allowed to run at an given moment.
542 ///
543 /// This can be adjusted to the controller's needs to increase
544 /// performance and/or make performance predictable. By default, its 0 meaning
545 /// the controller runs with unbounded concurrency.
546 ///
547 /// Note that despite concurrency, a controller never schedules concurrent reconciles
548 /// on the same object.
549 #[must_use]
550 pub fn concurrency(mut self, concurrency: u16) -> Self {
551 self.concurrency = concurrency;
552 self
553 }
554}
555
556/// Controller for a Resource `K`
557///
558/// A controller is an infinite stream of objects to be reconciled.
559///
560/// Once `run` and continuously awaited, it continuously calls out to user provided
561/// `reconcile` and `error_policy` callbacks whenever relevant changes are detected
562/// or if errors are seen from `reconcile`.
563///
564/// Reconciles are generally requested for all changes on your root objects.
565/// Changes to managed child resources will also trigger the reconciler for the
566/// managing object by traversing owner references (for `Controller::owns`),
567/// or traverse a custom mapping (for `Controller::watches`).
568///
569/// This mapping mechanism ultimately hides the reason for the reconciliation request,
570/// and forces you to write an idempotent reconciler.
571///
572/// General setup:
573/// ```no_run
574/// use kube::{Api, Client, CustomResource};
575/// use kube::runtime::{controller::{Controller, Action}, watcher};
576/// # use serde::{Deserialize, Serialize};
577/// # use tokio::time::Duration;
578/// use futures::StreamExt;
579/// use k8s_openapi::api::core::v1::ConfigMap;
580/// use schemars::JsonSchema;
581/// # use std::sync::Arc;
582/// use thiserror::Error;
583///
584/// #[derive(Debug, Error)]
585/// enum Error {}
586///
587/// /// A custom resource
588/// #[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
589/// #[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
590/// struct ConfigMapGeneratorSpec {
591/// content: String,
592/// }
593///
594/// /// The reconciler that will be called when either object change
595/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
596/// // .. use api here to reconcile a child ConfigMap with ownerreferences
597/// // see configmapgen_controller example for full info
598/// Ok(Action::requeue(Duration::from_secs(300)))
599/// }
600/// /// an error handler that will be called when the reconciler fails with access to both the
601/// /// object that caused the failure and the actual error
602/// fn error_policy(obj: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<()>) -> Action {
603/// Action::requeue(Duration::from_secs(60))
604/// }
605///
606/// /// something to drive the controller
607/// #[tokio::main]
608/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
609/// let client = Client::try_default().await?;
610/// let context = Arc::new(()); // bad empty context - put client in here
611/// let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
612/// let cms = Api::<ConfigMap>::all(client.clone());
613/// Controller::new(cmgs, watcher::Config::default())
614/// .owns(cms, watcher::Config::default())
615/// .run(reconcile, error_policy, context)
616/// .for_each(|res| async move {
617/// match res {
618/// Ok(o) => println!("reconciled {:?}", o),
619/// Err(e) => println!("reconcile failed: {:?}", e),
620/// }
621/// })
622/// .await; // controller does nothing unless polled
623/// Ok(())
624/// }
625/// ```
626pub struct Controller<K>
627where
628 K: Clone + Resource + Debug + 'static,
629 K::DynamicType: Eq + Hash,
630{
631 // NB: Need to Unpin for stream::select_all
632 trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
633 trigger_backoff: Box<dyn Backoff + Send>,
634 /// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete,
635 /// refusing to start any new reconciliations but letting any existing ones finish.
636 graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
637 /// [`run`](crate::Controller::run) terminates immediately when any of these [`Future`]s complete,
638 /// requesting that all running reconciliations be aborted.
639 /// However, note that they *will* keep running until their next yield point (`.await`),
640 /// blocking [`tokio::runtime::Runtime`] destruction (unless you follow up by calling [`std::process::exit`] after `run`).
641 forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
642 dyntype: K::DynamicType,
643 reader: Store<K>,
644 config: Config,
645}
646
647impl<K> Controller<K>
648where
649 K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
650 K::DynamicType: Eq + Hash + Clone,
651{
652 /// Create a Controller for a resource `K`
653 ///
654 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
655 ///
656 /// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage
657 /// and receive reconcile events for.
658 /// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`].
659 #[must_use]
660 pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self
661 where
662 K::DynamicType: Default,
663 {
664 Self::new_with(main_api, wc, Default::default())
665 }
666
667 /// Create a Controller for a resource `K`
668 ///
669 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
670 ///
671 /// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
672 /// to watch - in the Api's configured scope - and receive reconcile events for.
673 /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
674 ///
675 /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
676 ///
677 /// [`watcher::Config`]: crate::watcher::Config
678 /// [`Api`]: kube_client::Api
679 /// [`dynamic`]: kube_client::core::dynamic
680 /// [`Config::default`]: crate::watcher::Config::default
681 pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {
682 let writer = Writer::<K>::new(dyntype.clone());
683 let reader = writer.as_reader();
684 let mut trigger_selector = stream::SelectAll::new();
685 let self_watcher = trigger_self(
686 reflector(writer, watcher(main_api, wc)).applied_objects(),
687 dyntype.clone(),
688 )
689 .boxed();
690 trigger_selector.push(self_watcher);
691 Self {
692 trigger_selector,
693 trigger_backoff: Box::<DefaultBackoff>::default(),
694 graceful_shutdown_selector: vec![
695 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
696 future::pending().boxed(),
697 ],
698 forceful_shutdown_selector: vec![
699 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
700 future::pending().boxed(),
701 ],
702 dyntype,
703 reader,
704 config: Default::default(),
705 }
706 }
707
708 /// Create a Controller for a resource `K` from a stream of `K` objects
709 ///
710 /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
711 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
712 /// as well as sharing input streams between multiple controllers.
713 ///
714 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
715 ///
716 /// # Example:
717 ///
718 /// ```no_run
719 /// # use futures::StreamExt;
720 /// # use k8s_openapi::api::apps::v1::Deployment;
721 /// # use kube::runtime::controller::{Action, Controller};
722 /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
723 /// # use kube::{Api, Client, Error, ResourceExt};
724 /// # use std::sync::Arc;
725 /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
726 /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
727 /// # async fn doc(client: kube::Client) {
728 /// let api: Api<Deployment> = Api::default_namespaced(client);
729 /// let (reader, writer) = reflector::store();
730 /// let deploys = watcher(api, watcher::Config::default())
731 /// .default_backoff()
732 /// .reflect(writer)
733 /// .applied_objects()
734 /// .predicate_filter(predicates::generation);
735 ///
736 /// Controller::for_stream(deploys, reader)
737 /// .run(reconcile, error_policy, Arc::new(()))
738 /// .for_each(|_| std::future::ready(()))
739 /// .await;
740 /// # }
741 /// ```
742 ///
743 /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
744 #[cfg(feature = "unstable-runtime-stream-control")]
745 pub fn for_stream(
746 trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
747 reader: Store<K>,
748 ) -> Self
749 where
750 K::DynamicType: Default,
751 {
752 Self::for_stream_with(trigger, reader, Default::default())
753 }
754
755 /// Create a Controller for a resource `K` from a stream of `K` objects
756 ///
757 /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
758 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
759 /// as well as sharing input streams between multiple controllers.
760 ///
761 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
762 ///
763 /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
764 ///
765 /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::for_stream`] for static types.
766 ///
767 /// [`dynamic`]: kube_client::core::dynamic
768 #[cfg(feature = "unstable-runtime-stream-control")]
769 pub fn for_stream_with(
770 trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
771 reader: Store<K>,
772 dyntype: K::DynamicType,
773 ) -> Self {
774 let mut trigger_selector = stream::SelectAll::new();
775 let self_watcher = trigger_self(trigger, dyntype.clone()).boxed();
776 trigger_selector.push(self_watcher);
777 Self {
778 trigger_selector,
779 trigger_backoff: Box::<DefaultBackoff>::default(),
780 graceful_shutdown_selector: vec![
781 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
782 future::pending().boxed(),
783 ],
784 forceful_shutdown_selector: vec![
785 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
786 future::pending().boxed(),
787 ],
788 dyntype,
789 reader,
790 config: Default::default(),
791 }
792 }
793
794 /// This is the same as [`Controller::for_stream`]. Instead of taking an
795 /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
796 /// streams can be created out-of-band by subscribing on a store `Writer`.
797 /// Through this interface, multiple controllers can use the same root
798 /// (shared) input stream of resources to keep memory overheads smaller.
799 ///
800 /// **N.B**: This constructor requires an
801 /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
802 /// feature.
803 ///
804 /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
805 /// need to share the stream.
806 ///
807 /// ## Warning:
808 ///
809 /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
810 /// is driven to readiness independently of this controller to ensure the
811 /// watcher never deadlocks.
812 ///
813 /// # Example:
814 ///
815 /// ```no_run
816 /// # use futures::StreamExt;
817 /// # use k8s_openapi::api::apps::v1::Deployment;
818 /// # use kube::runtime::controller::{Action, Controller};
819 /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
820 /// # use kube::{Api, Client, Error, ResourceExt};
821 /// # use std::sync::Arc;
822 /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
823 /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
824 /// # async fn doc(client: kube::Client) {
825 /// let api: Api<Deployment> = Api::default_namespaced(client);
826 /// let (reader, writer) = reflector::store_shared(128);
827 /// let subscriber = writer
828 /// .subscribe()
829 /// .expect("subscribers can only be created from shared stores");
830 /// let deploys = watcher(api, watcher::Config::default())
831 /// .default_backoff()
832 /// .reflect(writer)
833 /// .applied_objects()
834 /// .for_each(|ev| async move {
835 /// match ev {
836 /// Ok(obj) => tracing::info!("got obj {obj:?}"),
837 /// Err(error) => tracing::error!(%error, "received error")
838 /// }
839 /// });
840 ///
841 /// let controller = Controller::for_shared_stream(subscriber, reader)
842 /// .run(reconcile, error_policy, Arc::new(()))
843 /// .for_each(|ev| async move {
844 /// tracing::info!("reconciled {ev:?}")
845 /// });
846 ///
847 /// // Drive streams using a select statement
848 /// tokio::select! {
849 /// _ = deploys => {},
850 /// _ = controller => {},
851 /// }
852 /// # }
853 #[cfg(feature = "unstable-runtime-subscribe")]
854 pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
855 where
856 K::DynamicType: Default,
857 {
858 Self::for_shared_stream_with(trigger, reader, Default::default())
859 }
860
861 /// This is the same as [`Controller::for_stream`]. Instead of taking an
862 /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
863 /// streams can be created out-of-band by subscribing on a store `Writer`.
864 /// Through this interface, multiple controllers can use the same root
865 /// (shared) input stream of resources to keep memory overheads smaller.
866 ///
867 /// **N.B**: This constructor requires an
868 /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
869 /// feature.
870 ///
871 /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
872 /// need to share the stream.
873 ///
874 /// This variant constructor is used for [`dynamic`] types found through
875 /// discovery. Prefer [`Controller::for_shared_stream`] for static types (i.e.
876 /// known at compile time).
877 ///
878 /// [`dynamic`]: kube_client::core::dynamic
879 #[cfg(feature = "unstable-runtime-subscribe")]
880 pub fn for_shared_stream_with(
881 trigger: impl Stream<Item = Arc<K>> + Send + 'static,
882 reader: Store<K>,
883 dyntype: K::DynamicType,
884 ) -> Self {
885 let mut trigger_selector = stream::SelectAll::new();
886 let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
887 trigger_selector.push(self_watcher);
888 Self {
889 trigger_selector,
890 trigger_backoff: Box::<DefaultBackoff>::default(),
891 graceful_shutdown_selector: vec![
892 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
893 future::pending().boxed(),
894 ],
895 forceful_shutdown_selector: vec![
896 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
897 future::pending().boxed(),
898 ],
899 dyntype,
900 reader,
901 config: Default::default(),
902 }
903 }
904
905 /// Specify the configuration for the controller's behavior.
906 #[must_use]
907 pub fn with_config(mut self, config: Config) -> Self {
908 self.config = config;
909 self
910 }
911
912 /// Specify the backoff policy for "trigger" watches
913 ///
914 /// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`].
915 ///
916 /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
917 /// but can be overridden by calling this method.
918 #[must_use]
919 pub fn trigger_backoff(mut self, backoff: impl Backoff + 'static) -> Self {
920 self.trigger_backoff = Box::new(backoff);
921 self
922 }
923
924 /// Retrieve a copy of the reader before starting the controller
925 pub fn store(&self) -> Store<K> {
926 self.reader.clone()
927 }
928
929 /// Specify `Child` objects which `K` owns and should be watched
930 ///
931 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Child`.
932 /// All owned `Child` objects **must** contain an [`OwnerReference`] pointing back to a `K`.
933 ///
934 /// The [`watcher::Config`] controls the subset of `Child` objects that you want the [`Api`]
935 /// to watch - in the Api's configured scope - and receive reconcile events for.
936 /// To watch the full set of `Child` objects in the given `Api` scope, you can use [`watcher::Config::default`].
937 ///
938 /// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
939 #[must_use]
940 pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
941 self,
942 api: Api<Child>,
943 wc: watcher::Config,
944 ) -> Self {
945 self.owns_with(api, (), wc)
946 }
947
948 /// Specify `Child` objects which `K` owns and should be watched
949 ///
950 /// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources.
951 #[must_use]
952 pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
953 mut self,
954 api: Api<Child>,
955 dyntype: Child::DynamicType,
956 wc: watcher::Config,
957 ) -> Self
958 where
959 Child::DynamicType: Debug + Eq + Hash + Clone,
960 {
961 // TODO: call owns_stream_with when it's stable
962 let child_watcher = trigger_owners(
963 metadata_watcher(api, wc).touched_objects(),
964 self.dyntype.clone(),
965 dyntype,
966 );
967 self.trigger_selector.push(child_watcher.boxed());
968 self
969 }
970
971 /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
972 ///
973 /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
974 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
975 /// as well as sharing input streams between multiple controllers.
976 ///
977 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
978 ///
979 /// Watcher streams passed in here should be filtered first through `touched_objects`.
980 ///
981 /// # Example:
982 ///
983 /// ```no_run
984 /// # use futures::StreamExt;
985 /// # use k8s_openapi::api::core::v1::ConfigMap;
986 /// # use k8s_openapi::api::apps::v1::StatefulSet;
987 /// # use kube::runtime::controller::Action;
988 /// # use kube::runtime::{predicates, metadata_watcher, watcher, Controller, WatchStreamExt};
989 /// # use kube::{Api, Client, Error, ResourceExt};
990 /// # use std::sync::Arc;
991 /// # type CustomResource = ConfigMap;
992 /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
993 /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
994 /// # async fn doc(client: kube::Client) {
995 /// let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
996 /// .touched_objects()
997 /// .predicate_filter(predicates::generation);
998 ///
999 /// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
1000 /// .owns_stream(sts_stream)
1001 /// .run(reconcile, error_policy, Arc::new(()))
1002 /// .for_each(|_| std::future::ready(()))
1003 /// .await;
1004 /// # }
1005 /// ```
1006 #[cfg(feature = "unstable-runtime-stream-control")]
1007 #[must_use]
1008 pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1009 self,
1010 trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1011 ) -> Self {
1012 self.owns_stream_with(trigger, ())
1013 }
1014
1015 /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
1016 ///
1017 /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
1018 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1019 /// as well as sharing input streams between multiple controllers.
1020 ///
1021 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1022 ///
1023 /// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1024 #[cfg(feature = "unstable-runtime-stream-control")]
1025 #[must_use]
1026 pub fn owns_stream_with<Child: Resource + Send + 'static>(
1027 mut self,
1028 trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1029 dyntype: Child::DynamicType,
1030 ) -> Self
1031 where
1032 Child::DynamicType: Debug + Eq + Hash + Clone,
1033 {
1034 let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype);
1035 self.trigger_selector.push(child_watcher.boxed());
1036 self
1037 }
1038
1039 /// This is the same as [`Controller::for_stream`]. Instead of taking an
1040 /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
1041 /// streams can be created out-of-band by subscribing on a store `Writer`.
1042 /// Through this interface, multiple controllers can use the same root
1043 /// (shared) input stream of resources to keep memory overheads smaller.
1044 ///
1045 /// **N.B**: This constructor requires an
1046 /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
1047 /// feature.
1048 ///
1049 /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
1050 /// need to share the stream.
1051 ///
1052 /// ## Warning:
1053 ///
1054 /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
1055 /// is driven to readiness independently of this controller to ensure the
1056 /// watcher never deadlocks.
1057 ///
1058 ///
1059 /// Trigger the reconciliation process for a shared stream of `Child`
1060 /// objects of the owner `K`
1061 ///
1062 /// Conceptually the same as [`Controller::owns`], but a stream is used
1063 /// instead of an `Api`. This interface behaves similarly to its non-shared
1064 /// counterpart [`Controller::owns_stream`].
1065 ///
1066 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1067 ///
1068 /// # Example:
1069 ///
1070 /// ```no_run
1071 /// # use futures::StreamExt;
1072 /// # use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod};
1073 /// # use kube::runtime::controller::{Action, Controller};
1074 /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
1075 /// # use kube::{Api, Client, Error, ResourceExt};
1076 /// # use std::sync::Arc;
1077 /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1078 /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1079 /// # async fn doc(client: kube::Client) {
1080 /// let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
1081 /// let pod_api: Api<Pod> = Api::default_namespaced(client);
1082 ///
1083 /// let (reader, writer) = reflector::store_shared(128);
1084 /// let subscriber = writer
1085 /// .subscribe()
1086 /// .expect("subscribers can only be created from shared stores");
1087 /// let pods = watcher(pod_api, watcher::Config::default())
1088 /// .default_backoff()
1089 /// .reflect(writer)
1090 /// .applied_objects()
1091 /// .for_each(|ev| async move {
1092 /// match ev {
1093 /// Ok(obj) => tracing::info!("got obj {obj:?}"),
1094 /// Err(error) => tracing::error!(%error, "received error")
1095 /// }
1096 /// });
1097 ///
1098 /// let controller = Controller::new(deploys, Default::default())
1099 /// .owns_shared_stream(subscriber)
1100 /// .run(reconcile, error_policy, Arc::new(()))
1101 /// .for_each(|ev| async move {
1102 /// tracing::info!("reconciled {ev:?}")
1103 /// });
1104 ///
1105 /// // Drive streams using a select statement
1106 /// tokio::select! {
1107 /// _ = pods => {},
1108 /// _ = controller => {},
1109 /// }
1110 /// # }
1111 #[cfg(feature = "unstable-runtime-subscribe")]
1112 #[must_use]
1113 pub fn owns_shared_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1114 self,
1115 trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1116 ) -> Self {
1117 self.owns_shared_stream_with(trigger, ())
1118 }
1119
1120 /// Trigger the reconciliation process for a shared stream of `Child` objects of the owner `K`
1121 ///
1122 /// Same as [`Controller::owns`], but instead of an `Api`, a shared stream of resources is used.
1123 /// The source stream can be shared between multiple controllers, optimising
1124 /// resource usage.
1125 ///
1126 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1127 ///
1128 /// Same as [`Controller::owns_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1129 #[cfg(feature = "unstable-runtime-subscribe")]
1130 #[must_use]
1131 pub fn owns_shared_stream_with<Child: Resource<DynamicType = ()> + Send + 'static>(
1132 mut self,
1133 trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1134 dyntype: Child::DynamicType,
1135 ) -> Self
1136 where
1137 Child::DynamicType: Debug + Eq + Hash + Clone,
1138 {
1139 let child_watcher = trigger_owners_shared(trigger.map(Ok), self.dyntype.clone(), dyntype);
1140 self.trigger_selector.push(child_watcher.boxed());
1141 self
1142 }
1143
1144 /// Specify `Watched` object which `K` has a custom relation to and should be watched
1145 ///
1146 /// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,
1147 /// when given a `Watched` object, returns an option or iterator of relevant `ObjectRef<K>` to reconcile.
1148 ///
1149 /// If the relation `K` has to `Watched` is that `K` owns `Watched`, consider using [`Controller::owns`].
1150 ///
1151 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Watched`.
1152 ///
1153 /// The [`watcher::Config`] controls the subset of `Watched` objects that you want the [`Api`]
1154 /// to watch - in the Api's configured scope - and run through the custom mapper.
1155 /// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`watcher::Config::default`].
1156 ///
1157 /// # Example
1158 ///
1159 /// Tracking cross cluster references using the [Operator-SDK] annotations.
1160 ///
1161 /// ```
1162 /// # use kube::runtime::{Controller, controller::Action, reflector::ObjectRef, watcher};
1163 /// # use kube::{Api, ResourceExt};
1164 /// # use k8s_openapi::api::core::v1::{ConfigMap, Namespace};
1165 /// # use futures::StreamExt;
1166 /// # use std::sync::Arc;
1167 /// # type WatchedResource = Namespace;
1168 /// # struct Context;
1169 /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<Context>) -> Result<Action, kube::Error> {
1170 /// # Ok(Action::await_change())
1171 /// # };
1172 /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Context>) -> Action {
1173 /// # Action::await_change()
1174 /// # }
1175 /// # async fn doc(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
1176 /// # let memcached = Api::<ConfigMap>::all(client.clone());
1177 /// # let context = Arc::new(Context);
1178 /// Controller::new(memcached, watcher::Config::default())
1179 /// .watches(
1180 /// Api::<WatchedResource>::all(client.clone()),
1181 /// watcher::Config::default(),
1182 /// |ar| {
1183 /// let prt = ar
1184 /// .annotations()
1185 /// .get("operator-sdk/primary-resource-type")
1186 /// .map(String::as_str);
1187 ///
1188 /// if prt != Some("Memcached.cache.example.com") {
1189 /// return None;
1190 /// }
1191 ///
1192 /// let (namespace, name) = ar
1193 /// .annotations()
1194 /// .get("operator-sdk/primary-resource")?
1195 /// .split_once('/')?;
1196 ///
1197 /// Some(ObjectRef::new(name).within(namespace))
1198 /// }
1199 /// )
1200 /// .run(reconcile, error_policy, context)
1201 /// .for_each(|_| futures::future::ready(()))
1202 /// .await;
1203 /// # Ok(())
1204 /// # }
1205 /// ```
1206 ///
1207 /// [Operator-SDK]: https://sdk.operatorframework.io/docs/building-operators/ansible/reference/retroactively-owned-resources/
1208 #[must_use]
1209 pub fn watches<Other, I>(
1210 self,
1211 api: Api<Other>,
1212 wc: watcher::Config,
1213 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1214 ) -> Self
1215 where
1216 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1217 Other::DynamicType: Default + Debug + Clone + Eq + Hash,
1218 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1219 I::IntoIter: Send,
1220 {
1221 self.watches_with(api, Default::default(), wc, mapper)
1222 }
1223
1224 /// Specify `Watched` object which `K` has a custom relation to and should be watched
1225 ///
1226 /// Same as [`Controller::watches`], but accepts a `DynamicType` so it can be used with dynamic resources.
1227 #[must_use]
1228 pub fn watches_with<Other, I>(
1229 mut self,
1230 api: Api<Other>,
1231 dyntype: Other::DynamicType,
1232 wc: watcher::Config,
1233 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1234 ) -> Self
1235 where
1236 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1237 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1238 I::IntoIter: Send,
1239 Other::DynamicType: Debug + Clone + Eq + Hash,
1240 {
1241 let other_watcher = trigger_others(watcher(api, wc).touched_objects(), mapper, dyntype);
1242 self.trigger_selector.push(other_watcher.boxed());
1243 self
1244 }
1245
1246 /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1247 ///
1248 /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1249 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1250 /// as well as sharing input streams between multiple controllers.
1251 ///
1252 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1253 ///
1254 /// Watcher streams passed in here should be filtered first through `touched_objects`.
1255 ///
1256 /// # Example:
1257 ///
1258 /// ```no_run
1259 /// # use futures::StreamExt;
1260 /// # use k8s_openapi::api::core::v1::ConfigMap;
1261 /// # use k8s_openapi::api::apps::v1::DaemonSet;
1262 /// # use kube::runtime::controller::Action;
1263 /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1264 /// # use kube::{Api, Client, Error, ResourceExt};
1265 /// # use std::sync::Arc;
1266 /// # type CustomResource = ConfigMap;
1267 /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1268 /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1269 /// fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
1270 /// # async fn doc(client: kube::Client) {
1271 /// let api: Api<DaemonSet> = Api::all(client.clone());
1272 /// let cr: Api<CustomResource> = Api::all(client.clone());
1273 /// let daemons = watcher(api, watcher::Config::default())
1274 /// .touched_objects()
1275 /// .predicate_filter(predicates::generation);
1276 ///
1277 /// Controller::new(cr, watcher::Config::default())
1278 /// .watches_stream(daemons, mapper)
1279 /// .run(reconcile, error_policy, Arc::new(()))
1280 /// .for_each(|_| std::future::ready(()))
1281 /// .await;
1282 /// # }
1283 /// ```
1284 #[cfg(feature = "unstable-runtime-stream-control")]
1285 #[must_use]
1286 pub fn watches_stream<Other, I>(
1287 self,
1288 trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1289 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1290 ) -> Self
1291 where
1292 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1293 Other::DynamicType: Default + Debug + Clone,
1294 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1295 I::IntoIter: Send,
1296 {
1297 self.watches_stream_with(trigger, mapper, Default::default())
1298 }
1299
1300 /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1301 ///
1302 /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1303 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1304 /// as well as sharing input streams between multiple controllers.
1305 ///
1306 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1307 ///
1308 /// Same as [`Controller::watches_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1309 #[cfg(feature = "unstable-runtime-stream-control")]
1310 #[must_use]
1311 pub fn watches_stream_with<Other, I>(
1312 mut self,
1313 trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1314 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1315 dyntype: Other::DynamicType,
1316 ) -> Self
1317 where
1318 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1319 Other::DynamicType: Debug + Clone,
1320 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1321 I::IntoIter: Send,
1322 {
1323 let other_watcher = trigger_others(trigger, mapper, dyntype);
1324 self.trigger_selector.push(other_watcher.boxed());
1325 self
1326 }
1327
1328 /// Trigger the reconciliation process for a shared stream of `Other`
1329 /// objects related to a `K`
1330 ///
1331 /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1332 /// stream of resources is used. This allows for sharing input streams
1333 /// between multiple controllers.
1334 ///
1335 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1336 ///
1337 /// Watcher streams passed in here should be filtered first through `touched_objects`.
1338 ///
1339 /// # Example:
1340 ///
1341 /// ```no_run
1342 /// # use futures::StreamExt;
1343 /// # use k8s_openapi::api::core::v1::ConfigMap;
1344 /// # use k8s_openapi::api::apps::v1::DaemonSet;
1345 /// # use kube::runtime::controller::Action;
1346 /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1347 /// # use kube::{Api, Client, Error, ResourceExt};
1348 /// # use std::sync::Arc;
1349 /// # type CustomResource = ConfigMap;
1350 /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1351 /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1352 /// fn mapper(_: Arc<DaemonSet>) -> Option<ObjectRef<CustomResource>> { todo!() }
1353 /// # async fn doc(client: kube::Client) {
1354 /// let api: Api<DaemonSet> = Api::all(client.clone());
1355 /// let cr: Api<CustomResource> = Api::all(client.clone());
1356 /// let (reader, writer) = kube_runtime::reflector::store_shared(128);
1357 /// let subscriber = writer
1358 /// .subscribe()
1359 /// .expect("subscribers can only be created from shared stores");
1360 /// let daemons = watcher(api, watcher::Config::default())
1361 /// .reflect(writer)
1362 /// .touched_objects()
1363 /// .for_each(|ev| async move {
1364 /// match ev {
1365 /// Ok(obj) => {},
1366 /// Err(error) => tracing::error!(%error, "received err")
1367 /// }
1368 /// });
1369 ///
1370 /// let controller = Controller::new(cr, watcher::Config::default())
1371 /// .watches_shared_stream(subscriber, mapper)
1372 /// .run(reconcile, error_policy, Arc::new(()))
1373 /// .for_each(|_| std::future::ready(()));
1374 ///
1375 /// // Drive streams using a select statement
1376 /// tokio::select! {
1377 /// _ = daemons => {},
1378 /// _ = controller => {},
1379 /// }
1380 /// # }
1381 /// ```
1382 #[cfg(feature = "unstable-runtime-subscribe")]
1383 #[must_use]
1384 pub fn watches_shared_stream<Other, I>(
1385 self,
1386 trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1387 mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1388 ) -> Self
1389 where
1390 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1391 Other::DynamicType: Default + Debug + Clone,
1392 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1393 I::IntoIter: Send,
1394 {
1395 self.watches_shared_stream_with(trigger, mapper, Default::default())
1396 }
1397
1398 /// Trigger the reconciliation process for a shared stream of `Other` objects related to a `K`
1399 ///
1400 /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1401 /// stream of resources is used. This allows for sharing of streams between
1402 /// multiple controllers.
1403 ///
1404 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1405 ///
1406 /// Same as [`Controller::watches_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1407 #[cfg(feature = "unstable-runtime-subscribe")]
1408 #[must_use]
1409 pub fn watches_shared_stream_with<Other, I>(
1410 mut self,
1411 trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1412 mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1413 dyntype: Other::DynamicType,
1414 ) -> Self
1415 where
1416 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1417 Other::DynamicType: Debug + Clone,
1418 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1419 I::IntoIter: Send,
1420 {
1421 let other_watcher = trigger_others_shared(trigger.map(Ok), mapper, dyntype);
1422 self.trigger_selector.push(other_watcher.boxed());
1423 self
1424 }
1425
1426 /// Trigger a reconciliation for all managed objects whenever `trigger` emits a value
1427 ///
1428 /// For example, this can be used to reconcile all objects whenever the controller's configuration changes.
1429 ///
1430 /// To reconcile all objects when a new line is entered:
1431 ///
1432 /// ```
1433 /// # async {
1434 /// use futures::stream::StreamExt;
1435 /// use k8s_openapi::api::core::v1::ConfigMap;
1436 /// use kube::{
1437 /// Client,
1438 /// api::{Api, ResourceExt},
1439 /// runtime::{
1440 /// controller::{Controller, Action},
1441 /// watcher,
1442 /// },
1443 /// };
1444 /// use std::{convert::Infallible, io::BufRead, sync::Arc};
1445 /// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
1446 /// // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
1447 /// // and its worker prevents the Tokio runtime from shutting down.
1448 /// std::thread::spawn(move || {
1449 /// for _ in std::io::BufReader::new(std::io::stdin()).lines() {
1450 /// let _ = reload_tx.try_send(());
1451 /// }
1452 /// });
1453 /// Controller::new(
1454 /// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1455 /// watcher::Config::default(),
1456 /// )
1457 /// .reconcile_all_on(reload_rx.map(|_| ()))
1458 /// .run(
1459 /// |o, _| async move {
1460 /// println!("Reconciling {}", o.name_any());
1461 /// Ok(Action::await_change())
1462 /// },
1463 /// |_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
1464 /// Arc::new(()),
1465 /// );
1466 /// # };
1467 /// ```
1468 ///
1469 /// This can be called multiple times, in which case they are additive; reconciles are scheduled whenever *any* [`Stream`] emits a new item.
1470 ///
1471 /// If a [`Stream`] is terminated (by emitting [`None`]) then the [`Controller`] keeps running, but the [`Stream`] stops being polled.
1472 #[must_use]
1473 pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
1474 let store = self.store();
1475 let dyntype = self.dyntype.clone();
1476 self.trigger_selector.push(
1477 trigger
1478 .flat_map(move |()| {
1479 let dyntype = dyntype.clone();
1480 stream::iter(store.state().into_iter().map(move |obj| {
1481 Ok(ReconcileRequest {
1482 obj_ref: ObjectRef::from_obj_with(&*obj, dyntype.clone()),
1483 reason: ReconcileReason::BulkReconcile,
1484 })
1485 }))
1486 })
1487 .boxed(),
1488 );
1489 self
1490 }
1491
1492 /// Trigger the reconciliation process for a managed object `ObjectRef<K>` whenever `trigger` emits a value
1493 ///
1494 /// This can be used to inject reconciliations for specific objects from an external resource.
1495 ///
1496 /// # Example:
1497 ///
1498 /// ```no_run
1499 /// # async {
1500 /// # use futures::{StreamExt, Stream, stream, TryStreamExt};
1501 /// # use k8s_openapi::api::core::v1::{ConfigMap};
1502 /// # use kube::api::Api;
1503 /// # use kube::runtime::controller::Action;
1504 /// # use kube::runtime::reflector::{ObjectRef, Store};
1505 /// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt};
1506 /// # use kube::runtime::watcher::Config;
1507 /// # use kube::{Client, Error, ResourceExt};
1508 /// # use std::future;
1509 /// # use std::sync::Arc;
1510 /// #
1511 /// # let client: Client = todo!();
1512 /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1513 /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1514 /// # fn watch_external_objects() -> impl Stream<Item = ExternalObject> { stream::iter(vec![]) }
1515 /// # let ns = "controller-ns".to_string();
1516 /// struct ExternalObject {
1517 /// name: String,
1518 /// }
1519 /// let external_stream = watch_external_objects().map(|ext| {
1520 /// ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)
1521 /// });
1522 ///
1523 /// Controller::new(Api::<ConfigMap>::namespaced(client, &ns), Config::default())
1524 /// .reconcile_on(external_stream)
1525 /// .run(reconcile, error_policy, Arc::new(()))
1526 /// .for_each(|_| future::ready(()))
1527 /// .await;
1528 /// # };
1529 /// ```
1530 #[cfg(feature = "unstable-runtime-reconcile-on")]
1531 #[must_use]
1532 pub fn reconcile_on(mut self, trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static) -> Self {
1533 self.trigger_selector.push(
1534 trigger
1535 .map(move |obj| {
1536 Ok(ReconcileRequest {
1537 obj_ref: obj,
1538 reason: ReconcileReason::Unknown,
1539 })
1540 })
1541 .boxed(),
1542 );
1543 self
1544 }
1545
1546 /// Start a graceful shutdown when `trigger` resolves. Once a graceful shutdown has been initiated:
1547 ///
1548 /// - No new reconciliations are started from the scheduler
1549 /// - The underlying Kubernetes watch is terminated
1550 /// - All running reconciliations are allowed to finish
1551 /// - [`Controller::run`]'s [`Stream`] terminates once all running reconciliations are done.
1552 ///
1553 /// For example, to stop the reconciler whenever the user presses Ctrl+C:
1554 ///
1555 /// ```rust
1556 /// # async {
1557 /// use futures::future::FutureExt;
1558 /// use k8s_openapi::api::core::v1::ConfigMap;
1559 /// use kube::{Api, Client, ResourceExt};
1560 /// use kube_runtime::{
1561 /// controller::{Controller, Action},
1562 /// watcher,
1563 /// };
1564 /// use std::{convert::Infallible, sync::Arc};
1565 /// Controller::new(
1566 /// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1567 /// watcher::Config::default(),
1568 /// )
1569 /// .graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
1570 /// .run(
1571 /// |o, _| async move {
1572 /// println!("Reconciling {}", o.name_any());
1573 /// Ok(Action::await_change())
1574 /// },
1575 /// |_, err: &Infallible, _| Err(err).unwrap(),
1576 /// Arc::new(()),
1577 /// );
1578 /// # };
1579 /// ```
1580 ///
1581 /// This can be called multiple times, in which case they are additive; the [`Controller`] starts to terminate
1582 /// as soon as *any* [`Future`] resolves.
1583 #[must_use]
1584 pub fn graceful_shutdown_on(mut self, trigger: impl Future<Output = ()> + Send + Sync + 'static) -> Self {
1585 self.graceful_shutdown_selector.push(trigger.boxed());
1586 self
1587 }
1588
1589 /// Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
1590 ///
1591 /// Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again
1592 /// to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
1593 ///
1594 /// NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the [`Controller`] has
1595 /// terminated. If you run this in a process containing more tasks than just the [`Controller`], ensure that
1596 /// all other tasks either terminate when the [`Controller`] does, that they have their own signal handlers,
1597 /// or use [`Controller::graceful_shutdown_on`] to manage your own shutdown strategy.
1598 ///
1599 /// NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
1600 /// [`Controller::graceful_shutdown_on`].
1601 ///
1602 /// NOTE: [`Controller::run`] terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
1603 /// in the background while they terminate. This will block [`tokio::runtime::Runtime`] termination until they actually terminate,
1604 /// unless you run [`std::process::exit`] afterwards.
1605 #[must_use]
1606 pub fn shutdown_on_signal(mut self) -> Self {
1607 async fn shutdown_signal() {
1608 futures::future::select(
1609 tokio::signal::ctrl_c().map(|_| ()).boxed(),
1610 #[cfg(unix)]
1611 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1612 .unwrap()
1613 .recv()
1614 .map(|_| ())
1615 .boxed(),
1616 // Assume that ctrl_c is enough on non-Unix platforms (such as Windows)
1617 #[cfg(not(unix))]
1618 futures::future::pending::<()>(),
1619 )
1620 .await;
1621 }
1622
1623 let (graceful_tx, graceful_rx) = channel::oneshot::channel();
1624 self.graceful_shutdown_selector
1625 .push(graceful_rx.map(|_| ()).boxed());
1626 self.forceful_shutdown_selector.push(
1627 async {
1628 tracing::info!("press ctrl+c to shut down gracefully");
1629 shutdown_signal().await;
1630 if let Ok(()) = graceful_tx.send(()) {
1631 tracing::info!("graceful shutdown requested, press ctrl+c again to force shutdown");
1632 } else {
1633 tracing::info!(
1634 "graceful shutdown already requested, press ctrl+c again to force shutdown"
1635 );
1636 }
1637 shutdown_signal().await;
1638 tracing::info!("forced shutdown requested");
1639 }
1640 .boxed(),
1641 );
1642 self
1643 }
1644
1645 /// Consume all the parameters of the Controller and start the applier stream
1646 ///
1647 /// This creates a stream from all builder calls and starts an applier with
1648 /// a specified `reconciler` and `error_policy` callbacks. Each of these will be called
1649 /// with a configurable `context`.
1650 pub fn run<ReconcilerFut, Ctx>(
1651 self,
1652 mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
1653 error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
1654 context: Arc<Ctx>,
1655 ) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
1656 where
1657 K::DynamicType: Debug + Unpin,
1658 ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
1659 ReconcilerFut::Error: std::error::Error + Send + 'static,
1660 {
1661 applier(
1662 move |obj, ctx| {
1663 CancelableJoinHandle::spawn(
1664 reconciler(obj, ctx).into_future().in_current_span(),
1665 &Handle::current(),
1666 )
1667 },
1668 error_policy,
1669 context,
1670 self.reader,
1671 StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
1672 .take_until(future::select_all(self.graceful_shutdown_selector)),
1673 self.config,
1674 )
1675 .take_until(futures::future::select_all(self.forceful_shutdown_selector))
1676 }
1677}
1678
1679#[cfg(test)]
1680mod tests {
1681 use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration};
1682
1683 use super::{Action, APPLIER_REQUEUE_BUF_SIZE};
1684 use crate::{
1685 applier,
1686 reflector::{self, ObjectRef},
1687 watcher::{self, metadata_watcher, watcher, Event},
1688 Config, Controller,
1689 };
1690 use futures::{Stream, StreamExt, TryStreamExt};
1691 use k8s_openapi::api::core::v1::ConfigMap;
1692 use kube_client::{core::ObjectMeta, Api, Resource};
1693 use serde::de::DeserializeOwned;
1694 use tokio::time::timeout;
1695
1696 fn assert_send<T: Send>(x: T) -> T {
1697 x
1698 }
1699
1700 // Used to typecheck that a type T is a generic type that implements Stream
1701 // and returns a WatchEvent generic over a resource `K`
1702 fn assert_stream<T, K>(x: T) -> T
1703 where
1704 T: Stream<Item = watcher::Result<Event<K>>> + Send,
1705 K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
1706 {
1707 x
1708 }
1709
1710 fn mock_type<T>() -> T {
1711 unimplemented!(
1712 "mock_type is not supposed to be called, only used for filling holes in type assertions"
1713 )
1714 }
1715
1716 // not #[test] because we don't want to actually run it, we just want to assert that it typechecks
1717 #[allow(dead_code, unused_must_use)]
1718 fn test_controller_should_be_send() {
1719 assert_send(
1720 Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
1721 |_, _| async { Ok(mock_type::<Action>()) },
1722 |_: Arc<ConfigMap>, _: &std::io::Error, _| mock_type::<Action>(),
1723 Arc::new(()),
1724 ),
1725 );
1726 }
1727
1728 // not #[test] because we don't want to actually run it, we just want to
1729 // assert that it typechecks
1730 //
1731 // will check return types for `watcher` and `watch_metadata` do not drift
1732 // given an arbitrary K that implements `Resource` (e.g ConfigMap)
1733 #[allow(dead_code, unused_must_use)]
1734 fn test_watcher_stream_type_drift() {
1735 assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
1736 assert_stream(metadata_watcher(
1737 mock_type::<Api<ConfigMap>>(),
1738 Default::default(),
1739 ));
1740 }
1741
1742 #[tokio::test]
1743 async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
1744 // This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles
1745 // This is intended to avoid regressing on https://github.com/kube-rs/kube/issues/926
1746
1747 // Assume that we can keep APPLIER_REQUEUE_BUF_SIZE flooded if we have 100x the number of objects "in rotation"
1748 // On my (@nightkr)'s 3900X I can reliably trigger this with 10x, but let's have some safety margin to avoid false negatives
1749 let items = APPLIER_REQUEUE_BUF_SIZE * 50;
1750 // Assume that everything's OK if we can reconcile every object 3 times on average
1751 let reconciles = items * 3;
1752
1753 let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
1754 let (store_rx, mut store_tx) = reflector::store();
1755 let mut applier = pin!(applier(
1756 |_obj, _| {
1757 Box::pin(async move {
1758 // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
1759 //println!("reconciling {:?}", obj.metadata.name);
1760 Ok(Action::requeue(Duration::ZERO))
1761 })
1762 },
1763 |_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
1764 Arc::new(()),
1765 store_rx,
1766 queue_rx.map(Result::<_, Infallible>::Ok),
1767 Config::default(),
1768 ));
1769 store_tx.apply_watcher_event(&watcher::Event::InitDone);
1770 for i in 0..items {
1771 let obj = ConfigMap {
1772 metadata: ObjectMeta {
1773 name: Some(format!("cm-{i}")),
1774 namespace: Some("default".to_string()),
1775 ..Default::default()
1776 },
1777 ..Default::default()
1778 };
1779 store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone()));
1780 queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
1781 }
1782
1783 timeout(
1784 Duration::from_secs(10),
1785 applier
1786 .as_mut()
1787 .take(reconciles)
1788 .try_for_each(|_| async { Ok(()) }),
1789 )
1790 .await
1791 .expect("test timeout expired, applier likely deadlocked")
1792 .unwrap();
1793
1794 // Do an orderly shutdown to ensure that no individual reconcilers are stuck
1795 drop(queue_tx);
1796 timeout(
1797 Duration::from_secs(10),
1798 applier.try_for_each(|_| async { Ok(()) }),
1799 )
1800 .await
1801 .expect("applier cleanup timeout expired, individual reconciler likely deadlocked?")
1802 .unwrap();
1803 }
1804}