kube_client/
lib.rs

1//! Crate for interacting with the Kubernetes API
2//!
3//! This crate includes the tools for manipulating Kubernetes resources as
4//! well as keeping track of those resources as they change over time
5//!
6//! # Example
7//!
8//! The following example will create a [`Pod`](k8s_openapi::api::core::v1::Pod)
9//! and then watch for it to become available using a manual [`Api::watch`] call.
10//!
11//! ```rust,no_run
12//! use futures::{StreamExt, TryStreamExt};
13//! use kube_client::api::{Api, ResourceExt, ListParams, PatchParams, Patch};
14//! use kube_client::Client;
15//! use k8s_openapi::api::core::v1::Pod;
16//!
17//! #[tokio::main]
18//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
19//!     // Read the environment to find config for kube client.
20//!     // Note that this tries an in-cluster configuration first,
21//!     // then falls back on a kubeconfig file.
22//!     let client = Client::try_default().await?;
23//!
24//!     // Interact with pods in the configured namespace with the typed interface from k8s-openapi
25//!     let pods: Api<Pod> = Api::default_namespaced(client);
26//!
27//!     // Create a Pod (cheating here with json, but it has to validate against the type):
28//!     let patch: Pod = serde_json::from_value(serde_json::json!({
29//!         "apiVersion": "v1",
30//!         "kind": "Pod",
31//!         "metadata": {
32//!             "name": "my-pod"
33//!         },
34//!         "spec": {
35//!             "containers": [
36//!                 {
37//!                     "name": "my-container",
38//!                     "image": "myregistry.azurecr.io/hello-world:v1",
39//!                 },
40//!             ],
41//!         }
42//!     }))?;
43//!
44//!     // Apply the Pod via server-side apply
45//!     let params = PatchParams::apply("myapp");
46//!     let result = pods.patch("my-pod", &params, &Patch::Apply(&patch)).await?;
47//!
48//!     // List pods in the configured namespace
49//!     for p in pods.list(&ListParams::default()).await? {
50//!         println!("found pod {}", p.name_any());
51//!     }
52//!
53//!     Ok(())
54//! }
55//! ```
56//!
57//! For more details, see:
58//!
59//! - [`Client`](crate::client) for the extensible Kubernetes client
60//! - [`Config`](crate::config) for the Kubernetes config abstraction
61//! - [`Api`](crate::Api) for the generic api methods available on Kubernetes resources
62//! - [k8s-openapi](https://docs.rs/k8s-openapi) for how to create typed kubernetes objects directly
63#![cfg_attr(docsrs, feature(doc_cfg))]
64// Nightly clippy (0.1.64) considers Drop a side effect, see https://github.com/rust-lang/rust-clippy/issues/9608
65#![allow(clippy::unnecessary_lazy_evaluations)]
66
67macro_rules! cfg_client {
68    ($($item:item)*) => {
69        $(
70            #[cfg_attr(docsrs, doc(cfg(feature = "client")))]
71            #[cfg(feature = "client")]
72            $item
73        )*
74    }
75}
76macro_rules! cfg_config {
77    ($($item:item)*) => {
78        $(
79            #[cfg_attr(docsrs, doc(cfg(feature = "config")))]
80            #[cfg(feature = "config")]
81            $item
82        )*
83    }
84}
85
86macro_rules! cfg_error {
87    ($($item:item)*) => {
88        $(
89            #[cfg_attr(docsrs, doc(cfg(any(feature = "config", feature = "client"))))]
90            #[cfg(any(feature = "config", feature = "client"))]
91            $item
92        )*
93    }
94}
95
96cfg_client! {
97    pub mod api;
98    pub mod discovery;
99    pub mod client;
100
101    #[doc(inline)]
102    pub use api::Api;
103    #[doc(inline)]
104    pub use client::Client;
105    #[doc(inline)]
106    pub use discovery::Discovery;
107}
108
109cfg_config! {
110    pub mod config;
111    #[doc(inline)]
112    pub use config::Config;
113}
114
115cfg_error! {
116    pub mod error;
117    #[doc(inline)] pub use error::Error;
118    /// Convient alias for `Result<T, Error>`
119    pub type Result<T, E = Error> = std::result::Result<T, E>;
120}
121
122pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
123/// Re-exports from kube_core
124pub use kube_core as core;
125
126// Tests that require a cluster and the complete feature set
127// Can be run with `cargo test -p kube-client --lib features=rustls-tls,ws -- --ignored`
128#[cfg(all(feature = "client", feature = "config"))]
129#[cfg(test)]
130#[allow(unused_imports)] // varying test imports depending on feature
131mod test {
132    use crate::{
133        api::{AttachParams, AttachedProcess},
134        client::ConfigExt,
135        Api, Client, Config, ResourceExt,
136    };
137    use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt, TryStreamExt};
138    use hyper::Uri;
139    use k8s_openapi::api::core::v1::{EphemeralContainer, Pod, PodSpec};
140    use kube_core::{
141        params::{DeleteParams, Patch, PatchParams, PostParams, WatchParams},
142        response::StatusSummary,
143    };
144    use serde_json::json;
145    use tower::ServiceBuilder;
146
147    // hard disabled test atm due to k3d rustls issues: https://github.com/kube-rs/kube/issues?q=is%3Aopen+is%3Aissue+label%3Arustls
148    #[allow(dead_code)]
149    // #[tokio::test]
150    #[ignore = "needs cluster (lists pods)"]
151    #[cfg(feature = "rustls-tls")]
152    async fn custom_client_rustls_configuration() -> Result<(), Box<dyn std::error::Error>> {
153        use hyper_util::rt::TokioExecutor;
154
155        let config = Config::infer().await?;
156        let https = config.rustls_https_connector()?;
157        let service = ServiceBuilder::new()
158            .layer(config.base_uri_layer())
159            .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
160        let client = Client::new(service, config.default_namespace);
161        let pods: Api<Pod> = Api::default_namespaced(client);
162        pods.list(&Default::default()).await?;
163        Ok(())
164    }
165
166    #[tokio::test]
167    #[ignore = "needs cluster (lists pods)"]
168    #[cfg(feature = "openssl-tls")]
169    async fn custom_client_openssl_tls_configuration() -> Result<(), Box<dyn std::error::Error>> {
170        use hyper_util::rt::TokioExecutor;
171
172        let config = Config::infer().await?;
173        let https = config.openssl_https_connector()?;
174        let service = ServiceBuilder::new()
175            .layer(config.base_uri_layer())
176            .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
177        let client = Client::new(service, config.default_namespace);
178        let pods: Api<Pod> = Api::default_namespaced(client);
179        pods.list(&Default::default()).await?;
180        Ok(())
181    }
182
183    #[tokio::test]
184    #[ignore = "needs cluster (lists api resources)"]
185    #[cfg(feature = "client")]
186    async fn group_discovery_oneshot() -> Result<(), Box<dyn std::error::Error>> {
187        use crate::{core::DynamicObject, discovery};
188        let client = Client::try_default().await?;
189        let apigroup = discovery::group(&client, "apiregistration.k8s.io").await?;
190        let (ar, _caps) = apigroup.recommended_kind("APIService").unwrap();
191        let api: Api<DynamicObject> = Api::all_with(client.clone(), &ar);
192        api.list(&Default::default()).await?;
193
194        Ok(())
195    }
196
197    #[tokio::test]
198    #[ignore = "needs cluster (will create and edit a pod)"]
199    async fn pod_can_use_core_apis() -> Result<(), Box<dyn std::error::Error>> {
200        use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, WatchEvent};
201
202        let client = Client::try_default().await?;
203        let pods: Api<Pod> = Api::default_namespaced(client);
204
205        // create busybox pod that's alive for at most 30s
206        let p: Pod = serde_json::from_value(json!({
207            "apiVersion": "v1",
208            "kind": "Pod",
209            "metadata": {
210                "name": "busybox-kube1",
211                "labels": { "app": "kube-rs-test" },
212            },
213            "spec": {
214                "terminationGracePeriodSeconds": 1,
215                "restartPolicy": "Never",
216                "containers": [{
217                  "name": "busybox",
218                  "image": "busybox:1.34.1",
219                  "command": ["sh", "-c", "sleep 30"],
220                }],
221            }
222        }))?;
223
224        let pp = PostParams::default();
225        match pods.create(&pp, &p).await {
226            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
227            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
228            Err(e) => return Err(e.into()),                         // any other case if a failure
229        }
230
231        // Manual watch-api for it to become ready
232        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
233        let wp = WatchParams::default()
234            .fields(&format!("metadata.name={}", "busybox-kube1"))
235            .timeout(15);
236        let mut stream = pods.watch(&wp, "0").await?.boxed();
237        while let Some(ev) = stream.try_next().await? {
238            // can debug format watch event
239            let _ = format!("we: {ev:?}");
240            match ev {
241                WatchEvent::Modified(o) => {
242                    let s = o.status.as_ref().expect("status exists on pod");
243                    let phase = s.phase.clone().unwrap_or_default();
244                    if phase == "Running" {
245                        break;
246                    }
247                }
248                WatchEvent::Error(e) => panic!("watch error: {e}"),
249                _ => {}
250            }
251        }
252
253        // Verify we can get it
254        let mut pod = pods.get("busybox-kube1").await?;
255        assert_eq!(p.spec.as_ref().unwrap().containers[0].name, "busybox");
256
257        // verify replace with explicit resource version
258        // NB: don't do this; use server side apply
259        {
260            assert!(pod.resource_version().is_some());
261            pod.spec.as_mut().unwrap().active_deadline_seconds = Some(5);
262
263            let pp = PostParams::default();
264            let patched_pod = pods.replace("busybox-kube1", &pp, &pod).await?;
265            assert_eq!(patched_pod.spec.unwrap().active_deadline_seconds, Some(5));
266        }
267
268        // Delete it
269        let dp = DeleteParams::default();
270        pods.delete("busybox-kube1", &dp).await?.map_left(|pdel| {
271            assert_eq!(pdel.name_unchecked(), "busybox-kube1");
272        });
273
274        Ok(())
275    }
276
277    #[tokio::test]
278    #[ignore = "needs cluster (will create and attach to a pod)"]
279    #[cfg(feature = "ws")]
280    async fn pod_can_exec_and_write_to_stdin() -> Result<(), Box<dyn std::error::Error>> {
281        use crate::api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent};
282        use tokio::io::AsyncWriteExt;
283
284        let client = Client::try_default().await?;
285        let pods: Api<Pod> = Api::default_namespaced(client);
286
287        // create busybox pod that's alive for at most 30s
288        let p: Pod = serde_json::from_value(json!({
289            "apiVersion": "v1",
290            "kind": "Pod",
291            "metadata": {
292                "name": "busybox-kube2",
293                "labels": { "app": "kube-rs-test" },
294            },
295            "spec": {
296                "terminationGracePeriodSeconds": 1,
297                "restartPolicy": "Never",
298                "containers": [{
299                  "name": "busybox",
300                  "image": "busybox:1.34.1",
301                  "command": ["sh", "-c", "sleep 30"],
302                }],
303            }
304        }))?;
305
306        match pods.create(&Default::default(), &p).await {
307            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
308            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
309            Err(e) => return Err(e.into()),                         // any other case if a failure
310        }
311
312        // Manual watch-api for it to become ready
313        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
314        let wp = WatchParams::default()
315            .fields(&format!("metadata.name={}", "busybox-kube2"))
316            .timeout(15);
317        let mut stream = pods.watch(&wp, "0").await?.boxed();
318        while let Some(ev) = stream.try_next().await? {
319            match ev {
320                WatchEvent::Modified(o) => {
321                    let s = o.status.as_ref().expect("status exists on pod");
322                    let phase = s.phase.clone().unwrap_or_default();
323                    if phase == "Running" {
324                        break;
325                    }
326                }
327                WatchEvent::Error(e) => panic!("watch error: {e}"),
328                _ => {}
329            }
330        }
331
332        // Verify exec works and we can get the output
333        {
334            let mut attached = pods
335                .exec(
336                    "busybox-kube2",
337                    vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
338                    &AttachParams::default().stderr(false),
339                )
340                .await?;
341            let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
342            let out = stdout
343                .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
344                .collect::<Vec<_>>()
345                .await
346                .join("");
347            attached.join().await.unwrap();
348            assert_eq!(out.lines().count(), 3);
349            assert_eq!(out, "1\n2\n3\n");
350        }
351
352        // Verify we read from stdout after stdin is closed.
353        {
354            let name = "busybox-kube2";
355            let command = vec!["sh", "-c", "sleep 2; echo test string 2"];
356            let ap = AttachParams::default().stdin(true).stderr(false);
357
358            // Make a connection so we can determine if the K8s cluster supports stream closing.
359            let mut req = pods.request.exec(name, command.clone(), &ap)?;
360            req.extensions_mut().insert("exec");
361            let stream = pods.client.connect(req).await?;
362
363            // This only works is the cluster supports protocol version v5.channel.k8s.io
364            // Skip for older protocols.
365            if stream.supports_stream_close() {
366                let mut attached = pods.exec(name, command, &ap).await?;
367                let mut stdin_writer = attached.stdin().unwrap();
368                let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
369
370                stdin_writer.write_all(b"this will be ignored\n").await?;
371                _ = stdin_writer.shutdown().await;
372
373                let next_stdout = stdout_stream.next();
374                let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
375                assert_eq!(stdout, "test string 2\n");
376
377                // AttachedProcess resolves with status object.
378                let status = attached.take_status().unwrap();
379                if let Some(status) = status.await {
380                    assert_eq!(status.status, Some("Success".to_owned()));
381                    assert_eq!(status.reason, None);
382                }
383            }
384        }
385
386        // Verify we can write to Stdin
387        {
388            let mut attached = pods
389                .exec(
390                    "busybox-kube2",
391                    vec!["sh"],
392                    &AttachParams::default().stdin(true).stderr(false),
393                )
394                .await?;
395            let mut stdin_writer = attached.stdin().unwrap();
396            let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
397            let next_stdout = stdout_stream.next();
398            stdin_writer.write_all(b"echo test string 1\n").await?;
399            let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
400            println!("{stdout}");
401            assert_eq!(stdout, "test string 1\n");
402
403            // AttachedProcess resolves with status object.
404            // Send `exit 1` to get a failure status.
405            stdin_writer.write_all(b"exit 1\n").await?;
406            let status = attached.take_status().unwrap();
407            if let Some(status) = status.await {
408                println!("{status:?}");
409                assert_eq!(status.status, Some("Failure".to_owned()));
410                assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
411            }
412        }
413
414        // Delete it
415        let dp = DeleteParams::default();
416        pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
417            assert_eq!(pdel.name_unchecked(), "busybox-kube2");
418        });
419
420        Ok(())
421    }
422
423    #[tokio::test]
424    #[ignore = "needs cluster (will create and tail logs from a pod)"]
425    async fn can_get_pod_logs_and_evict() -> Result<(), Box<dyn std::error::Error>> {
426        use crate::{
427            api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
428            core::subresource::LogParams,
429        };
430
431        let client = Client::try_default().await?;
432        let pods: Api<Pod> = Api::default_namespaced(client);
433
434        // create busybox pod that's alive for at most 30s
435        let p: Pod = serde_json::from_value(json!({
436            "apiVersion": "v1",
437            "kind": "Pod",
438            "metadata": {
439                "name": "busybox-kube3",
440                "labels": { "app": "kube-rs-test" },
441            },
442            "spec": {
443                "terminationGracePeriodSeconds": 1,
444                "restartPolicy": "Never",
445                "containers": [{
446                  "name": "busybox",
447                  "image": "busybox:1.34.1",
448                  "command": ["sh", "-c", "for i in $(seq 1 5); do echo kube $i; sleep 0.1; done"],
449                }],
450            }
451        }))?;
452
453        match pods.create(&Default::default(), &p).await {
454            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
455            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
456            Err(e) => return Err(e.into()),                         // any other case if a failure
457        }
458
459        // Manual watch-api for it to become ready
460        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
461        let wp = WatchParams::default()
462            .fields(&format!("metadata.name={}", "busybox-kube3"))
463            .timeout(15);
464        let mut stream = pods.watch(&wp, "0").await?.boxed();
465        while let Some(ev) = stream.try_next().await? {
466            match ev {
467                WatchEvent::Modified(o) => {
468                    let s = o.status.as_ref().expect("status exists on pod");
469                    let phase = s.phase.clone().unwrap_or_default();
470                    if phase == "Running" {
471                        break;
472                    }
473                }
474                WatchEvent::Error(e) => panic!("watch error: {e}"),
475                _ => {}
476            }
477        }
478
479        // Get current list of logs
480        let lp = LogParams {
481            follow: true,
482            ..LogParams::default()
483        };
484        let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.lines();
485
486        // wait for container to finish
487        tokio::time::sleep(std::time::Duration::from_secs(2)).await;
488
489        let all_logs = pods.logs("busybox-kube3", &Default::default()).await?;
490        assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
491
492        // individual logs may or may not buffer
493        let mut output = vec![];
494        while let Some(line) = logs_stream.try_next().await? {
495            output.push(line);
496        }
497        assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]);
498
499        // evict the pod
500        let ep = EvictParams::default();
501        let eres = pods.evict("busybox-kube3", &ep).await?;
502        assert_eq!(eres.code, 201); // created
503        assert!(eres.is_success());
504
505        Ok(())
506    }
507
508    #[tokio::test]
509    #[ignore = "requires a cluster"]
510    async fn can_operate_on_pod_metadata() -> Result<(), Box<dyn std::error::Error>> {
511        use crate::{
512            api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
513            core::subresource::LogParams,
514        };
515        use kube_core::{ObjectList, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};
516
517        let client = Client::try_default().await?;
518        let pods: Api<Pod> = Api::default_namespaced(client);
519
520        // create busybox pod that's alive for at most 30s
521        let p: Pod = serde_json::from_value(json!({
522            "apiVersion": "v1",
523            "kind": "Pod",
524            "metadata": {
525                "name": "busybox-kube-meta",
526                "labels": { "app": "kube-rs-test" },
527            },
528            "spec": {
529                "terminationGracePeriodSeconds": 1,
530                "restartPolicy": "Never",
531                "containers": [{
532                  "name": "busybox",
533                  "image": "busybox:1.34.1",
534                  "command": ["sh", "-c", "sleep 30s"],
535                }],
536            }
537        }))?;
538
539        match pods.create(&Default::default(), &p).await {
540            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
541            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
542            Err(e) => return Err(e.into()),                         // any other case if a failure
543        }
544
545        // Test we can get a pod as a PartialObjectMeta and convert to
546        // ObjectMeta
547        let pod_metadata = pods.get_metadata("busybox-kube-meta").await?;
548        assert_eq!("busybox-kube-meta", pod_metadata.name_any());
549        assert_eq!(
550            Some((&"app".to_string(), &"kube-rs-test".to_string())),
551            pod_metadata.labels().get_key_value("app")
552        );
553
554        // Test we can get a list of PartialObjectMeta for pods
555        let p_list = pods.list_metadata(&ListParams::default()).await?;
556
557        // Find only pod we are concerned with in this test and fail eagerly if
558        // name doesn't exist
559        let pod_metadata = p_list
560            .items
561            .into_iter()
562            .find(|p| p.name_any() == "busybox-kube-meta")
563            .unwrap();
564        assert_eq!(
565            pod_metadata.labels().get("app"),
566            Some(&"kube-rs-test".to_string())
567        );
568
569        // Attempt to patch pod metadata
570        let patch = ObjectMeta {
571            annotations: Some([("test".to_string(), "123".to_string())].into()),
572            ..Default::default()
573        }
574        .into_request_partial::<Pod>();
575
576        let patchparams = PatchParams::default();
577        let p_patched = pods
578            .patch_metadata("busybox-kube-meta", &patchparams, &Patch::Merge(&patch))
579            .await?;
580        assert_eq!(p_patched.annotations().get("test"), Some(&"123".to_string()));
581        assert_eq!(p_patched.types.as_ref().unwrap().kind, "PartialObjectMetadata");
582        assert_eq!(p_patched.types.as_ref().unwrap().api_version, "meta.k8s.io/v1");
583
584        // Clean-up
585        let dp = DeleteParams::default();
586        pods.delete("busybox-kube-meta", &dp).await?.map_left(|pdel| {
587            assert_eq!(pdel.name_any(), "busybox-kube-meta");
588        });
589
590        Ok(())
591    }
592    #[tokio::test]
593    #[ignore = "needs cluster (will create a CertificateSigningRequest)"]
594    async fn csr_can_be_approved() -> Result<(), Box<dyn std::error::Error>> {
595        use crate::api::PostParams;
596        use k8s_openapi::api::certificates::v1::{
597            CertificateSigningRequest, CertificateSigningRequestCondition, CertificateSigningRequestStatus,
598        };
599
600        let csr_name = "fake";
601        let dummy_csr: CertificateSigningRequest = serde_json::from_value(json!({
602            "apiVersion": "certificates.k8s.io/v1",
603            "kind": "CertificateSigningRequest",
604            "metadata": { "name": csr_name },
605            "spec": {
606                "request": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURSBSRVFVRVNULS0tLS0KTUlJQ1ZqQ0NBVDRDQVFBd0VURVBNQTBHQTFVRUF3d0dZVzVuWld4aE1JSUJJakFOQmdrcWhraUc5dzBCQVFFRgpBQU9DQVE4QU1JSUJDZ0tDQVFFQTByczhJTHRHdTYxakx2dHhWTTJSVlRWMDNHWlJTWWw0dWluVWo4RElaWjBOCnR2MUZtRVFSd3VoaUZsOFEzcWl0Qm0wMUFSMkNJVXBGd2ZzSjZ4MXF3ckJzVkhZbGlBNVhwRVpZM3ExcGswSDQKM3Z3aGJlK1o2MVNrVHF5SVBYUUwrTWM5T1Nsbm0xb0R2N0NtSkZNMUlMRVI3QTVGZnZKOEdFRjJ6dHBoaUlFMwpub1dtdHNZb3JuT2wzc2lHQ2ZGZzR4Zmd4eW8ybmlneFNVekl1bXNnVm9PM2ttT0x1RVF6cXpkakJ3TFJXbWlECklmMXBMWnoyalVnald4UkhCM1gyWnVVV1d1T09PZnpXM01LaE8ybHEvZi9DdS8wYk83c0x0MCt3U2ZMSU91TFcKcW90blZtRmxMMytqTy82WDNDKzBERHk5aUtwbXJjVDBnWGZLemE1dHJRSURBUUFCb0FBd0RRWUpLb1pJaHZjTgpBUUVMQlFBRGdnRUJBR05WdmVIOGR4ZzNvK21VeVRkbmFjVmQ1N24zSkExdnZEU1JWREkyQTZ1eXN3ZFp1L1BVCkkwZXpZWFV0RVNnSk1IRmQycVVNMjNuNVJsSXJ3R0xuUXFISUh5VStWWHhsdnZsRnpNOVpEWllSTmU3QlJvYXgKQVlEdUI5STZXT3FYbkFvczFqRmxNUG5NbFpqdU5kSGxpT1BjTU1oNndLaTZzZFhpVStHYTJ2RUVLY01jSVUyRgpvU2djUWdMYTk0aEpacGk3ZnNMdm1OQUxoT045UHdNMGM1dVJVejV4T0dGMUtCbWRSeEgvbUNOS2JKYjFRQm1HCkkwYitEUEdaTktXTU0xMzhIQXdoV0tkNjVoVHdYOWl4V3ZHMkh4TG1WQzg0L1BHT0tWQW9FNkpsYWFHdTlQVmkKdjlOSjVaZlZrcXdCd0hKbzZXdk9xVlA3SVFjZmg3d0drWm89Ci0tLS0tRU5EIENFUlRJRklDQVRFIFJFUVVFU1QtLS0tLQo=",
607                "signerName": "kubernetes.io/kube-apiserver-client",
608                "expirationSeconds": 86400,
609                "usages": ["client auth"]
610            }
611        }))?;
612
613        let client = Client::try_default().await?;
614        let csr: Api<CertificateSigningRequest> = Api::all(client.clone());
615        assert!(csr.create(&PostParams::default(), &dummy_csr).await.is_ok());
616
617        // Patch the approval and approve the CSR
618        let approval_type = "ApprovedFake";
619        let csr_status: CertificateSigningRequestStatus = CertificateSigningRequestStatus {
620            certificate: None,
621            conditions: Some(vec![CertificateSigningRequestCondition {
622                type_: approval_type.to_string(),
623                last_update_time: None,
624                last_transition_time: None,
625                message: Some(format!("{} {}", approval_type, "by kube-rs client")),
626                reason: Some("kube-rsClient".to_string()),
627                status: "True".to_string(),
628            }]),
629        };
630        let csr_status_patch = Patch::Merge(serde_json::json!({ "status": csr_status }));
631        let _ = csr
632            .patch_approval(csr_name, &Default::default(), &csr_status_patch)
633            .await?;
634        let csr_after_approval = csr.get_approval(csr_name).await?;
635
636        assert_eq!(
637            csr_after_approval
638                .status
639                .as_ref()
640                .unwrap()
641                .conditions
642                .as_ref()
643                .unwrap()[0]
644                .type_,
645            approval_type.to_string()
646        );
647        csr.delete(csr_name, &DeleteParams::default()).await?;
648        Ok(())
649    }
650
651    #[tokio::test]
652    #[ignore = "needs cluster for ephemeral containers operations"]
653    async fn can_operate_on_ephemeral_containers() -> Result<(), Box<dyn std::error::Error>> {
654        let client = Client::try_default().await?;
655
656        // Ephemeral containers were stabilized in Kubernetes v1.25.
657        // This test therefore exits early if the current cluster version is older than v1.25.
658        let api_version = client.apiserver_version().await?;
659        if api_version.major.parse::<i32>()? < 1 || api_version.minor.parse::<i32>()? < 25 {
660            return Ok(());
661        }
662
663        let pod: Pod = serde_json::from_value(serde_json::json!({
664            "apiVersion": "v1",
665            "kind": "Pod",
666            "metadata": {
667                "name": "ephemeral-container-test",
668                "labels": { "app": "kube-rs-test" },
669            },
670            "spec": {
671                "restartPolicy": "Never",
672                "containers": [{
673                  "name": "busybox",
674                  "image": "busybox:1.34.1",
675                  "command": ["sh", "-c", "sleep 2"],
676                }],
677            }
678        }))?;
679
680        let pod_name = pod.name_any();
681        let pods = Api::<Pod>::default_namespaced(client);
682
683        // If cleanup failed and a pod already exists, we attempt to remove it
684        // before proceeding. This is important as ephemeral containers can't
685        // be removed from a Pod's spec. Therefore this test must start with a fresh
686        // Pod every time.
687        let _ = pods
688            .delete(&pod.name_any(), &DeleteParams::default())
689            .await
690            .map(|v| v.map_left(|pdel| assert_eq!(pdel.name_any(), pod.name_any())));
691
692        // Ephemeral containes can only be applied to a running pod, so one must
693        // be created before any operations are tested.
694        match pods.create(&Default::default(), &pod).await {
695            Ok(o) => assert_eq!(pod.name_unchecked(), o.name_unchecked()),
696            Err(e) => return Err(e.into()), // any other case if a failure
697        }
698
699        let current_ephemeral_containers = pods
700            .get_ephemeral_containers(&pod.name_any())
701            .await?
702            .spec
703            .unwrap()
704            .ephemeral_containers;
705
706        // We expect no ephemeral containers initially, get_ephemeral_containers should
707        // reflect that.
708        assert_eq!(current_ephemeral_containers, None);
709
710        let mut busybox_eph: EphemeralContainer = serde_json::from_value(json!(
711            {
712                "name": "myephemeralcontainer1",
713                "image": "busybox:1.34.1",
714                "command": ["sh", "-c", "sleep 2"],
715            }
716        ))?;
717
718        // Attempt to replace ephemeral containers.
719
720        let patch: Pod = serde_json::from_value(json!({
721            "metadata": { "name": pod_name },
722            "spec":{ "ephemeralContainers": [ busybox_eph ] }
723        }))?;
724
725        let current_containers = pods
726            .replace_ephemeral_containers(&pod_name, &PostParams::default(), &patch)
727            .await?
728            .spec
729            .unwrap()
730            .ephemeral_containers
731            .expect("could find ephemeral container");
732
733        // Note that we can't compare the whole ephemeral containers object, as some fields
734        // are set by the cluster. We therefore compare the fields specified in the patch.
735        assert_eq!(current_containers.len(), 1);
736        assert_eq!(current_containers[0].name, busybox_eph.name);
737        assert_eq!(current_containers[0].image, busybox_eph.image);
738        assert_eq!(current_containers[0].command, busybox_eph.command);
739
740        // Attempt to patch ephemeral containers.
741
742        // The new ephemeral container will have different values from the
743        // first to ensure we can test for its presence.
744        busybox_eph = serde_json::from_value(json!(
745            {
746                "name": "myephemeralcontainer2",
747                "image": "busybox:1.35.0",
748                "command": ["sh", "-c", "sleep 1"],
749            }
750        ))?;
751
752        let patch: Pod =
753            serde_json::from_value(json!({ "spec": { "ephemeralContainers": [ busybox_eph ] }}))?;
754
755        let current_containers = pods
756            .patch_ephemeral_containers(&pod_name, &PatchParams::default(), &Patch::Strategic(patch))
757            .await?
758            .spec
759            .unwrap()
760            .ephemeral_containers
761            .expect("could find ephemeral container");
762
763        // There should only be 2 ephemeral containers at this point,
764        // one from each patch
765        assert_eq!(current_containers.len(), 2);
766
767        let new_container = current_containers
768            .iter()
769            .find(|c| c.name == busybox_eph.name)
770            .expect("could find myephemeralcontainer2");
771
772        // Note that we can't compare the whole ephemeral container object, as some fields
773        // get set in the cluster. We therefore compare the fields specified in the patch.
774        assert_eq!(new_container.image, busybox_eph.image);
775        assert_eq!(new_container.command, busybox_eph.command);
776
777        // Attempt to get ephemeral containers.
778
779        let expected_containers = current_containers;
780
781        let current_containers = pods
782            .get_ephemeral_containers(&pod.name_any())
783            .await?
784            .spec
785            .unwrap()
786            .ephemeral_containers
787            .unwrap();
788
789        assert_eq!(current_containers, expected_containers);
790
791        pods.delete(&pod.name_any(), &DeleteParams::default())
792            .await?
793            .map_left(|pdel| {
794                assert_eq!(pdel.name_any(), pod.name_any());
795            });
796
797        Ok(())
798    }
799
800    #[tokio::test]
801    #[ignore = "needs kubelet debug methods"]
802    #[cfg(feature = "kubelet-debug")]
803    async fn pod_can_exec_and_write_to_stdin_from_node_proxy() -> Result<(), Box<dyn std::error::Error>> {
804        use crate::{
805            api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent},
806            core::kubelet_debug::KubeletDebugParams,
807        };
808
809        let client = Client::try_default().await?;
810        let pods: Api<Pod> = Api::default_namespaced(client);
811
812        // create busybox pod that's alive for at most 30s
813        let p: Pod = serde_json::from_value(json!({
814            "apiVersion": "v1",
815            "kind": "Pod",
816            "metadata": {
817                "name": "busybox-kube2",
818                "labels": { "app": "kube-rs-test" },
819            },
820            "spec": {
821                "terminationGracePeriodSeconds": 1,
822                "restartPolicy": "Never",
823                "containers": [{
824                  "name": "busybox",
825                  "image": "busybox:1.34.1",
826                  "command": ["sh", "-c", "sleep 30"],
827                }],
828            }
829        }))?;
830
831        match pods.create(&Default::default(), &p).await {
832            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
833            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
834            Err(e) => return Err(e.into()),                         // any other case if a failure
835        }
836
837        // Manual watch-api for it to become ready
838        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
839        let wp = WatchParams::default()
840            .fields(&format!("metadata.name={}", "busybox-kube2"))
841            .timeout(15);
842        let mut stream = pods.watch(&wp, "0").await?.boxed();
843        while let Some(ev) = stream.try_next().await? {
844            match ev {
845                WatchEvent::Modified(o) => {
846                    let s = o.status.as_ref().expect("status exists on pod");
847                    let phase = s.phase.clone().unwrap_or_default();
848                    if phase == "Running" {
849                        break;
850                    }
851                }
852                WatchEvent::Error(e) => panic!("watch error: {e}"),
853                _ => {}
854            }
855        }
856
857        let mut config = Config::infer().await?;
858        config.accept_invalid_certs = true;
859        config.cluster_url = "https://localhost:10250".to_string().parse::<Uri>().unwrap();
860        let kubelet_client: Client = config.try_into()?;
861
862        // Verify exec works and we can get the output
863        {
864            let mut attached = kubelet_client
865                .kubelet_node_exec(
866                    &KubeletDebugParams {
867                        name: "busybox-kube2",
868                        namespace: "default",
869                        ..Default::default()
870                    },
871                    "busybox",
872                    vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
873                    &AttachParams::default().stderr(false),
874                )
875                .await?;
876            let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
877            let out = stdout
878                .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
879                .collect::<Vec<_>>()
880                .await
881                .join("");
882            attached.join().await.unwrap();
883            assert_eq!(out.lines().count(), 3);
884            assert_eq!(out, "1\n2\n3\n");
885        }
886
887        // Verify we can write to Stdin
888        {
889            use tokio::io::AsyncWriteExt;
890            let mut attached = kubelet_client
891                .kubelet_node_exec(
892                    &KubeletDebugParams {
893                        name: "busybox-kube2",
894                        namespace: "default",
895                        ..Default::default()
896                    },
897                    "busybox",
898                    vec!["sh"],
899                    &AttachParams::default().stdin(true).stderr(false),
900                )
901                .await?;
902            let mut stdin_writer = attached.stdin().unwrap();
903            let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
904            let next_stdout = stdout_stream.next();
905            stdin_writer.write_all(b"echo test string 1\n").await?;
906            let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
907            println!("{stdout}");
908            assert_eq!(stdout, "test string 1\n");
909
910            // AttachedProcess resolves with status object.
911            // Send `exit 1` to get a failure status.
912            stdin_writer.write_all(b"exit 1\n").await?;
913            let status = attached.take_status().unwrap();
914            if let Some(status) = status.await {
915                println!("{status:?}");
916                assert_eq!(status.status, Some("Failure".to_owned()));
917                assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
918            }
919        }
920
921        // Delete it
922        let dp = DeleteParams::default();
923        pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
924            assert_eq!(pdel.name_unchecked(), "busybox-kube2");
925        });
926
927        Ok(())
928    }
929}