pub struct Controller<K>{ /* private fields */ }
runtime
only.Expand description
Controller for a Resource K
A controller is an infinite stream of objects to be reconciled.
Once run
and continuously awaited, it continuously calls out to user provided
reconcile
and error_policy
callbacks whenever relevant changes are detected
or if errors are seen from reconcile
.
Reconciles are generally requested for all changes on your root objects.
Changes to managed child resources will also trigger the reconciler for the
managing object by traversing owner references (for Controller::owns
),
or traverse a custom mapping (for Controller::watches
).
This mapping mechanism ultimately hides the reason for the reconciliation request, and forces you to write an idempotent reconciler.
General setup:
use kube::{Api, Client, CustomResource};
use kube::runtime::{controller::{Controller, Action}, watcher};
use futures::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use schemars::JsonSchema;
use thiserror::Error;
#[derive(Debug, Error)]
enum Error {}
/// A custom resource
#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
#[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
struct ConfigMapGeneratorSpec {
content: String,
}
/// The reconciler that will be called when either object change
async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
// .. use api here to reconcile a child ConfigMap with ownerreferences
// see configmapgen_controller example for full info
Ok(Action::requeue(Duration::from_secs(300)))
}
/// an error handler that will be called when the reconciler fails with access to both the
/// object that caused the failure and the actual error
fn error_policy(obj: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<()>) -> Action {
Action::requeue(Duration::from_secs(60))
}
/// something to drive the controller
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
let context = Arc::new(()); // bad empty context - put client in here
let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
let cms = Api::<ConfigMap>::all(client.clone());
Controller::new(cmgs, watcher::Config::default())
.owns(cms, watcher::Config::default())
.run(reconcile, error_policy, context)
.for_each(|res| async move {
match res {
Ok(o) => println!("reconciled {:?}", o),
Err(e) => println!("reconcile failed: {:?}", e),
}
})
.await; // controller does nothing unless polled
Ok(())
}
Implementations§
Source§impl<K> Controller<K>
impl<K> Controller<K>
Sourcepub fn new(main_api: Api<K>, wc: Config) -> Controller<K>
pub fn new(main_api: Api<K>, wc: Config) -> Controller<K>
Create a Controller for a resource K
Takes an Api
object that determines how the Controller
listens for changes to the K
.
The watcher::Config
controls to the possible subset of objects of K
that you want to manage
and receive reconcile events for.
For the full set of objects K
in the given Api
scope, you can use watcher::Config::default
.
Sourcepub fn new_with(
main_api: Api<K>,
wc: Config,
dyntype: <K as Resource>::DynamicType,
) -> Controller<K>
pub fn new_with( main_api: Api<K>, wc: Config, dyntype: <K as Resource>::DynamicType, ) -> Controller<K>
Create a Controller for a resource K
Takes an Api
object that determines how the Controller
listens for changes to the K
.
The watcher::Config
lets you define a possible subset of objects of K
that you want the Api
to watch - in the Api’s configured scope - and receive reconcile events for.
For the full set of objects K
in the given Api
scope, you can use Config::default
.
This variant constructor is for dynamic
types found through discovery. Prefer Controller::new
for static types.
Sourcepub fn for_stream(
trigger: impl Stream<Item = Result<K, Error>> + Send + 'static,
reader: Store<K>,
) -> Controller<K>
pub fn for_stream( trigger: impl Stream<Item = Result<K, Error>> + Send + 'static, reader: Store<K>, ) -> Controller<K>
Create a Controller for a resource K
from a stream of K
objects
Same as Controller::new
, but instead of an Api
, a stream of resources is used.
This allows for customized and pre-filtered watch streams to be used as a trigger,
as well as sharing input streams between multiple controllers.
NB: This is constructor requires an unstable
feature.
§Example:
let api: Api<Deployment> = Api::default_namespaced(client);
let (reader, writer) = reflector::store();
let deploys = watcher(api, watcher::Config::default())
.default_backoff()
.reflect(writer)
.applied_objects()
.predicate_filter(predicates::generation);
Controller::for_stream(deploys, reader)
.run(reconcile, error_policy, Arc::new(()))
.for_each(|_| std::future::ready(()))
.await;
Prefer Controller::new
if you do not need to share the stream, or do not need pre-filtering.
Sourcepub fn for_stream_with(
trigger: impl Stream<Item = Result<K, Error>> + Send + 'static,
reader: Store<K>,
dyntype: <K as Resource>::DynamicType,
) -> Controller<K>
pub fn for_stream_with( trigger: impl Stream<Item = Result<K, Error>> + Send + 'static, reader: Store<K>, dyntype: <K as Resource>::DynamicType, ) -> Controller<K>
Create a Controller for a resource K
from a stream of K
objects
Same as Controller::new
, but instead of an Api
, a stream of resources is used.
This allows for customized and pre-filtered watch streams to be used as a trigger,
as well as sharing input streams between multiple controllers.
NB: This is constructor requires an unstable
feature.
Prefer Controller::new
if you do not need to share the stream, or do not need pre-filtering.
This variant constructor is for dynamic
types found through discovery. Prefer Controller::for_stream
for static types.
This is the same as Controller::for_stream
. Instead of taking an
Api
(e.g. Controller::new
), a stream of resources is used. Shared
streams can be created out-of-band by subscribing on a store Writer
.
Through this interface, multiple controllers can use the same root
(shared) input stream of resources to keep memory overheads smaller.
N.B: This constructor requires an
unstable
feature.
Prefer Controller::new
or Controller::for_stream
if you do not
need to share the stream.
§Warning:
You must ensure the root stream (i.e. stream created through a reflector()
)
is driven to readiness independently of this controller to ensure the
watcher never deadlocks.
§Example:
let api: Api<Deployment> = Api::default_namespaced(client);
let (reader, writer) = reflector::store_shared(128);
let subscriber = writer
.subscribe()
.expect("subscribers can only be created from shared stores");
let deploys = watcher(api, watcher::Config::default())
.default_backoff()
.reflect(writer)
.applied_objects()
.for_each(|ev| async move {
match ev {
Ok(obj) => tracing::info!("got obj {obj:?}"),
Err(error) => tracing::error!(%error, "received error")
}
});
let controller = Controller::for_shared_stream(subscriber, reader)
.run(reconcile, error_policy, Arc::new(()))
.for_each(|ev| async move {
tracing::info!("reconciled {ev:?}")
});
// Drive streams using a select statement
tokio::select! {
_ = deploys => {},
_ = controller => {},
}
This is the same as Controller::for_stream
. Instead of taking an
Api
(e.g. Controller::new
), a stream of resources is used. Shared
streams can be created out-of-band by subscribing on a store Writer
.
Through this interface, multiple controllers can use the same root
(shared) input stream of resources to keep memory overheads smaller.
N.B: This constructor requires an
unstable
feature.
Prefer Controller::new
or Controller::for_stream
if you do not
need to share the stream.
This variant constructor is used for dynamic
types found through
discovery. Prefer Controller::for_shared_stream
for static types (i.e.
known at compile time).
Sourcepub fn with_config(self, config: Config) -> Controller<K>
pub fn with_config(self, config: Config) -> Controller<K>
Specify the configuration for the controller’s behavior.
Sourcepub fn trigger_backoff(
self,
backoff: impl Backoff + Send + 'static,
) -> Controller<K>
pub fn trigger_backoff( self, backoff: impl Backoff + Send + 'static, ) -> Controller<K>
Specify the backoff policy for “trigger” watches
This includes the core watch, as well as auxilary watches introduced by Self::owns
and Self::watches
.
The default_backoff
follows client-go conventions,
but can be overridden by calling this method.
Sourcepub fn owns<Child>(self, api: Api<Child>, wc: Config) -> Controller<K>
pub fn owns<Child>(self, api: Api<Child>, wc: Config) -> Controller<K>
Specify Child
objects which K
owns and should be watched
Takes an Api
object that determines how the Controller
listens for changes to the Child
.
All owned Child
objects must contain an OwnerReference
pointing back to a K
.
The watcher::Config
controls the subset of Child
objects that you want the Api
to watch - in the Api’s configured scope - and receive reconcile events for.
To watch the full set of Child
objects in the given Api
scope, you can use watcher::Config::default
.
Sourcepub fn owns_with<Child>(
self,
api: Api<Child>,
dyntype: <Child as Resource>::DynamicType,
wc: Config,
) -> Controller<K>
pub fn owns_with<Child>( self, api: Api<Child>, dyntype: <Child as Resource>::DynamicType, wc: Config, ) -> Controller<K>
Specify Child
objects which K
owns and should be watched
Same as Controller::owns
, but accepts a DynamicType
so it can be used with dynamic resources.
Sourcepub fn owns_stream<Child>(
self,
trigger: impl Stream<Item = Result<Child, Error>> + Send + 'static,
) -> Controller<K>
pub fn owns_stream<Child>( self, trigger: impl Stream<Item = Result<Child, Error>> + Send + 'static, ) -> Controller<K>
Trigger the reconciliation process for a stream of Child
objects of the owner K
Same as Controller::owns
, but instead of an Api
, a stream of resources is used.
This allows for customized and pre-filtered watch streams to be used as a trigger,
as well as sharing input streams between multiple controllers.
NB: This is constructor requires an unstable
feature.
Watcher streams passed in here should be filtered first through touched_objects
.
§Example:
let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
.touched_objects()
.predicate_filter(predicates::generation);
Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
.owns_stream(sts_stream)
.run(reconcile, error_policy, Arc::new(()))
.for_each(|_| std::future::ready(()))
.await;
Sourcepub fn owns_stream_with<Child>(
self,
trigger: impl Stream<Item = Result<Child, Error>> + Send + 'static,
dyntype: <Child as Resource>::DynamicType,
) -> Controller<K>
pub fn owns_stream_with<Child>( self, trigger: impl Stream<Item = Result<Child, Error>> + Send + 'static, dyntype: <Child as Resource>::DynamicType, ) -> Controller<K>
Trigger the reconciliation process for a stream of Child
objects of the owner K
Same as Controller::owns
, but instead of an Api
, a stream of resources is used.
This allows for customized and pre-filtered watch streams to be used as a trigger,
as well as sharing input streams between multiple controllers.
NB: This is constructor requires an unstable
feature.
Same as Controller::owns_stream
, but accepts a DynamicType
so it can be used with dynamic resources.
This is the same as Controller::for_stream
. Instead of taking an
Api
(e.g. Controller::new
), a stream of resources is used. Shared
streams can be created out-of-band by subscribing on a store Writer
.
Through this interface, multiple controllers can use the same root
(shared) input stream of resources to keep memory overheads smaller.
N.B: This constructor requires an
unstable
feature.
Prefer Controller::new
or Controller::for_stream
if you do not
need to share the stream.
§Warning:
You must ensure the root stream (i.e. stream created through a reflector()
)
is driven to readiness independently of this controller to ensure the
watcher never deadlocks.
Trigger the reconciliation process for a shared stream of Child
objects of the owner K
Conceptually the same as Controller::owns
, but a stream is used
instead of an Api
. This interface behaves similarly to its non-shared
counterpart Controller::owns_stream
.
NB: This is constructor requires an unstable
feature.
§Example:
let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
let pod_api: Api<Pod> = Api::default_namespaced(client);
let (reader, writer) = reflector::store_shared(128);
let subscriber = writer
.subscribe()
.expect("subscribers can only be created from shared stores");
let pods = watcher(pod_api, watcher::Config::default())
.default_backoff()
.reflect(writer)
.applied_objects()
.for_each(|ev| async move {
match ev {
Ok(obj) => tracing::info!("got obj {obj:?}"),
Err(error) => tracing::error!(%error, "received error")
}
});
let controller = Controller::new(deploys, Default::default())
.owns_shared_stream(subscriber)
.run(reconcile, error_policy, Arc::new(()))
.for_each(|ev| async move {
tracing::info!("reconciled {ev:?}")
});
// Drive streams using a select statement
tokio::select! {
_ = pods => {},
_ = controller => {},
}
Trigger the reconciliation process for a shared stream of Child
objects of the owner K
Same as Controller::owns
, but instead of an Api
, a shared stream of resources is used.
The source stream can be shared between multiple controllers, optimising
resource usage.
NB: This is constructor requires an unstable
feature.
Same as Controller::owns_shared_stream
, but accepts a DynamicType
so it can be used with dynamic resources.
Sourcepub fn watches<Other, I>(
self,
api: Api<Other>,
wc: Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Controller<K>where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
<Other as Resource>::DynamicType: Default + Debug + Clone + Eq + Hash,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
<I as IntoIterator>::IntoIter: Send,
pub fn watches<Other, I>(
self,
api: Api<Other>,
wc: Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Controller<K>where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
<Other as Resource>::DynamicType: Default + Debug + Clone + Eq + Hash,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
<I as IntoIterator>::IntoIter: Send,
Specify Watched
object which K
has a custom relation to and should be watched
To define the Watched
relation with K
, you must define a custom relation mapper, which,
when given a Watched
object, returns an option or iterator of relevant ObjectRef<K>
to reconcile.
If the relation K
has to Watched
is that K
owns Watched
, consider using Controller::owns
.
Takes an Api
object that determines how the Controller
listens for changes to the Watched
.
The watcher::Config
controls the subset of Watched
objects that you want the Api
to watch - in the Api’s configured scope - and run through the custom mapper.
To watch the full set of Watched
objects in given the Api
scope, you can use watcher::Config::default
.
§Example
Tracking cross cluster references using the Operator-SDK annotations.
Controller::new(memcached, watcher::Config::default())
.watches(
Api::<WatchedResource>::all(client.clone()),
watcher::Config::default(),
|ar| {
let prt = ar
.annotations()
.get("operator-sdk/primary-resource-type")
.map(String::as_str);
if prt != Some("Memcached.cache.example.com") {
return None;
}
let (namespace, name) = ar
.annotations()
.get("operator-sdk/primary-resource")?
.split_once('/')?;
Some(ObjectRef::new(name).within(namespace))
}
)
.run(reconcile, error_policy, context)
.for_each(|_| futures::future::ready(()))
.await;
Sourcepub fn watches_with<Other, I>(
self,
api: Api<Other>,
dyntype: <Other as Resource>::DynamicType,
wc: Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Controller<K>where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
<I as IntoIterator>::IntoIter: Send,
<Other as Resource>::DynamicType: Debug + Clone + Eq + Hash,
pub fn watches_with<Other, I>(
self,
api: Api<Other>,
dyntype: <Other as Resource>::DynamicType,
wc: Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Controller<K>where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
<I as IntoIterator>::IntoIter: Send,
<Other as Resource>::DynamicType: Debug + Clone + Eq + Hash,
Specify Watched
object which K
has a custom relation to and should be watched
Same as Controller::watches
, but accepts a DynamicType
so it can be used with dynamic resources.
Sourcepub fn watches_stream<Other, I>(
self,
trigger: impl Stream<Item = Result<Other, Error>> + Send + 'static,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Controller<K>where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
<Other as Resource>::DynamicType: Default + Debug + Clone,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
<I as IntoIterator>::IntoIter: Send,
pub fn watches_stream<Other, I>(
self,
trigger: impl Stream<Item = Result<Other, Error>> + Send + 'static,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Controller<K>where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
<Other as Resource>::DynamicType: Default + Debug + Clone,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
<I as IntoIterator>::IntoIter: Send,
Trigger the reconciliation process for a stream of Other
objects related to a K
Same as Controller::watches
, but instead of an Api
, a stream of resources is used.
This allows for customized and pre-filtered watch streams to be used as a trigger,
as well as sharing input streams between multiple controllers.
NB: This is constructor requires an unstable
feature.
Watcher streams passed in here should be filtered first through touched_objects
.
§Example:
fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
let api: Api<DaemonSet> = Api::all(client.clone());
let cr: Api<CustomResource> = Api::all(client.clone());
let daemons = watcher(api, watcher::Config::default())
.touched_objects()
.predicate_filter(predicates::generation);
Controller::new(cr, watcher::Config::default())
.watches_stream(daemons, mapper)
.run(reconcile, error_policy, Arc::new(()))
.for_each(|_| std::future::ready(()))
.await;
Sourcepub fn watches_stream_with<Other, I>(
self,
trigger: impl Stream<Item = Result<Other, Error>> + Send + 'static,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
dyntype: <Other as Resource>::DynamicType,
) -> Controller<K>where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
<Other as Resource>::DynamicType: Debug + Clone,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
<I as IntoIterator>::IntoIter: Send,
pub fn watches_stream_with<Other, I>(
self,
trigger: impl Stream<Item = Result<Other, Error>> + Send + 'static,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
dyntype: <Other as Resource>::DynamicType,
) -> Controller<K>where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
<Other as Resource>::DynamicType: Debug + Clone,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
<I as IntoIterator>::IntoIter: Send,
Trigger the reconciliation process for a stream of Other
objects related to a K
Same as Controller::watches
, but instead of an Api
, a stream of resources is used.
This allows for customized and pre-filtered watch streams to be used as a trigger,
as well as sharing input streams between multiple controllers.
NB: This is constructor requires an unstable
feature.
Same as Controller::watches_stream
, but accepts a DynamicType
so it can be used with dynamic resources.
Trigger the reconciliation process for a shared stream of Other
objects related to a K
Same as Controller::watches
, but instead of an Api
, a shared
stream of resources is used. This allows for sharing input streams
between multiple controllers.
NB: This is constructor requires an unstable
feature.
Watcher streams passed in here should be filtered first through touched_objects
.
§Example:
fn mapper(_: Arc<DaemonSet>) -> Option<ObjectRef<CustomResource>> { todo!() }
let api: Api<DaemonSet> = Api::all(client.clone());
let cr: Api<CustomResource> = Api::all(client.clone());
let (reader, writer) = kube_runtime::reflector::store_shared(128);
let subscriber = writer
.subscribe()
.expect("subscribers can only be created from shared stores");
let daemons = watcher(api, watcher::Config::default())
.reflect(writer)
.touched_objects()
.for_each(|ev| async move {
match ev {
Ok(obj) => {},
Err(error) => tracing::error!(%error, "received err")
}
});
let controller = Controller::new(cr, watcher::Config::default())
.watches_shared_stream(subscriber, mapper)
.run(reconcile, error_policy, Arc::new(()))
.for_each(|_| std::future::ready(()));
// Drive streams using a select statement
tokio::select! {
_ = daemons => {},
_ = controller => {},
}
Trigger the reconciliation process for a shared stream of Other
objects related to a K
Same as Controller::watches
, but instead of an Api
, a shared
stream of resources is used. This allows for sharing of streams between
multiple controllers.
NB: This is constructor requires an unstable
feature.
Same as Controller::watches_shared_stream
, but accepts a DynamicType
so it can be used with dynamic resources.
Sourcepub fn reconcile_all_on(
self,
trigger: impl Stream<Item = ()> + Send + Sync + 'static,
) -> Controller<K>
pub fn reconcile_all_on( self, trigger: impl Stream<Item = ()> + Send + Sync + 'static, ) -> Controller<K>
Trigger a reconciliation for all managed objects whenever trigger
emits a value
For example, this can be used to reconcile all objects whenever the controller’s configuration changes.
To reconcile all objects when a new line is entered:
use futures::stream::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
Client,
api::{Api, ResourceExt},
runtime::{
controller::{Controller, Action},
watcher,
},
};
use std::{convert::Infallible, io::BufRead, sync::Arc};
let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
// Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
// and its worker prevents the Tokio runtime from shutting down.
std::thread::spawn(move || {
for _ in std::io::BufReader::new(std::io::stdin()).lines() {
let _ = reload_tx.try_send(());
}
});
Controller::new(
Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
watcher::Config::default(),
)
.reconcile_all_on(reload_rx.map(|_| ()))
.run(
|o, _| async move {
println!("Reconciling {}", o.name_any());
Ok(Action::await_change())
},
|_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
Arc::new(()),
);
This can be called multiple times, in which case they are additive; reconciles are scheduled whenever any Stream
emits a new item.
If a Stream
is terminated (by emitting None
) then the Controller
keeps running, but the Stream
stops being polled.
Sourcepub fn reconcile_on(
self,
trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static,
) -> Controller<K>
pub fn reconcile_on( self, trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static, ) -> Controller<K>
Trigger the reconciliation process for a managed object ObjectRef<K>
whenever trigger
emits a value
This can be used to inject reconciliations for specific objects from an external resource.
§Example:
struct ExternalObject {
name: String,
}
let external_stream = watch_external_objects().map(|ext| {
ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)
});
Controller::new(Api::<ConfigMap>::namespaced(client, &ns), Config::default())
.reconcile_on(external_stream)
.run(reconcile, error_policy, Arc::new(()))
.for_each(|_| future::ready(()))
.await;
Sourcepub fn graceful_shutdown_on(
self,
trigger: impl Future<Output = ()> + Send + Sync + 'static,
) -> Controller<K>
pub fn graceful_shutdown_on( self, trigger: impl Future<Output = ()> + Send + Sync + 'static, ) -> Controller<K>
Start a graceful shutdown when trigger
resolves. Once a graceful shutdown has been initiated:
- No new reconciliations are started from the scheduler
- The underlying Kubernetes watch is terminated
- All running reconciliations are allowed to finish
Controller::run
’sStream
terminates once all running reconciliations are done.
For example, to stop the reconciler whenever the user presses Ctrl+C:
use futures::future::FutureExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{Api, Client, ResourceExt};
use kube_runtime::{
controller::{Controller, Action},
watcher,
};
use std::{convert::Infallible, sync::Arc};
Controller::new(
Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
watcher::Config::default(),
)
.graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
.run(
|o, _| async move {
println!("Reconciling {}", o.name_any());
Ok(Action::await_change())
},
|_, err: &Infallible, _| Err(err).unwrap(),
Arc::new(()),
);
This can be called multiple times, in which case they are additive; the Controller
starts to terminate
as soon as any Future
resolves.
Sourcepub fn shutdown_on_signal(self) -> Controller<K>
pub fn shutdown_on_signal(self) -> Controller<K>
Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the Controller
has
terminated. If you run this in a process containing more tasks than just the Controller
, ensure that
all other tasks either terminate when the Controller
does, that they have their own signal handlers,
or use Controller::graceful_shutdown_on
to manage your own shutdown strategy.
NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
Controller::graceful_shutdown_on
.
NOTE: Controller::run
terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
in the background while they terminate. This will block tokio::runtime::Runtime
termination until they actually terminate,
unless you run std::process::exit
afterwards.
Sourcepub fn run<ReconcilerFut, Ctx>(
self,
reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl Fn(Arc<K>, &<ReconcilerFut as TryFuture>::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<<ReconcilerFut as TryFuture>::Error, Error>>>
pub fn run<ReconcilerFut, Ctx>( self, reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut, error_policy: impl Fn(Arc<K>, &<ReconcilerFut as TryFuture>::Error, Arc<Ctx>) -> Action, context: Arc<Ctx>, ) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<<ReconcilerFut as TryFuture>::Error, Error>>>
Consume all the parameters of the Controller and start the applier stream
This creates a stream from all builder calls and starts an applier with
a specified reconciler
and error_policy
callbacks. Each of these will be called
with a configurable context
.
Auto Trait Implementations§
impl<K> !Freeze for Controller<K>
impl<K> !RefUnwindSafe for Controller<K>
impl<K> Send for Controller<K>
impl<K> !Sync for Controller<K>
impl<K> Unpin for Controller<K>
impl<K> !UnwindSafe for Controller<K>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more