kube_client/api/
subresource.rs

1use futures::AsyncBufRead;
2use serde::{de::DeserializeOwned, Serialize};
3use std::fmt::Debug;
4
5use crate::{
6    api::{Api, Patch, PatchParams, PostParams},
7    Error, Result,
8};
9
10use kube_core::response::Status;
11pub use kube_core::subresource::{EvictParams, LogParams};
12
13#[cfg(feature = "ws")]
14#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
15pub use kube_core::subresource::AttachParams;
16
17pub use k8s_openapi::api::autoscaling::v1::{Scale, ScaleSpec, ScaleStatus};
18
19#[cfg(feature = "ws")] use crate::api::portforward::Portforwarder;
20#[cfg(feature = "ws")] use crate::api::remote_command::AttachedProcess;
21
22/// Methods for [scale subresource](https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#scale-subresource).
23impl<K> Api<K>
24where
25    K: Clone + DeserializeOwned,
26{
27    /// Fetch the scale subresource
28    pub async fn get_scale(&self, name: &str) -> Result<Scale> {
29        let mut req = self
30            .request
31            .get_subresource("scale", name)
32            .map_err(Error::BuildRequest)?;
33        req.extensions_mut().insert("get_scale");
34        self.client.request::<Scale>(req).await
35    }
36
37    /// Update the scale subresource
38    pub async fn patch_scale<P: serde::Serialize + Debug>(
39        &self,
40        name: &str,
41        pp: &PatchParams,
42        patch: &Patch<P>,
43    ) -> Result<Scale> {
44        let mut req = self
45            .request
46            .patch_subresource("scale", name, pp, patch)
47            .map_err(Error::BuildRequest)?;
48        req.extensions_mut().insert("patch_scale");
49        self.client.request::<Scale>(req).await
50    }
51
52    /// Replace the scale subresource
53    pub async fn replace_scale(&self, name: &str, pp: &PostParams, data: Vec<u8>) -> Result<Scale> {
54        let mut req = self
55            .request
56            .replace_subresource("scale", name, pp, data)
57            .map_err(Error::BuildRequest)?;
58        req.extensions_mut().insert("replace_scale");
59        self.client.request::<Scale>(req).await
60    }
61}
62
63/// Arbitrary subresources
64impl<K> Api<K>
65where
66    K: Clone + DeserializeOwned + Debug,
67{
68    /// Display one or many sub-resources.
69    pub async fn get_subresource(&self, subresource_name: &str, name: &str) -> Result<K> {
70        let mut req = self
71            .request
72            .get_subresource(subresource_name, name)
73            .map_err(Error::BuildRequest)?;
74        req.extensions_mut().insert("get_subresource");
75        self.client.request::<K>(req).await
76    }
77
78    /// Create an instance of the subresource
79    pub async fn create_subresource<T>(
80        &self,
81        subresource_name: &str,
82        name: &str,
83        pp: &PostParams,
84        data: Vec<u8>,
85    ) -> Result<T>
86    where
87        T: DeserializeOwned,
88    {
89        let mut req = self
90            .request
91            .create_subresource(subresource_name, name, pp, data)
92            .map_err(Error::BuildRequest)?;
93        req.extensions_mut().insert("create_subresource");
94        self.client.request::<T>(req).await
95    }
96
97    /// Patch an instance of the subresource
98    pub async fn patch_subresource<P: serde::Serialize + Debug>(
99        &self,
100        subresource_name: &str,
101        name: &str,
102        pp: &PatchParams,
103        patch: &Patch<P>,
104    ) -> Result<K> {
105        let mut req = self
106            .request
107            .patch_subresource(subresource_name, name, pp, patch)
108            .map_err(Error::BuildRequest)?;
109        req.extensions_mut().insert("patch_subresource");
110        self.client.request::<K>(req).await
111    }
112
113    /// Replace an instance of the subresource
114    pub async fn replace_subresource(
115        &self,
116        subresource_name: &str,
117        name: &str,
118        pp: &PostParams,
119        data: Vec<u8>,
120    ) -> Result<K> {
121        let mut req = self
122            .request
123            .replace_subresource(subresource_name, name, pp, data)
124            .map_err(Error::BuildRequest)?;
125        req.extensions_mut().insert("replace_subresource");
126        self.client.request::<K>(req).await
127    }
128}
129
130// ----------------------------------------------------------------------------
131// Ephemeral containers
132// ----------------------------------------------------------------------------
133
134/// Marker trait for objects that support the ephemeral containers sub resource.
135///
136/// See [`Api::get_ephemeral_containers`] et al.
137pub trait Ephemeral {}
138
139impl Ephemeral for k8s_openapi::api::core::v1::Pod {}
140
141impl<K> Api<K>
142where
143    K: Clone + DeserializeOwned + Ephemeral,
144{
145    /// Replace the ephemeral containers sub resource entirely.
146    ///
147    /// This functions in the same way as [`Api::replace`] except only `.spec.ephemeralcontainers` is replaced, everything else is ignored.
148    ///
149    /// Note that ephemeral containers may **not** be changed or removed once attached to a pod.
150    ///
151    ///
152    /// You way want to patch the underlying resource to gain access to the main container process,
153    /// see the [documentation](https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/) for `sharedProcessNamespace`.
154    ///
155    /// See the Kubernetes [documentation](https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/#what-is-an-ephemeral-container) for more details.
156    ///
157    /// [`Api::patch_ephemeral_containers`] may be more ergonomic, as you can will avoid having to first fetch the
158    /// existing subresources with an approriate merge strategy, see the examples for more details.
159    ///
160    /// Example of using `replace_ephemeral_containers`:
161    ///
162    /// ```no_run
163    /// use k8s_openapi::api::core::v1::Pod;
164    /// use kube::{Api, api::PostParams};
165    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
166    /// # let client = kube::Client::try_default().await?;
167    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
168    /// let pp = PostParams::default();
169    ///
170    /// // Get pod object with ephemeral containers.
171    /// let mut mypod = pods.get_ephemeral_containers("mypod").await?;
172    ///
173    /// // If there were existing ephemeral containers, we would have to append
174    /// // new containers to the list before calling replace_ephemeral_containers.
175    /// assert_eq!(mypod.spec.as_mut().unwrap().ephemeral_containers, None);
176    ///
177    /// // Add an ephemeral container to the pod object.
178    /// mypod.spec.as_mut().unwrap().ephemeral_containers = Some(serde_json::from_value(serde_json::json!([
179    ///    {
180    ///        "name": "myephemeralcontainer",
181    ///        "image": "busybox:1.34.1",
182    ///        "command": ["sh", "-c", "sleep 20"],
183    ///    },
184    /// ]))?);
185    ///
186    /// pods.replace_ephemeral_containers("mypod", &pp, &mypod).await?;
187    ///
188    /// # Ok(())
189    /// # }
190    /// ```
191    pub async fn replace_ephemeral_containers(&self, name: &str, pp: &PostParams, data: &K) -> Result<K>
192    where
193        K: Serialize,
194    {
195        let mut req = self
196            .request
197            .replace_subresource(
198                "ephemeralcontainers",
199                name,
200                pp,
201                serde_json::to_vec(data).map_err(Error::SerdeError)?,
202            )
203            .map_err(Error::BuildRequest)?;
204        req.extensions_mut().insert("replace_ephemeralcontainers");
205        self.client.request::<K>(req).await
206    }
207
208    /// Patch the ephemeral containers sub resource
209    ///
210    /// Any partial object containing the ephemeral containers
211    /// sub resource is valid as long as the complete structure
212    /// for the object is present, as shown below.
213    ///
214    /// You way want to patch the underlying resource to gain access to the main container process,
215    /// see the [docs](https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/) for `sharedProcessNamespace`.
216    ///
217    /// Ephemeral containers may **not** be changed or removed once attached to a pod.
218    /// Therefore if the chosen merge strategy overwrites the existing ephemeral containers,
219    /// you will have to fetch the existing ephemeral containers first.
220    /// In order to append your new ephemeral containers to the existing list before patching. See some examples and
221    /// discussion related to merge strategies in Kubernetes
222    /// [here](https://kubernetes.io/docs/tasks/manage-kubernetes-objects/update-api-object-kubectl-patch/#use-a-json-merge-patch-to-update-a-deployment). The example below uses a strategic merge patch which does not require
223    ///
224    /// See the `Kubernetes` [documentation](https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/)
225    /// for more information about ephemeral containers.
226    ///
227    ///
228    /// Example of using `patch_ephemeral_containers`:
229    ///
230    /// ```no_run
231    /// use kube::api::{Api, PatchParams, Patch};
232    /// use k8s_openapi::api::core::v1::Pod;
233    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
234    /// # let client = kube::Client::try_default().await?;
235    /// let pods: Api<Pod> = Api::namespaced(client, "apps");
236    /// let pp = PatchParams::default(); // stratetgic merge patch
237    ///
238    /// // Note that the strategic merge patch will concatenate the
239    /// // lists of ephemeral containers so we avoid having to fetch the
240    /// // current list and append to it manually.
241    /// let patch = serde_json::json!({
242    ///    "spec":{
243    ///    "ephemeralContainers": [
244    ///    {
245    ///        "name": "myephemeralcontainer",
246    ///        "image": "busybox:1.34.1",
247    ///        "command": ["sh", "-c", "sleep 20"],
248    ///    },
249    ///    ]
250    /// }});
251    ///
252    /// pods.patch_ephemeral_containers("mypod", &pp, &Patch::Strategic(patch)).await?;
253    ///
254    /// # Ok(())
255    /// # }
256    /// ```
257    pub async fn patch_ephemeral_containers<P: serde::Serialize>(
258        &self,
259        name: &str,
260        pp: &PatchParams,
261        patch: &Patch<P>,
262    ) -> Result<K> {
263        let mut req = self
264            .request
265            .patch_subresource("ephemeralcontainers", name, pp, patch)
266            .map_err(Error::BuildRequest)?;
267
268        req.extensions_mut().insert("patch_ephemeralcontainers");
269        self.client.request::<K>(req).await
270    }
271
272    /// Get the named resource with the ephemeral containers subresource.
273    ///
274    /// This returns the whole K, with metadata and spec.
275    pub async fn get_ephemeral_containers(&self, name: &str) -> Result<K> {
276        let mut req = self
277            .request
278            .get_subresource("ephemeralcontainers", name)
279            .map_err(Error::BuildRequest)?;
280
281        req.extensions_mut().insert("get_ephemeralcontainers");
282        self.client.request::<K>(req).await
283    }
284}
285
286// ----------------------------------------------------------------------------
287
288// TODO: Replace examples with owned custom resources. Bad practice to write to owned objects
289// These examples work, but the job controller will totally overwrite what we do.
290/// Methods for [status subresource](https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#status-subresource).
291impl<K> Api<K>
292where
293    K: DeserializeOwned,
294{
295    /// Get the named resource with a status subresource
296    ///
297    /// This actually returns the whole K, with metadata, and spec.
298    pub async fn get_status(&self, name: &str) -> Result<K> {
299        let mut req = self
300            .request
301            .get_subresource("status", name)
302            .map_err(Error::BuildRequest)?;
303        req.extensions_mut().insert("get_status");
304        self.client.request::<K>(req).await
305    }
306
307    /// Patch fields on the status object
308    ///
309    /// NB: Requires that the resource has a status subresource.
310    ///
311    /// ```no_run
312    /// use kube::api::{Api, PatchParams, Patch};
313    /// use k8s_openapi::api::batch::v1::Job;
314    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
315    /// # let client = kube::Client::try_default().await?;
316    /// let jobs: Api<Job> = Api::namespaced(client, "apps");
317    /// let mut j = jobs.get("baz").await?;
318    /// let pp = PatchParams::default(); // json merge patch
319    /// let data = serde_json::json!({
320    ///     "status": {
321    ///         "succeeded": 2
322    ///     }
323    /// });
324    /// let o = jobs.patch_status("baz", &pp, &Patch::Merge(data)).await?;
325    /// assert_eq!(o.status.unwrap().succeeded, Some(2));
326    /// # Ok(())
327    /// # }
328    /// ```
329    pub async fn patch_status<P: serde::Serialize + Debug>(
330        &self,
331        name: &str,
332        pp: &PatchParams,
333        patch: &Patch<P>,
334    ) -> Result<K> {
335        let mut req = self
336            .request
337            .patch_subresource("status", name, pp, patch)
338            .map_err(Error::BuildRequest)?;
339        req.extensions_mut().insert("patch_status");
340        self.client.request::<K>(req).await
341    }
342
343    /// Replace every field on the status object
344    ///
345    /// This works similarly to the [`Api::replace`] method, but `.spec` is ignored.
346    /// You can leave out the `.spec` entirely from the serialized output.
347    ///
348    /// ```no_run
349    /// use kube::api::{Api, PostParams};
350    /// use k8s_openapi::api::batch::v1::{Job, JobStatus};
351    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
352    /// #   let client = kube::Client::try_default().await?;
353    /// let jobs: Api<Job> = Api::namespaced(client, "apps");
354    /// let mut o = jobs.get_status("baz").await?; // retrieve partial object
355    /// o.status = Some(JobStatus::default()); // update the job part
356    /// let pp = PostParams::default();
357    /// let o = jobs.replace_status("baz", &pp, serde_json::to_vec(&o)?).await?;
358    /// #    Ok(())
359    /// # }
360    /// ```
361    pub async fn replace_status(&self, name: &str, pp: &PostParams, data: Vec<u8>) -> Result<K> {
362        let mut req = self
363            .request
364            .replace_subresource("status", name, pp, data)
365            .map_err(Error::BuildRequest)?;
366        req.extensions_mut().insert("replace_status");
367        self.client.request::<K>(req).await
368    }
369}
370
371// ----------------------------------------------------------------------------
372// Log subresource
373// ----------------------------------------------------------------------------
374
375#[test]
376fn log_path() {
377    use crate::api::{Request, Resource};
378    use k8s_openapi::api::core::v1 as corev1;
379    let lp = LogParams {
380        container: Some("blah".into()),
381        ..LogParams::default()
382    };
383    let url = corev1::Pod::url_path(&(), Some("ns"));
384    let req = Request::new(url).logs("foo", &lp).unwrap();
385    assert_eq!(req.uri(), "/api/v1/namespaces/ns/pods/foo/log?&container=blah");
386}
387
388/// Marker trait for objects that has logs
389///
390/// See [`Api::logs`] and [`Api::log_stream`] for usage.
391pub trait Log {}
392
393impl Log for k8s_openapi::api::core::v1::Pod {}
394
395impl<K> Api<K>
396where
397    K: DeserializeOwned + Log,
398{
399    /// Fetch logs as a string
400    pub async fn logs(&self, name: &str, lp: &LogParams) -> Result<String> {
401        let mut req = self.request.logs(name, lp).map_err(Error::BuildRequest)?;
402        req.extensions_mut().insert("logs");
403        self.client.request_text(req).await
404    }
405
406    /// Stream the logs via [`AsyncBufRead`].
407    ///
408    /// Log stream can be processsed using [`AsyncReadExt`](futures::AsyncReadExt)
409    /// and [`AsyncBufReadExt`](futures::AsyncBufReadExt).
410    ///
411    /// # Example
412    ///
413    /// ```no_run
414    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
415    /// # use k8s_openapi::api::core::v1::Pod;
416    /// # use kube::{api::{Api, LogParams}, Client};
417    /// # let client: Client = todo!();
418    /// use futures::{AsyncBufReadExt, TryStreamExt};
419    ///
420    /// let pods: Api<Pod> = Api::default_namespaced(client);
421    /// let mut logs = pods
422    ///     .log_stream("my-pod", &LogParams::default()).await?
423    ///     .lines();
424    ///
425    /// while let Some(line) = logs.try_next().await? {
426    ///     println!("{}", line);
427    /// }
428    /// # Ok(())
429    /// # }
430    /// ```
431    pub async fn log_stream(&self, name: &str, lp: &LogParams) -> Result<impl AsyncBufRead> {
432        let mut req = self.request.logs(name, lp).map_err(Error::BuildRequest)?;
433        req.extensions_mut().insert("log_stream");
434        self.client.request_stream(req).await
435    }
436}
437
438// ----------------------------------------------------------------------------
439// Eviction subresource
440// ----------------------------------------------------------------------------
441
442#[test]
443fn evict_path() {
444    use crate::api::{Request, Resource};
445    use k8s_openapi::api::core::v1 as corev1;
446    let ep = EvictParams::default();
447    let url = corev1::Pod::url_path(&(), Some("ns"));
448    let req = Request::new(url).evict("foo", &ep).unwrap();
449    assert_eq!(req.uri(), "/api/v1/namespaces/ns/pods/foo/eviction?");
450}
451
452/// Marker trait for objects that can be evicted
453///
454/// See [`Api::evic`] for usage
455pub trait Evict {}
456
457impl Evict for k8s_openapi::api::core::v1::Pod {}
458
459impl<K> Api<K>
460where
461    K: DeserializeOwned + Evict,
462{
463    /// Create an eviction
464    pub async fn evict(&self, name: &str, ep: &EvictParams) -> Result<Status> {
465        let mut req = self.request.evict(name, ep).map_err(Error::BuildRequest)?;
466        req.extensions_mut().insert("evict");
467        self.client.request::<Status>(req).await
468    }
469}
470
471// ----------------------------------------------------------------------------
472// Attach subresource
473// ----------------------------------------------------------------------------
474
475#[cfg(feature = "ws")]
476#[test]
477fn attach_path() {
478    use crate::api::{Request, Resource};
479    use k8s_openapi::api::core::v1 as corev1;
480    let ap = AttachParams {
481        container: Some("blah".into()),
482        ..AttachParams::default()
483    };
484    let url = corev1::Pod::url_path(&(), Some("ns"));
485    let req = Request::new(url).attach("foo", &ap).unwrap();
486    assert_eq!(
487        req.uri(),
488        "/api/v1/namespaces/ns/pods/foo/attach?&stdout=true&stderr=true&container=blah"
489    );
490}
491
492/// Marker trait for objects that has attach
493///
494/// See [`Api::attach`] for usage
495#[cfg(feature = "ws")]
496#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
497pub trait Attach {}
498
499#[cfg(feature = "ws")]
500#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
501impl Attach for k8s_openapi::api::core::v1::Pod {}
502
503#[cfg(feature = "ws")]
504#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
505impl<K> Api<K>
506where
507    K: Clone + DeserializeOwned + Attach,
508{
509    /// Attach to pod
510    pub async fn attach(&self, name: &str, ap: &AttachParams) -> Result<AttachedProcess> {
511        let mut req = self.request.attach(name, ap).map_err(Error::BuildRequest)?;
512        req.extensions_mut().insert("attach");
513        let stream = self.client.connect(req).await?;
514        Ok(AttachedProcess::new(stream, ap))
515    }
516}
517
518// ----------------------------------------------------------------------------
519// Exec subresource
520// ----------------------------------------------------------------------------
521#[cfg(feature = "ws")]
522#[test]
523fn exec_path() {
524    use crate::api::{Request, Resource};
525    use k8s_openapi::api::core::v1 as corev1;
526    let ap = AttachParams {
527        container: Some("blah".into()),
528        ..AttachParams::default()
529    };
530    let url = corev1::Pod::url_path(&(), Some("ns"));
531    let req = Request::new(url)
532        .exec("foo", vec!["echo", "foo", "bar"], &ap)
533        .unwrap();
534    assert_eq!(
535        req.uri(),
536        "/api/v1/namespaces/ns/pods/foo/exec?&stdout=true&stderr=true&container=blah&command=echo&command=foo&command=bar"
537    );
538}
539
540/// Marker trait for objects that has exec
541///
542/// See [`Api::exec`] for usage.
543#[cfg(feature = "ws")]
544#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
545pub trait Execute {}
546
547#[cfg(feature = "ws")]
548#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
549impl Execute for k8s_openapi::api::core::v1::Pod {}
550
551#[cfg(feature = "ws")]
552#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
553impl<K> Api<K>
554where
555    K: Clone + DeserializeOwned + Execute,
556{
557    /// Execute a command in a pod
558    pub async fn exec<I, T>(&self, name: &str, command: I, ap: &AttachParams) -> Result<AttachedProcess>
559    where
560        I: IntoIterator<Item = T> + Debug,
561        T: Into<String>,
562    {
563        let mut req = self
564            .request
565            .exec(name, command, ap)
566            .map_err(Error::BuildRequest)?;
567        req.extensions_mut().insert("exec");
568        let stream = self.client.connect(req).await?;
569        Ok(AttachedProcess::new(stream, ap))
570    }
571}
572
573// ----------------------------------------------------------------------------
574// Portforward subresource
575// ----------------------------------------------------------------------------
576#[cfg(feature = "ws")]
577#[test]
578fn portforward_path() {
579    use crate::api::{Request, Resource};
580    use k8s_openapi::api::core::v1 as corev1;
581    let url = corev1::Pod::url_path(&(), Some("ns"));
582    let req = Request::new(url).portforward("foo", &[80, 1234]).unwrap();
583    assert_eq!(
584        req.uri(),
585        "/api/v1/namespaces/ns/pods/foo/portforward?&ports=80%2C1234"
586    );
587}
588
589/// Marker trait for objects that has portforward
590///
591/// See [`Api::portforward`] for usage.
592#[cfg(feature = "ws")]
593pub trait Portforward {}
594
595#[cfg(feature = "ws")]
596impl Portforward for k8s_openapi::api::core::v1::Pod {}
597
598#[cfg(feature = "ws")]
599impl<K> Api<K>
600where
601    K: Clone + DeserializeOwned + Portforward,
602{
603    /// Forward ports of a pod
604    pub async fn portforward(&self, name: &str, ports: &[u16]) -> Result<Portforwarder> {
605        let req = self
606            .request
607            .portforward(name, ports)
608            .map_err(Error::BuildRequest)?;
609        let stream = self.client.connect(req).await?;
610        Ok(Portforwarder::new(stream, ports))
611    }
612}