1use chrono::{DateTime, Utc};
11use either::{Either, Left, Right};
12use futures::{future::BoxFuture, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
13use http::{self, Request, Response};
14use http_body_util::BodyExt;
15#[cfg(feature = "ws")] use hyper_util::rt::TokioIo;
16use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
17pub use kube_core::response::Status;
18use serde::de::DeserializeOwned;
19use serde_json::{self, Value};
20#[cfg(feature = "ws")]
21use tokio_tungstenite::{tungstenite as ws, WebSocketStream};
22use tokio_util::{
23 codec::{FramedRead, LinesCodec, LinesCodecError},
24 io::StreamReader,
25};
26use tower::{buffer::Buffer, util::BoxService, BoxError, Layer, Service, ServiceExt};
27use tower_http::map_response_body::MapResponseBodyLayer;
28
29pub use self::body::Body;
30use crate::{api::WatchEvent, error::ErrorResponse, Config, Error, Result};
31
32mod auth;
33mod body;
34mod builder;
35#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
36#[cfg(feature = "unstable-client")]
37mod client_ext;
38#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
39#[cfg(feature = "unstable-client")]
40pub use client_ext::scope;
41mod config_ext;
42pub use auth::Error as AuthError;
43pub use config_ext::ConfigExt;
44pub mod middleware;
45
46#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls;
47
48#[cfg(feature = "openssl-tls")]
49pub use tls::openssl_tls::Error as OpensslTlsError;
50#[cfg(feature = "rustls-tls")] pub use tls::rustls_tls::Error as RustlsTlsError;
51#[cfg(feature = "ws")] mod upgrade;
52
53#[cfg(feature = "oauth")]
54#[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
55pub use auth::OAuthError;
56
57#[cfg(feature = "oidc")]
58#[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
59pub use auth::oidc_errors;
60
61#[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError;
62
63#[cfg(feature = "kubelet-debug")]
64#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
65mod kubelet_debug;
66
67pub use builder::{ClientBuilder, DynBody};
68
69#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
76#[derive(Clone)]
77pub struct Client {
78 inner: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, BoxError>>>,
81 default_ns: String,
82 valid_until: Option<DateTime<Utc>>,
83}
84
85#[cfg(feature = "ws")]
88#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
89pub struct Connection {
90 stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
91 protocol: upgrade::StreamProtocol,
92}
93
94#[cfg(feature = "ws")]
95#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
96impl Connection {
97 pub fn supports_stream_close(&self) -> bool {
99 self.protocol.supports_stream_close()
100 }
101
102 pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
104 self.stream
105 }
106}
107
108impl Client {
114 pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
144 where
145 S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
146 S::Future: Send + 'static,
147 S::Error: Into<BoxError>,
148 B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
149 B::Error: Into<BoxError>,
150 T: Into<String>,
151 {
152 let service = MapResponseBodyLayer::new(Body::wrap_body)
154 .layer(service)
155 .map_err(|e| e.into());
156 Self {
157 inner: Buffer::new(BoxService::new(service), 1024),
158 default_ns: default_namespace.into(),
159 valid_until: None,
160 }
161 }
162
163 pub fn with_valid_until(self, valid_until: Option<DateTime<Utc>>) -> Self {
165 Client { valid_until, ..self }
166 }
167
168 pub fn valid_until(&self) -> &Option<DateTime<Utc>> {
170 &self.valid_until
171 }
172
173 pub async fn try_default() -> Result<Self> {
191 Self::try_from(Config::infer().await.map_err(Error::InferConfig)?)
192 }
193
194 pub fn default_namespace(&self) -> &str {
200 &self.default_ns
201 }
202
203 pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
207 let mut svc = self.inner.clone();
208 let res = svc
209 .ready()
210 .await
211 .map_err(Error::Service)?
212 .call(request)
213 .await
214 .map_err(|err| {
215 err.downcast::<Error>()
217 .map(|e| *e)
218 .or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
220 .unwrap_or_else(Error::Service)
222 })?;
223 Ok(res)
224 }
225
226 #[cfg(feature = "ws")]
228 #[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
229 pub async fn connect(&self, request: Request<Vec<u8>>) -> Result<Connection> {
230 use http::header::HeaderValue;
231 let (mut parts, body) = request.into_parts();
232 parts
233 .headers
234 .insert(http::header::CONNECTION, HeaderValue::from_static("Upgrade"));
235 parts
236 .headers
237 .insert(http::header::UPGRADE, HeaderValue::from_static("websocket"));
238 parts.headers.insert(
239 http::header::SEC_WEBSOCKET_VERSION,
240 HeaderValue::from_static("13"),
241 );
242 let key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
243 parts.headers.insert(
244 http::header::SEC_WEBSOCKET_KEY,
245 key.parse().expect("valid header value"),
246 );
247 upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;
248
249 let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
250 let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
251 match hyper::upgrade::on(res).await {
252 Ok(upgraded) => Ok(Connection {
253 stream: WebSocketStream::from_raw_socket(
254 TokioIo::new(upgraded),
255 ws::protocol::Role::Client,
256 None,
257 )
258 .await,
259 protocol,
260 }),
261
262 Err(e) => Err(Error::UpgradeConnection(
263 UpgradeConnectionError::GetPendingUpgrade(e),
264 )),
265 }
266 }
267
268 pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
271 where
272 T: DeserializeOwned,
273 {
274 let text = self.request_text(request).await?;
275
276 serde_json::from_str(&text).map_err(|e| {
277 tracing::warn!("{}, {:?}", text, e);
278 Error::SerdeError(e)
279 })
280 }
281
282 pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
285 let res = self.send(request.map(Body::from)).await?;
286 let res = handle_api_errors(res).await?;
287 let body_bytes = res.into_body().collect().await?.to_bytes();
288 let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
289 Ok(text)
290 }
291
292 pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead> {
297 let res = self.send(request.map(Body::from)).await?;
298 let res = handle_api_errors(res).await?;
299 let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
302 Ok(body.into_async_read())
303 }
304
305 pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
308 where
309 T: DeserializeOwned,
310 {
311 let text = self.request_text(request).await?;
312 let v: Value = serde_json::from_str(&text).map_err(Error::SerdeError)?;
314 if v["kind"] == "Status" {
315 tracing::trace!("Status from {}", text);
316 Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
317 tracing::warn!("{}, {:?}", text, e);
318 Error::SerdeError(e)
319 })?))
320 } else {
321 Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
322 tracing::warn!("{}, {:?}", text, e);
323 Error::SerdeError(e)
324 })?))
325 }
326 }
327
328 pub async fn request_events<T>(
330 &self,
331 request: Request<Vec<u8>>,
332 ) -> Result<impl TryStream<Item = Result<WatchEvent<T>>>>
333 where
334 T: Clone + DeserializeOwned,
335 {
336 let res = self.send(request.map(Body::from)).await?;
337 tracing::trace!("headers: {:?}", res.headers());
339
340 let frames = FramedRead::new(
341 StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
342 if e.to_string().contains("unexpected EOF during chunk") {
345 return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
346 }
347 std::io::Error::other(e)
348 })),
349 LinesCodec::new(),
350 );
351
352 Ok(frames.filter_map(|res| async {
353 match res {
354 Ok(line) => match serde_json::from_str::<WatchEvent<T>>(&line) {
355 Ok(event) => Some(Ok(event)),
356 Err(e) => {
357 if e.is_eof() {
359 return None;
360 }
361
362 if let Ok(e_resp) = serde_json::from_str::<ErrorResponse>(&line) {
364 return Some(Err(Error::Api(e_resp)));
365 }
366 Some(Err(Error::SerdeError(e)))
368 }
369 },
370
371 Err(LinesCodecError::Io(e)) => match e.kind() {
372 std::io::ErrorKind::TimedOut => {
374 tracing::warn!("timeout in poll: {}", e); None
376 }
377 std::io::ErrorKind::UnexpectedEof => {
380 tracing::warn!("eof in poll: {}", e);
381 None
382 }
383 _ => Some(Err(Error::ReadEvents(e))),
384 },
385
386 Err(LinesCodecError::MaxLineLengthExceeded) => {
389 Some(Err(Error::LinesCodecMaxLineLengthExceeded))
390 }
391 }
392 }))
393 }
394}
395
396impl Client {
402 pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
404 self.request(
405 Request::builder()
406 .uri("/version")
407 .body(vec![])
408 .map_err(Error::HttpError)?,
409 )
410 .await
411 }
412
413 pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
415 self.request(
416 Request::builder()
417 .uri("/apis")
418 .body(vec![])
419 .map_err(Error::HttpError)?,
420 )
421 .await
422 }
423
424 pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
443 let url = format!("/apis/{apiversion}");
444 self.request(
445 Request::builder()
446 .uri(url)
447 .body(vec![])
448 .map_err(Error::HttpError)?,
449 )
450 .await
451 }
452
453 pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
455 self.request(
456 Request::builder()
457 .uri("/api")
458 .body(vec![])
459 .map_err(Error::HttpError)?,
460 )
461 .await
462 }
463
464 pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
466 let url = format!("/api/{version}");
467 self.request(
468 Request::builder()
469 .uri(url)
470 .body(vec![])
471 .map_err(Error::HttpError)?,
472 )
473 .await
474 }
475}
476
477async fn handle_api_errors(res: Response<Body>) -> Result<Response<Body>> {
485 let status = res.status();
486 if status.is_client_error() || status.is_server_error() {
487 let body_bytes = res.into_body().collect().await?.to_bytes();
489 let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
490 if let Ok(errdata) = serde_json::from_str::<ErrorResponse>(&text) {
493 tracing::debug!("Unsuccessful: {errdata:?}");
494 Err(Error::Api(errdata))
495 } else {
496 tracing::warn!("Unsuccessful data error parse: {}", text);
497 let error_response = ErrorResponse {
498 status: status.to_string(),
499 code: status.as_u16(),
500 message: format!("{text:?}"),
501 reason: "Failed to parse error data".into(),
502 };
503 tracing::debug!("Unsuccessful: {error_response:?} (reconstruct)");
504 Err(Error::Api(error_response))
505 }
506 } else {
507 Ok(res)
508 }
509}
510
511impl TryFrom<Config> for Client {
512 type Error = Error;
513
514 fn try_from(config: Config) -> Result<Self> {
518 Ok(ClientBuilder::try_from(config)?.build())
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use std::pin::pin;
525
526 use crate::{client::Body, Api, Client};
527
528 use http::{Request, Response};
529 use k8s_openapi::api::core::v1::Pod;
530 use tower_test::mock;
531
532 #[tokio::test]
533 async fn test_default_ns() {
534 let (mock_service, _) = mock::pair::<Request<Body>, Response<Body>>();
535 let client = Client::new(mock_service, "test-namespace");
536 assert_eq!(client.default_namespace(), "test-namespace");
537 }
538
539 #[tokio::test]
540 async fn test_mock() {
541 let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
542 let spawned = tokio::spawn(async move {
543 let mut handle = pin!(handle);
545 let (request, send) = handle.next_request().await.expect("service not called");
546 assert_eq!(request.method(), http::Method::GET);
547 assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
548 let pod: Pod = serde_json::from_value(serde_json::json!({
549 "apiVersion": "v1",
550 "kind": "Pod",
551 "metadata": {
552 "name": "test",
553 "annotations": { "kube-rs": "test" },
554 },
555 "spec": {
556 "containers": [{ "name": "test", "image": "test-image" }],
557 }
558 }))
559 .unwrap();
560 send.send_response(
561 Response::builder()
562 .body(Body::from(serde_json::to_vec(&pod).unwrap()))
563 .unwrap(),
564 );
565 });
566
567 let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "default"));
568 let pod = pods.get("test").await.unwrap();
569 assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
570 spawned.await.unwrap();
571 }
572}