1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
//! Watches a Kubernetes Resource for changes, with error recovery
use derivative::Derivative;
use futures::{stream::BoxStream, Stream, StreamExt};
use kube::{
api::{ListParams, Resource, ResourceExt, WatchEvent},
Api,
};
use serde::de::DeserializeOwned;
use smallvec::SmallVec;
use snafu::{Backtrace, ResultExt, Snafu};
use std::{clone::Clone, fmt::Debug};
#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("failed to perform initial object list: {}", source))]
InitialListFailed {
source: kube::Error,
backtrace: Backtrace,
},
#[snafu(display("failed to start watching object: {}", source))]
WatchStartFailed {
source: kube::Error,
backtrace: Backtrace,
},
#[snafu(display("error returned by apiserver during watch: {}", source))]
WatchError {
source: kube::error::ErrorResponse,
backtrace: Backtrace,
},
#[snafu(display("watch stream failed: {}", source))]
WatchFailed {
source: kube::Error,
backtrace: Backtrace,
},
#[snafu(display("too many objects matched search criteria"))]
TooManyObjects { backtrace: Backtrace },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone)]
/// Watch events returned from the [`watcher`]
pub enum Event<K> {
/// An object was added or modified
Applied(K),
/// An object was deleted
///
/// NOTE: This should not be used for managing persistent state elsewhere, since
/// events may be lost if the watcher is unavailable. Use Finalizers instead.
Deleted(K),
/// The watch stream was restarted, so `Deleted` events may have been missed
///
/// Should be used as a signal to replace the store contents atomically.
///
/// Any objects that were previously [`Applied`](Event::Applied) but are not listed in this event
/// should be assumed to have been [`Deleted`](Event::Deleted).
Restarted(Vec<K>),
}
impl<K> Event<K> {
/// Flattens out all objects that were added or modified in the event.
///
/// `Deleted` objects are ignored, all objects mentioned by `Restarted` events are
/// emitted individually.
pub fn into_iter_applied(self) -> impl Iterator<Item = K> {
match self {
Event::Applied(obj) => SmallVec::from_buf([obj]),
Event::Deleted(_) => SmallVec::new(),
Event::Restarted(objs) => SmallVec::from_vec(objs),
}
.into_iter()
}
/// Flattens out all objects that were added, modified, or deleted in the event.
///
/// Note that `Deleted` events may be missed when restarting the stream. Use finalizers
/// or owner references instead if you care about cleaning up external resources after
/// deleted objects.
pub fn into_iter_touched(self) -> impl Iterator<Item = K> {
match self {
Event::Applied(obj) | Event::Deleted(obj) => SmallVec::from_buf([obj]),
Event::Restarted(objs) => SmallVec::from_vec(objs),
}
.into_iter()
}
}
#[derive(Derivative)]
#[derivative(Debug)]
/// The internal finite state machine driving the [`watcher`]
enum State<K: Resource + Clone> {
/// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
Empty,
/// The initial LIST was successful, so we should move on to starting the actual watch.
InitListed { resource_version: String },
/// The watch is in progress, from this point we just return events from the server.
///
/// If the connection is disrupted then we propagate the error but try to restart the watch stream by
/// returning to the `InitListed` state.
/// If we fall out of the K8s watch window then we propagate the error and fall back doing a re-list
/// with `Empty`.
Watching {
resource_version: String,
#[derivative(Debug = "ignore")]
stream: BoxStream<'static, kube::Result<WatchEvent<K>>>,
},
}
/// Progresses the watcher a single step, returning (event, state)
///
/// This function should be trampolined: if event == `None`
/// then the function should be called again until it returns a Some.
async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: &Api<K>,
list_params: &ListParams,
state: State<K>,
) -> (Option<Result<Event<K>>>, State<K>) {
match state {
State::Empty => match api.list(list_params).await {
Ok(list) => (Some(Ok(Event::Restarted(list.items))), State::InitListed {
resource_version: list.metadata.resource_version.unwrap(),
}),
Err(err) => (Some(Err(err).context(InitialListFailed)), State::Empty),
},
State::InitListed { resource_version } => match api.watch(list_params, &resource_version).await {
Ok(stream) => (None, State::Watching {
resource_version,
stream: stream.boxed(),
}),
Err(err) => (Some(Err(err).context(WatchStartFailed)), State::InitListed {
resource_version,
}),
},
State::Watching {
resource_version,
mut stream,
} => match stream.next().await {
Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
let resource_version = obj.resource_version().unwrap();
(Some(Ok(Event::Applied(obj))), State::Watching {
resource_version,
stream,
})
}
Some(Ok(WatchEvent::Deleted(obj))) => {
let resource_version = obj.resource_version().unwrap();
(Some(Ok(Event::Deleted(obj))), State::Watching {
resource_version,
stream,
})
}
Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
resource_version: bm.metadata.resource_version,
stream,
}),
Some(Ok(WatchEvent::Error(err))) => {
// HTTP GONE, means we have desynced and need to start over and re-list :(
let new_state = if err.code == 410 {
State::Empty
} else {
State::Watching {
resource_version,
stream,
}
};
(Some(Err(err).context(WatchError)), new_state)
}
Some(Err(err)) => (Some(Err(err).context(WatchFailed)), State::Watching {
resource_version,
stream,
}),
None => (None, State::InitListed { resource_version }),
},
}
}
/// Trampoline helper for `step_trampolined`
async fn step<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: &Api<K>,
list_params: &ListParams,
mut state: State<K>,
) -> (Result<Event<K>>, State<K>) {
loop {
match step_trampolined(api, list_params, state).await {
(Some(result), new_state) => return (result, new_state),
(None, new_state) => state = new_state,
}
}
}
/// Watches a Kubernetes Resource for changes continuously
///
/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors.
///
/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
/// You can apply your own backoff by not polling the stream for a duration after errors.
/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
/// will terminate eagerly as soon as they receive an [`Err`].
///
/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`],
/// direct users may want to flatten composite events with [`try_flatten_applied`]:
///
/// ```no_run
/// use kube::{api::{Api, ListParams, ResourceExt}, Client};
/// use kube_runtime::{utils::try_flatten_applied, watcher};
/// use k8s_openapi::api::core::v1::Pod;
/// use futures::{StreamExt, TryStreamExt};
/// #[tokio::main]
/// async fn main() -> Result<(), kube_runtime::watcher::Error> {
/// let client = Client::try_default().await.unwrap();
/// let pods: Api<Pod> = Api::namespaced(client, "apps");
/// let watcher = watcher(pods, ListParams::default());
/// try_flatten_applied(watcher)
/// .try_for_each(|p| async move {
/// println!("Applied: {}", p.name());
/// Ok(())
/// })
/// .await?;
/// Ok(())
/// }
/// ```
/// [`try_flatten_applied`]: super::utils::try_flatten_applied
/// [`reflector`]: super::reflector::reflector
/// [`Api::watch`]: kube::Api::watch
///
/// # Recovery
///
/// (The details of recovery are considered an implementation detail and should not be relied on to be stable, but are
/// documented here for posterity.)
///
/// If the watch connection is interrupted then we attempt to restart the watch using the last
/// [resource versions](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
/// an [`Event::Restarted`].
pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: Api<K>,
list_params: ListParams,
) -> impl Stream<Item = Result<Event<K>>> + Send {
futures::stream::unfold(
(api, list_params, State::Empty),
|(api, list_params, state)| async {
let (event, state) = step(&api, &list_params, state).await;
Some((event, (api, list_params, state)))
},
)
}
/// Watch a single named object for updates
///
/// Emits `None` if the object is deleted (or not found), and `Some` if an object is updated (or created/found).
///
/// Compared to [`watcher`], `watch_object` does not return return [`Event`], since there is no need for an atomic
/// [`Event::Restarted`] when only one object is covered anyway.
pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: Api<K>,
name: &str,
) -> impl Stream<Item = Result<Option<K>>> + Send {
watcher(api, ListParams {
field_selector: Some(format!("metadata.name={}", name)),
..Default::default()
})
.map(|event| match event? {
Event::Deleted(_) => Ok(None),
// We're filtering by object name, so getting more than one object means that either:
// 1. The apiserver is accepting multiple objects with the same name, or
// 2. The apiserver is ignoring our query
// In either case, the K8s apiserver is broken and our API will return invalid data, so
// we had better bail out ASAP.
Event::Restarted(objs) if objs.len() > 1 => TooManyObjects.fail(),
Event::Restarted(mut objs) => Ok(objs.pop()),
Event::Applied(obj) => Ok(Some(obj)),
})
}