1#![cfg_attr(docsrs, feature(doc_cfg))]
64#![allow(clippy::unnecessary_lazy_evaluations)]
66
67macro_rules! cfg_client {
68 ($($item:item)*) => {
69 $(
70 #[cfg_attr(docsrs, doc(cfg(feature = "client")))]
71 #[cfg(feature = "client")]
72 $item
73 )*
74 }
75}
76macro_rules! cfg_config {
77 ($($item:item)*) => {
78 $(
79 #[cfg_attr(docsrs, doc(cfg(feature = "config")))]
80 #[cfg(feature = "config")]
81 $item
82 )*
83 }
84}
85
86macro_rules! cfg_error {
87 ($($item:item)*) => {
88 $(
89 #[cfg_attr(docsrs, doc(cfg(any(feature = "config", feature = "client"))))]
90 #[cfg(any(feature = "config", feature = "client"))]
91 $item
92 )*
93 }
94}
95
96cfg_client! {
97 pub mod api;
98 pub mod discovery;
99 pub mod client;
100
101 #[doc(inline)]
102 pub use api::Api;
103 #[doc(inline)]
104 pub use client::Client;
105 #[doc(inline)]
106 pub use discovery::Discovery;
107}
108
109cfg_config! {
110 pub mod config;
111 #[doc(inline)]
112 pub use config::Config;
113}
114
115cfg_error! {
116 pub mod error;
117 #[doc(inline)] pub use error::Error;
118 pub type Result<T, E = Error> = std::result::Result<T, E>;
120}
121
122pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
123pub use kube_core as core;
125
126#[cfg(all(feature = "client", feature = "config"))]
129#[cfg(test)]
130#[allow(unused_imports)] mod test {
132 use crate::{
133 api::{AttachParams, AttachedProcess},
134 client::ConfigExt,
135 Api, Client, Config, ResourceExt,
136 };
137 use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt, TryStreamExt};
138 use hyper::Uri;
139 use k8s_openapi::api::core::v1::{EphemeralContainer, Pod, PodSpec};
140 use kube_core::{
141 params::{DeleteParams, Patch, PatchParams, PostParams, WatchParams},
142 response::StatusSummary,
143 };
144 use serde_json::json;
145 use tower::ServiceBuilder;
146
147 #[allow(dead_code)]
149 #[ignore = "needs cluster (lists pods)"]
151 #[cfg(feature = "rustls-tls")]
152 async fn custom_client_rustls_configuration() -> Result<(), Box<dyn std::error::Error>> {
153 use hyper_util::rt::TokioExecutor;
154
155 let config = Config::infer().await?;
156 let https = config.rustls_https_connector()?;
157 let service = ServiceBuilder::new()
158 .layer(config.base_uri_layer())
159 .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
160 let client = Client::new(service, config.default_namespace);
161 let pods: Api<Pod> = Api::default_namespaced(client);
162 pods.list(&Default::default()).await?;
163 Ok(())
164 }
165
166 #[tokio::test]
167 #[ignore = "needs cluster (lists pods)"]
168 #[cfg(feature = "openssl-tls")]
169 async fn custom_client_openssl_tls_configuration() -> Result<(), Box<dyn std::error::Error>> {
170 use hyper_util::rt::TokioExecutor;
171
172 let config = Config::infer().await?;
173 let https = config.openssl_https_connector()?;
174 let service = ServiceBuilder::new()
175 .layer(config.base_uri_layer())
176 .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
177 let client = Client::new(service, config.default_namespace);
178 let pods: Api<Pod> = Api::default_namespaced(client);
179 pods.list(&Default::default()).await?;
180 Ok(())
181 }
182
183 #[tokio::test]
184 #[ignore = "needs cluster (lists api resources)"]
185 #[cfg(feature = "client")]
186 async fn group_discovery_oneshot() -> Result<(), Box<dyn std::error::Error>> {
187 use crate::{core::DynamicObject, discovery};
188 let client = Client::try_default().await?;
189 let apigroup = discovery::group(&client, "apiregistration.k8s.io").await?;
190 let (ar, _caps) = apigroup.recommended_kind("APIService").unwrap();
191 let api: Api<DynamicObject> = Api::all_with(client.clone(), &ar);
192 api.list(&Default::default()).await?;
193
194 Ok(())
195 }
196
197 #[tokio::test]
198 #[ignore = "needs cluster (will create and edit a pod)"]
199 async fn pod_can_use_core_apis() -> Result<(), Box<dyn std::error::Error>> {
200 use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, WatchEvent};
201
202 let client = Client::try_default().await?;
203 let pods: Api<Pod> = Api::default_namespaced(client);
204
205 let p: Pod = serde_json::from_value(json!({
207 "apiVersion": "v1",
208 "kind": "Pod",
209 "metadata": {
210 "name": "busybox-kube1",
211 "labels": { "app": "kube-rs-test" },
212 },
213 "spec": {
214 "terminationGracePeriodSeconds": 1,
215 "restartPolicy": "Never",
216 "containers": [{
217 "name": "busybox",
218 "image": "busybox:1.34.1",
219 "command": ["sh", "-c", "sleep 30"],
220 }],
221 }
222 }))?;
223
224 let pp = PostParams::default();
225 match pods.create(&pp, &p).await {
226 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
227 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
230
231 let wp = WatchParams::default()
234 .fields(&format!("metadata.name={}", "busybox-kube1"))
235 .timeout(15);
236 let mut stream = pods.watch(&wp, "0").await?.boxed();
237 while let Some(ev) = stream.try_next().await? {
238 let _ = format!("we: {ev:?}");
240 match ev {
241 WatchEvent::Modified(o) => {
242 let s = o.status.as_ref().expect("status exists on pod");
243 let phase = s.phase.clone().unwrap_or_default();
244 if phase == "Running" {
245 break;
246 }
247 }
248 WatchEvent::Error(e) => panic!("watch error: {e}"),
249 _ => {}
250 }
251 }
252
253 let mut pod = pods.get("busybox-kube1").await?;
255 assert_eq!(p.spec.as_ref().unwrap().containers[0].name, "busybox");
256
257 {
260 assert!(pod.resource_version().is_some());
261 pod.spec.as_mut().unwrap().active_deadline_seconds = Some(5);
262
263 let pp = PostParams::default();
264 let patched_pod = pods.replace("busybox-kube1", &pp, &pod).await?;
265 assert_eq!(patched_pod.spec.unwrap().active_deadline_seconds, Some(5));
266 }
267
268 let dp = DeleteParams::default();
270 pods.delete("busybox-kube1", &dp).await?.map_left(|pdel| {
271 assert_eq!(pdel.name_unchecked(), "busybox-kube1");
272 });
273
274 Ok(())
275 }
276
277 #[tokio::test]
278 #[ignore = "needs cluster (will create and attach to a pod)"]
279 #[cfg(feature = "ws")]
280 async fn pod_can_exec_and_write_to_stdin() -> Result<(), Box<dyn std::error::Error>> {
281 use crate::api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent};
282 use tokio::io::AsyncWriteExt;
283
284 let client = Client::try_default().await?;
285 let pods: Api<Pod> = Api::default_namespaced(client);
286
287 let p: Pod = serde_json::from_value(json!({
289 "apiVersion": "v1",
290 "kind": "Pod",
291 "metadata": {
292 "name": "busybox-kube2",
293 "labels": { "app": "kube-rs-test" },
294 },
295 "spec": {
296 "terminationGracePeriodSeconds": 1,
297 "restartPolicy": "Never",
298 "containers": [{
299 "name": "busybox",
300 "image": "busybox:1.34.1",
301 "command": ["sh", "-c", "sleep 30"],
302 }],
303 }
304 }))?;
305
306 match pods.create(&Default::default(), &p).await {
307 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
308 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
311
312 let wp = WatchParams::default()
315 .fields(&format!("metadata.name={}", "busybox-kube2"))
316 .timeout(15);
317 let mut stream = pods.watch(&wp, "0").await?.boxed();
318 while let Some(ev) = stream.try_next().await? {
319 match ev {
320 WatchEvent::Modified(o) => {
321 let s = o.status.as_ref().expect("status exists on pod");
322 let phase = s.phase.clone().unwrap_or_default();
323 if phase == "Running" {
324 break;
325 }
326 }
327 WatchEvent::Error(e) => panic!("watch error: {e}"),
328 _ => {}
329 }
330 }
331
332 {
334 let mut attached = pods
335 .exec(
336 "busybox-kube2",
337 vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
338 &AttachParams::default().stderr(false),
339 )
340 .await?;
341 let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
342 let out = stdout
343 .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
344 .collect::<Vec<_>>()
345 .await
346 .join("");
347 attached.join().await.unwrap();
348 assert_eq!(out.lines().count(), 3);
349 assert_eq!(out, "1\n2\n3\n");
350 }
351
352 {
354 let name = "busybox-kube2";
355 let command = vec!["sh", "-c", "sleep 2; echo test string 2"];
356 let ap = AttachParams::default().stdin(true).stderr(false);
357
358 let mut req = pods.request.exec(name, command.clone(), &ap)?;
360 req.extensions_mut().insert("exec");
361 let stream = pods.client.connect(req).await?;
362
363 if stream.supports_stream_close() {
366 let mut attached = pods.exec(name, command, &ap).await?;
367 let mut stdin_writer = attached.stdin().unwrap();
368 let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
369
370 stdin_writer.write_all(b"this will be ignored\n").await?;
371 _ = stdin_writer.shutdown().await;
372
373 let next_stdout = stdout_stream.next();
374 let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
375 assert_eq!(stdout, "test string 2\n");
376
377 let status = attached.take_status().unwrap();
379 if let Some(status) = status.await {
380 assert_eq!(status.status, Some("Success".to_owned()));
381 assert_eq!(status.reason, None);
382 }
383 }
384 }
385
386 {
388 let mut attached = pods
389 .exec(
390 "busybox-kube2",
391 vec!["sh"],
392 &AttachParams::default().stdin(true).stderr(false),
393 )
394 .await?;
395 let mut stdin_writer = attached.stdin().unwrap();
396 let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
397 let next_stdout = stdout_stream.next();
398 stdin_writer.write_all(b"echo test string 1\n").await?;
399 let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
400 println!("{stdout}");
401 assert_eq!(stdout, "test string 1\n");
402
403 stdin_writer.write_all(b"exit 1\n").await?;
406 let status = attached.take_status().unwrap();
407 if let Some(status) = status.await {
408 println!("{status:?}");
409 assert_eq!(status.status, Some("Failure".to_owned()));
410 assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
411 }
412 }
413
414 let dp = DeleteParams::default();
416 pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
417 assert_eq!(pdel.name_unchecked(), "busybox-kube2");
418 });
419
420 Ok(())
421 }
422
423 #[tokio::test]
424 #[ignore = "needs cluster (will create and tail logs from a pod)"]
425 async fn can_get_pod_logs_and_evict() -> Result<(), Box<dyn std::error::Error>> {
426 use crate::{
427 api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
428 core::subresource::LogParams,
429 };
430
431 let client = Client::try_default().await?;
432 let pods: Api<Pod> = Api::default_namespaced(client);
433
434 let p: Pod = serde_json::from_value(json!({
436 "apiVersion": "v1",
437 "kind": "Pod",
438 "metadata": {
439 "name": "busybox-kube3",
440 "labels": { "app": "kube-rs-test" },
441 },
442 "spec": {
443 "terminationGracePeriodSeconds": 1,
444 "restartPolicy": "Never",
445 "containers": [{
446 "name": "busybox",
447 "image": "busybox:1.34.1",
448 "command": ["sh", "-c", "for i in $(seq 1 5); do echo kube $i; sleep 0.1; done"],
449 }],
450 }
451 }))?;
452
453 match pods.create(&Default::default(), &p).await {
454 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
455 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
458
459 let wp = WatchParams::default()
462 .fields(&format!("metadata.name={}", "busybox-kube3"))
463 .timeout(15);
464 let mut stream = pods.watch(&wp, "0").await?.boxed();
465 while let Some(ev) = stream.try_next().await? {
466 match ev {
467 WatchEvent::Modified(o) => {
468 let s = o.status.as_ref().expect("status exists on pod");
469 let phase = s.phase.clone().unwrap_or_default();
470 if phase == "Running" {
471 break;
472 }
473 }
474 WatchEvent::Error(e) => panic!("watch error: {e}"),
475 _ => {}
476 }
477 }
478
479 let lp = LogParams {
481 follow: true,
482 ..LogParams::default()
483 };
484 let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.lines();
485
486 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
488
489 let all_logs = pods.logs("busybox-kube3", &Default::default()).await?;
490 assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
491
492 let mut output = vec![];
494 while let Some(line) = logs_stream.try_next().await? {
495 output.push(line);
496 }
497 assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]);
498
499 let ep = EvictParams::default();
501 let eres = pods.evict("busybox-kube3", &ep).await?;
502 assert_eq!(eres.code, 201); assert!(eres.is_success());
504
505 Ok(())
506 }
507
508 #[tokio::test]
509 #[ignore = "requires a cluster"]
510 async fn can_operate_on_pod_metadata() -> Result<(), Box<dyn std::error::Error>> {
511 use crate::{
512 api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
513 core::subresource::LogParams,
514 };
515 use kube_core::{ObjectList, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};
516
517 let client = Client::try_default().await?;
518 let pods: Api<Pod> = Api::default_namespaced(client);
519
520 let p: Pod = serde_json::from_value(json!({
522 "apiVersion": "v1",
523 "kind": "Pod",
524 "metadata": {
525 "name": "busybox-kube-meta",
526 "labels": { "app": "kube-rs-test" },
527 },
528 "spec": {
529 "terminationGracePeriodSeconds": 1,
530 "restartPolicy": "Never",
531 "containers": [{
532 "name": "busybox",
533 "image": "busybox:1.34.1",
534 "command": ["sh", "-c", "sleep 30s"],
535 }],
536 }
537 }))?;
538
539 match pods.create(&Default::default(), &p).await {
540 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
541 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
544
545 let pod_metadata = pods.get_metadata("busybox-kube-meta").await?;
548 assert_eq!("busybox-kube-meta", pod_metadata.name_any());
549 assert_eq!(
550 Some((&"app".to_string(), &"kube-rs-test".to_string())),
551 pod_metadata.labels().get_key_value("app")
552 );
553
554 let p_list = pods.list_metadata(&ListParams::default()).await?;
556
557 let pod_metadata = p_list
560 .items
561 .into_iter()
562 .find(|p| p.name_any() == "busybox-kube-meta")
563 .unwrap();
564 assert_eq!(
565 pod_metadata.labels().get("app"),
566 Some(&"kube-rs-test".to_string())
567 );
568
569 let patch = ObjectMeta {
571 annotations: Some([("test".to_string(), "123".to_string())].into()),
572 ..Default::default()
573 }
574 .into_request_partial::<Pod>();
575
576 let patchparams = PatchParams::default();
577 let p_patched = pods
578 .patch_metadata("busybox-kube-meta", &patchparams, &Patch::Merge(&patch))
579 .await?;
580 assert_eq!(p_patched.annotations().get("test"), Some(&"123".to_string()));
581 assert_eq!(p_patched.types.as_ref().unwrap().kind, "PartialObjectMetadata");
582 assert_eq!(p_patched.types.as_ref().unwrap().api_version, "meta.k8s.io/v1");
583
584 let dp = DeleteParams::default();
586 pods.delete("busybox-kube-meta", &dp).await?.map_left(|pdel| {
587 assert_eq!(pdel.name_any(), "busybox-kube-meta");
588 });
589
590 Ok(())
591 }
592 #[tokio::test]
593 #[ignore = "needs cluster (will create a CertificateSigningRequest)"]
594 async fn csr_can_be_approved() -> Result<(), Box<dyn std::error::Error>> {
595 use crate::api::PostParams;
596 use k8s_openapi::api::certificates::v1::{
597 CertificateSigningRequest, CertificateSigningRequestCondition, CertificateSigningRequestStatus,
598 };
599
600 let csr_name = "fake";
601 let dummy_csr: CertificateSigningRequest = serde_json::from_value(json!({
602 "apiVersion": "certificates.k8s.io/v1",
603 "kind": "CertificateSigningRequest",
604 "metadata": { "name": csr_name },
605 "spec": {
606 "request": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURSBSRVFVRVNULS0tLS0KTUlJQ1ZqQ0NBVDRDQVFBd0VURVBNQTBHQTFVRUF3d0dZVzVuWld4aE1JSUJJakFOQmdrcWhraUc5dzBCQVFFRgpBQU9DQVE4QU1JSUJDZ0tDQVFFQTByczhJTHRHdTYxakx2dHhWTTJSVlRWMDNHWlJTWWw0dWluVWo4RElaWjBOCnR2MUZtRVFSd3VoaUZsOFEzcWl0Qm0wMUFSMkNJVXBGd2ZzSjZ4MXF3ckJzVkhZbGlBNVhwRVpZM3ExcGswSDQKM3Z3aGJlK1o2MVNrVHF5SVBYUUwrTWM5T1Nsbm0xb0R2N0NtSkZNMUlMRVI3QTVGZnZKOEdFRjJ6dHBoaUlFMwpub1dtdHNZb3JuT2wzc2lHQ2ZGZzR4Zmd4eW8ybmlneFNVekl1bXNnVm9PM2ttT0x1RVF6cXpkakJ3TFJXbWlECklmMXBMWnoyalVnald4UkhCM1gyWnVVV1d1T09PZnpXM01LaE8ybHEvZi9DdS8wYk83c0x0MCt3U2ZMSU91TFcKcW90blZtRmxMMytqTy82WDNDKzBERHk5aUtwbXJjVDBnWGZLemE1dHJRSURBUUFCb0FBd0RRWUpLb1pJaHZjTgpBUUVMQlFBRGdnRUJBR05WdmVIOGR4ZzNvK21VeVRkbmFjVmQ1N24zSkExdnZEU1JWREkyQTZ1eXN3ZFp1L1BVCkkwZXpZWFV0RVNnSk1IRmQycVVNMjNuNVJsSXJ3R0xuUXFISUh5VStWWHhsdnZsRnpNOVpEWllSTmU3QlJvYXgKQVlEdUI5STZXT3FYbkFvczFqRmxNUG5NbFpqdU5kSGxpT1BjTU1oNndLaTZzZFhpVStHYTJ2RUVLY01jSVUyRgpvU2djUWdMYTk0aEpacGk3ZnNMdm1OQUxoT045UHdNMGM1dVJVejV4T0dGMUtCbWRSeEgvbUNOS2JKYjFRQm1HCkkwYitEUEdaTktXTU0xMzhIQXdoV0tkNjVoVHdYOWl4V3ZHMkh4TG1WQzg0L1BHT0tWQW9FNkpsYWFHdTlQVmkKdjlOSjVaZlZrcXdCd0hKbzZXdk9xVlA3SVFjZmg3d0drWm89Ci0tLS0tRU5EIENFUlRJRklDQVRFIFJFUVVFU1QtLS0tLQo=",
607 "signerName": "kubernetes.io/kube-apiserver-client",
608 "expirationSeconds": 86400,
609 "usages": ["client auth"]
610 }
611 }))?;
612
613 let client = Client::try_default().await?;
614 let csr: Api<CertificateSigningRequest> = Api::all(client.clone());
615 assert!(csr.create(&PostParams::default(), &dummy_csr).await.is_ok());
616
617 let approval_type = "ApprovedFake";
619 let csr_status: CertificateSigningRequestStatus = CertificateSigningRequestStatus {
620 certificate: None,
621 conditions: Some(vec![CertificateSigningRequestCondition {
622 type_: approval_type.to_string(),
623 last_update_time: None,
624 last_transition_time: None,
625 message: Some(format!("{} {}", approval_type, "by kube-rs client")),
626 reason: Some("kube-rsClient".to_string()),
627 status: "True".to_string(),
628 }]),
629 };
630 let csr_status_patch = Patch::Merge(serde_json::json!({ "status": csr_status }));
631 let _ = csr
632 .patch_approval(csr_name, &Default::default(), &csr_status_patch)
633 .await?;
634 let csr_after_approval = csr.get_approval(csr_name).await?;
635
636 assert_eq!(
637 csr_after_approval
638 .status
639 .as_ref()
640 .unwrap()
641 .conditions
642 .as_ref()
643 .unwrap()[0]
644 .type_,
645 approval_type.to_string()
646 );
647 csr.delete(csr_name, &DeleteParams::default()).await?;
648 Ok(())
649 }
650
651 #[tokio::test]
652 #[ignore = "needs cluster for ephemeral containers operations"]
653 async fn can_operate_on_ephemeral_containers() -> Result<(), Box<dyn std::error::Error>> {
654 let client = Client::try_default().await?;
655
656 let api_version = client.apiserver_version().await?;
659 if api_version.major.parse::<i32>()? < 1 || api_version.minor.parse::<i32>()? < 25 {
660 return Ok(());
661 }
662
663 let pod: Pod = serde_json::from_value(serde_json::json!({
664 "apiVersion": "v1",
665 "kind": "Pod",
666 "metadata": {
667 "name": "ephemeral-container-test",
668 "labels": { "app": "kube-rs-test" },
669 },
670 "spec": {
671 "restartPolicy": "Never",
672 "containers": [{
673 "name": "busybox",
674 "image": "busybox:1.34.1",
675 "command": ["sh", "-c", "sleep 2"],
676 }],
677 }
678 }))?;
679
680 let pod_name = pod.name_any();
681 let pods = Api::<Pod>::default_namespaced(client);
682
683 let _ = pods
688 .delete(&pod.name_any(), &DeleteParams::default())
689 .await
690 .map(|v| v.map_left(|pdel| assert_eq!(pdel.name_any(), pod.name_any())));
691
692 match pods.create(&Default::default(), &pod).await {
695 Ok(o) => assert_eq!(pod.name_unchecked(), o.name_unchecked()),
696 Err(e) => return Err(e.into()), }
698
699 let current_ephemeral_containers = pods
700 .get_ephemeral_containers(&pod.name_any())
701 .await?
702 .spec
703 .unwrap()
704 .ephemeral_containers;
705
706 assert_eq!(current_ephemeral_containers, None);
709
710 let mut busybox_eph: EphemeralContainer = serde_json::from_value(json!(
711 {
712 "name": "myephemeralcontainer1",
713 "image": "busybox:1.34.1",
714 "command": ["sh", "-c", "sleep 2"],
715 }
716 ))?;
717
718 let patch: Pod = serde_json::from_value(json!({
721 "metadata": { "name": pod_name },
722 "spec":{ "ephemeralContainers": [ busybox_eph ] }
723 }))?;
724
725 let current_containers = pods
726 .replace_ephemeral_containers(&pod_name, &PostParams::default(), &patch)
727 .await?
728 .spec
729 .unwrap()
730 .ephemeral_containers
731 .expect("could find ephemeral container");
732
733 assert_eq!(current_containers.len(), 1);
736 assert_eq!(current_containers[0].name, busybox_eph.name);
737 assert_eq!(current_containers[0].image, busybox_eph.image);
738 assert_eq!(current_containers[0].command, busybox_eph.command);
739
740 busybox_eph = serde_json::from_value(json!(
745 {
746 "name": "myephemeralcontainer2",
747 "image": "busybox:1.35.0",
748 "command": ["sh", "-c", "sleep 1"],
749 }
750 ))?;
751
752 let patch: Pod =
753 serde_json::from_value(json!({ "spec": { "ephemeralContainers": [ busybox_eph ] }}))?;
754
755 let current_containers = pods
756 .patch_ephemeral_containers(&pod_name, &PatchParams::default(), &Patch::Strategic(patch))
757 .await?
758 .spec
759 .unwrap()
760 .ephemeral_containers
761 .expect("could find ephemeral container");
762
763 assert_eq!(current_containers.len(), 2);
766
767 let new_container = current_containers
768 .iter()
769 .find(|c| c.name == busybox_eph.name)
770 .expect("could find myephemeralcontainer2");
771
772 assert_eq!(new_container.image, busybox_eph.image);
775 assert_eq!(new_container.command, busybox_eph.command);
776
777 let expected_containers = current_containers;
780
781 let current_containers = pods
782 .get_ephemeral_containers(&pod.name_any())
783 .await?
784 .spec
785 .unwrap()
786 .ephemeral_containers
787 .unwrap();
788
789 assert_eq!(current_containers, expected_containers);
790
791 pods.delete(&pod.name_any(), &DeleteParams::default())
792 .await?
793 .map_left(|pdel| {
794 assert_eq!(pdel.name_any(), pod.name_any());
795 });
796
797 Ok(())
798 }
799
800 #[tokio::test]
801 #[ignore = "needs kubelet debug methods"]
802 #[cfg(feature = "kubelet-debug")]
803 async fn pod_can_exec_and_write_to_stdin_from_node_proxy() -> Result<(), Box<dyn std::error::Error>> {
804 use crate::{
805 api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent},
806 core::kubelet_debug::KubeletDebugParams,
807 };
808
809 let client = Client::try_default().await?;
810 let pods: Api<Pod> = Api::default_namespaced(client);
811
812 let p: Pod = serde_json::from_value(json!({
814 "apiVersion": "v1",
815 "kind": "Pod",
816 "metadata": {
817 "name": "busybox-kube2",
818 "labels": { "app": "kube-rs-test" },
819 },
820 "spec": {
821 "terminationGracePeriodSeconds": 1,
822 "restartPolicy": "Never",
823 "containers": [{
824 "name": "busybox",
825 "image": "busybox:1.34.1",
826 "command": ["sh", "-c", "sleep 30"],
827 }],
828 }
829 }))?;
830
831 match pods.create(&Default::default(), &p).await {
832 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
833 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
836
837 let wp = WatchParams::default()
840 .fields(&format!("metadata.name={}", "busybox-kube2"))
841 .timeout(15);
842 let mut stream = pods.watch(&wp, "0").await?.boxed();
843 while let Some(ev) = stream.try_next().await? {
844 match ev {
845 WatchEvent::Modified(o) => {
846 let s = o.status.as_ref().expect("status exists on pod");
847 let phase = s.phase.clone().unwrap_or_default();
848 if phase == "Running" {
849 break;
850 }
851 }
852 WatchEvent::Error(e) => panic!("watch error: {e}"),
853 _ => {}
854 }
855 }
856
857 let mut config = Config::infer().await?;
858 config.accept_invalid_certs = true;
859 config.cluster_url = "https://localhost:10250".to_string().parse::<Uri>().unwrap();
860 let kubelet_client: Client = config.try_into()?;
861
862 {
864 let mut attached = kubelet_client
865 .kubelet_node_exec(
866 &KubeletDebugParams {
867 name: "busybox-kube2",
868 namespace: "default",
869 ..Default::default()
870 },
871 "busybox",
872 vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
873 &AttachParams::default().stderr(false),
874 )
875 .await?;
876 let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
877 let out = stdout
878 .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
879 .collect::<Vec<_>>()
880 .await
881 .join("");
882 attached.join().await.unwrap();
883 assert_eq!(out.lines().count(), 3);
884 assert_eq!(out, "1\n2\n3\n");
885 }
886
887 {
889 use tokio::io::AsyncWriteExt;
890 let mut attached = kubelet_client
891 .kubelet_node_exec(
892 &KubeletDebugParams {
893 name: "busybox-kube2",
894 namespace: "default",
895 ..Default::default()
896 },
897 "busybox",
898 vec!["sh"],
899 &AttachParams::default().stdin(true).stderr(false),
900 )
901 .await?;
902 let mut stdin_writer = attached.stdin().unwrap();
903 let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
904 let next_stdout = stdout_stream.next();
905 stdin_writer.write_all(b"echo test string 1\n").await?;
906 let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
907 println!("{stdout}");
908 assert_eq!(stdout, "test string 1\n");
909
910 stdin_writer.write_all(b"exit 1\n").await?;
913 let status = attached.take_status().unwrap();
914 if let Some(status) = status.await {
915 println!("{status:?}");
916 assert_eq!(status.status, Some("Failure".to_owned()));
917 assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
918 }
919 }
920
921 let dp = DeleteParams::default();
923 pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
924 assert_eq!(pdel.name_unchecked(), "busybox-kube2");
925 });
926
927 Ok(())
928 }
929}