Struct kube_runtime::controller::Controller
source · [−]pub struct Controller<K> where
K: Clone + Resource + Debug + 'static,
K::DynamicType: Eq + Hash, { /* private fields */ }
Expand description
Controller
A controller is made up of:
- 1
reflector
(for the core object) - N
watcher
objects for each object child object - user defined
reconcile
+error_policy
callbacks - a generated input stream considering all sources
And all reconcile requests through an internal scheduler
Pieces:
use kube::{
Client, CustomResource,
api::{Api, ListParams},
runtime::controller::{Controller, Action}
};
use serde::{Deserialize, Serialize};
use tokio::time::Duration;
use futures::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use schemars::JsonSchema;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Error)]
enum Error {}
/// A custom resource
#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
#[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
struct ConfigMapGeneratorSpec {
content: String,
}
/// The reconciler that will be called when either object change
async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
// .. use api here to reconcile a child ConfigMap with ownerreferences
// see configmapgen_controller example for full info
Ok(Action::requeue(Duration::from_secs(300)))
}
/// an error handler that will be called when the reconciler fails
fn error_policy(_error: &Error, _ctx: Arc<()>) -> Action {
Action::requeue(Duration::from_secs(60))
}
/// something to drive the controller
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let context = Arc::new(()); // bad empty context - put client in here
let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
let cms = Api::<ConfigMap>::all(client.clone());
Controller::new(cmgs, ListParams::default())
.owns(cms, ListParams::default())
.run(reconcile, error_policy, context)
.for_each(|res| async move {
match res {
Ok(o) => println!("reconciled {:?}", o),
Err(e) => println!("reconcile failed: {:?}", e),
}
})
.await; // controller does nothing unless polled
Ok(())
}
Implementations
sourceimpl<K> Controller<K> where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone,
impl<K> Controller<K> where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone,
sourcepub fn new(owned_api: Api<K>, lp: ListParams) -> Self where
K::DynamicType: Default,
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self where
K::DynamicType: Default,
Create a Controller on a type K
Takes an Api
object that determines how the Controller
listens for changes to the K
.
The [ListParams
] controls to the possible subset of objects of K
that you want to manage
and receive reconcile events for.
For the full set of objects K
in the given Api
scope, you can use [ListParams::default
].
sourcepub fn new_with(
owned_api: Api<K>,
lp: ListParams,
dyntype: K::DynamicType
) -> Self
pub fn new_with(
owned_api: Api<K>,
lp: ListParams,
dyntype: K::DynamicType
) -> Self
Create a Controller on a type K
Takes an Api
object that determines how the Controller
listens for changes to the K
.
The ListParams
lets you define a possible subset of objects of K
that you want the Api
to watch - in the Api’s configured scope - and receive reconcile events for.
For the full set of objects K
in the given Api
scope, you can use ListParams::default
.
This variant constructor is for dynamic
types found through discovery. Prefer Controller::new
for static types.
sourcepub fn trigger_backoff(self, backoff: impl Backoff + Send + 'static) -> Self
pub fn trigger_backoff(self, backoff: impl Backoff + Send + 'static) -> Self
Specify the backoff policy for “trigger” watches
This includes the core watch, as well as auxilary watches introduced by Self::owns
and Self::watches
.
The default_backoff
follows client-go conventions,
but can be overridden by calling this method.
sourcepub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
lp: ListParams
) -> Self
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
lp: ListParams
) -> Self
Specify Child
objects which K
owns and should be watched
Takes an Api
object that determines how the Controller
listens for changes to the Child
.
All owned Child
objects must contain an OwnerReference
pointing back to a K
.
The [ListParams
] refer to the possible subset of Child
objects that you want the Api
to watch - in the Api’s configured scope - and receive reconcile events for.
To watch the full set of Child
objects in the given Api
scope, you can use [ListParams::default
].
sourcepub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
dyntype: Child::DynamicType,
lp: ListParams
) -> Self where
Child::DynamicType: Debug + Eq + Hash + Clone,
pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
dyntype: Child::DynamicType,
lp: ListParams
) -> Self where
Child::DynamicType: Debug + Eq + Hash + Clone,
Specify Child
objects which K
owns and should be watched
Same as Controller::owns
, but accepts a DynamicType
so it can be used with dynamic resources.
sourcepub fn watches<Other: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>(
self,
api: Api<Other>,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static
) -> Self where
I::IntoIter: Send,
pub fn watches<Other: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>(
self,
api: Api<Other>,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static
) -> Self where
I::IntoIter: Send,
Specify Watched
object which K
has a custom relation to and should be watched
To define the Watched
relation with K
, you must define a custom relation mapper, which,
when given a Watched
object, returns an option or iterator of relevant ObjectRef<K>
to reconcile.
If the relation K
has to Watched
is that K
owns Watched
, consider using Controller::owns
.
Takes an Api
object that determines how the Controller
listens for changes to the Watched
.
The [ListParams
] refer to the possible subset of Watched
objects that you want the Api
to watch - in the Api’s configured scope - and run through the custom mapper.
To watch the full set of Watched
objects in given the Api
scope, you can use [ListParams::default
].
sourcepub fn watches_with<Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>(
self,
api: Api<Other>,
dyntype: Other::DynamicType,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static
) -> Self where
I::IntoIter: Send,
Other::DynamicType: Clone,
pub fn watches_with<Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, I: 'static + IntoIterator<Item = ObjectRef<K>>>(
self,
api: Api<Other>,
dyntype: Other::DynamicType,
lp: ListParams,
mapper: impl Fn(Other) -> I + Sync + Send + 'static
) -> Self where
I::IntoIter: Send,
Other::DynamicType: Clone,
Specify Watched
object which K
has a custom relation to and should be watched
Same as Controller::watches
, but accepts a DynamicType
so it can be used with dynamic resources.
sourcepub fn reconcile_all_on(
self,
trigger: impl Stream<Item = ()> + Send + Sync + 'static
) -> Self
pub fn reconcile_all_on(
self,
trigger: impl Stream<Item = ()> + Send + Sync + 'static
) -> Self
Trigger a reconciliation for all managed objects whenever trigger
emits a value
For example, this can be used to reconcile all objects whenever the controller’s configuration changes.
To reconcile all objects when a new line is entered:
use futures::stream::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
Client,
api::{ListParams, Api, ResourceExt},
runtime::{controller::{Controller, Action}},
};
use std::{convert::Infallible, io::BufRead, sync::Arc};
let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
// Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
// and its worker prevents the Tokio runtime from shutting down.
std::thread::spawn(move || {
for _ in std::io::BufReader::new(std::io::stdin()).lines() {
let _ = reload_tx.try_send(());
}
});
Controller::new(
Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
ListParams::default(),
)
.reconcile_all_on(reload_rx.map(|_| ()))
.run(
|o, _| async move {
println!("Reconciling {}", o.name());
Ok(Action::await_change())
},
|err: &Infallible, _| Err(err).unwrap(),
Arc::new(()),
);
This can be called multiple times, in which case they are additive; reconciles are scheduled whenever any [Stream
] emits a new item.
If a [Stream
] is terminated (by emitting None
) then the Controller
keeps running, but the [Stream
] stops being polled.
sourcepub fn graceful_shutdown_on(
self,
trigger: impl Future<Output = ()> + Send + Sync + 'static
) -> Self
pub fn graceful_shutdown_on(
self,
trigger: impl Future<Output = ()> + Send + Sync + 'static
) -> Self
Start a graceful shutdown when trigger
resolves. Once a graceful shutdown has been initiated:
- No new reconciliations are started from the scheduler
- The underlying Kubernetes watch is terminated
- All running reconciliations are allowed to finish
Controller::run
’s [Stream
] terminates once all running reconciliations are done.
For example, to stop the reconciler whenever the user presses Ctrl+C:
use futures::future::FutureExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{api::ListParams, Api, Client, ResourceExt};
use kube_runtime::controller::{Controller, Action};
use std::{convert::Infallible, sync::Arc};
Controller::new(
Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
ListParams::default(),
)
.graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
.run(
|o, _| async move {
println!("Reconciling {}", o.name());
Ok(Action::await_change())
},
|err: &Infallible, _| Err(err).unwrap(),
Arc::new(()),
);
This can be called multiple times, in which case they are additive; the Controller
starts to terminate
as soon as any Future
resolves.
sourcepub fn shutdown_on_signal(self) -> Self
pub fn shutdown_on_signal(self) -> Self
Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the Controller
has
terminated. If you run this in a process containing more tasks than just the Controller
, ensure that
all other tasks either terminate when the Controller
does, that they have their own signal handlers,
or use Controller::graceful_shutdown_on
to manage your own shutdown strategy.
NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
Controller::graceful_shutdown_on
.
NOTE: Controller::run
terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
in the background while they terminate. This will block tokio::runtime::Runtime
termination until they actually terminate,
unless you run std::process::exit
afterwards.
sourcepub fn run<ReconcilerFut, Ctx>(
self,
reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, Error>>> where
K::DynamicType: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
ReconcilerFut::Error: Error + Send + 'static,
pub fn run<ReconcilerFut, Ctx>(
self,
reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, Error>>> where
K::DynamicType: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
ReconcilerFut::Error: Error + Send + 'static,
Consume all the parameters of the Controller and start the applier stream
This creates a stream from all builder calls and starts an applier with
a specified reconciler
and error_policy
callbacks. Each of these will be called
with a configurable context
.
Auto Trait Implementations
impl<K> !RefUnwindSafe for Controller<K>
impl<K> Send for Controller<K> where
K: Send + Sync,
impl<K> !Sync for Controller<K>
impl<K> Unpin for Controller<K> where
<K as Resource>::DynamicType: Unpin,
impl<K> !UnwindSafe for Controller<K>
Blanket Implementations
sourceimpl<T> BorrowMut<T> for T where
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
const: unstable · sourcefn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
sourceimpl<T> Instrument for T
impl<T> Instrument for T
sourcefn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
sourcefn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
impl<V, T> VZip<V> for T where
V: MultiLane<T>,
impl<V, T> VZip<V> for T where
V: MultiLane<T>,
fn vzip(self) -> V
sourceimpl<T> WithSubscriber for T
impl<T> WithSubscriber for T
sourcefn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
sourcefn with_current_subscriber(self) -> WithDispatch<Self>
fn with_current_subscriber(self) -> WithDispatch<Self>
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more