pub trait WatchStreamExt: Stream {
// Provided methods
fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
where Self: TryStream + Sized { ... }
fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
where B: Backoff,
Self: TryStream + Sized { ... }
fn applied_objects<K>(self) -> EventDecode<Self>
where Self: Stream<Item = Result<Event<K>, Error>> + Sized { ... }
fn touched_objects<K>(self) -> EventDecode<Self>
where Self: Stream<Item = Result<Event<K>, Error>> + Sized { ... }
fn modify<F, K>(self, f: F) -> EventModify<Self, F>
where Self: Stream<Item = Result<Event<K>, Error>> + Sized,
F: FnMut(&mut K) { ... }
fn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>
where Self: Stream<Item = Result<K, Error>> + Sized,
K: Resource + 'static,
P: Predicate<K> + 'static { ... }
fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
where Self: Stream<Item = Result<Event<K>>> + Sized,
K: Resource + Clone + 'static,
K::DynamicType: Eq + Hash + Clone { ... }
fn reflect_shared<K>(
self,
writer: Writer<K>,
) -> impl Stream<Item = Self::Item>
where Self: Stream<Item = Result<Event<K>>> + Sized,
K: Resource + Clone + 'static,
K::DynamicType: Eq + Hash + Clone { ... }
}
Provided Methods§
sourcefn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
Apply the DefaultBackoff
watcher Backoff
policy
This is recommended for controllers that want to play nicely with the apiserver.
sourcefn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
Apply a specific Backoff
policy to a Stream
using StreamBackoff
sourcefn applied_objects<K>(self) -> EventDecode<Self>
fn applied_objects<K>(self) -> EventDecode<Self>
Decode a watcher()
stream into a stream of applied objects
All Added/Modified events are passed through, and critical errors bubble up.
sourcefn touched_objects<K>(self) -> EventDecode<Self>
fn touched_objects<K>(self) -> EventDecode<Self>
Decode a watcher()
stream into a stream of touched objects
All Added/Modified/Deleted events are passed through, and critical errors bubble up.
sourcefn modify<F, K>(self, f: F) -> EventModify<Self, F>
fn modify<F, K>(self, f: F) -> EventModify<Self, F>
Modify elements of a watcher()
stream.
Calls watcher::Event::modify()
on every element.
Stream shorthand for stream.map_ok(|event| { event.modify(f) })
.
let deploys: Api<Deployment> = Api::all(client);
let mut truncated_deploy_stream = pin!(watcher(deploys, watcher::Config::default())
.modify(|deploy| {
deploy.managed_fields_mut().clear();
deploy.status = None;
})
.applied_objects());
while let Some(d) = truncated_deploy_stream.try_next().await? {
println!("Truncated Deployment: '{:?}'", serde_json::to_string(&d)?);
}
sourcefn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>
fn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>
Filter a stream based on on predicates
.
This will filter out repeat calls where the predicate returns the same result.
Common use case for this is to avoid repeat events for status updates
by filtering on predicates::generation
.
§Usage
use kube::{Api, Client, ResourceExt};
use kube_runtime::{watcher, WatchStreamExt, predicates};
use k8s_openapi::api::apps::v1::Deployment;
let deploys: Api<Deployment> = Api::default_namespaced(client);
let mut changed_deploys = pin!(watcher(deploys, watcher::Config::default())
.applied_objects()
.predicate_filter(predicates::generation));
while let Some(d) = changed_deploys.try_next().await? {
println!("saw Deployment '{} with hitherto unseen generation", d.name_any());
}
sourcefn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
Reflect a watcher()
stream into a Store
through a Writer
Returns the stream unmodified, but passes every watcher::Event
through a Writer
.
This populates a Store
as the stream is polled.
§Usage
use kube::{Api, Client, ResourceExt};
use kube_runtime::{watcher, WatchStreamExt, reflector};
use k8s_openapi::api::apps::v1::Deployment;
let deploys: Api<Deployment> = Api::default_namespaced(client);
let (reader, writer) = reflector::store::<Deployment>();
tokio::spawn(async move {
// start polling the store once the reader is ready
reader.wait_until_ready().await.unwrap();
loop {
let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
info!("Current {} deploys: {:?}", names.len(), names);
tokio::time::sleep(Duration::from_secs(10)).await;
}
});
// configure the watcher stream and populate the store while polling
watcher(deploys, watcher::Config::default())
.reflect(writer)
.applied_objects()
.for_each(|res| async move {
match res {
Ok(o) => info!("saw {}", o.name_any()),
Err(e) => warn!("watcher error: {}", e),
}
})
.await;
Reflect a shared watcher()
stream into a Store
through a Writer
Returns the stream unmodified, but passes every watcher::Event
through a Writer
. This populates a Store
as the stream is
polled. When the watcher::Event
is not an error or a
[watcher::Event::Deleted
] then its inner object will also be
propagated to subscribers.
Subscribers can be created by calling subscribe()
on a Writer
.
This will return a ReflectHandle
stream that should be polled
independently. When the root stream is dropped, or it ends, all ReflectHandle
s
subscribed to the stream will also terminate after all events yielded by
the root stream have been observed. This means ReflectHandle
streams
can still be polled after the root stream has been dropped.
NB: This adapter requires an
unstable
feature
§Warning
If the root Stream
is not polled, ReflectHandle
streams will
never receive any events. This will cause the streams to deadlock since
the root stream will apply backpressure when downstream readers are not
consuming events.
§Usage
use kube::{Api, Client, ResourceExt};
use kube_runtime::{watcher, WatchStreamExt, reflector};
use k8s_openapi::api::apps::v1::Deployment;
let deploys: Api<Deployment> = Api::default_namespaced(client);
let subscriber_buf_sz = 100;
let (reader, writer) = reflector::store_shared::<Deployment>(subscriber_buf_sz);
let subscriber = &writer.subscribe().unwrap();
tokio::spawn(async move {
// start polling the store once the reader is ready
reader.wait_until_ready().await.unwrap();
loop {
let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
info!("Current {} deploys: {:?}", names.len(), names);
tokio::time::sleep(Duration::from_secs(10)).await;
}
});
// configure the watcher stream and populate the store while polling
watcher(deploys, watcher::Config::default())
.reflect_shared(writer)
.applied_objects()
.for_each(|res| async move {
match res {
Ok(o) => info!("saw in root stream {}", o.name_any()),
Err(e) => warn!("watcher error in root stream: {}", e),
}
})
.await;
// subscriber can be used to receive applied_objects
subscriber.for_each(|obj| async move {
info!("saw in subscriber {}", &obj.name_any())
}).await;