use derivative::Derivative;
use futures::{stream::BoxStream, Stream, StreamExt};
use kube::{
api::{ListParams, Meta, WatchEvent},
Api,
};
use serde::de::DeserializeOwned;
use smallvec::SmallVec;
use snafu::{Backtrace, ResultExt, Snafu};
use std::clone::Clone;
#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("failed to perform initial object list: {}", source))]
InitialListFailed {
source: kube::Error,
backtrace: Backtrace,
},
#[snafu(display("failed to start watching object: {}", source))]
WatchStartFailed {
source: kube::Error,
backtrace: Backtrace,
},
#[snafu(display("error returned by apiserver during watch: {}", source))]
WatchError {
source: kube::error::ErrorResponse,
backtrace: Backtrace,
},
#[snafu(display("watch stream failed: {}", source))]
WatchFailed {
source: kube::Error,
backtrace: Backtrace,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Clone)]
pub enum Event<K> {
Applied(K),
Deleted(K),
Restarted(Vec<K>),
}
impl<K> Event<K> {
pub fn into_iter_applied(self) -> impl Iterator<Item = K> {
match self {
Event::Applied(obj) => SmallVec::from_buf([obj]),
Event::Deleted(_) => SmallVec::new(),
Event::Restarted(objs) => SmallVec::from_vec(objs),
}
.into_iter()
}
pub fn into_iter_touched(self) -> impl Iterator<Item = K> {
match self {
Event::Applied(obj) | Event::Deleted(obj) => SmallVec::from_buf([obj]),
Event::Restarted(objs) => SmallVec::from_vec(objs),
}
.into_iter()
}
}
#[derive(Derivative)]
#[derivative(Debug)]
enum State<K: Meta + Clone> {
Empty,
InitListed { resource_version: String },
Watching {
resource_version: String,
#[derivative(Debug = "ignore")]
stream: BoxStream<'static, kube::Result<WatchEvent<K>>>,
},
}
async fn step_trampolined<K: Meta + Clone + DeserializeOwned + Send + 'static>(
api: &Api<K>,
list_params: &ListParams,
state: State<K>,
) -> (Option<Result<Event<K>>>, State<K>) {
match state {
State::Empty => match api.list(&list_params).await {
Ok(list) => (Some(Ok(Event::Restarted(list.items))), State::InitListed {
resource_version: list.metadata.resource_version.unwrap(),
}),
Err(err) => (Some(Err(err).context(InitialListFailed)), State::Empty),
},
State::InitListed { resource_version } => match api.watch(&list_params, &resource_version).await {
Ok(stream) => (None, State::Watching {
resource_version,
stream: stream.boxed(),
}),
Err(err) => (Some(Err(err).context(WatchStartFailed)), State::InitListed {
resource_version,
}),
},
State::Watching {
resource_version,
mut stream,
} => match stream.next().await {
Some(Ok(WatchEvent::Added(obj))) | Some(Ok(WatchEvent::Modified(obj))) => {
let resource_version = obj.resource_ver().unwrap();
(Some(Ok(Event::Applied(obj))), State::Watching {
resource_version,
stream,
})
}
Some(Ok(WatchEvent::Deleted(obj))) => {
let resource_version = obj.resource_ver().unwrap();
(Some(Ok(Event::Deleted(obj))), State::Watching {
resource_version,
stream,
})
}
Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
resource_version: bm.metadata.resource_version.clone(),
stream,
}),
Some(Ok(WatchEvent::Error(err))) => {
let new_state = if err.code == 410 {
State::Empty
} else {
State::Watching {
resource_version,
stream,
}
};
(Some(Err(err).context(WatchError)), new_state)
}
Some(Err(err)) => (Some(Err(err).context(WatchFailed)), State::Watching {
resource_version,
stream,
}),
None => (None, State::InitListed { resource_version }),
},
}
}
async fn step<K: Meta + Clone + DeserializeOwned + Send + 'static>(
api: &Api<K>,
list_params: &ListParams,
mut state: State<K>,
) -> (Result<Event<K>>, State<K>) {
loop {
match step_trampolined(&api, &list_params, state).await {
(Some(result), new_state) => return (result, new_state),
(None, new_state) => state = new_state,
}
}
}
pub fn watcher<K: Meta + Clone + DeserializeOwned + Send + 'static>(
api: Api<K>,
list_params: ListParams,
) -> impl Stream<Item = Result<Event<K>>> + Send {
futures::stream::unfold(
(api, list_params, State::Empty),
|(api, list_params, state)| async {
let (event, state) = step(&api, &list_params, state).await;
Some((event, (api, list_params, state)))
},
)
}