Struct kube_runtime::controller::Controller
source · pub struct Controller<K>where
K: Clone + Resource + Debug + 'static,
K::DynamicType: Eq + Hash,{ /* private fields */ }
Expand description
Controller
A controller is made up of:
- 1
reflector
(for the core object) - N
watcher
objects for each object child object - user defined
reconcile
+error_policy
callbacks - a generated input stream considering all sources
And all reconcile requests through an internal scheduler
Pieces:
use kube::{
Client, CustomResource,
api::{Api, ListParams},
runtime::controller::{Controller, Action}
};
use serde::{Deserialize, Serialize};
use tokio::time::Duration;
use futures::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use schemars::JsonSchema;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Error)]
enum Error {}
/// A custom resource
#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
#[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
struct ConfigMapGeneratorSpec {
content: String,
}
/// The reconciler that will be called when either object change
async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
// .. use api here to reconcile a child ConfigMap with ownerreferences
// see configmapgen_controller example for full info
Ok(Action::requeue(Duration::from_secs(300)))
}
/// an error handler that will be called when the reconciler fails with access to both the
/// object that caused the failure and the actual error
fn error_policy(obj: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<()>) -> Action {
Action::requeue(Duration::from_secs(60))
}
/// something to drive the controller
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let context = Arc::new(()); // bad empty context - put client in here
let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
let cms = Api::<ConfigMap>::all(client.clone());
Controller::new(cmgs, ListParams::default())
.owns(cms, ListParams::default())
.run(reconcile, error_policy, context)
.for_each(|res| async move {
match res {
Ok(o) => println!("reconciled {:?}", o),
Err(e) => println!("reconcile failed: {:?}", e),
}
})
.await; // controller does nothing unless polled
Ok(())
}
Implementations§
source§impl<K> Controller<K>where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone,
impl<K> Controller<K>where K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static, K::DynamicType: Eq + Hash + Clone,
sourcepub fn new(owned_api: Api<K>, lp: ListParams) -> Selfwhere
K::DynamicType: Default,
pub fn new(owned_api: Api<K>, lp: ListParams) -> Selfwhere K::DynamicType: Default,
Create a Controller on a type K
Takes an Api
object that determines how the Controller
listens for changes to the K
.
The [ListParams
] controls to the possible subset of objects of K
that you want to manage
and receive reconcile events for.
For the full set of objects K
in the given Api
scope, you can use [ListParams::default
].
sourcepub fn new_with(
owned_api: Api<K>,
lp: ListParams,
dyntype: K::DynamicType
) -> Self
pub fn new_with( owned_api: Api<K>, lp: ListParams, dyntype: K::DynamicType ) -> Self
Create a Controller on a type K
Takes an Api
object that determines how the Controller
listens for changes to the K
.
The ListParams
lets you define a possible subset of objects of K
that you want the Api
to watch - in the Api’s configured scope - and receive reconcile events for.
For the full set of objects K
in the given Api
scope, you can use ListParams::default
.
This variant constructor is for dynamic
types found through discovery. Prefer Controller::new
for static types.
sourcepub fn trigger_backoff(self, backoff: impl Backoff + Send + 'static) -> Self
pub fn trigger_backoff(self, backoff: impl Backoff + Send + 'static) -> Self
Specify the backoff policy for “trigger” watches
This includes the core watch, as well as auxilary watches introduced by Self::owns
and Self::watches
.
The default_backoff
follows client-go conventions,
but can be overridden by calling this method.
sourcepub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
lp: ListParams
) -> Self
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>( self, api: Api<Child>, lp: ListParams ) -> Self
Specify Child
objects which K
owns and should be watched
Takes an Api
object that determines how the Controller
listens for changes to the Child
.
All owned Child
objects must contain an OwnerReference
pointing back to a K
.
The [ListParams
] refer to the possible subset of Child
objects that you want the Api
to watch - in the Api’s configured scope - and receive reconcile events for.
To watch the full set of Child
objects in the given Api
scope, you can use [ListParams::default
].
sourcepub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
dyntype: Child::DynamicType,
lp: ListParams
) -> Selfwhere
Child::DynamicType: Debug + Eq + Hash + Clone,
pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>( self, api: Api<Child>, dyntype: Child::DynamicType, lp: ListParams ) -> Selfwhere Child::DynamicType: Debug + Eq + Hash + Clone,
Specify Child
objects which K
owns and should be watched
Same as Controller::owns
, but accepts a DynamicType
so it can be used with dynamic resources.
sourcepub fn watches<Other: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>(
self,
api: Api<Other>,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static
) -> Selfwhere
I::IntoIter: Send,
pub fn watches<Other: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>( self, api: Api<Other>, lp: ListParams, mapper: impl Fn(Other) -> I + Sync + Send + 'static ) -> Selfwhere I::IntoIter: Send,
Specify Watched
object which K
has a custom relation to and should be watched
To define the Watched
relation with K
, you must define a custom relation mapper, which,
when given a Watched
object, returns an option or iterator of relevant ObjectRef<K>
to reconcile.
If the relation K
has to Watched
is that K
owns Watched
, consider using Controller::owns
.
Takes an Api
object that determines how the Controller
listens for changes to the Watched
.
The [ListParams
] refer to the possible subset of Watched
objects that you want the Api
to watch - in the Api’s configured scope - and run through the custom mapper.
To watch the full set of Watched
objects in given the Api
scope, you can use [ListParams::default
].
Example
Tracking cross cluster references using the Operator-SDK annotations.
Controller::new(memcached, ListParams::default())
.watches(
Api::<WatchedResource>::all(client.clone()),
ListParams::default(),
|ar| {
let prt = ar
.annotations()
.get("operator-sdk/primary-resource-type")
.map(String::as_str);
if prt != Some("Memcached.cache.example.com") {
return None;
}
let (namespace, name) = ar
.annotations()
.get("operator-sdk/primary-resource")?
.split_once('/')?;
Some(ObjectRef::new(name).within(namespace))
}
)
.run(reconcile, error_policy, context)
.for_each(|_| futures::future::ready(()))
.await;
sourcepub fn watches_with<Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>(
self,
api: Api<Other>,
dyntype: Other::DynamicType,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static
) -> Selfwhere
I::IntoIter: Send,
Other::DynamicType: Clone,
pub fn watches_with<Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>( self, api: Api<Other>, dyntype: Other::DynamicType, lp: ListParams, mapper: impl Fn(Other) -> I + Sync + Send + 'static ) -> Selfwhere I::IntoIter: Send, Other::DynamicType: Clone,
Specify Watched
object which K
has a custom relation to and should be watched
Same as Controller::watches
, but accepts a DynamicType
so it can be used with dynamic resources.
sourcepub fn reconcile_all_on(
self,
trigger: impl Stream<Item = ()> + Send + Sync + 'static
) -> Self
pub fn reconcile_all_on( self, trigger: impl Stream<Item = ()> + Send + Sync + 'static ) -> Self
Trigger a reconciliation for all managed objects whenever trigger
emits a value
For example, this can be used to reconcile all objects whenever the controller’s configuration changes.
To reconcile all objects when a new line is entered:
use futures::stream::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
Client,
api::{ListParams, Api, ResourceExt},
runtime::{controller::{Controller, Action}},
};
use std::{convert::Infallible, io::BufRead, sync::Arc};
let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
// Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
// and its worker prevents the Tokio runtime from shutting down.
std::thread::spawn(move || {
for _ in std::io::BufReader::new(std::io::stdin()).lines() {
let _ = reload_tx.try_send(());
}
});
Controller::new(
Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
ListParams::default(),
)
.reconcile_all_on(reload_rx.map(|_| ()))
.run(
|o, _| async move {
println!("Reconciling {}", o.name_any());
Ok(Action::await_change())
},
|_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
Arc::new(()),
);
This can be called multiple times, in which case they are additive; reconciles are scheduled whenever any [Stream
] emits a new item.
If a [Stream
] is terminated (by emitting None
) then the Controller
keeps running, but the [Stream
] stops being polled.
sourcepub fn graceful_shutdown_on(
self,
trigger: impl Future<Output = ()> + Send + Sync + 'static
) -> Self
pub fn graceful_shutdown_on( self, trigger: impl Future<Output = ()> + Send + Sync + 'static ) -> Self
Start a graceful shutdown when trigger
resolves. Once a graceful shutdown has been initiated:
- No new reconciliations are started from the scheduler
- The underlying Kubernetes watch is terminated
- All running reconciliations are allowed to finish
Controller::run
’s [Stream
] terminates once all running reconciliations are done.
For example, to stop the reconciler whenever the user presses Ctrl+C:
use futures::future::FutureExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{api::ListParams, Api, Client, ResourceExt};
use kube_runtime::controller::{Controller, Action};
use std::{convert::Infallible, sync::Arc};
Controller::new(
Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
ListParams::default(),
)
.graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
.run(
|o, _| async move {
println!("Reconciling {}", o.name_any());
Ok(Action::await_change())
},
|_, err: &Infallible, _| Err(err).unwrap(),
Arc::new(()),
);
This can be called multiple times, in which case they are additive; the Controller
starts to terminate
as soon as any Future
resolves.
sourcepub fn shutdown_on_signal(self) -> Self
pub fn shutdown_on_signal(self) -> Self
Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the Controller
has
terminated. If you run this in a process containing more tasks than just the Controller
, ensure that
all other tasks either terminate when the Controller
does, that they have their own signal handlers,
or use Controller::graceful_shutdown_on
to manage your own shutdown strategy.
NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
Controller::graceful_shutdown_on
.
NOTE: Controller::run
terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
in the background while they terminate. This will block tokio::runtime::Runtime
termination until they actually terminate,
unless you run std::process::exit
afterwards.
sourcepub fn run<ReconcilerFut, Ctx>(
self,
reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, Error>>>where
K::DynamicType: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
ReconcilerFut::Error: Error + Send + 'static,
pub fn run<ReconcilerFut, Ctx>( self, reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut, error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action, context: Arc<Ctx> ) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, Error>>>where K::DynamicType: Debug + Unpin, ReconcilerFut: TryFuture<Ok = Action> + Send + 'static, ReconcilerFut::Error: Error + Send + 'static,
Consume all the parameters of the Controller and start the applier stream
This creates a stream from all builder calls and starts an applier with
a specified reconciler
and error_policy
callbacks. Each of these will be called
with a configurable context
.