1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use futures::TryStreamExt;
use kube::{Api, Resource};
use serde::de::DeserializeOwned;
use snafu::{futures::TryStreamExt as _, Snafu};
use std::fmt::Debug;
use crate::watcher::{self, watch_object};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("failed to probe for whether the condition is fulfilled yet: {}", source))]
ProbeFailed {
#[snafu(backtrace)]
source: watcher::Error,
},
}
pub async fn await_condition<K>(
api: Api<K>,
name: &str,
mut cond: impl FnMut(Option<&K>) -> bool,
) -> Result<(), Error>
where
K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
{
watch_object(api, name)
.context(ProbeFailed)
.try_take_while(|obj| {
let result = !cond(obj.as_ref());
async move { Ok(result) }
})
.try_for_each(|_| async { Ok(()) })
.await
}
pub mod conditions {
use kube::Resource;
pub fn is_deleted<K: Resource>(uid: &str) -> impl Fn(Option<&K>) -> bool + '_ {
move |obj: Option<&K>| {
obj.map_or(
true,
|obj| obj.meta().uid.as_deref() != Some(uid),
)
}
}
}
pub mod delete {
use super::{await_condition, conditions};
use kube::{api::DeleteParams, Api, Resource};
use serde::de::DeserializeOwned;
use snafu::{OptionExt, ResultExt, Snafu};
use std::fmt::Debug;
#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("deleted object has no UID to wait for"))]
NoUid,
#[snafu(display("failed to delete object: {}", source))]
Delete { source: kube::Error },
#[snafu(display("failed to wait for object to be deleted: {}", source))]
Await { source: super::Error },
}
#[allow(clippy::module_name_repetitions)]
pub async fn delete_and_finalize<K: Clone + Debug + Send + DeserializeOwned + Resource + 'static>(
api: Api<K>,
name: &str,
delete_params: &DeleteParams,
) -> Result<(), Error> {
let deleted_obj_uid = api
.delete(name, delete_params)
.await
.context(Delete)?
.either(
|mut obj| obj.meta_mut().uid.take(),
|status| status.details.map(|details| details.uid),
)
.context(NoUid)?;
await_condition(api, name, conditions::is_deleted(&deleted_obj_uid))
.await
.context(Await)
}
}