kube_client/client/
builder.rs

1use bytes::Bytes;
2use http::{header::HeaderMap, Request, Response};
3use hyper::{
4    body::Incoming,
5    rt::{Read, Write},
6};
7use hyper_timeout::TimeoutConnector;
8
9use hyper_util::{
10    client::legacy::connect::{Connection, HttpConnector},
11    rt::TokioExecutor,
12};
13
14use std::time::Duration;
15use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder};
16use tower_http::{
17    classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer,
18};
19use tracing::Span;
20
21use super::body::Body;
22use crate::{client::ConfigExt, Client, Config, Error, Result};
23
24/// HTTP body of a dynamic backing type.
25///
26/// The suggested implementation type is [`crate::client::Body`].
27pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send + Unpin;
28
29/// Builder for [`Client`] instances with customized [tower](`Service`) middleware.
30pub struct ClientBuilder<Svc> {
31    service: Svc,
32    default_ns: String,
33}
34
35impl<Svc> ClientBuilder<Svc> {
36    /// Construct a [`ClientBuilder`] from scratch with a fully custom [`Service`] stack.
37    ///
38    /// This method is only intended for advanced use cases, most users will want to use [`ClientBuilder::try_from`] instead,
39    /// which provides a default stack as a starting point.
40    pub fn new(service: Svc, default_namespace: impl Into<String>) -> Self
41    where
42        Svc: Service<Request<Body>>,
43    {
44        Self {
45            service,
46            default_ns: default_namespace.into(),
47        }
48    }
49
50    /// Add a [`Layer`] to the current [`Service`] stack.
51    pub fn with_layer<L: Layer<Svc>>(self, layer: &L) -> ClientBuilder<L::Service> {
52        let Self {
53            service: stack,
54            default_ns,
55        } = self;
56        ClientBuilder {
57            service: layer.layer(stack),
58            default_ns,
59        }
60    }
61
62    /// Build a [`Client`] instance with the current [`Service`] stack.
63    pub fn build<B>(self) -> Client
64    where
65        Svc: Service<Request<Body>, Response = Response<B>> + Send + 'static,
66        Svc::Future: Send + 'static,
67        Svc::Error: Into<BoxError>,
68        B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
69        B::Error: Into<BoxError>,
70    {
71        Client::new(self.service, self.default_ns)
72    }
73}
74
75pub type GenericService = BoxService<Request<Body>, Response<Box<DynBody>>, BoxError>;
76
77impl TryFrom<Config> for ClientBuilder<GenericService> {
78    type Error = Error;
79
80    /// Builds a default [`ClientBuilder`] stack from a given configuration
81    fn try_from(config: Config) -> Result<Self> {
82        let mut connector = HttpConnector::new();
83        connector.enforce_http(false);
84
85        #[cfg(all(feature = "aws-lc-rs", feature = "rustls-tls"))]
86        {
87            if rustls::crypto::CryptoProvider::get_default().is_none() {
88                // the only error here is if it's been initialized in between: we can ignore it
89                // since our semantic is only to set the default value if it does not exist.
90                let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
91            }
92        }
93
94        match config.proxy_url.as_ref() {
95            Some(proxy_url) if proxy_url.scheme_str() == Some("socks5") => {
96                #[cfg(feature = "socks5")]
97                {
98                    let connector = hyper_socks2::SocksConnector {
99                        proxy_addr: proxy_url.clone(),
100                        auth: None,
101                        connector,
102                    };
103                    make_generic_builder(connector, config)
104                }
105
106                #[cfg(not(feature = "socks5"))]
107                Err(Error::ProxyProtocolDisabled {
108                    proxy_url: proxy_url.clone(),
109                    protocol_feature: "kube/socks5",
110                })
111            }
112
113            Some(proxy_url) if proxy_url.scheme_str() == Some("http") => {
114                #[cfg(feature = "http-proxy")]
115                {
116                    let proxy =
117                        hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::All, proxy_url.clone());
118                    let connector = hyper_http_proxy::ProxyConnector::from_proxy_unsecured(connector, proxy);
119
120                    make_generic_builder(connector, config)
121                }
122
123                #[cfg(not(feature = "http-proxy"))]
124                Err(Error::ProxyProtocolDisabled {
125                    proxy_url: proxy_url.clone(),
126                    protocol_feature: "kube/http-proxy",
127                })
128            }
129
130            Some(proxy_url) => Err(Error::ProxyProtocolUnsupported {
131                proxy_url: proxy_url.clone(),
132            }),
133
134            None => make_generic_builder(connector, config),
135        }
136    }
137}
138
139/// Helper function for implementation of [`TryFrom<Config>`] for [`ClientBuilder`].
140/// Ignores [`Config::proxy_url`], which at this point is already handled.
141fn make_generic_builder<H>(connector: H, config: Config) -> Result<ClientBuilder<GenericService>, Error>
142where
143    H: 'static + Clone + Send + Sync + Service<http::Uri>,
144    H::Response: 'static + Connection + Read + Write + Send + Unpin,
145    H::Future: 'static + Send,
146    H::Error: 'static + Send + Sync + std::error::Error,
147{
148    let default_ns = config.default_namespace.clone();
149    let auth_layer = config.auth_layer()?;
150
151    let client: hyper_util::client::legacy::Client<_, Body> = {
152        // Current TLS feature precedence when more than one are set:
153        // 1. rustls-tls
154        // 2. openssl-tls
155        // Create a custom client to use something else.
156        // If TLS features are not enabled, http connector will be used.
157        #[cfg(feature = "rustls-tls")]
158        let connector = config.rustls_https_connector_with_connector(connector)?;
159        #[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
160        let connector = config.openssl_https_connector_with_connector(connector)?;
161        #[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
162        if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
163            // no tls stack situation only works with http scheme
164            return Err(Error::TlsRequired);
165        }
166
167        let mut connector = TimeoutConnector::new(connector);
168
169        // Set the timeouts for the client
170        connector.set_connect_timeout(config.connect_timeout);
171        connector.set_read_timeout(config.read_timeout);
172        connector.set_write_timeout(config.write_timeout);
173
174        hyper_util::client::legacy::Builder::new(TokioExecutor::new()).build(connector)
175    };
176
177    let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
178    #[cfg(feature = "gzip")]
179    let stack = ServiceBuilder::new()
180        .layer(stack)
181        .layer(
182            tower_http::decompression::DecompressionLayer::new()
183                .no_br()
184                .no_deflate()
185                .no_zstd()
186                .gzip(!config.disable_compression),
187        )
188        .into_inner();
189
190    let service = ServiceBuilder::new()
191        .layer(stack)
192        .option_layer(auth_layer)
193        .layer(config.extra_headers_layer()?)
194        .layer(
195            // Attribute names follow [Semantic Conventions].
196            // [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
197            TraceLayer::new_for_http()
198                .make_span_with(|req: &Request<Body>| {
199                    tracing::debug_span!(
200                        "HTTP",
201                         http.method = %req.method(),
202                         http.url = %req.uri(),
203                         http.status_code = tracing::field::Empty,
204                         otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
205                         otel.kind = "client",
206                         otel.status_code = tracing::field::Empty,
207                    )
208                })
209                .on_request(|_req: &Request<Body>, _span: &Span| {
210                    tracing::debug!("requesting");
211                })
212                .on_response(|res: &Response<Incoming>, _latency: Duration, span: &Span| {
213                    let status = res.status();
214                    span.record("http.status_code", status.as_u16());
215                    if status.is_client_error() || status.is_server_error() {
216                        span.record("otel.status_code", "ERROR");
217                    }
218                })
219                // Explicitly disable `on_body_chunk`. The default does nothing.
220                .on_body_chunk(())
221                .on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
222                    tracing::debug!("stream closed");
223                })
224                .on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {
225                    // Called when
226                    // - Calling the inner service errored
227                    // - Polling `Body` errored
228                    // - the response was classified as failure (5xx)
229                    // - End of stream was classified as failure
230                    span.record("otel.status_code", "ERROR");
231                    match ec {
232                        ServerErrorsFailureClass::StatusCode(status) => {
233                            span.record("http.status_code", status.as_u16());
234                            tracing::error!("failed with status {}", status)
235                        }
236                        ServerErrorsFailureClass::Error(err) => {
237                            tracing::error!("failed with error {}", err)
238                        }
239                    }
240                }),
241        )
242        .map_err(BoxError::from)
243        .service(client);
244
245    Ok(ClientBuilder::new(
246        BoxService::new(
247            MapResponseBodyLayer::new(|body| {
248                Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
249            })
250            .layer(service),
251        ),
252        default_ns,
253    ))
254}
255
256#[cfg(test)]
257mod tests {
258    #[cfg(feature = "gzip")] use super::*;
259
260    #[cfg(feature = "gzip")]
261    #[tokio::test]
262    async fn test_no_accept_encoding_header_sent_when_compression_disabled(
263    ) -> Result<(), Box<dyn std::error::Error>> {
264        use http::Uri;
265        use std::net::SocketAddr;
266        use tokio::net::{TcpListener, TcpStream};
267
268        // setup a server that echoes back any encoding header value
269        let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
270        let listener = TcpListener::bind(addr).await?;
271        let local_addr = listener.local_addr()?;
272        let uri: Uri = format!("http://{}", local_addr).parse()?;
273
274        tokio::spawn(async move {
275            use http_body_util::Full;
276            use hyper::{server::conn::http1, service::service_fn};
277            use hyper_util::rt::{TokioIo, TokioTimer};
278            use std::convert::Infallible;
279
280            loop {
281                let (tcp, _) = listener.accept().await.unwrap();
282                let io: TokioIo<TcpStream> = TokioIo::new(tcp);
283
284                tokio::spawn(async move {
285                    let _ = http1::Builder::new()
286                        .timer(TokioTimer::new())
287                        .serve_connection(
288                            io,
289                            service_fn(|req| async move {
290                                let response = req
291                                    .headers()
292                                    .get(http::header::ACCEPT_ENCODING)
293                                    .map(|b| Bytes::copy_from_slice(b.as_bytes()))
294                                    .unwrap_or_default();
295                                Ok::<_, Infallible>(Response::new(Full::new(response)))
296                            }),
297                        )
298                        .await
299                        .unwrap();
300                });
301            }
302        });
303
304        // confirm gzip echoed back with default config
305        let config = Config { ..Config::new(uri) };
306        let client = make_generic_builder(HttpConnector::new(), config.clone())?.build();
307        let response = client.request_text(http::Request::default()).await?;
308        assert_eq!(&response, "gzip");
309
310        // now disable and check empty string echoed back
311        let config = Config {
312            disable_compression: true,
313            ..config
314        };
315        let client = make_generic_builder(HttpConnector::new(), config)?.build();
316        let response = client.request_text(http::Request::default()).await?;
317        assert_eq!(&response, "");
318
319        Ok(())
320    }
321}