kube_client/client/
mod.rs

1//! API client for interacting with the Kubernetes API
2//!
3//! The [`Client`] uses standard kube error handling.
4//!
5//! This client can be used on its own or in conjuction with the [`Api`][crate::api::Api]
6//! type for more structured interaction with the kubernetes API.
7//!
8//! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically
9//! retrieve the resources served by the kubernetes API.
10use chrono::{DateTime, Utc};
11use either::{Either, Left, Right};
12use futures::{future::BoxFuture, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
13use http::{self, Request, Response};
14use http_body_util::BodyExt;
15#[cfg(feature = "ws")] use hyper_util::rt::TokioIo;
16use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
17pub use kube_core::response::Status;
18use serde::de::DeserializeOwned;
19use serde_json::{self, Value};
20#[cfg(feature = "ws")]
21use tokio_tungstenite::{tungstenite as ws, WebSocketStream};
22use tokio_util::{
23    codec::{FramedRead, LinesCodec, LinesCodecError},
24    io::StreamReader,
25};
26use tower::{buffer::Buffer, util::BoxService, BoxError, Layer, Service, ServiceExt};
27use tower_http::map_response_body::MapResponseBodyLayer;
28
29pub use self::body::Body;
30use crate::{api::WatchEvent, error::ErrorResponse, Config, Error, Result};
31
32mod auth;
33mod body;
34mod builder;
35#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
36#[cfg(feature = "unstable-client")]
37mod client_ext;
38#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
39#[cfg(feature = "unstable-client")]
40pub use client_ext::scope;
41mod config_ext;
42pub use auth::Error as AuthError;
43pub use config_ext::ConfigExt;
44pub mod middleware;
45
46#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls;
47
48#[cfg(feature = "openssl-tls")]
49pub use tls::openssl_tls::Error as OpensslTlsError;
50#[cfg(feature = "rustls-tls")] pub use tls::rustls_tls::Error as RustlsTlsError;
51#[cfg(feature = "ws")] mod upgrade;
52
53#[cfg(feature = "oauth")]
54#[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
55pub use auth::OAuthError;
56
57#[cfg(feature = "oidc")]
58#[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
59pub use auth::oidc_errors;
60
61#[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError;
62
63#[cfg(feature = "kubelet-debug")]
64#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
65mod kubelet_debug;
66
67pub use builder::{ClientBuilder, DynBody};
68
69/// Client for connecting with a Kubernetes cluster.
70///
71/// The easiest way to instantiate the client is either by
72/// inferring the configuration from the environment using
73/// [`Client::try_default`] or with an existing [`Config`]
74/// using [`Client::try_from`].
75#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
76#[derive(Clone)]
77pub struct Client {
78    // - `Buffer` for cheap clone
79    // - `BoxFuture` for dynamic response future type
80    inner: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, BoxError>>>,
81    default_ns: String,
82    valid_until: Option<DateTime<Utc>>,
83}
84
85/// Represents a WebSocket connection.
86/// Value returned by [`Client::connect`].
87#[cfg(feature = "ws")]
88#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
89pub struct Connection {
90    stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
91    protocol: upgrade::StreamProtocol,
92}
93
94#[cfg(feature = "ws")]
95#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
96impl Connection {
97    /// Return true if the stream supports graceful close signaling.
98    pub fn supports_stream_close(&self) -> bool {
99        self.protocol.supports_stream_close()
100    }
101
102    /// Transform into the raw WebSocketStream.
103    pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
104        self.stream
105    }
106}
107
108/// Constructors and low-level api interfaces.
109///
110/// Most users only need [`Client::try_default`] or [`Client::new`] from this block.
111///
112/// The many various lower level interfaces here are for more advanced use-cases with specific requirements.
113impl Client {
114    /// Create a [`Client`] using a custom `Service` stack.
115    ///
116    /// [`ConfigExt`](crate::client::ConfigExt) provides extensions for
117    /// building a custom stack.
118    ///
119    /// To create with the default stack with a [`Config`], use
120    /// [`Client::try_from`].
121    ///
122    /// To create with the default stack with an inferred [`Config`], use
123    /// [`Client::try_default`].
124    ///
125    /// # Example
126    ///
127    /// ```rust
128    /// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
129    /// use kube::{client::ConfigExt, Client, Config};
130    /// use tower::{BoxError, ServiceBuilder};
131    /// use hyper_util::rt::TokioExecutor;
132    ///
133    /// let config = Config::infer().await?;
134    /// let service = ServiceBuilder::new()
135    ///     .layer(config.base_uri_layer())
136    ///     .option_layer(config.auth_layer()?)
137    ///     .map_err(BoxError::from)
138    ///     .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build_http());
139    /// let client = Client::new(service, config.default_namespace);
140    /// # Ok(())
141    /// # }
142    /// ```
143    pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
144    where
145        S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
146        S::Future: Send + 'static,
147        S::Error: Into<BoxError>,
148        B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
149        B::Error: Into<BoxError>,
150        T: Into<String>,
151    {
152        // Transform response body to `crate::client::Body` and use type erased error to avoid type parameters.
153        let service = MapResponseBodyLayer::new(Body::wrap_body)
154            .layer(service)
155            .map_err(|e| e.into());
156        Self {
157            inner: Buffer::new(BoxService::new(service), 1024),
158            default_ns: default_namespace.into(),
159            valid_until: None,
160        }
161    }
162
163    /// Sets an expiration timestamp to the client, which has to be checked by the user using [`Client::valid_until`] function.
164    pub fn with_valid_until(self, valid_until: Option<DateTime<Utc>>) -> Self {
165        Client { valid_until, ..self }
166    }
167
168    /// Get the expiration timestamp of the client, if it has been set.
169    pub fn valid_until(&self) -> &Option<DateTime<Utc>> {
170        &self.valid_until
171    }
172
173    /// Create and initialize a [`Client`] using the inferred configuration.
174    ///
175    /// Will use [`Config::infer`] which attempts to load the local kubeconfig first,
176    /// and then if that fails, trying the in-cluster environment variables.
177    ///
178    /// Will fail if neither configuration could be loaded.
179    ///
180    /// ```rust
181    /// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
182    /// # use kube::Client;
183    /// let client = Client::try_default().await?;
184    /// # Ok(())
185    /// # }
186    /// ```
187    ///
188    /// If you already have a [`Config`] then use [`Client::try_from`](Self::try_from)
189    /// instead.
190    pub async fn try_default() -> Result<Self> {
191        Self::try_from(Config::infer().await.map_err(Error::InferConfig)?)
192    }
193
194    /// Get the default namespace for the client
195    ///
196    /// The namespace is either configured on `context` in the kubeconfig,
197    /// falls back to `default` when running locally,
198    /// or uses the service account's namespace when deployed in-cluster.
199    pub fn default_namespace(&self) -> &str {
200        &self.default_ns
201    }
202
203    /// Perform a raw HTTP request against the API and return the raw response back.
204    /// This method can be used to get raw access to the API which may be used to, for example,
205    /// create a proxy server or application-level gateway between localhost and the API server.
206    pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
207        let mut svc = self.inner.clone();
208        let res = svc
209            .ready()
210            .await
211            .map_err(Error::Service)?
212            .call(request)
213            .await
214            .map_err(|err| {
215                // Error decorating request
216                err.downcast::<Error>()
217                    .map(|e| *e)
218                    // Error requesting
219                    .or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
220                    // Error from another middleware
221                    .unwrap_or_else(Error::Service)
222            })?;
223        Ok(res)
224    }
225
226    /// Make WebSocket connection.
227    #[cfg(feature = "ws")]
228    #[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
229    pub async fn connect(&self, request: Request<Vec<u8>>) -> Result<Connection> {
230        use http::header::HeaderValue;
231        let (mut parts, body) = request.into_parts();
232        parts
233            .headers
234            .insert(http::header::CONNECTION, HeaderValue::from_static("Upgrade"));
235        parts
236            .headers
237            .insert(http::header::UPGRADE, HeaderValue::from_static("websocket"));
238        parts.headers.insert(
239            http::header::SEC_WEBSOCKET_VERSION,
240            HeaderValue::from_static("13"),
241        );
242        let key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
243        parts.headers.insert(
244            http::header::SEC_WEBSOCKET_KEY,
245            key.parse().expect("valid header value"),
246        );
247        upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;
248
249        let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
250        let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
251        match hyper::upgrade::on(res).await {
252            Ok(upgraded) => Ok(Connection {
253                stream: WebSocketStream::from_raw_socket(
254                    TokioIo::new(upgraded),
255                    ws::protocol::Role::Client,
256                    None,
257                )
258                .await,
259                protocol,
260            }),
261
262            Err(e) => Err(Error::UpgradeConnection(
263                UpgradeConnectionError::GetPendingUpgrade(e),
264            )),
265        }
266    }
267
268    /// Perform a raw HTTP request against the API and deserialize the response
269    /// as JSON to some known type.
270    pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
271    where
272        T: DeserializeOwned,
273    {
274        let text = self.request_text(request).await?;
275
276        serde_json::from_str(&text).map_err(|e| {
277            tracing::warn!("{}, {:?}", text, e);
278            Error::SerdeError(e)
279        })
280    }
281
282    /// Perform a raw HTTP request against the API and get back the response
283    /// as a string
284    pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
285        let res = self.send(request.map(Body::from)).await?;
286        let res = handle_api_errors(res).await?;
287        let body_bytes = res.into_body().collect().await?.to_bytes();
288        let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
289        Ok(text)
290    }
291
292    /// Perform a raw HTTP request against the API and stream the response body.
293    ///
294    /// The response can be processed using [`AsyncReadExt`](futures::AsyncReadExt)
295    /// and [`AsyncBufReadExt`](futures::AsyncBufReadExt).
296    pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead> {
297        let res = self.send(request.map(Body::from)).await?;
298        let res = handle_api_errors(res).await?;
299        // Map the error, since we want to convert this into an `AsyncBufReader` using
300        // `into_async_read` which specifies `std::io::Error` as the stream's error type.
301        let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
302        Ok(body.into_async_read())
303    }
304
305    /// Perform a raw HTTP request against the API and get back either an object
306    /// deserialized as JSON or a [`Status`] Object.
307    pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
308    where
309        T: DeserializeOwned,
310    {
311        let text = self.request_text(request).await?;
312        // It needs to be JSON:
313        let v: Value = serde_json::from_str(&text).map_err(Error::SerdeError)?;
314        if v["kind"] == "Status" {
315            tracing::trace!("Status from {}", text);
316            Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
317                tracing::warn!("{}, {:?}", text, e);
318                Error::SerdeError(e)
319            })?))
320        } else {
321            Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
322                tracing::warn!("{}, {:?}", text, e);
323                Error::SerdeError(e)
324            })?))
325        }
326    }
327
328    /// Perform a raw request and get back a stream of [`WatchEvent`] objects
329    pub async fn request_events<T>(
330        &self,
331        request: Request<Vec<u8>>,
332    ) -> Result<impl TryStream<Item = Result<WatchEvent<T>>>>
333    where
334        T: Clone + DeserializeOwned,
335    {
336        let res = self.send(request.map(Body::from)).await?;
337        // trace!("Streaming from {} -> {}", res.url(), res.status().as_str());
338        tracing::trace!("headers: {:?}", res.headers());
339
340        let frames = FramedRead::new(
341            StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
342                // Unexpected EOF from chunked decoder.
343                // Tends to happen when watching for 300+s. This will be ignored.
344                if e.to_string().contains("unexpected EOF during chunk") {
345                    return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
346                }
347                std::io::Error::other(e)
348            })),
349            LinesCodec::new(),
350        );
351
352        Ok(frames.filter_map(|res| async {
353            match res {
354                Ok(line) => match serde_json::from_str::<WatchEvent<T>>(&line) {
355                    Ok(event) => Some(Ok(event)),
356                    Err(e) => {
357                        // Ignore EOF error that can happen for incomplete line from `decode_eof`.
358                        if e.is_eof() {
359                            return None;
360                        }
361
362                        // Got general error response
363                        if let Ok(e_resp) = serde_json::from_str::<ErrorResponse>(&line) {
364                            return Some(Err(Error::Api(e_resp)));
365                        }
366                        // Parsing error
367                        Some(Err(Error::SerdeError(e)))
368                    }
369                },
370
371                Err(LinesCodecError::Io(e)) => match e.kind() {
372                    // Client timeout
373                    std::io::ErrorKind::TimedOut => {
374                        tracing::warn!("timeout in poll: {}", e); // our client timeout
375                        None
376                    }
377                    // Unexpected EOF from chunked decoder.
378                    // Tends to happen after 300+s of watching.
379                    std::io::ErrorKind::UnexpectedEof => {
380                        tracing::warn!("eof in poll: {}", e);
381                        None
382                    }
383                    _ => Some(Err(Error::ReadEvents(e))),
384                },
385
386                // Reached the maximum line length without finding a newline.
387                // This should never happen because we're using the default `usize::MAX`.
388                Err(LinesCodecError::MaxLineLengthExceeded) => {
389                    Some(Err(Error::LinesCodecMaxLineLengthExceeded))
390                }
391            }
392        }))
393    }
394}
395
396/// Low level discovery methods using `k8s_openapi` types.
397///
398/// Consider using the [`discovery`](crate::discovery) module for
399/// easier-to-use variants of this functionality.
400/// The following methods might be deprecated to avoid confusion between similarly named types within `discovery`.
401impl Client {
402    /// Returns apiserver version.
403    pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
404        self.request(
405            Request::builder()
406                .uri("/version")
407                .body(vec![])
408                .map_err(Error::HttpError)?,
409        )
410        .await
411    }
412
413    /// Lists api groups that apiserver serves.
414    pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
415        self.request(
416            Request::builder()
417                .uri("/apis")
418                .body(vec![])
419                .map_err(Error::HttpError)?,
420        )
421        .await
422    }
423
424    /// Lists resources served in given API group.
425    ///
426    /// ### Example usage:
427    /// ```rust
428    /// # async fn scope(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
429    /// let apigroups = client.list_api_groups().await?;
430    /// for g in apigroups.groups {
431    ///     let ver = g
432    ///         .preferred_version
433    ///         .as_ref()
434    ///         .or_else(|| g.versions.first())
435    ///         .expect("preferred or versions exists");
436    ///     let apis = client.list_api_group_resources(&ver.group_version).await?;
437    ///     dbg!(apis);
438    /// }
439    /// # Ok(())
440    /// # }
441    /// ```
442    pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
443        let url = format!("/apis/{apiversion}");
444        self.request(
445            Request::builder()
446                .uri(url)
447                .body(vec![])
448                .map_err(Error::HttpError)?,
449        )
450        .await
451    }
452
453    /// Lists versions of `core` a.k.a. `""` legacy API group.
454    pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
455        self.request(
456            Request::builder()
457                .uri("/api")
458                .body(vec![])
459                .map_err(Error::HttpError)?,
460        )
461        .await
462    }
463
464    /// Lists resources served in particular `core` group version.
465    pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
466        let url = format!("/api/{version}");
467        self.request(
468            Request::builder()
469                .uri(url)
470                .body(vec![])
471                .map_err(Error::HttpError)?,
472        )
473        .await
474    }
475}
476
477/// Kubernetes returned error handling
478///
479/// Either kube returned an explicit ApiError struct,
480/// or it someohow returned something we couldn't parse as one.
481///
482/// In either case, present an ApiError upstream.
483/// The latter is probably a bug if encountered.
484async fn handle_api_errors(res: Response<Body>) -> Result<Response<Body>> {
485    let status = res.status();
486    if status.is_client_error() || status.is_server_error() {
487        // trace!("Status = {:?} for {}", status, res.url());
488        let body_bytes = res.into_body().collect().await?.to_bytes();
489        let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
490        // Print better debug when things do fail
491        // trace!("Parsing error: {}", text);
492        if let Ok(errdata) = serde_json::from_str::<ErrorResponse>(&text) {
493            tracing::debug!("Unsuccessful: {errdata:?}");
494            Err(Error::Api(errdata))
495        } else {
496            tracing::warn!("Unsuccessful data error parse: {}", text);
497            let error_response = ErrorResponse {
498                status: status.to_string(),
499                code: status.as_u16(),
500                message: format!("{text:?}"),
501                reason: "Failed to parse error data".into(),
502            };
503            tracing::debug!("Unsuccessful: {error_response:?} (reconstruct)");
504            Err(Error::Api(error_response))
505        }
506    } else {
507        Ok(res)
508    }
509}
510
511impl TryFrom<Config> for Client {
512    type Error = Error;
513
514    /// Builds a default [`Client`] from a [`Config`].
515    ///
516    /// See [`ClientBuilder`] or [`Client::new`] if more customization is required
517    fn try_from(config: Config) -> Result<Self> {
518        Ok(ClientBuilder::try_from(config)?.build())
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use std::pin::pin;
525
526    use crate::{client::Body, Api, Client};
527
528    use http::{Request, Response};
529    use k8s_openapi::api::core::v1::Pod;
530    use tower_test::mock;
531
532    #[tokio::test]
533    async fn test_default_ns() {
534        let (mock_service, _) = mock::pair::<Request<Body>, Response<Body>>();
535        let client = Client::new(mock_service, "test-namespace");
536        assert_eq!(client.default_namespace(), "test-namespace");
537    }
538
539    #[tokio::test]
540    async fn test_mock() {
541        let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
542        let spawned = tokio::spawn(async move {
543            // Receive a request for pod and respond with some data
544            let mut handle = pin!(handle);
545            let (request, send) = handle.next_request().await.expect("service not called");
546            assert_eq!(request.method(), http::Method::GET);
547            assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
548            let pod: Pod = serde_json::from_value(serde_json::json!({
549                "apiVersion": "v1",
550                "kind": "Pod",
551                "metadata": {
552                    "name": "test",
553                    "annotations": { "kube-rs": "test" },
554                },
555                "spec": {
556                    "containers": [{ "name": "test", "image": "test-image" }],
557                }
558            }))
559            .unwrap();
560            send.send_response(
561                Response::builder()
562                    .body(Body::from(serde_json::to_vec(&pod).unwrap()))
563                    .unwrap(),
564            );
565        });
566
567        let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "default"));
568        let pod = pods.get("test").await.unwrap();
569        assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
570        spawned.await.unwrap();
571    }
572}