kube_client/client/
builder.rs

1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3use http::{header::HeaderMap, Request, Response};
4use hyper::{
5    body::Incoming,
6    rt::{Read, Write},
7};
8use hyper_timeout::TimeoutConnector;
9
10use hyper_util::{
11    client::legacy::connect::{Connection, HttpConnector},
12    rt::TokioExecutor,
13};
14
15use std::time::Duration;
16use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder};
17use tower_http::{
18    classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer,
19};
20use tracing::Span;
21
22use super::body::Body;
23use crate::{client::ConfigExt, Client, Config, Error, Result};
24
25/// HTTP body of a dynamic backing type.
26///
27/// The suggested implementation type is [`crate::client::Body`].
28pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send + Unpin;
29
30/// Builder for [`Client`] instances with customized [tower](`Service`) middleware.
31pub struct ClientBuilder<Svc> {
32    service: Svc,
33    default_ns: String,
34    valid_until: Option<DateTime<Utc>>,
35}
36
37impl<Svc> ClientBuilder<Svc> {
38    /// Construct a [`ClientBuilder`] from scratch with a fully custom [`Service`] stack.
39    ///
40    /// This method is only intended for advanced use cases, most users will want to use [`ClientBuilder::try_from`] instead,
41    /// which provides a default stack as a starting point.
42    pub fn new(service: Svc, default_namespace: impl Into<String>) -> Self
43    where
44        Svc: Service<Request<Body>>,
45    {
46        Self {
47            service,
48            default_ns: default_namespace.into(),
49            valid_until: None,
50        }
51    }
52
53    /// Add a [`Layer`] to the current [`Service`] stack.
54    pub fn with_layer<L: Layer<Svc>>(self, layer: &L) -> ClientBuilder<L::Service> {
55        let Self {
56            service: stack,
57            default_ns,
58            valid_until,
59        } = self;
60        ClientBuilder {
61            service: layer.layer(stack),
62            default_ns,
63            valid_until,
64        }
65    }
66
67    /// Sets an expiration timestamp for the client.
68    pub fn with_valid_until(self, valid_until: Option<DateTime<Utc>>) -> Self {
69        ClientBuilder {
70            service: self.service,
71            default_ns: self.default_ns,
72            valid_until,
73        }
74    }
75
76    /// Build a [`Client`] instance with the current [`Service`] stack.
77    pub fn build<B>(self) -> Client
78    where
79        Svc: Service<Request<Body>, Response = Response<B>> + Send + 'static,
80        Svc::Future: Send + 'static,
81        Svc::Error: Into<BoxError>,
82        B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
83        B::Error: Into<BoxError>,
84    {
85        Client::new(self.service, self.default_ns).with_valid_until(self.valid_until)
86    }
87}
88
89pub type GenericService = BoxService<Request<Body>, Response<Box<DynBody>>, BoxError>;
90
91impl TryFrom<Config> for ClientBuilder<GenericService> {
92    type Error = Error;
93
94    /// Builds a default [`ClientBuilder`] stack from a given configuration
95    fn try_from(config: Config) -> Result<Self> {
96        let mut connector = HttpConnector::new();
97        connector.enforce_http(false);
98
99        #[cfg(all(feature = "aws-lc-rs", feature = "rustls-tls"))]
100        {
101            if rustls::crypto::CryptoProvider::get_default().is_none() {
102                // the only error here is if it's been initialized in between: we can ignore it
103                // since our semantic is only to set the default value if it does not exist.
104                let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
105            }
106        }
107
108        match config.proxy_url.as_ref() {
109            Some(proxy_url) if proxy_url.scheme_str() == Some("socks5") => {
110                #[cfg(feature = "socks5")]
111                {
112                    let connector = hyper_socks2::SocksConnector {
113                        proxy_addr: proxy_url.clone(),
114                        auth: None,
115                        connector,
116                    };
117                    make_generic_builder(connector, config)
118                }
119
120                #[cfg(not(feature = "socks5"))]
121                Err(Error::ProxyProtocolDisabled {
122                    proxy_url: proxy_url.clone(),
123                    protocol_feature: "kube/socks5",
124                })
125            }
126
127            Some(proxy_url) if proxy_url.scheme_str() == Some("http") => {
128                #[cfg(feature = "http-proxy")]
129                {
130                    let proxy =
131                        hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::All, proxy_url.clone());
132                    let connector = hyper_http_proxy::ProxyConnector::from_proxy_unsecured(connector, proxy);
133
134                    make_generic_builder(connector, config)
135                }
136
137                #[cfg(not(feature = "http-proxy"))]
138                Err(Error::ProxyProtocolDisabled {
139                    proxy_url: proxy_url.clone(),
140                    protocol_feature: "kube/http-proxy",
141                })
142            }
143
144            Some(proxy_url) => Err(Error::ProxyProtocolUnsupported {
145                proxy_url: proxy_url.clone(),
146            }),
147
148            None => make_generic_builder(connector, config),
149        }
150    }
151}
152
153/// Helper function for implementation of [`TryFrom<Config>`] for [`ClientBuilder`].
154/// Ignores [`Config::proxy_url`], which at this point is already handled.
155fn make_generic_builder<H>(connector: H, config: Config) -> Result<ClientBuilder<GenericService>, Error>
156where
157    H: 'static + Clone + Send + Sync + Service<http::Uri>,
158    H::Response: 'static + Connection + Read + Write + Send + Unpin,
159    H::Future: 'static + Send,
160    H::Error: 'static + Send + Sync + std::error::Error,
161{
162    let default_ns = config.default_namespace.clone();
163    let auth_layer = config.auth_layer()?;
164
165    let client: hyper_util::client::legacy::Client<_, Body> = {
166        // Current TLS feature precedence when more than one are set:
167        // 1. rustls-tls
168        // 2. openssl-tls
169        // Create a custom client to use something else.
170        // If TLS features are not enabled, http connector will be used.
171        #[cfg(feature = "rustls-tls")]
172        let connector = config.rustls_https_connector_with_connector(connector)?;
173        #[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
174        let connector = config.openssl_https_connector_with_connector(connector)?;
175        #[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
176        if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
177            // no tls stack situation only works with http scheme
178            return Err(Error::TlsRequired);
179        }
180
181        let mut connector = TimeoutConnector::new(connector);
182
183        // Set the timeouts for the client
184        connector.set_connect_timeout(config.connect_timeout);
185        connector.set_read_timeout(config.read_timeout);
186        connector.set_write_timeout(config.write_timeout);
187
188        hyper_util::client::legacy::Builder::new(TokioExecutor::new()).build(connector)
189    };
190
191    let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
192    #[cfg(feature = "gzip")]
193    let stack = ServiceBuilder::new()
194        .layer(stack)
195        .layer(
196            tower_http::decompression::DecompressionLayer::new()
197                .no_br()
198                .no_deflate()
199                .no_zstd()
200                .gzip(!config.disable_compression),
201        )
202        .into_inner();
203
204    let service = ServiceBuilder::new()
205        .layer(stack)
206        .option_layer(auth_layer)
207        .layer(config.extra_headers_layer()?)
208        .layer(
209            // Attribute names follow [Semantic Conventions].
210            // [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
211            TraceLayer::new_for_http()
212                .make_span_with(|req: &Request<Body>| {
213                    tracing::debug_span!(
214                        "HTTP",
215                         http.method = %req.method(),
216                         http.url = %req.uri(),
217                         http.status_code = tracing::field::Empty,
218                         otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
219                         otel.kind = "client",
220                         otel.status_code = tracing::field::Empty,
221                    )
222                })
223                .on_request(|_req: &Request<Body>, _span: &Span| {
224                    tracing::debug!("requesting");
225                })
226                .on_response(|res: &Response<Incoming>, _latency: Duration, span: &Span| {
227                    let status = res.status();
228                    span.record("http.status_code", status.as_u16());
229                    if status.is_client_error() || status.is_server_error() {
230                        span.record("otel.status_code", "ERROR");
231                    }
232                })
233                // Explicitly disable `on_body_chunk`. The default does nothing.
234                .on_body_chunk(())
235                .on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
236                    tracing::debug!("stream closed");
237                })
238                .on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {
239                    // Called when
240                    // - Calling the inner service errored
241                    // - Polling `Body` errored
242                    // - the response was classified as failure (5xx)
243                    // - End of stream was classified as failure
244                    span.record("otel.status_code", "ERROR");
245                    match ec {
246                        ServerErrorsFailureClass::StatusCode(status) => {
247                            span.record("http.status_code", status.as_u16());
248                            tracing::error!("failed with status {}", status)
249                        }
250                        ServerErrorsFailureClass::Error(err) => {
251                            tracing::error!("failed with error {}", err)
252                        }
253                    }
254                }),
255        )
256        .map_err(BoxError::from)
257        .service(client);
258
259
260    let (_, expiration) = config.exec_identity_pem();
261
262    let client = ClientBuilder::new(
263        BoxService::new(
264            MapResponseBodyLayer::new(|body| {
265                Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
266            })
267            .layer(service),
268        ),
269        default_ns,
270    )
271    .with_valid_until(expiration);
272
273    Ok(client)
274}
275
276#[cfg(test)]
277mod tests {
278    #[cfg(feature = "gzip")] use super::*;
279
280    #[cfg(feature = "gzip")]
281    #[tokio::test]
282    async fn test_no_accept_encoding_header_sent_when_compression_disabled(
283    ) -> Result<(), Box<dyn std::error::Error>> {
284        use http::Uri;
285        use std::net::SocketAddr;
286        use tokio::net::{TcpListener, TcpStream};
287
288        // setup a server that echoes back any encoding header value
289        let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
290        let listener = TcpListener::bind(addr).await?;
291        let local_addr = listener.local_addr()?;
292        let uri: Uri = format!("http://{}", local_addr).parse()?;
293
294        tokio::spawn(async move {
295            use http_body_util::Full;
296            use hyper::{server::conn::http1, service::service_fn};
297            use hyper_util::rt::{TokioIo, TokioTimer};
298            use std::convert::Infallible;
299
300            loop {
301                let (tcp, _) = listener.accept().await.unwrap();
302                let io: TokioIo<TcpStream> = TokioIo::new(tcp);
303
304                tokio::spawn(async move {
305                    let _ = http1::Builder::new()
306                        .timer(TokioTimer::new())
307                        .serve_connection(
308                            io,
309                            service_fn(|req| async move {
310                                let response = req
311                                    .headers()
312                                    .get(http::header::ACCEPT_ENCODING)
313                                    .map(|b| Bytes::copy_from_slice(b.as_bytes()))
314                                    .unwrap_or_default();
315                                Ok::<_, Infallible>(Response::new(Full::new(response)))
316                            }),
317                        )
318                        .await
319                        .unwrap();
320                });
321            }
322        });
323
324        // confirm gzip echoed back with default config
325        let config = Config { ..Config::new(uri) };
326        let client = make_generic_builder(HttpConnector::new(), config.clone())?.build();
327        let response = client.request_text(http::Request::default()).await?;
328        assert_eq!(&response, "gzip");
329
330        // now disable and check empty string echoed back
331        let config = Config {
332            disable_compression: true,
333            ..config
334        };
335        let client = make_generic_builder(HttpConnector::new(), config)?.build();
336        let response = client.request_text(http::Request::default()).await?;
337        assert_eq!(&response, "");
338
339        Ok(())
340    }
341}