Struct kube_runtime::controller::Controller [−][src]
pub struct Controller<K> where
K: Clone + Resource + Debug + 'static,
K::DynamicType: Eq + Hash, { /* fields omitted */ }
Expand description
Controller
A controller is made up of:
- 1
reflector
(for the core object) - N
watcher
objects for each object child object - user defined
reconcile
+error_policy
callbacks - a generated input stream considering all sources
And all reconcile requests through an internal scheduler
Pieces:
use kube::{Client, api::{Api, ListParams}}; use kube_derive::CustomResource; use serde::{Deserialize, Serialize}; use tokio::time::Duration; use futures::StreamExt; use kube_runtime::controller::{Context, Controller, ReconcilerAction}; use k8s_openapi::api::core::v1::ConfigMap; use schemars::JsonSchema; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; #[derive(Debug, Snafu)] enum Error {} /// A custom resource #[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)] #[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)] struct ConfigMapGeneratorSpec { content: String, } /// The reconciler that will be called when either object change async fn reconcile(g: ConfigMapGenerator, _ctx: Context<()>) -> Result<ReconcilerAction, Error> { // .. use api here to reconcile a child ConfigMap with ownerreferences // see configmapgen_controller example for full info Ok(ReconcilerAction { requeue_after: Some(Duration::from_secs(300)), }) } /// an error handler that will be called when the reconciler fails fn error_policy(_error: &Error, _ctx: Context<()>) -> ReconcilerAction { ReconcilerAction { requeue_after: Some(Duration::from_secs(60)), } } /// something to drive the controller #[tokio::main] async fn main() -> Result<(), kube::Error> { let client = Client::try_default().await?; let context = Context::new(()); // bad empty context - put client in here let cmgs = Api::<ConfigMapGenerator>::all(client.clone()); let cms = Api::<ConfigMap>::all(client.clone()); Controller::new(cmgs, ListParams::default()) .owns(cms, ListParams::default()) .run(reconcile, error_policy, context) .for_each(|res| async move { match res { Ok(o) => println!("reconciled {:?}", o), Err(e) => println!("reconcile failed: {:?}", e), } }) .await; // controller does nothing unless polled Ok(()) }
Implementations
Create a Controller on a type K
Takes an Api
object that determines how the Controller
listens for changes to the K
.
The [ListParams
] controls to the possible subset of objects of K
that you want to manage
and receive reconcile events for.
For the full set of objects K
in the given Api
scope, you can use [ListParams::default
].
impl<K> Controller<K> where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone,
impl<K> Controller<K> where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone,
Create a Controller on a type K
Takes an Api
object that determines how the Controller
listens for changes to the K
.
The ListParams
lets you define a possible subset of objects of K
that you want the Api
to watch - in the Api’s configured scope - and receive reconcile events for.
For the full set of objects K
in the given Api
scope, you can use ListParams::default
.
This variant constructor is for dynamic
types found through discovery. Prefer Controller::new
for static types.
Specify Child
objects which K
owns and should be watched
Takes an Api
object that determines how the Controller
listens for changes to the Child
.
All owned Child
objects must contain an OwnerReference
pointing back to a K
.
The [ListParams
] refer to the possible subset of Child
objects that you want the Api
to watch - in the Api’s configured scope - and receive reconcile events for.
To watch the full set of Child
objects in the given Api
scope, you can use [ListParams::default
].
Specify Child
objects which K
owns and should be watched
Same as Controller::owns
, but accepts a DynamicType
so it can be used with dynamic resources.
Specify Watched
object which K
has a custom relation to and should be watched
To define the Watched
relation with K
, you must define a custom relation mapper, which,
when given a Watched
object, returns an option or iterator of relevant ObjectRef<K>
to reconcile.
If the relation K
has to Watched
is that K
owns Watched
, consider using Controller::owns
.
Takes an Api
object that determines how the Controller
listens for changes to the Watched
.
The [ListParams
] refer to the possible subset of Watched
objects that you want the Api
to watch - in the Api’s configured scope - and run through the custom mapper.
To watch the full set of Watched
objects in given the Api
scope, you can use [ListParams::default
].
pub fn watches_with<Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>(
self,
api: Api<Other>,
dyntype: Other::DynamicType,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static
) -> Self where
I::IntoIter: Send,
Other::DynamicType: Clone,
pub fn watches_with<Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>(
self,
api: Api<Other>,
dyntype: Other::DynamicType,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static
) -> Self where
I::IntoIter: Send,
Other::DynamicType: Clone,
Specify Watched
object which K
has a custom relation to and should be watched
Same as Controller::watches
, but accepts a DynamicType
so it can be used with dynamic resources.
pub fn reconcile_all_on(
self,
trigger: impl Stream<Item = ()> + Send + Sync + 'static
) -> Self
pub fn reconcile_all_on(
self,
trigger: impl Stream<Item = ()> + Send + Sync + 'static
) -> Self
Trigger a reconciliation for all managed objects whenever trigger
emits a value
For example, this can be used to reconcile all objects whenever the controller’s configuration changes.
To reconcile all objects when a new line is entered:
use futures::stream::StreamExt; use k8s_openapi::api::core::v1::ConfigMap; use kube::{api::ListParams, Api, Client, ResourceExt}; use kube_runtime::controller::{Context, Controller, ReconcilerAction}; use std::{convert::Infallible, io::BufRead}; let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0); // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads, // and its worker prevents the Tokio runtime from shutting down. std::thread::spawn(move || { for _ in std::io::BufReader::new(std::io::stdin()).lines() { let _ = reload_tx.try_send(()); } }); Controller::new( Api::<ConfigMap>::all(Client::try_default().await.unwrap()), ListParams::default(), ) .reconcile_all_on(reload_rx.map(|_| ())) .run( |o, _| async move { println!("Reconciling {}", o.name()); Ok(ReconcilerAction { requeue_after: None }) }, |err: &Infallible, _| Err(err).unwrap(), Context::new(()), );
This can be called multiple times, in which case they are additive; reconciles are scheduled whenever any [Stream
] emits a new item.
If a [Stream
] is terminated (by emitting None
) then the Controller
keeps running, but the [Stream
] stops being polled.
Start a graceful shutdown when trigger
resolves. Once a graceful shutdown has been initiated:
- No new reconciliations are started from the scheduler
- The underlying Kubernetes watch is terminated
- All running reconciliations are allowed to finish
Controller::run
’s [Stream
] terminates once all running reconciliations are done.
For example, to stop the reconciler whenever the user presses Ctrl+C:
use futures::future::FutureExt; use k8s_openapi::api::core::v1::ConfigMap; use kube::{api::ListParams, Api, Client, ResourceExt}; use kube_runtime::controller::{Context, Controller, ReconcilerAction}; use std::convert::Infallible; Controller::new( Api::<ConfigMap>::all(Client::try_default().await.unwrap()), ListParams::default(), ) .graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ())) .run( |o, _| async move { println!("Reconciling {}", o.name()); Ok(ReconcilerAction { requeue_after: None }) }, |err: &Infallible, _| Err(err).unwrap(), Context::new(()), );
This can be called multiple times, in which case they are additive; the Controller
starts to terminate
as soon as any Future
resolves.
Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the Controller
has
terminated. If you run this in a process containing more tasks than just the Controller
, ensure that
all other tasks either terminate when the Controller
does, that they have their own signal handlers,
or use Controller::graceful_shutdown_on
to manage your own shutdown strategy.
NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
Controller::graceful_shutdown_on
.
NOTE: Controller::run
terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
in the background while they terminate. This will block tokio::runtime::Runtime
termination until they actually terminate,
unless you run std::process::exit
afterwards.
pub fn run<ReconcilerFut, T>(
self,
reconciler: impl FnMut(K, Context<T>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
context: Context<T>
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, Error>>> where
K::DynamicType: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Send + 'static,
ReconcilerFut::Error: Error + Send + 'static,
pub fn run<ReconcilerFut, T>(
self,
reconciler: impl FnMut(K, Context<T>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
context: Context<T>
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, Error>>> where
K::DynamicType: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Send + 'static,
ReconcilerFut::Error: Error + Send + 'static,
Consume all the parameters of the Controller and start the applier stream
This creates a stream from all builder calls and starts an applier with
a specified reconciler
and error_policy
callbacks. Each of these will be called
with a configurable Context
.
Auto Trait Implementations
impl<K> !RefUnwindSafe for Controller<K>
impl<K> Send for Controller<K> where
K: Send + Sync,
impl<K> !Sync for Controller<K>
impl<K> Unpin for Controller<K> where
<K as Resource>::DynamicType: Unpin,
impl<K> !UnwindSafe for Controller<K>