kube_runtime::watcher

Function metadata_watcher

source
pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
    api: Api<K>,
    watcher_config: Config,
) -> impl Stream<Item = Result<Event<PartialObjectMeta<K>>>> + Send
Expand description

Watches a Kubernetes Resource for changes continuously and receives only the metadata

Compared to Api::watch_metadata, this automatically tries to recover the stream upon errors.

Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll. You can apply your own backoff by not polling the stream for a duration after errors. Keep in mind that some TryStream combinators (such as try_for_each and try_concat) will terminate eagerly as soon as they receive an Err.

The events are intended to provide a safe input interface for a state store like a reflector. Direct users may want to use WatchStreamExt for higher-level constructs.

use kube::{
  api::{Api, ResourceExt}, Client,
  runtime::{watcher, metadata_watcher, WatchStreamExt}
};
use k8s_openapi::api::core::v1::Pod;
use futures::TryStreamExt;
#[tokio::main]
async fn main() -> Result<(), watcher::Error> {
    let client = Client::try_default().await.unwrap();
    let pods: Api<Pod> = Api::namespaced(client, "apps");

    metadata_watcher(pods, watcher::Config::default()).applied_objects()
        .try_for_each(|p| async move {
         println!("Applied: {}", p.name_any());
            Ok(())
        })
        .await?;
    Ok(())
}

ยงRecovery

The stream will attempt to be recovered on the next poll after an Err is returned. This will normally happen immediately, but you can use StreamBackoff to introduce an artificial delay. [default_backoff] returns a suitable default set of parameters.

If the watch connection is interrupted, then watcher will attempt to restart the watch using the last resource version that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off. If this fails because the resource version is no longer valid then we start over with a new stream, starting with an Event::Init. The internals mechanics of recovery should be considered an implementation detail.