1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
#[cfg(feature = "unstable-runtime-subscribe")]
use crate::utils::stream_subscribe::StreamSubscribe;
use crate::{
utils::{event_flatten::EventFlatten, stream_backoff::StreamBackoff},
watcher,
};
use backoff::backoff::Backoff;
use futures::{Stream, TryStream};
/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector)
pub trait WatchStreamExt: Stream {
/// Apply a [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`]
fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
where
B: Backoff,
Self: TryStream + Sized,
{
StreamBackoff::new(self, b)
}
/// Flatten a [`watcher()`] stream into a stream of applied objects
///
/// All Added/Modified events are passed through, and critical errors bubble up.
fn applied_objects<K>(self) -> EventFlatten<Self, K>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
{
EventFlatten::new(self, false)
}
/// Flatten a [`watcher()`] stream into a stream of touched objects
///
/// All Added/Modified/Deleted events are passed through, and critical errors bubble up.
fn touched_objects<K>(self) -> EventFlatten<Self, K>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
{
EventFlatten::new(self, true)
}
/// Create a [`StreamSubscribe`] from a [`watcher()`] stream.
///
/// The [`StreamSubscribe::subscribe()`] method which allows additional consumers
/// of events from a stream without consuming the stream itself.
///
/// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`]
/// error. The subscriber can then decide to abort its task or tolerate the lost events.
///
/// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams
/// will also end.
///
/// ## Warning
///
/// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams
/// will never receive any events.
///
/// # Usage
///
/// ```
/// use futures::{Stream, StreamExt};
/// use std::{fmt::Debug, sync::Arc};
/// use kube_runtime::{watcher, WatchStreamExt};
///
/// fn explain_events<K, S>(
/// stream: S,
/// ) -> (
/// impl Stream<Item = Arc<Result<watcher::Event<K>, watcher::Error>>> + Send + Sized + 'static,
/// impl Stream<Item = String> + Send + Sized + 'static,
/// )
/// where
/// K: Debug + Send + Sync + 'static,
/// S: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
/// {
/// // Create a stream that can be subscribed to
/// let stream_subscribe = stream.stream_subscribe();
/// // Create a subscription to that stream
/// let subscription = stream_subscribe.subscribe();
///
/// // Create a stream of descriptions of the events
/// let explain_stream = subscription.filter_map(|event| async move {
/// // We don't care about lagged events so we can throw that error away
/// match event.ok()?.as_ref() {
/// Ok(watcher::Event::Applied(event)) => {
/// Some(format!("An object was added or modified: {event:?}"))
/// }
/// Ok(_) => todo!("explain other events"),
/// // We don't care about watcher errors either
/// Err(_) => None,
/// }
/// });
///
/// // We now still have the original stream, and a secondary stream of explanations
/// (stream_subscribe, explain_stream)
/// }
/// ```
#[cfg(feature = "unstable-runtime-subscribe")]
fn stream_subscribe<K>(self) -> StreamSubscribe<Self>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
{
StreamSubscribe::new(self)
}
}
impl<St: ?Sized> WatchStreamExt for St where St: Stream {}