kube_client/api/
entry.rs

1//! API helpers for get-or-create and get-and-modify patterns
2//!
3//! [`Api::entry`] is the primary entry point for this API.
4
5// Import used in docs
6#[allow(unused_imports)] use std::collections::HashMap;
7use std::fmt::Debug;
8
9use crate::{Api, Error, Result};
10use kube_core::{params::PostParams, Resource};
11use serde::{de::DeserializeOwned, Serialize};
12
13impl<K: Resource + Clone + DeserializeOwned + Debug> Api<K> {
14    /// Gets a given object's "slot" on the Kubernetes API, designed for "get-or-create" and "get-and-modify" patterns
15    ///
16    /// This is similar to [`HashMap::entry`], but the [`Entry`] must be [`OccupiedEntry::commit`]ed for changes to be persisted.
17    ///
18    /// # Usage
19    ///
20    /// ```rust,no_run
21    /// # use std::collections::BTreeMap;
22    /// # use k8s_openapi::api::core::v1::ConfigMap;
23    /// # async fn wrapper() -> Result <(), Box<dyn std::error::Error>> {
24    /// let kube = kube::Client::try_default().await?;
25    /// let cms = kube::Api::<ConfigMap>::namespaced(kube, "default");
26    /// cms
27    ///     // Try to get `entry-example` if it exists
28    ///     .entry("entry-example").await?
29    ///     // Modify object if it already exists
30    ///     .and_modify(|cm| {
31    ///         cm.data
32    ///             .get_or_insert_with(BTreeMap::default)
33    ///             .insert("already-exists".to_string(), "true".to_string());
34    ///     })
35    ///     // Provide a default object if it does not exist
36    ///     .or_insert(|| ConfigMap::default())
37    ///     // Modify the object unconditionally now that we have provided a default value
38    ///     .and_modify(|cm| {
39    ///         cm.data
40    ///             .get_or_insert_with(BTreeMap::default)
41    ///             .insert("modified".to_string(), "true".to_string());
42    ///     })
43    ///     // Save changes
44    ///     .commit(&kube::api::PostParams::default()).await?;
45    /// # Ok(())
46    /// # }
47    /// ```
48    pub async fn entry<'a>(&'a self, name: &'a str) -> Result<Entry<'a, K>> {
49        Ok(match self.get_opt(name).await? {
50            Some(object) => Entry::Occupied(OccupiedEntry {
51                api: self,
52                dirtiness: Dirtiness::Clean,
53                name,
54                object,
55            }),
56            None => Entry::Vacant(VacantEntry { api: self, name }),
57        })
58    }
59}
60
61#[derive(Debug)]
62/// A view into a single object, with enough context to create or update it
63///
64/// See [`Api::entry`] for more information.
65pub enum Entry<'a, K> {
66    /// An object that either exists on the server, or has been created locally (and is awaiting synchronization)
67    Occupied(OccupiedEntry<'a, K>),
68    /// An object that does not exist
69    Vacant(VacantEntry<'a, K>),
70}
71
72impl<'a, K> Entry<'a, K> {
73    /// Borrow the object, if it exists (on the API, or queued for creation using [`Entry::or_insert`])
74    pub fn get(&self) -> Option<&K> {
75        match self {
76            Entry::Occupied(entry) => Some(entry.get()),
77            Entry::Vacant(_) => None,
78        }
79    }
80
81    /// Borrow the object mutably, if it exists (on the API, or queued for creation using [`Entry::or_insert`])
82    ///
83    /// [`OccupiedEntry::commit`] must be called afterwards for any changes to be persisted.
84    pub fn get_mut(&mut self) -> Option<&mut K> {
85        match self {
86            Entry::Occupied(entry) => Some(entry.get_mut()),
87            Entry::Vacant(_) => None,
88        }
89    }
90
91    /// Let `f` modify the object, if it exists (on the API, or queued for creation using [`Entry::or_insert`])
92    ///
93    /// [`OccupiedEntry::commit`] must be called afterwards for any changes to be persisted.
94    pub fn and_modify(self, f: impl FnOnce(&mut K)) -> Self {
95        match self {
96            Entry::Occupied(entry) => Entry::Occupied(entry.and_modify(f)),
97            entry @ Entry::Vacant(_) => entry,
98        }
99    }
100
101    /// Create a new object if it does not already exist
102    ///
103    /// [`OccupiedEntry::commit`] must be called afterwards for the change to be persisted.
104    pub fn or_insert(self, default: impl FnOnce() -> K) -> OccupiedEntry<'a, K>
105    where
106        K: Resource,
107    {
108        match self {
109            Entry::Occupied(entry) => entry,
110            Entry::Vacant(entry) => entry.insert(default()),
111        }
112    }
113}
114
115/// A view into a single object that exists
116///
117/// The object may exist because it existed at the time of call to [`Api::entry`],
118/// or because it was created by [`Entry::or_insert`].
119#[derive(Debug)]
120pub struct OccupiedEntry<'a, K> {
121    api: &'a Api<K>,
122    dirtiness: Dirtiness,
123    name: &'a str,
124    object: K,
125}
126
127#[derive(Debug)]
128enum Dirtiness {
129    /// The object has not been modified (locally) since the last API operation
130    Clean,
131    /// The object exists in the API, but has been modified locally
132    Dirty,
133    /// The object does not yet exist in the API, but was created locally
134    New,
135}
136
137impl<K> OccupiedEntry<'_, K> {
138    /// Borrow the object
139    pub fn get(&self) -> &K {
140        &self.object
141    }
142
143    /// Borrow the object mutably
144    ///
145    /// [`OccupiedEntry::commit`] must be called afterwards for any changes to be persisted.
146    pub fn get_mut(&mut self) -> &mut K {
147        self.dirtiness = match self.dirtiness {
148            Dirtiness::Clean => Dirtiness::Dirty,
149            Dirtiness::Dirty => Dirtiness::Dirty,
150            Dirtiness::New => Dirtiness::New,
151        };
152        &mut self.object
153    }
154
155    /// Let `f` modify the object
156    ///
157    /// [`OccupiedEntry::commit`] must be called afterwards for any changes to be persisted.
158    pub fn and_modify(mut self, f: impl FnOnce(&mut K)) -> Self {
159        f(self.get_mut());
160        self
161    }
162
163    /// Take ownership over the object
164    pub fn into_object(self) -> K {
165        self.object
166    }
167
168    /// Save the object to the Kubernetes API, if any changes have been made
169    ///
170    /// The [`OccupiedEntry`] is updated with the new object (including changes made by the API server, such as
171    /// `.metadata.resource_version`).
172    ///
173    /// # Errors
174    ///
175    /// This function can fail due to transient errors, or due to write conflicts (for example: if another client
176    /// created the object between the calls to [`Api::entry`] and `OccupiedEntry::commit`, or because another
177    /// client modified the object in the meantime).
178    ///
179    /// Any retries should be coarse-grained enough to also include the call to [`Api::entry`], so that the latest
180    /// state can be fetched.
181    #[tracing::instrument(skip(self))]
182    pub async fn commit(&mut self, pp: &PostParams) -> Result<(), CommitError>
183    where
184        K: Resource + DeserializeOwned + Serialize + Clone + Debug,
185    {
186        self.prepare_for_commit()?;
187        match self.dirtiness {
188            Dirtiness::New => {
189                self.object = self
190                    .api
191                    .create(pp, &self.object)
192                    .await
193                    .map_err(CommitError::Save)?
194            }
195            Dirtiness::Dirty => {
196                self.object = self
197                    .api
198                    .replace(self.name, pp, &self.object)
199                    .await
200                    .map_err(CommitError::Save)?;
201            }
202            Dirtiness::Clean => (),
203        };
204        if !pp.dry_run {
205            self.dirtiness = Dirtiness::Clean;
206        }
207        Ok(())
208    }
209
210    /// Validate that [`Self::object`] is valid, and refers to the same object as the original [`Api::entry`] call
211    ///
212    /// Defaults `ObjectMeta::name` and `ObjectMeta::namespace` if unset.
213    fn prepare_for_commit(&mut self) -> Result<(), CommitValidationError>
214    where
215        K: Resource,
216    {
217        // Access `Self::object` directly rather than using `Self::get_mut` to avoid flagging the object as dirty
218        let meta = self.object.meta_mut();
219        match &mut meta.name {
220            name @ None => *name = Some(self.name.to_string()),
221            Some(name) if name != self.name => {
222                return Err(CommitValidationError::NameMismatch {
223                    object_name: name.clone(),
224                    expected: self.name.to_string(),
225                });
226            }
227            Some(_) => (),
228        }
229        match &mut meta.namespace {
230            ns @ None => ns.clone_from(&self.api.namespace),
231            Some(ns) if Some(ns.as_str()) != self.api.namespace.as_deref() => {
232                return Err(CommitValidationError::NamespaceMismatch {
233                    object_namespace: Some(ns.clone()),
234                    expected: self.api.namespace.clone(),
235                });
236            }
237            Some(_) => (),
238        }
239        if let Some(generate_name) = &meta.generate_name {
240            return Err(CommitValidationError::GenerateName {
241                object_generate_name: generate_name.clone(),
242            });
243        }
244        Ok(())
245    }
246}
247
248#[derive(Debug, thiserror::Error)]
249/// Commit errors
250pub enum CommitError {
251    /// Pre-commit validation failed
252    #[error("failed to validate object for saving")]
253    Validate(#[from] CommitValidationError),
254    /// Failed to submit the new object to the Kubernetes API
255    #[error("failed to save object")]
256    Save(#[source] Error),
257}
258
259#[derive(Debug, thiserror::Error)]
260/// Pre-commit validation errors
261pub enum CommitValidationError {
262    /// `ObjectMeta::name` does not match the name passed to [`Api::entry`]
263    #[error(".metadata.name does not match the name passed to Api::entry (got: {object_name:?}, expected: {expected:?})")]
264    NameMismatch {
265        /// The name of the object (`ObjectMeta::name`)
266        object_name: String,
267        /// The name passed to [`Api::entry`]
268        expected: String,
269    },
270    /// `ObjectMeta::namespace` does not match the namespace of the [`Api`]
271    #[error(".metadata.namespace does not match the namespace of the Api (got: {object_namespace:?}, expected: {expected:?})")]
272    NamespaceMismatch {
273        /// The name of the object (`ObjectMeta::namespace`)
274        object_namespace: Option<String>,
275        /// The namespace of the [`Api`]
276        expected: Option<String>,
277    },
278    /// `ObjectMeta::generate_name` must not be set
279    #[error(".metadata.generate_name must not be set (got: {object_generate_name:?})")]
280    GenerateName {
281        /// The set name generation template of the object (`ObjectMeta::generate_name`)
282        object_generate_name: String,
283    },
284}
285
286/// A view of an object that does not yet exist
287///
288/// Created by [`Api::entry`], as a variant of [`Entry`]
289#[derive(Debug)]
290pub struct VacantEntry<'a, K> {
291    api: &'a Api<K>,
292    name: &'a str,
293}
294
295impl<'a, K> VacantEntry<'a, K> {
296    /// Create a new object
297    ///
298    /// [`OccupiedEntry::commit`] must be called afterwards for the change to be persisted.
299    #[tracing::instrument(skip(self, object))]
300    pub fn insert(self, object: K) -> OccupiedEntry<'a, K>
301    where
302        K: Resource,
303    {
304        OccupiedEntry {
305            api: self.api,
306            dirtiness: Dirtiness::New,
307            name: self.name,
308            object,
309        }
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use std::collections::BTreeMap;
316
317    use k8s_openapi::api::core::v1::ConfigMap;
318    use kube_core::{
319        params::{DeleteParams, PostParams},
320        ErrorResponse, ObjectMeta,
321    };
322
323    use crate::{
324        api::entry::{CommitError, Entry},
325        Api, Client, Error,
326    };
327
328    #[tokio::test]
329    #[ignore = "needs cluster (gets and writes cms)"]
330    async fn entry_create_missing_object() -> Result<(), Box<dyn std::error::Error>> {
331        let client = Client::try_default().await?;
332        let api = Api::<ConfigMap>::default_namespaced(client);
333
334        let object_name = "entry-missing-cm";
335        if api.get_opt(object_name).await?.is_some() {
336            api.delete(object_name, &DeleteParams::default()).await?;
337        }
338
339        let entry = api.entry(object_name).await?;
340        let entry2 = api.entry(object_name).await?;
341        assert_eq!(entry.get(), None);
342        assert_eq!(entry2.get(), None);
343
344        // Create object cleanly
345        let mut entry = entry.or_insert(|| ConfigMap {
346            data: Some([("key".to_string(), "value".to_string())].into()),
347            ..ConfigMap::default()
348        });
349        entry.commit(&PostParams::default()).await?;
350        assert_eq!(
351            entry
352                .get()
353                .data
354                .as_ref()
355                .and_then(|data| data.get("key"))
356                .map(String::as_str),
357            Some("value")
358        );
359        let fetched_obj = api.get(object_name).await?;
360        assert_eq!(
361            fetched_obj
362                .data
363                .as_ref()
364                .and_then(|data| data.get("key"))
365                .map(String::as_str),
366            Some("value")
367        );
368
369        // Update object
370        entry
371            .get_mut()
372            .data
373            .get_or_insert_with(BTreeMap::default)
374            .insert("key".to_string(), "value2".to_string());
375        entry.commit(&PostParams::default()).await?;
376        assert_eq!(
377            entry
378                .get()
379                .data
380                .as_ref()
381                .and_then(|data| data.get("key"))
382                .map(String::as_str),
383            Some("value2")
384        );
385        let fetched_obj = api.get(object_name).await?;
386        assert_eq!(
387            fetched_obj
388                .data
389                .as_ref()
390                .and_then(|data| data.get("key"))
391                .map(String::as_str),
392            Some("value2")
393        );
394
395        // Object was already created in parallel, fail with a conflict error
396        let mut entry2 = entry2.or_insert(|| ConfigMap {
397            data: Some([("key".to_string(), "value3".to_string())].into()),
398            ..ConfigMap::default()
399        });
400        assert!(
401            matches!(dbg!(entry2.commit(&PostParams::default()).await), Err(CommitError::Save(Error::Api(ErrorResponse { reason, .. }))) if reason == "AlreadyExists")
402        );
403
404        // Cleanup
405        api.delete(object_name, &DeleteParams::default()).await?;
406        Ok(())
407    }
408
409    #[tokio::test]
410    #[ignore = "needs cluster (gets and writes cms)"]
411    async fn entry_update_existing_object() -> Result<(), Box<dyn std::error::Error>> {
412        let client = Client::try_default().await?;
413        let api = Api::<ConfigMap>::default_namespaced(client);
414
415        let object_name = "entry-existing-cm";
416        if api.get_opt(object_name).await?.is_some() {
417            api.delete(object_name, &DeleteParams::default()).await?;
418        }
419        api.create(&PostParams::default(), &ConfigMap {
420            metadata: ObjectMeta {
421                namespace: api.namespace.clone(),
422                name: Some(object_name.to_string()),
423                ..ObjectMeta::default()
424            },
425            data: Some([("key".to_string(), "value".to_string())].into()),
426            ..ConfigMap::default()
427        })
428        .await?;
429
430        let mut entry = match api.entry(object_name).await? {
431            Entry::Occupied(entry) => entry,
432            entry => panic!("entry for existing object must be occupied: {entry:?}"),
433        };
434        let mut entry2 = match api.entry(object_name).await? {
435            Entry::Occupied(entry) => entry,
436            entry => panic!("entry for existing object must be occupied: {entry:?}"),
437        };
438
439        // Entry is up-to-date, modify cleanly
440        entry
441            .get_mut()
442            .data
443            .get_or_insert_with(BTreeMap::default)
444            .insert("key".to_string(), "value2".to_string());
445        entry.commit(&PostParams::default()).await?;
446        assert_eq!(
447            entry
448                .get()
449                .data
450                .as_ref()
451                .and_then(|data| data.get("key"))
452                .map(String::as_str),
453            Some("value2")
454        );
455        let fetched_obj = api.get(object_name).await?;
456        assert_eq!(
457            fetched_obj
458                .data
459                .as_ref()
460                .and_then(|data| data.get("key"))
461                .map(String::as_str),
462            Some("value2")
463        );
464
465        // Object was already updated in parallel, fail with a conflict error
466        entry2
467            .get_mut()
468            .data
469            .get_or_insert_with(BTreeMap::default)
470            .insert("key".to_string(), "value3".to_string());
471        assert!(
472            matches!(entry2.commit(&PostParams::default()).await, Err(CommitError::Save(Error::Api(ErrorResponse { reason, .. }))) if reason == "Conflict")
473        );
474
475        // Cleanup
476        api.delete(object_name, &DeleteParams::default()).await?;
477        Ok(())
478    }
479
480    #[tokio::test]
481    #[ignore = "needs cluster (gets and writes cms)"]
482    async fn entry_create_dry_run() -> Result<(), Box<dyn std::error::Error>> {
483        let client = Client::try_default().await?;
484        let api = Api::<ConfigMap>::default_namespaced(client);
485
486        let object_name = "entry-cm-dry";
487        if api.get_opt(object_name).await?.is_some() {
488            api.delete(object_name, &DeleteParams::default()).await?;
489        }
490
491        let pp_dry = PostParams {
492            dry_run: true,
493            ..Default::default()
494        };
495
496        let entry = api.entry(object_name).await?;
497        assert_eq!(entry.get(), None);
498
499        // Create object dry-run
500        let mut entry = entry.or_insert(|| ConfigMap {
501            data: Some([("key".to_string(), "value".to_string())].into()),
502            ..ConfigMap::default()
503        });
504        entry.commit(&pp_dry).await?;
505        assert_eq!(
506            entry
507                .get()
508                .data
509                .as_ref()
510                .and_then(|data| data.get("key"))
511                .map(String::as_str),
512            Some("value")
513        );
514        let fetched_obj = api.get_opt(object_name).await?;
515        assert_eq!(fetched_obj, None);
516
517        // Commit object creation properly
518        entry.commit(&PostParams::default()).await?;
519        assert_eq!(
520            entry
521                .get()
522                .data
523                .as_ref()
524                .and_then(|data| data.get("key"))
525                .map(String::as_str),
526            Some("value")
527        );
528        let fetched_obj = api.get(object_name).await?;
529        assert_eq!(
530            fetched_obj
531                .data
532                .as_ref()
533                .and_then(|data| data.get("key"))
534                .map(String::as_str),
535            Some("value")
536        );
537
538        // Update object dry-run
539        entry
540            .get_mut()
541            .data
542            .get_or_insert_with(BTreeMap::default)
543            .insert("key".to_string(), "value2".to_string());
544        entry.commit(&pp_dry).await?;
545        assert_eq!(
546            entry
547                .get()
548                .data
549                .as_ref()
550                .and_then(|data| data.get("key"))
551                .map(String::as_str),
552            Some("value2")
553        );
554        let fetched_obj = api.get(object_name).await?;
555        assert_eq!(
556            fetched_obj
557                .data
558                .as_ref()
559                .and_then(|data| data.get("key"))
560                .map(String::as_str),
561            Some("value")
562        );
563
564        // Commit object update properly
565        entry.commit(&PostParams::default()).await?;
566        assert_eq!(
567            entry
568                .get()
569                .data
570                .as_ref()
571                .and_then(|data| data.get("key"))
572                .map(String::as_str),
573            Some("value2")
574        );
575        let fetched_obj = api.get(object_name).await?;
576        assert_eq!(
577            fetched_obj
578                .data
579                .as_ref()
580                .and_then(|data| data.get("key"))
581                .map(String::as_str),
582            Some("value2")
583        );
584
585        // Cleanup
586        api.delete(object_name, &DeleteParams::default()).await?;
587        Ok(())
588    }
589}