Trait kube_runtime::utils::WatchStreamExt
source · pub trait WatchStreamExt: Stream {
// Provided methods
fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
where Self: TryStream + Sized { ... }
fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
where B: Backoff,
Self: TryStream + Sized { ... }
fn applied_objects<K>(self) -> EventFlatten<Self, K>
where Self: Stream<Item = Result<Event<K>, Error>> + Sized { ... }
fn touched_objects<K>(self) -> EventFlatten<Self, K>
where Self: Stream<Item = Result<Event<K>, Error>> + Sized { ... }
fn modify<F, K>(self, f: F) -> EventModify<Self, F>
where Self: Stream<Item = Result<Event<K>, Error>> + Sized,
F: FnMut(&mut K) { ... }
fn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>
where Self: Stream<Item = Result<K, Error>> + Sized,
K: Resource + 'static,
P: Predicate<K> + 'static { ... }
fn stream_subscribe<K>(self) -> StreamSubscribe<Self>
where Self: Stream<Item = Result<Event<K>, Error>> + Send + Sized + 'static { ... }
fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
where Self: Stream<Item = Result<Event<K>>> + Sized,
K: Resource + Clone + 'static,
K::DynamicType: Eq + Hash + Clone { ... }
}
Provided Methods§
sourcefn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>where
Self: TryStream + Sized,
fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>where
Self: TryStream + Sized,
Apply the DefaultBackoff
watcher Backoff
policy
This is recommended for controllers that want to play nicely with the apiserver.
sourcefn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
Apply a specific Backoff
policy to a [Stream
] using StreamBackoff
sourcefn applied_objects<K>(self) -> EventFlatten<Self, K>
fn applied_objects<K>(self) -> EventFlatten<Self, K>
Flatten a watcher()
stream into a stream of applied objects
All Added/Modified events are passed through, and critical errors bubble up.
sourcefn touched_objects<K>(self) -> EventFlatten<Self, K>
fn touched_objects<K>(self) -> EventFlatten<Self, K>
Flatten a watcher()
stream into a stream of touched objects
All Added/Modified/Deleted events are passed through, and critical errors bubble up.
sourcefn modify<F, K>(self, f: F) -> EventModify<Self, F>
fn modify<F, K>(self, f: F) -> EventModify<Self, F>
Modify elements of a watcher()
stream.
Calls watcher::Event::modify()
on every element.
Stream shorthand for stream.map_ok(|event| { event.modify(f) })
.
let deploys: Api<Deployment> = Api::all(client);
let truncated_deploy_stream = watcher(deploys, watcher::Config::default())
.modify(|deploy| {
deploy.managed_fields_mut().clear();
deploy.status = None;
})
.applied_objects();
pin_mut!(truncated_deploy_stream);
while let Some(d) = truncated_deploy_stream.try_next().await? {
println!("Truncated Deployment: '{:?}'", serde_json::to_string(&d)?);
}
sourcefn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>
fn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>
Filter out a flattened stream on predicates
.
This will filter out repeat calls where the predicate returns the same result.
Common use case for this is to avoid repeat events for status updates
by filtering on predicates::generation
.
NB: This is constructor requires an unstable
feature.
§Usage
use kube::{Api, Client, ResourceExt};
use kube_runtime::{watcher, WatchStreamExt, predicates};
use k8s_openapi::api::apps::v1::Deployment;
let deploys: Api<Deployment> = Api::default_namespaced(client);
let changed_deploys = watcher(deploys, watcher::Config::default())
.applied_objects()
.predicate_filter(predicates::generation);
pin_mut!(changed_deploys);
while let Some(d) = changed_deploys.try_next().await? {
println!("saw Deployment '{} with hitherto unseen generation", d.name_any());
}
sourcefn stream_subscribe<K>(self) -> StreamSubscribe<Self>
fn stream_subscribe<K>(self) -> StreamSubscribe<Self>
Create a StreamSubscribe
from a watcher()
stream.
The StreamSubscribe::subscribe()
method which allows additional consumers
of events from a stream without consuming the stream itself.
If a subscriber begins to lag behind the stream, it will receive an Error::Lagged
error. The subscriber can then decide to abort its task or tolerate the lost events.
If the [Stream
] is dropped or ends, any StreamSubscribe::subscribe()
streams
will also end.
NB: This is constructor requires an unstable
feature.
§Warning
If the primary [Stream
] is not polled, the StreamSubscribe::subscribe()
streams
will never receive any events.
§Usage
use futures::{Stream, StreamExt};
use std::{fmt::Debug, sync::Arc};
use kube_runtime::{watcher, WatchStreamExt};
fn explain_events<K, S>(
stream: S,
) -> (
impl Stream<Item = Arc<Result<watcher::Event<K>, watcher::Error>>> + Send + Sized + 'static,
impl Stream<Item = String> + Send + Sized + 'static,
)
where
K: Debug + Send + Sync + 'static,
S: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
{
// Create a stream that can be subscribed to
let stream_subscribe = stream.stream_subscribe();
// Create a subscription to that stream
let subscription = stream_subscribe.subscribe();
// Create a stream of descriptions of the events
let explain_stream = subscription.filter_map(|event| async move {
// We don't care about lagged events so we can throw that error away
match event.ok()?.as_ref() {
Ok(watcher::Event::Applied(event)) => {
Some(format!("An object was added or modified: {event:?}"))
}
Ok(_) => todo!("explain other events"),
// We don't care about watcher errors either
Err(_) => None,
}
});
// We now still have the original stream, and a secondary stream of explanations
(stream_subscribe, explain_stream)
}
sourcefn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
Reflect a watcher()
stream into a Store
through a Writer
Returns the stream unmodified, but passes every watcher::Event
through a Writer
.
This populates a Store
as the stream is polled.
§Usage
use kube::{Api, Client, ResourceExt};
use kube_runtime::{watcher, WatchStreamExt, reflector};
use k8s_openapi::api::apps::v1::Deployment;
let deploys: Api<Deployment> = Api::default_namespaced(client);
let (reader, writer) = reflector::store::<Deployment>();
tokio::spawn(async move {
// start polling the store once the reader is ready
reader.wait_until_ready().await.unwrap();
loop {
let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
info!("Current {} deploys: {:?}", names.len(), names);
tokio::time::sleep(Duration::from_secs(10)).await;
}
});
// configure the watcher stream and populate the store while polling
watcher(deploys, watcher::Config::default())
.reflect(writer)
.applied_objects()
.for_each(|res| async move {
match res {
Ok(o) => info!("saw {}", o.name_any()),
Err(e) => warn!("watcher error: {}", e),
}
})
.await;