use crate::{
reflector::{
reflector,
store::{Store, Writer},
ErasedResource, ObjectRef,
},
scheduler::{self, scheduler, ScheduleRequest},
utils::{try_flatten_applied, try_flatten_touched, trystream_try_via},
watcher::{self, watcher},
};
use derivative::Derivative;
use futures::{
channel, future,
stream::{self, SelectAll},
FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
};
use kube::api::{Api, ListParams, Meta};
use serde::de::DeserializeOwned;
use snafu::{futures::TryStreamExt as SnafuTryStreamExt, Backtrace, OptionExt, ResultExt, Snafu};
use std::{pin::Pin, sync::Arc, time::Duration};
use tokio::time::Instant;
#[derive(Snafu, Debug)]
pub enum Error<ReconcilerErr: std::error::Error + 'static, QueueErr: std::error::Error + 'static> {
ObjectNotFound {
obj_ref: ObjectRef<ErasedResource>,
backtrace: Backtrace,
},
ReconcilerFailed {
source: ReconcilerErr,
backtrace: Backtrace,
},
SchedulerDequeueFailed {
#[snafu(backtrace)]
source: scheduler::Error,
},
QueueError {
source: QueueErr,
backtrace: Backtrace,
},
}
#[derive(Debug, Clone)]
pub struct ReconcilerAction {
pub requeue_after: Option<Duration>,
}
pub fn trigger_with<T, K, I, S>(
stream: S,
mapper: impl Fn(T) -> I,
) -> impl Stream<Item = Result<ObjectRef<K>, S::Error>>
where
S: TryStream<Ok = T>,
I: IntoIterator<Item = ObjectRef<K>>,
K: Meta,
{
stream
.map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Ok)))
.try_flatten()
}
pub fn trigger_self<S>(stream: S) -> impl Stream<Item = Result<ObjectRef<S::Ok>, S::Error>>
where
S: TryStream,
S::Ok: Meta,
{
trigger_with(stream, |obj| Some(ObjectRef::from_obj(&obj)))
}
pub fn trigger_owners<KOwner, S>(stream: S) -> impl Stream<Item = Result<ObjectRef<KOwner>, S::Error>>
where
S: TryStream,
S::Ok: Meta,
KOwner: Meta,
{
trigger_with(stream, |obj| {
let meta = obj.meta().clone();
let ns = meta.namespace;
meta.owner_references
.into_iter()
.flatten()
.flat_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner))
})
}
#[derive(Debug, Derivative)]
#[derivative(Clone(bound = ""))]
pub struct Context<T>(Arc<T>);
impl<T> Context<T> {
pub fn new(state: T) -> Context<T> {
Context(Arc::new(state))
}
pub fn get_ref(&self) -> &T {
self.0.as_ref()
}
pub fn into_inner(self) -> Arc<T> {
self.0
}
}
pub fn applier<K, QueueStream, ReconcilerFut, T>(
mut reconciler: impl FnMut(K, Context<T>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
context: Context<T>,
store: Store<K>,
queue: QueueStream,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Meta + 'static,
ReconcilerFut: TryFuture<Ok = ReconcilerAction>,
ReconcilerFut::Error: std::error::Error + 'static,
QueueStream: TryStream<Ok = ObjectRef<K>>,
QueueStream::Error: std::error::Error + 'static,
{
let err_context = context.clone();
let (scheduler_tx, scheduler_rx) = channel::mpsc::channel::<ScheduleRequest<ObjectRef<K>>>(100);
trystream_try_via(
Box::pin(stream::select(
queue.context(QueueError).map_ok(|obj_ref| ScheduleRequest {
message: obj_ref,
run_at: Instant::now() + Duration::from_millis(1),
}),
scheduler_rx.map(Ok),
)),
|s| scheduler(s).context(SchedulerDequeueFailed),
)
.and_then(move |obj_ref| {
future::ready(
store
.get(&obj_ref)
.context(ObjectNotFound {
obj_ref: obj_ref.clone(),
})
.map(|obj| (obj_ref, obj)),
)
})
.and_then(move |(obj_ref, obj)| {
reconciler(obj, context.clone())
.into_future()
.map(|result| (obj_ref, result))
.map(Ok)
})
.and_then(move |(obj_ref, reconciler_result)| {
let ReconcilerAction { requeue_after } = match &reconciler_result {
Ok(action) => action.clone(),
Err(err) => error_policy(err, err_context.clone()),
};
let mut scheduler_tx = scheduler_tx.clone();
async move {
if let Some(delay) = requeue_after {
scheduler_tx
.send(ScheduleRequest {
message: obj_ref.clone(),
run_at: Instant::now() + delay,
})
.await
.expect("Message could not be sent to scheduler_rx");
}
reconciler_result
.map(|action| (obj_ref, action))
.context(ReconcilerFailed)
}
})
}
pub struct Controller<K>
where
K: Clone + Meta + 'static,
{
selector: SelectAll<Pin<Box<dyn Stream<Item = Result<ObjectRef<K>, watcher::Error>>>>>,
reader: Store<K>,
}
impl<K> Controller<K>
where
K: Clone + Meta + DeserializeOwned + 'static,
{
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self {
let writer = Writer::<K>::default();
let reader = writer.as_reader();
let mut selector = stream::SelectAll::new();
let self_watcher =
trigger_self(try_flatten_applied(reflector(writer, watcher(owned_api, lp)))).boxed_local();
selector.push(self_watcher);
Self { selector, reader }
}
pub fn store(&self) -> Store<K> {
self.reader.clone()
}
pub fn owns<Child: Clone + Meta + DeserializeOwned + 'static>(
mut self,
api: Api<Child>,
lp: ListParams,
) -> Self {
let child_watcher = trigger_owners(try_flatten_touched(watcher(api, lp)));
self.selector.push(Box::pin(child_watcher));
self
}
pub fn watches<
Other: Clone + Meta + DeserializeOwned + 'static,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
>(
mut self,
api: Api<Other>,
lp: ListParams,
mapper: impl Fn(Other) -> I + 'static,
) -> Self {
let other_watcher = trigger_with(try_flatten_touched(watcher(api, lp)), mapper);
self.selector.push(Box::pin(other_watcher));
self
}
pub fn run<ReconcilerFut, T>(
self,
reconciler: impl FnMut(K, Context<T>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
context: Context<T>,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, watcher::Error>>>
where
K: Clone + Meta + 'static,
ReconcilerFut: TryFuture<Ok = ReconcilerAction>,
ReconcilerFut::Error: std::error::Error + 'static,
{
applier(reconciler, error_policy, context, self.reader, self.selector)
}
}