kube_client/api/subresource.rs
1use futures::AsyncBufRead;
2use serde::{de::DeserializeOwned, Serialize};
3use std::fmt::Debug;
4
5use crate::{
6 api::{Api, Patch, PatchParams, PostParams},
7 Error, Result,
8};
9
10use kube_core::response::Status;
11pub use kube_core::subresource::{EvictParams, LogParams};
12
13#[cfg(feature = "ws")]
14#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
15pub use kube_core::subresource::AttachParams;
16
17pub use k8s_openapi::api::autoscaling::v1::{Scale, ScaleSpec, ScaleStatus};
18
19#[cfg(feature = "ws")] use crate::api::portforward::Portforwarder;
20#[cfg(feature = "ws")] use crate::api::remote_command::AttachedProcess;
21
22/// Methods for [scale subresource](https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#scale-subresource).
23impl<K> Api<K>
24where
25 K: Clone + DeserializeOwned,
26{
27 /// Fetch the scale subresource
28 pub async fn get_scale(&self, name: &str) -> Result<Scale> {
29 let mut req = self
30 .request
31 .get_subresource("scale", name)
32 .map_err(Error::BuildRequest)?;
33 req.extensions_mut().insert("get_scale");
34 self.client.request::<Scale>(req).await
35 }
36
37 /// Update the scale subresource
38 pub async fn patch_scale<P: serde::Serialize + Debug>(
39 &self,
40 name: &str,
41 pp: &PatchParams,
42 patch: &Patch<P>,
43 ) -> Result<Scale> {
44 let mut req = self
45 .request
46 .patch_subresource("scale", name, pp, patch)
47 .map_err(Error::BuildRequest)?;
48 req.extensions_mut().insert("patch_scale");
49 self.client.request::<Scale>(req).await
50 }
51
52 /// Replace the scale subresource
53 pub async fn replace_scale(&self, name: &str, pp: &PostParams, data: Vec<u8>) -> Result<Scale> {
54 let mut req = self
55 .request
56 .replace_subresource("scale", name, pp, data)
57 .map_err(Error::BuildRequest)?;
58 req.extensions_mut().insert("replace_scale");
59 self.client.request::<Scale>(req).await
60 }
61}
62
63/// Arbitrary subresources
64impl<K> Api<K>
65where
66 K: Clone + DeserializeOwned + Debug,
67{
68 /// Display one or many sub-resources.
69 pub async fn get_subresource(&self, subresource_name: &str, name: &str) -> Result<K> {
70 let mut req = self
71 .request
72 .get_subresource(subresource_name, name)
73 .map_err(Error::BuildRequest)?;
74 req.extensions_mut().insert("get_subresource");
75 self.client.request::<K>(req).await
76 }
77
78 /// Create an instance of the subresource
79 pub async fn create_subresource<T>(
80 &self,
81 subresource_name: &str,
82 name: &str,
83 pp: &PostParams,
84 data: Vec<u8>,
85 ) -> Result<T>
86 where
87 T: DeserializeOwned,
88 {
89 let mut req = self
90 .request
91 .create_subresource(subresource_name, name, pp, data)
92 .map_err(Error::BuildRequest)?;
93 req.extensions_mut().insert("create_subresource");
94 self.client.request::<T>(req).await
95 }
96
97 /// Patch an instance of the subresource
98 pub async fn patch_subresource<P: serde::Serialize + Debug>(
99 &self,
100 subresource_name: &str,
101 name: &str,
102 pp: &PatchParams,
103 patch: &Patch<P>,
104 ) -> Result<K> {
105 let mut req = self
106 .request
107 .patch_subresource(subresource_name, name, pp, patch)
108 .map_err(Error::BuildRequest)?;
109 req.extensions_mut().insert("patch_subresource");
110 self.client.request::<K>(req).await
111 }
112
113 /// Replace an instance of the subresource
114 pub async fn replace_subresource(
115 &self,
116 subresource_name: &str,
117 name: &str,
118 pp: &PostParams,
119 data: Vec<u8>,
120 ) -> Result<K> {
121 let mut req = self
122 .request
123 .replace_subresource(subresource_name, name, pp, data)
124 .map_err(Error::BuildRequest)?;
125 req.extensions_mut().insert("replace_subresource");
126 self.client.request::<K>(req).await
127 }
128}
129
130// ----------------------------------------------------------------------------
131// Ephemeral containers
132// ----------------------------------------------------------------------------
133
134/// Marker trait for objects that support the ephemeral containers sub resource.
135///
136/// See [`Api::get_ephemeral_containers`] et al.
137pub trait Ephemeral {}
138
139impl Ephemeral for k8s_openapi::api::core::v1::Pod {}
140
141impl<K> Api<K>
142where
143 K: Clone + DeserializeOwned + Ephemeral,
144{
145 /// Replace the ephemeral containers sub resource entirely.
146 ///
147 /// This functions in the same way as [`Api::replace`] except only `.spec.ephemeralcontainers` is replaced, everything else is ignored.
148 ///
149 /// Note that ephemeral containers may **not** be changed or removed once attached to a pod.
150 ///
151 ///
152 /// You way want to patch the underlying resource to gain access to the main container process,
153 /// see the [documentation](https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/) for `sharedProcessNamespace`.
154 ///
155 /// See the Kubernetes [documentation](https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/#what-is-an-ephemeral-container) for more details.
156 ///
157 /// [`Api::patch_ephemeral_containers`] may be more ergonomic, as you can will avoid having to first fetch the
158 /// existing subresources with an approriate merge strategy, see the examples for more details.
159 ///
160 /// Example of using `replace_ephemeral_containers`:
161 ///
162 /// ```no_run
163 /// use k8s_openapi::api::core::v1::Pod;
164 /// use kube::{Api, api::PostParams};
165 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
166 /// # let client = kube::Client::try_default().await?;
167 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
168 /// let pp = PostParams::default();
169 ///
170 /// // Get pod object with ephemeral containers.
171 /// let mut mypod = pods.get_ephemeral_containers("mypod").await?;
172 ///
173 /// // If there were existing ephemeral containers, we would have to append
174 /// // new containers to the list before calling replace_ephemeral_containers.
175 /// assert_eq!(mypod.spec.as_mut().unwrap().ephemeral_containers, None);
176 ///
177 /// // Add an ephemeral container to the pod object.
178 /// mypod.spec.as_mut().unwrap().ephemeral_containers = Some(serde_json::from_value(serde_json::json!([
179 /// {
180 /// "name": "myephemeralcontainer",
181 /// "image": "busybox:1.34.1",
182 /// "command": ["sh", "-c", "sleep 20"],
183 /// },
184 /// ]))?);
185 ///
186 /// pods.replace_ephemeral_containers("mypod", &pp, &mypod).await?;
187 ///
188 /// # Ok(())
189 /// # }
190 /// ```
191 pub async fn replace_ephemeral_containers(&self, name: &str, pp: &PostParams, data: &K) -> Result<K>
192 where
193 K: Serialize,
194 {
195 let mut req = self
196 .request
197 .replace_subresource(
198 "ephemeralcontainers",
199 name,
200 pp,
201 serde_json::to_vec(data).map_err(Error::SerdeError)?,
202 )
203 .map_err(Error::BuildRequest)?;
204 req.extensions_mut().insert("replace_ephemeralcontainers");
205 self.client.request::<K>(req).await
206 }
207
208 /// Patch the ephemeral containers sub resource
209 ///
210 /// Any partial object containing the ephemeral containers
211 /// sub resource is valid as long as the complete structure
212 /// for the object is present, as shown below.
213 ///
214 /// You way want to patch the underlying resource to gain access to the main container process,
215 /// see the [docs](https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/) for `sharedProcessNamespace`.
216 ///
217 /// Ephemeral containers may **not** be changed or removed once attached to a pod.
218 /// Therefore if the chosen merge strategy overwrites the existing ephemeral containers,
219 /// you will have to fetch the existing ephemeral containers first.
220 /// In order to append your new ephemeral containers to the existing list before patching. See some examples and
221 /// discussion related to merge strategies in Kubernetes
222 /// [here](https://kubernetes.io/docs/tasks/manage-kubernetes-objects/update-api-object-kubectl-patch/#use-a-json-merge-patch-to-update-a-deployment). The example below uses a strategic merge patch which does not require
223 ///
224 /// See the `Kubernetes` [documentation](https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/)
225 /// for more information about ephemeral containers.
226 ///
227 ///
228 /// Example of using `patch_ephemeral_containers`:
229 ///
230 /// ```no_run
231 /// use kube::api::{Api, PatchParams, Patch};
232 /// use k8s_openapi::api::core::v1::Pod;
233 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
234 /// # let client = kube::Client::try_default().await?;
235 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
236 /// let pp = PatchParams::default(); // stratetgic merge patch
237 ///
238 /// // Note that the strategic merge patch will concatenate the
239 /// // lists of ephemeral containers so we avoid having to fetch the
240 /// // current list and append to it manually.
241 /// let patch = serde_json::json!({
242 /// "spec":{
243 /// "ephemeralContainers": [
244 /// {
245 /// "name": "myephemeralcontainer",
246 /// "image": "busybox:1.34.1",
247 /// "command": ["sh", "-c", "sleep 20"],
248 /// },
249 /// ]
250 /// }});
251 ///
252 /// pods.patch_ephemeral_containers("mypod", &pp, &Patch::Strategic(patch)).await?;
253 ///
254 /// # Ok(())
255 /// # }
256 /// ```
257 pub async fn patch_ephemeral_containers<P: serde::Serialize>(
258 &self,
259 name: &str,
260 pp: &PatchParams,
261 patch: &Patch<P>,
262 ) -> Result<K> {
263 let mut req = self
264 .request
265 .patch_subresource("ephemeralcontainers", name, pp, patch)
266 .map_err(Error::BuildRequest)?;
267
268 req.extensions_mut().insert("patch_ephemeralcontainers");
269 self.client.request::<K>(req).await
270 }
271
272 /// Get the named resource with the ephemeral containers subresource.
273 ///
274 /// This returns the whole K, with metadata and spec.
275 pub async fn get_ephemeral_containers(&self, name: &str) -> Result<K> {
276 let mut req = self
277 .request
278 .get_subresource("ephemeralcontainers", name)
279 .map_err(Error::BuildRequest)?;
280
281 req.extensions_mut().insert("get_ephemeralcontainers");
282 self.client.request::<K>(req).await
283 }
284}
285
286// ----------------------------------------------------------------------------
287
288// TODO: Replace examples with owned custom resources. Bad practice to write to owned objects
289// These examples work, but the job controller will totally overwrite what we do.
290/// Methods for [status subresource](https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#status-subresource).
291impl<K> Api<K>
292where
293 K: DeserializeOwned,
294{
295 /// Get the named resource with a status subresource
296 ///
297 /// This actually returns the whole K, with metadata, and spec.
298 pub async fn get_status(&self, name: &str) -> Result<K> {
299 let mut req = self
300 .request
301 .get_subresource("status", name)
302 .map_err(Error::BuildRequest)?;
303 req.extensions_mut().insert("get_status");
304 self.client.request::<K>(req).await
305 }
306
307 /// Patch fields on the status object
308 ///
309 /// NB: Requires that the resource has a status subresource.
310 ///
311 /// ```no_run
312 /// use kube::api::{Api, PatchParams, Patch};
313 /// use k8s_openapi::api::batch::v1::Job;
314 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
315 /// # let client = kube::Client::try_default().await?;
316 /// let jobs: Api<Job> = Api::namespaced(client, "apps");
317 /// let mut j = jobs.get("baz").await?;
318 /// let pp = PatchParams::default(); // json merge patch
319 /// let data = serde_json::json!({
320 /// "status": {
321 /// "succeeded": 2
322 /// }
323 /// });
324 /// let o = jobs.patch_status("baz", &pp, &Patch::Merge(data)).await?;
325 /// assert_eq!(o.status.unwrap().succeeded, Some(2));
326 /// # Ok(())
327 /// # }
328 /// ```
329 pub async fn patch_status<P: serde::Serialize + Debug>(
330 &self,
331 name: &str,
332 pp: &PatchParams,
333 patch: &Patch<P>,
334 ) -> Result<K> {
335 let mut req = self
336 .request
337 .patch_subresource("status", name, pp, patch)
338 .map_err(Error::BuildRequest)?;
339 req.extensions_mut().insert("patch_status");
340 self.client.request::<K>(req).await
341 }
342
343 /// Replace every field on the status object
344 ///
345 /// This works similarly to the [`Api::replace`] method, but `.spec` is ignored.
346 /// You can leave out the `.spec` entirely from the serialized output.
347 ///
348 /// ```no_run
349 /// use kube::api::{Api, PostParams};
350 /// use k8s_openapi::api::batch::v1::{Job, JobStatus};
351 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
352 /// # let client = kube::Client::try_default().await?;
353 /// let jobs: Api<Job> = Api::namespaced(client, "apps");
354 /// let mut o = jobs.get_status("baz").await?; // retrieve partial object
355 /// o.status = Some(JobStatus::default()); // update the job part
356 /// let pp = PostParams::default();
357 /// let o = jobs.replace_status("baz", &pp, serde_json::to_vec(&o)?).await?;
358 /// # Ok(())
359 /// # }
360 /// ```
361 pub async fn replace_status(&self, name: &str, pp: &PostParams, data: Vec<u8>) -> Result<K> {
362 let mut req = self
363 .request
364 .replace_subresource("status", name, pp, data)
365 .map_err(Error::BuildRequest)?;
366 req.extensions_mut().insert("replace_status");
367 self.client.request::<K>(req).await
368 }
369}
370
371// ----------------------------------------------------------------------------
372// Log subresource
373// ----------------------------------------------------------------------------
374
375#[test]
376fn log_path() {
377 use crate::api::{Request, Resource};
378 use k8s_openapi::api::core::v1 as corev1;
379 let lp = LogParams {
380 container: Some("blah".into()),
381 ..LogParams::default()
382 };
383 let url = corev1::Pod::url_path(&(), Some("ns"));
384 let req = Request::new(url).logs("foo", &lp).unwrap();
385 assert_eq!(req.uri(), "/api/v1/namespaces/ns/pods/foo/log?&container=blah");
386}
387
388/// Marker trait for objects that has logs
389///
390/// See [`Api::logs`] and [`Api::log_stream`] for usage.
391pub trait Log {}
392
393impl Log for k8s_openapi::api::core::v1::Pod {}
394
395impl<K> Api<K>
396where
397 K: DeserializeOwned + Log,
398{
399 /// Fetch logs as a string
400 pub async fn logs(&self, name: &str, lp: &LogParams) -> Result<String> {
401 let mut req = self.request.logs(name, lp).map_err(Error::BuildRequest)?;
402 req.extensions_mut().insert("logs");
403 self.client.request_text(req).await
404 }
405
406 /// Stream the logs via [`AsyncBufRead`].
407 ///
408 /// Log stream can be processsed using [`AsyncReadExt`](futures::AsyncReadExt)
409 /// and [`AsyncBufReadExt`](futures::AsyncBufReadExt).
410 ///
411 /// # Example
412 ///
413 /// ```no_run
414 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
415 /// # use k8s_openapi::api::core::v1::Pod;
416 /// # use kube::{api::{Api, LogParams}, Client};
417 /// # let client: Client = todo!();
418 /// use futures::{AsyncBufReadExt, TryStreamExt};
419 ///
420 /// let pods: Api<Pod> = Api::default_namespaced(client);
421 /// let mut logs = pods
422 /// .log_stream("my-pod", &LogParams::default()).await?
423 /// .lines();
424 ///
425 /// while let Some(line) = logs.try_next().await? {
426 /// println!("{}", line);
427 /// }
428 /// # Ok(())
429 /// # }
430 /// ```
431 pub async fn log_stream(&self, name: &str, lp: &LogParams) -> Result<impl AsyncBufRead> {
432 let mut req = self.request.logs(name, lp).map_err(Error::BuildRequest)?;
433 req.extensions_mut().insert("log_stream");
434 self.client.request_stream(req).await
435 }
436}
437
438// ----------------------------------------------------------------------------
439// Eviction subresource
440// ----------------------------------------------------------------------------
441
442#[test]
443fn evict_path() {
444 use crate::api::{Request, Resource};
445 use k8s_openapi::api::core::v1 as corev1;
446 let ep = EvictParams::default();
447 let url = corev1::Pod::url_path(&(), Some("ns"));
448 let req = Request::new(url).evict("foo", &ep).unwrap();
449 assert_eq!(req.uri(), "/api/v1/namespaces/ns/pods/foo/eviction?");
450}
451
452/// Marker trait for objects that can be evicted
453///
454/// See [`Api::evic`] for usage
455pub trait Evict {}
456
457impl Evict for k8s_openapi::api::core::v1::Pod {}
458
459impl<K> Api<K>
460where
461 K: DeserializeOwned + Evict,
462{
463 /// Create an eviction
464 pub async fn evict(&self, name: &str, ep: &EvictParams) -> Result<Status> {
465 let mut req = self.request.evict(name, ep).map_err(Error::BuildRequest)?;
466 req.extensions_mut().insert("evict");
467 self.client.request::<Status>(req).await
468 }
469}
470
471// ----------------------------------------------------------------------------
472// Attach subresource
473// ----------------------------------------------------------------------------
474
475#[cfg(feature = "ws")]
476#[test]
477fn attach_path() {
478 use crate::api::{Request, Resource};
479 use k8s_openapi::api::core::v1 as corev1;
480 let ap = AttachParams {
481 container: Some("blah".into()),
482 ..AttachParams::default()
483 };
484 let url = corev1::Pod::url_path(&(), Some("ns"));
485 let req = Request::new(url).attach("foo", &ap).unwrap();
486 assert_eq!(
487 req.uri(),
488 "/api/v1/namespaces/ns/pods/foo/attach?&stdout=true&stderr=true&container=blah"
489 );
490}
491
492/// Marker trait for objects that has attach
493///
494/// See [`Api::attach`] for usage
495#[cfg(feature = "ws")]
496#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
497pub trait Attach {}
498
499#[cfg(feature = "ws")]
500#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
501impl Attach for k8s_openapi::api::core::v1::Pod {}
502
503#[cfg(feature = "ws")]
504#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
505impl<K> Api<K>
506where
507 K: Clone + DeserializeOwned + Attach,
508{
509 /// Attach to pod
510 pub async fn attach(&self, name: &str, ap: &AttachParams) -> Result<AttachedProcess> {
511 let mut req = self.request.attach(name, ap).map_err(Error::BuildRequest)?;
512 req.extensions_mut().insert("attach");
513 let stream = self.client.connect(req).await?;
514 Ok(AttachedProcess::new(stream, ap))
515 }
516}
517
518// ----------------------------------------------------------------------------
519// Exec subresource
520// ----------------------------------------------------------------------------
521#[cfg(feature = "ws")]
522#[test]
523fn exec_path() {
524 use crate::api::{Request, Resource};
525 use k8s_openapi::api::core::v1 as corev1;
526 let ap = AttachParams {
527 container: Some("blah".into()),
528 ..AttachParams::default()
529 };
530 let url = corev1::Pod::url_path(&(), Some("ns"));
531 let req = Request::new(url)
532 .exec("foo", vec!["echo", "foo", "bar"], &ap)
533 .unwrap();
534 assert_eq!(
535 req.uri(),
536 "/api/v1/namespaces/ns/pods/foo/exec?&stdout=true&stderr=true&container=blah&command=echo&command=foo&command=bar"
537 );
538}
539
540/// Marker trait for objects that has exec
541///
542/// See [`Api::exec`] for usage.
543#[cfg(feature = "ws")]
544#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
545pub trait Execute {}
546
547#[cfg(feature = "ws")]
548#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
549impl Execute for k8s_openapi::api::core::v1::Pod {}
550
551#[cfg(feature = "ws")]
552#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
553impl<K> Api<K>
554where
555 K: Clone + DeserializeOwned + Execute,
556{
557 /// Execute a command in a pod
558 pub async fn exec<I, T>(&self, name: &str, command: I, ap: &AttachParams) -> Result<AttachedProcess>
559 where
560 I: IntoIterator<Item = T> + Debug,
561 T: Into<String>,
562 {
563 let mut req = self
564 .request
565 .exec(name, command, ap)
566 .map_err(Error::BuildRequest)?;
567 req.extensions_mut().insert("exec");
568 let stream = self.client.connect(req).await?;
569 Ok(AttachedProcess::new(stream, ap))
570 }
571}
572
573// ----------------------------------------------------------------------------
574// Portforward subresource
575// ----------------------------------------------------------------------------
576#[cfg(feature = "ws")]
577#[test]
578fn portforward_path() {
579 use crate::api::{Request, Resource};
580 use k8s_openapi::api::core::v1 as corev1;
581 let url = corev1::Pod::url_path(&(), Some("ns"));
582 let req = Request::new(url).portforward("foo", &[80, 1234]).unwrap();
583 assert_eq!(
584 req.uri(),
585 "/api/v1/namespaces/ns/pods/foo/portforward?&ports=80%2C1234"
586 );
587}
588
589/// Marker trait for objects that has portforward
590///
591/// See [`Api::portforward`] for usage.
592#[cfg(feature = "ws")]
593pub trait Portforward {}
594
595#[cfg(feature = "ws")]
596impl Portforward for k8s_openapi::api::core::v1::Pod {}
597
598#[cfg(feature = "ws")]
599impl<K> Api<K>
600where
601 K: Clone + DeserializeOwned + Portforward,
602{
603 /// Forward ports of a pod
604 pub async fn portforward(&self, name: &str, ports: &[u16]) -> Result<Portforwarder> {
605 let req = self
606 .request
607 .portforward(name, ports)
608 .map_err(Error::BuildRequest)?;
609 let stream = self.client.connect(req).await?;
610 Ok(Portforwarder::new(stream, ports))
611 }
612}