kube_runtime/
wait.rs

1//! Waits for objects to reach desired states
2use std::{future, pin::pin};
3
4use futures::TryStreamExt;
5use kube_client::{Api, Resource};
6use serde::de::DeserializeOwned;
7use std::fmt::Debug;
8use thiserror::Error;
9
10use crate::watcher::{self, watch_object};
11
12#[derive(Debug, Error)]
13pub enum Error {
14    #[error("failed to probe for whether the condition is fulfilled yet: {0}")]
15    ProbeFailed(#[source] watcher::Error),
16}
17
18/// Watch an object, and wait for some condition `cond` to return `true`.
19///
20/// `cond` is passed `Some` if the object is found, otherwise `None`.
21///
22/// The object is returned when the condition is fulfilled.
23///
24/// # Caveats
25///
26/// Keep in mind that the condition is typically fulfilled by an external service, which might not even be available. `await_condition`
27/// does *not* automatically add a timeout. If this is desired, wrap it in [`tokio::time::timeout`].
28///
29/// # Errors
30///
31/// Fails if the type is not known to the Kubernetes API, or if the [`Api`] does not have
32/// permission to `watch` and `list` it.
33///
34/// Does *not* fail if the object is not found.
35///
36/// # Usage
37///
38/// ```
39/// use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
40/// use kube::{Api, runtime::wait::{await_condition, conditions}};
41/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
42/// # let client: kube::Client = todo!();
43///
44/// let crds: Api<CustomResourceDefinition> = Api::all(client);
45/// // .. create or apply a crd here ..
46/// let establish = await_condition(crds, "foos.clux.dev", conditions::is_crd_established());
47/// let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await?;
48/// # Ok(())
49/// # }
50/// ```
51#[allow(clippy::missing_panics_doc)] // watch never actually terminates, expect cannot fail
52pub async fn await_condition<K>(api: Api<K>, name: &str, cond: impl Condition<K>) -> Result<Option<K>, Error>
53where
54    K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
55{
56    // Skip updates until the condition is satisfied.
57    let mut stream = pin!(watch_object(api, name).try_skip_while(|obj| {
58        let matches = cond.matches_object(obj.as_ref());
59        future::ready(Ok(!matches))
60    }));
61
62    // Then take the first update that satisfies the condition.
63    let obj = stream
64        .try_next()
65        .await
66        .map_err(Error::ProbeFailed)?
67        .expect("stream must not terminate");
68    Ok(obj)
69}
70
71/// A trait for condition functions to be used by [`await_condition`]
72///
73/// Note that this is auto-implemented for functions of type `fn(Option<&K>) -> bool`.
74///
75/// # Usage
76///
77/// ```
78/// use kube::runtime::wait::Condition;
79/// use k8s_openapi::api::core::v1::Pod;
80/// fn my_custom_condition(my_cond: &str) -> impl Condition<Pod> + '_ {
81///     move |obj: Option<&Pod>| {
82///         if let Some(pod) = &obj {
83///             if let Some(status) = &pod.status {
84///                 if let Some(conds) = &status.conditions {
85///                     if let Some(pcond) = conds.iter().find(|c| c.type_ == my_cond) {
86///                         return pcond.status == "True";
87///                     }
88///                 }
89///             }
90///         }
91///         false
92///     }
93/// }
94/// ```
95pub trait Condition<K> {
96    fn matches_object(&self, obj: Option<&K>) -> bool;
97
98    /// Returns a `Condition` that holds if `self` does not
99    ///
100    /// # Usage
101    ///
102    /// ```
103    /// # use kube_runtime::wait::Condition;
104    /// let condition: fn(Option<&()>) -> bool = |_| true;
105    /// assert!(condition.matches_object(None));
106    /// assert!(!condition.not().matches_object(None));
107    /// ```
108    fn not(self) -> conditions::Not<Self>
109    where
110        Self: Sized,
111    {
112        conditions::Not(self)
113    }
114
115    /// Returns a `Condition` that holds if `self` and `other` both do
116    ///
117    /// # Usage
118    ///
119    /// ```
120    /// # use kube_runtime::wait::Condition;
121    /// let cond_false: fn(Option<&()>) -> bool = |_| false;
122    /// let cond_true: fn(Option<&()>) -> bool = |_| true;
123    /// assert!(!cond_false.and(cond_false).matches_object(None));
124    /// assert!(!cond_false.and(cond_true).matches_object(None));
125    /// assert!(!cond_true.and(cond_false).matches_object(None));
126    /// assert!(cond_true.and(cond_true).matches_object(None));
127    /// ```
128    fn and<Other: Condition<K>>(self, other: Other) -> conditions::And<Self, Other>
129    where
130        Self: Sized,
131    {
132        conditions::And(self, other)
133    }
134
135    /// Returns a `Condition` that holds if either `self` or `other` does
136    ///
137    /// # Usage
138    ///
139    /// ```
140    /// # use kube_runtime::wait::Condition;
141    /// let cond_false: fn(Option<&()>) -> bool = |_| false;
142    /// let cond_true: fn(Option<&()>) -> bool = |_| true;
143    /// assert!(!cond_false.or(cond_false).matches_object(None));
144    /// assert!(cond_false.or(cond_true).matches_object(None));
145    /// assert!(cond_true.or(cond_false).matches_object(None));
146    /// assert!(cond_true.or(cond_true).matches_object(None));
147    /// ```
148    fn or<Other: Condition<K>>(self, other: Other) -> conditions::Or<Self, Other>
149    where
150        Self: Sized,
151    {
152        conditions::Or(self, other)
153    }
154}
155
156impl<K, F: Fn(Option<&K>) -> bool> Condition<K> for F {
157    fn matches_object(&self, obj: Option<&K>) -> bool {
158        (self)(obj)
159    }
160}
161
162/// Common conditions to wait for
163pub mod conditions {
164    pub use super::Condition;
165    use k8s_openapi::{
166        api::{
167            apps::v1::Deployment,
168            batch::v1::Job,
169            core::v1::{Pod, Service},
170            networking::v1::Ingress,
171        },
172        apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
173    };
174    use kube_client::Resource;
175
176    /// An await condition that returns `true` once the object has been deleted.
177    ///
178    /// An object is considered to be deleted if the object can no longer be found, or if its
179    /// [`uid`](kube_client::api::ObjectMeta#structfield.uid) changes. This means that an object is considered to be deleted even if we miss
180    /// the deletion event and the object is recreated in the meantime.
181    #[must_use]
182    pub fn is_deleted<K: Resource>(uid: &str) -> impl Condition<K> + '_ {
183        move |obj: Option<&K>| {
184            obj.map_or(
185                // Object is not found, success!
186                true,
187                // Object is found, but a changed uid would mean that it was deleted and recreated
188                |obj| obj.meta().uid.as_deref() != Some(uid),
189            )
190        }
191    }
192
193    /// An await condition for `CustomResourceDefinition` that returns `true` once it has been accepted and established
194    ///
195    /// Note that this condition only guarantees you that you can use `Api<CustomResourceDefinition>` when it is ready.
196    /// It usually takes extra time for Discovery to notice the custom resource, and there is no condition for this.
197    #[must_use]
198    pub fn is_crd_established() -> impl Condition<CustomResourceDefinition> {
199        |obj: Option<&CustomResourceDefinition>| {
200            if let Some(o) = obj {
201                if let Some(s) = &o.status {
202                    if let Some(conds) = &s.conditions {
203                        if let Some(pcond) = conds.iter().find(|c| c.type_ == "Established") {
204                            return pcond.status == "True";
205                        }
206                    }
207                }
208            }
209            false
210        }
211    }
212
213    /// An await condition for `Pod` that returns `true` once it is running
214    #[must_use]
215    pub fn is_pod_running() -> impl Condition<Pod> {
216        |obj: Option<&Pod>| {
217            if let Some(pod) = &obj {
218                if let Some(status) = &pod.status {
219                    if let Some(phase) = &status.phase {
220                        return phase == "Running";
221                    }
222                }
223            }
224            false
225        }
226    }
227
228    /// An await condition for `Job` that returns `true` once it is completed
229    #[must_use]
230    pub fn is_job_completed() -> impl Condition<Job> {
231        |obj: Option<&Job>| {
232            if let Some(job) = &obj {
233                if let Some(s) = &job.status {
234                    if let Some(conds) = &s.conditions {
235                        if let Some(pcond) = conds.iter().find(|c| c.type_ == "Complete") {
236                            return pcond.status == "True";
237                        }
238                    }
239                }
240            }
241            false
242        }
243    }
244
245    /// An await condition for `Deployment` that returns `true` once the latest deployment has completed
246    ///
247    /// This looks for the condition that Kubernetes sets for completed deployments:
248    /// <https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#complete-deployment>
249    #[must_use]
250    pub fn is_deployment_completed() -> impl Condition<Deployment> {
251        |obj: Option<&Deployment>| {
252            if let Some(depl) = &obj {
253                if let Some(s) = &depl.status {
254                    if let Some(conds) = &s.conditions {
255                        if let Some(dcond) = conds.iter().find(|c| {
256                            c.type_ == "Progressing" && c.reason == Some("NewReplicaSetAvailable".to_string())
257                        }) {
258                            return dcond.status == "True";
259                        }
260                    }
261                }
262            }
263            false
264        }
265    }
266
267    /// An await condition for `Service`s of type `LoadBalancer` that returns `true` once the backing load balancer has an external IP or hostname
268    #[must_use]
269    pub fn is_service_loadbalancer_provisioned() -> impl Condition<Service> {
270        |obj: Option<&Service>| {
271            if let Some(svc) = &obj {
272                // ignore services that are not type LoadBalancer (return true immediately)
273                if let Some(spec) = &svc.spec {
274                    if spec.type_ != Some("LoadBalancer".to_string()) {
275                        return true;
276                    }
277                    // carry on if this is a LoadBalancer service
278                    if let Some(s) = &svc.status {
279                        if let Some(lbs) = &s.load_balancer {
280                            if let Some(ings) = &lbs.ingress {
281                                return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
282                            }
283                        }
284                    }
285                }
286            }
287            false
288        }
289    }
290
291    /// An await condition for `Ingress` that returns `true` once the backing load balancer has an external IP or hostname
292    #[must_use]
293    pub fn is_ingress_provisioned() -> impl Condition<Ingress> {
294        |obj: Option<&Ingress>| {
295            if let Some(ing) = &obj {
296                if let Some(s) = &ing.status {
297                    if let Some(lbs) = &s.load_balancer {
298                        if let Some(ings) = &lbs.ingress {
299                            return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
300                        }
301                    }
302                }
303            }
304            false
305        }
306    }
307
308    /// See [`Condition::not`]
309    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
310    pub struct Not<A>(pub(super) A);
311    impl<A: Condition<K>, K> Condition<K> for Not<A> {
312        fn matches_object(&self, obj: Option<&K>) -> bool {
313            !self.0.matches_object(obj)
314        }
315    }
316
317    /// See [`Condition::and`]
318    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
319    pub struct And<A, B>(pub(super) A, pub(super) B);
320    impl<A, B, K> Condition<K> for And<A, B>
321    where
322        A: Condition<K>,
323        B: Condition<K>,
324    {
325        fn matches_object(&self, obj: Option<&K>) -> bool {
326            self.0.matches_object(obj) && self.1.matches_object(obj)
327        }
328    }
329
330    /// See [`Condition::or`]
331    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
332    pub struct Or<A, B>(pub(super) A, pub(super) B);
333    impl<A, B, K> Condition<K> for Or<A, B>
334    where
335        A: Condition<K>,
336        B: Condition<K>,
337    {
338        fn matches_object(&self, obj: Option<&K>) -> bool {
339            self.0.matches_object(obj) || self.1.matches_object(obj)
340        }
341    }
342
343    mod tests {
344        #[test]
345        /// pass when CRD is established
346        fn crd_established_ok() {
347            use super::{is_crd_established, Condition};
348
349            let crd = r#"
350                apiVersion: apiextensions.k8s.io/v1
351                kind: CustomResourceDefinition
352                metadata:
353                  name: testthings.kube.rs
354                spec:
355                  group: kube.rs
356                  names:
357                    categories: []
358                    kind: TestThing
359                    plural: testthings
360                    shortNames: []
361                    singular: testthing
362                  scope: Namespaced
363                  versions:
364                    - additionalPrinterColumns: []
365                      name: v1
366                      schema:
367                        openAPIV3Schema:
368                          type: object
369                          x-kubernetes-preserve-unknown-fields: true
370                      served: true
371                      storage: true
372                status:
373                  acceptedNames:
374                    kind: TestThing
375                    listKind: TestThingList
376                    plural: testthings
377                    singular: testthing
378                  conditions:
379                    - lastTransitionTime: "2025-03-06T03:10:03Z"
380                      message: no conflicts found
381                      reason: NoConflicts
382                      status: "True"
383                      type: NamesAccepted
384                    - lastTransitionTime: "2025-03-06T03:10:03Z"
385                      message: the initial names have been accepted
386                      reason: InitialNamesAccepted
387                      status: "True"
388                      type: Established
389                storedVersions:
390                  - v1
391            "#;
392
393            let c = serde_yaml::from_str(crd).unwrap();
394            assert!(is_crd_established().matches_object(Some(&c)))
395        }
396
397        #[test]
398        /// fail when CRD is not yet ready
399        fn crd_established_fail() {
400            use super::{is_crd_established, Condition};
401
402            let crd = r#"
403                apiVersion: apiextensions.k8s.io/v1
404                kind: CustomResourceDefinition
405                metadata:
406                  name: testthings.kube.rs
407                spec:
408                  group: kube.rs
409                  names:
410                    categories: []
411                    kind: TestThing
412                    plural: testthings
413                    shortNames: []
414                    singular: testthing
415                  scope: Namespaced
416                  versions:
417                    - additionalPrinterColumns: []
418                      name: v1
419                      schema:
420                        openAPIV3Schema:
421                          type: object
422                          x-kubernetes-preserve-unknown-fields: true
423                      served: true
424                      storage: true
425                status:
426                  acceptedNames:
427                    kind: TestThing
428                    listKind: TestThingList
429                    plural: testthings
430                    singular: testthing
431                  conditions:
432                    - lastTransitionTime: "2025-03-06T03:10:03Z"
433                      message: no conflicts found
434                      reason: NoConflicts
435                      status: "True"
436                      type: NamesAccepted
437                    - lastTransitionTime: "2025-03-06T03:10:03Z"
438                      message: the initial names have been accepted
439                      reason: InitialNamesAccepted
440                      status: "False"
441                      type: Established
442                storedVersions:
443                  - v1
444            "#;
445
446            let c = serde_yaml::from_str(crd).unwrap();
447            assert!(!is_crd_established().matches_object(Some(&c)))
448        }
449
450        #[test]
451        /// fail when CRD does not exist
452        fn crd_established_missing() {
453            use super::{is_crd_established, Condition};
454
455            assert!(!is_crd_established().matches_object(None))
456        }
457
458        #[test]
459        /// pass when pod is running
460        fn pod_running_ok() {
461            use super::{is_pod_running, Condition};
462
463            let pod = r#"
464                apiVersion: v1
465                kind: Pod
466                metadata:
467                  namespace: default
468                  name: testpod
469                spec:
470                  containers:
471                    - name: testcontainer
472                      image: alpine
473                      command: [ sleep ]
474                      args: [ "100000" ]
475                status:
476                  conditions:
477                    - lastProbeTime: null
478                      lastTransitionTime: "2025-03-06T03:53:07Z"
479                      status: "True"
480                      type: PodReadyToStartContainers
481                    - lastProbeTime: null
482                      lastTransitionTime: "2025-03-06T03:52:58Z"
483                      status: "True"
484                      type: Initialized
485                    - lastProbeTime: null
486                      lastTransitionTime: "2025-03-06T03:53:24Z"
487                      status: "True"
488                      type: Ready
489                    - lastProbeTime: null
490                      lastTransitionTime: "2025-03-06T03:53:24Z"
491                      status: "True"
492                      type: ContainersReady
493                    - lastProbeTime: null
494                      lastTransitionTime: "2025-03-06T03:52:58Z"
495                      status: "True"
496                      type: PodScheduled
497                  containerStatuses:
498                    - containerID: containerd://598323380ae59d60c1ab98f9091c94659137a976d52136a8083775d47fea5875
499                      image: docker.io/library/alpine:latest
500                      imageID: docker.io/library/alpine@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c
501                      lastState: {}
502                      name: testcontainer
503                      ready: true
504                      restartCount: 0
505                      started: true
506                      state:
507                        running:
508                          startedAt: "2025-03-06T03:59:20Z"
509                  phase: Running
510                  qosClass: Burstable
511            "#;
512
513            let p = serde_yaml::from_str(pod).unwrap();
514            assert!(is_pod_running().matches_object(Some(&p)))
515        }
516
517        #[test]
518        /// fail if pod is unschedulable
519        fn pod_running_unschedulable() {
520            use super::{is_pod_running, Condition};
521
522            let pod = r#"
523                apiVersion: v1
524                kind: Pod
525                metadata:
526                  namespace: default
527                  name: testpod
528                spec:
529                  containers:
530                    - name: testcontainer
531                      image: alpine
532                      command: [ sleep ]
533                      args: [ "100000" ]
534                status:
535                  conditions:
536                    - lastProbeTime: null
537                      lastTransitionTime: "2025-03-06T03:52:25Z"
538                      message: '0/1 nodes are available: 1 node(s) were unschedulable. preemption: 0/1
539                      nodes are available: 1 Preemption is not helpful for scheduling.'
540                      reason: Unschedulable
541                      status: "False"
542                      type: PodScheduled
543                  phase: Pending
544                  qosClass: Burstable
545            "#;
546
547            let p = serde_yaml::from_str(pod).unwrap();
548            assert!(!is_pod_running().matches_object(Some(&p)))
549        }
550
551        #[test]
552        /// fail if pod does not exist
553        fn pod_running_missing() {
554            use super::{is_pod_running, Condition};
555
556            assert!(!is_pod_running().matches_object(None))
557        }
558
559        #[test]
560        /// pass if job completed
561        fn job_completed_ok() {
562            use super::{is_job_completed, Condition};
563
564            let job = r#"
565                apiVersion: batch/v1
566                kind: Job
567                metadata:
568                  name: pi
569                  namespace: default
570                spec:
571                  template:
572                    spec:
573                      containers:
574                      - name: pi
575                        command:
576                        - perl
577                        - -Mbignum=bpi
578                        - -wle
579                        - print bpi(2000)
580                        image: perl:5.34.0
581                        imagePullPolicy: IfNotPresent
582                status:
583                  completionTime: "2025-03-06T05:27:56Z"
584                  conditions:
585                  - lastProbeTime: "2025-03-06T05:27:56Z"
586                    lastTransitionTime: "2025-03-06T05:27:56Z"
587                    message: Reached expected number of succeeded pods
588                    reason: CompletionsReached
589                    status: "True"
590                    type: SuccessCriteriaMet
591                  - lastProbeTime: "2025-03-06T05:27:56Z"
592                    lastTransitionTime: "2025-03-06T05:27:56Z"
593                    message: Reached expected number of succeeded pods
594                    reason: CompletionsReached
595                    status: "True"
596                    type: Complete
597                  ready: 0
598                  startTime: "2025-03-06T05:27:27Z"
599                  succeeded: 1
600                  terminating: 0
601                  uncountedTerminatedPods: {}
602            "#;
603
604            let j = serde_yaml::from_str(job).unwrap();
605            assert!(is_job_completed().matches_object(Some(&j)))
606        }
607
608        #[test]
609        /// fail if job is still in progress
610        fn job_completed_running() {
611            use super::{is_job_completed, Condition};
612
613            let job = r#"
614                apiVersion: batch/v1
615                kind: Job
616                metadata:
617                  name: pi
618                  namespace: default
619                spec:
620                  backoffLimit: 4
621                  completionMode: NonIndexed
622                  completions: 1
623                  manualSelector: false
624                  parallelism: 1
625                  template:
626                    spec:
627                      containers:
628                      - name: pi
629                        command:
630                        - perl
631                        - -Mbignum=bpi
632                        - -wle
633                        - print bpi(2000)
634                        image: perl:5.34.0
635                        imagePullPolicy: IfNotPresent
636                status:
637                  active: 1
638                  ready: 0
639                  startTime: "2025-03-06T05:27:27Z"
640                  terminating: 0
641                  uncountedTerminatedPods: {}
642            "#;
643
644            let j = serde_yaml::from_str(job).unwrap();
645            assert!(!is_job_completed().matches_object(Some(&j)))
646        }
647
648        #[test]
649        /// fail if job does not exist
650        fn job_completed_missing() {
651            use super::{is_job_completed, Condition};
652
653            assert!(!is_job_completed().matches_object(None))
654        }
655
656        #[test]
657        /// pass when deployment has been fully rolled out
658        fn deployment_completed_ok() {
659            use super::{is_deployment_completed, Condition};
660
661            let depl = r#"
662                apiVersion: apps/v1
663                kind: Deployment
664                metadata:
665                  name: testapp
666                  namespace: default
667                spec:
668                  progressDeadlineSeconds: 600
669                  replicas: 3
670                  revisionHistoryLimit: 10
671                  selector:
672                    matchLabels:
673                      app: test
674                  strategy:
675                    rollingUpdate:
676                      maxSurge: 25%
677                      maxUnavailable: 25%
678                    type: RollingUpdate
679                  template:
680                    metadata:
681                      creationTimestamp: null
682                      labels:
683                        app: test
684                    spec:
685                      containers:
686                      - image: postgres
687                        imagePullPolicy: Always
688                        name: postgres
689                        ports:
690                        - containerPort: 5432
691                          protocol: TCP
692                        env:
693                        - name: POSTGRES_PASSWORD
694                          value: foobar
695                status:
696                  availableReplicas: 3
697                  conditions:
698                  - lastTransitionTime: "2025-03-06T06:06:57Z"
699                    lastUpdateTime: "2025-03-06T06:06:57Z"
700                    message: Deployment has minimum availability.
701                    reason: MinimumReplicasAvailable
702                    status: "True"
703                    type: Available
704                  - lastTransitionTime: "2025-03-06T06:03:20Z"
705                    lastUpdateTime: "2025-03-06T06:06:57Z"
706                    message: ReplicaSet "testapp-7fcd4b58c9" has successfully progressed.
707                    reason: NewReplicaSetAvailable
708                    status: "True"
709                    type: Progressing
710                  observedGeneration: 2
711                  readyReplicas: 3
712                  replicas: 3
713                  updatedReplicas: 3
714            "#;
715
716            let d = serde_yaml::from_str(depl).unwrap();
717            assert!(is_deployment_completed().matches_object(Some(&d)))
718        }
719
720        #[test]
721        /// fail if deployment update is still rolling out
722        fn deployment_completed_pending() {
723            use super::{is_deployment_completed, Condition};
724
725            let depl = r#"
726                apiVersion: apps/v1
727                kind: Deployment
728                metadata:
729                  name: testapp
730                  namespace: default
731                spec:
732                  progressDeadlineSeconds: 600
733                  replicas: 3
734                  revisionHistoryLimit: 10
735                  selector:
736                    matchLabels:
737                      app: test
738                  strategy:
739                    rollingUpdate:
740                      maxSurge: 25%
741                      maxUnavailable: 25%
742                    type: RollingUpdate
743                  template:
744                    metadata:
745                      creationTimestamp: null
746                      labels:
747                        app: test
748                    spec:
749                      containers:
750                      - image: postgres
751                        imagePullPolicy: Always
752                        name: postgres
753                        ports:
754                        - containerPort: 5432
755                          protocol: TCP
756                        env:
757                        - name: POSTGRES_PASSWORD
758                          value: foobar
759                status:
760                  conditions:
761                  - lastTransitionTime: "2025-03-06T06:03:20Z"
762                    lastUpdateTime: "2025-03-06T06:03:20Z"
763                    message: Deployment does not have minimum availability.
764                    reason: MinimumReplicasUnavailable
765                    status: "False"
766                    type: Available
767                  - lastTransitionTime: "2025-03-06T06:03:20Z"
768                    lastUpdateTime: "2025-03-06T06:03:20Z"
769                    message: ReplicaSet "testapp-77789cd7d4" is progressing.
770                    reason: ReplicaSetUpdated
771                    status: "True"
772                    type: Progressing
773                  observedGeneration: 1
774                  replicas: 3
775                  unavailableReplicas: 3
776                  updatedReplicas: 3
777            "#;
778
779            let d = serde_yaml::from_str(depl).unwrap();
780            assert!(!is_deployment_completed().matches_object(Some(&d)))
781        }
782
783        #[test]
784        /// fail if deployment does not exist
785        fn deployment_completed_missing() {
786            use super::{is_deployment_completed, Condition};
787
788            assert!(!is_deployment_completed().matches_object(None))
789        }
790
791        #[test]
792        /// pass if loadbalancer service has recieved a loadbalancer IP
793        fn service_lb_provisioned_ok_ip() {
794            use super::{is_service_loadbalancer_provisioned, Condition};
795
796            let service = r"
797                apiVersion: v1
798                kind: Service
799                metadata:
800                  name: test
801                spec:
802                  selector:
803                    app.kubernetes.io/name: test
804                  type: LoadBalancer
805                  ports:
806                    - protocol: TCP
807                      port: 80
808                      targetPort: 9376
809                  clusterIP: 10.0.171.239
810                status:
811                  loadBalancer:
812                    ingress:
813                      - ip: 192.0.2.127
814            ";
815
816            let s = serde_yaml::from_str(service).unwrap();
817            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
818        }
819
820        #[test]
821        /// pass if loadbalancer service has recieved a loadbalancer hostname
822        fn service_lb_provisioned_ok_hostname() {
823            use super::{is_service_loadbalancer_provisioned, Condition};
824
825            let service = r"
826                apiVersion: v1
827                kind: Service
828                metadata:
829                  name: test
830                spec:
831                  selector:
832                    app.kubernetes.io/name: test
833                  type: LoadBalancer
834                  ports:
835                    - protocol: TCP
836                      port: 80
837                      targetPort: 9376
838                  clusterIP: 10.0.171.239
839                status:
840                  loadBalancer:
841                    ingress:
842                      - hostname: example.exposed.service
843            ";
844
845            let s = serde_yaml::from_str(service).unwrap();
846            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
847        }
848
849        #[test]
850        /// fail if loadbalancer service is still waiting for a LB
851        fn service_lb_provisioned_pending() {
852            use super::{is_service_loadbalancer_provisioned, Condition};
853
854            let service = r"
855                apiVersion: v1
856                kind: Service
857                metadata:
858                  name: test
859                spec:
860                  selector:
861                    app.kubernetes.io/name: test
862                  type: LoadBalancer
863                  ports:
864                    - protocol: TCP
865                      port: 80
866                      targetPort: 9376
867                  clusterIP: 10.0.171.239
868                status:
869                  loadBalancer: {}
870            ";
871
872            let s = serde_yaml::from_str(service).unwrap();
873            assert!(!is_service_loadbalancer_provisioned().matches_object(Some(&s)))
874        }
875
876        #[test]
877        /// pass if service is not a loadbalancer
878        fn service_lb_provisioned_not_loadbalancer() {
879            use super::{is_service_loadbalancer_provisioned, Condition};
880
881            let service = r"
882                apiVersion: v1
883                kind: Service
884                metadata:
885                  name: test
886                spec:
887                  selector:
888                    app.kubernetes.io/name: test
889                  type: ClusterIP
890                  ports:
891                    - protocol: TCP
892                      port: 80
893                      targetPort: 9376
894                status:
895                  loadBalancer: {}
896            ";
897
898            let s = serde_yaml::from_str(service).unwrap();
899            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
900        }
901
902        #[test]
903        /// fail if service does not exist
904        fn service_lb_provisioned_missing() {
905            use super::{is_service_loadbalancer_provisioned, Condition};
906
907            assert!(!is_service_loadbalancer_provisioned().matches_object(None))
908        }
909
910        #[test]
911        /// pass when ingress has recieved a loadbalancer IP
912        fn ingress_provisioned_ok_ip() {
913            use super::{is_ingress_provisioned, Condition};
914
915            let ingress = r#"
916                apiVersion: networking.k8s.io/v1
917                kind: Ingress
918                metadata:
919                  name: test
920                  namespace: default
921                  resourceVersion: "1401"
922                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
923                spec:
924                  ingressClassName: nginx
925                  rules:
926                  - host: httpbin.local
927                    http:
928                      paths:
929                      - path: /
930                        backend:
931                          service:
932                            name: httpbin
933                            port:
934                              number: 80
935                status:
936                  loadBalancer:
937                    ingress:
938                      - ip: 10.89.7.3
939            "#;
940
941            let i = serde_yaml::from_str(ingress).unwrap();
942            assert!(is_ingress_provisioned().matches_object(Some(&i)))
943        }
944
945        #[test]
946        /// pass when ingress has recieved a loadbalancer hostname
947        fn ingress_provisioned_ok_hostname() {
948            use super::{is_ingress_provisioned, Condition};
949
950            let ingress = r#"
951                apiVersion: networking.k8s.io/v1
952                kind: Ingress
953                metadata:
954                  name: test
955                  namespace: default
956                  resourceVersion: "1401"
957                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
958                spec:
959                  ingressClassName: nginx
960                  rules:
961                  - host: httpbin.local
962                    http:
963                      paths:
964                      - path: /
965                        backend:
966                          service:
967                            name: httpbin
968                            port:
969                              number: 80
970                status:
971                  loadBalancer:
972                    ingress:
973                      - hostname: example.exposed.service
974            "#;
975
976            let i = serde_yaml::from_str(ingress).unwrap();
977            assert!(is_ingress_provisioned().matches_object(Some(&i)))
978        }
979
980        #[test]
981        /// fail if ingress is still waiting for a LB
982        fn ingress_provisioned_pending() {
983            use super::{is_ingress_provisioned, Condition};
984
985            let ingress = r#"
986                apiVersion: networking.k8s.io/v1
987                kind: Ingress
988                metadata:
989                  name: test
990                  namespace: default
991                  resourceVersion: "1401"
992                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
993                spec:
994                  ingressClassName: nginx
995                  rules:
996                  - host: httpbin.local
997                    http:
998                      paths:
999                      - path: /
1000                        backend:
1001                          service:
1002                            name: httpbin
1003                            port:
1004                              number: 80
1005                status:
1006                  loadBalancer: {}
1007            "#;
1008
1009            let i = serde_yaml::from_str(ingress).unwrap();
1010            assert!(!is_ingress_provisioned().matches_object(Some(&i)))
1011        }
1012
1013        #[test]
1014        /// fail if ingress does not exist
1015        fn ingress_provisioned_missing() {
1016            use super::{is_ingress_provisioned, Condition};
1017
1018            assert!(!is_ingress_provisioned().matches_object(None))
1019        }
1020    }
1021}
1022
1023/// Utilities for deleting objects
1024pub mod delete {
1025    use super::{await_condition, conditions};
1026    use kube_client::{api::DeleteParams, Api, Resource};
1027    use serde::de::DeserializeOwned;
1028    use std::fmt::Debug;
1029    use thiserror::Error;
1030
1031    #[derive(Debug, Error)]
1032    pub enum Error {
1033        #[error("deleted object has no UID to wait for")]
1034        NoUid,
1035        #[error("failed to delete object: {0}")]
1036        Delete(#[source] kube_client::Error),
1037        #[error("failed to wait for object to be deleted: {0}")]
1038        Await(#[source] super::Error),
1039    }
1040
1041    /// Delete an object, and wait for it to be removed from the Kubernetes API (including waiting for all finalizers to unregister themselves).
1042    ///
1043    /// # Errors
1044    ///
1045    /// Returns an [`Error`](enum@super::Error) if the object was unable to be deleted, or if the wait was interrupted.
1046    #[allow(clippy::module_name_repetitions)]
1047    pub async fn delete_and_finalize<K: Clone + Debug + Send + DeserializeOwned + Resource + 'static>(
1048        api: Api<K>,
1049        name: &str,
1050        delete_params: &DeleteParams,
1051    ) -> Result<(), Error> {
1052        let deleted_obj_uid = api
1053            .delete(name, delete_params)
1054            .await
1055            .map_err(Error::Delete)?
1056            .either(
1057                |mut obj| obj.meta_mut().uid.take(),
1058                |status| status.details.map(|details| details.uid),
1059            )
1060            .ok_or(Error::NoUid)?;
1061        await_condition(api, name, conditions::is_deleted(&deleted_obj_uid))
1062            .await
1063            .map_err(Error::Await)?;
1064        Ok(())
1065    }
1066}