1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3use http::{header::HeaderMap, Request, Response};
4use hyper::{
5 body::Incoming,
6 rt::{Read, Write},
7};
8use hyper_timeout::TimeoutConnector;
9
10use hyper_util::{
11 client::legacy::connect::{Connection, HttpConnector},
12 rt::TokioExecutor,
13};
14
15use std::time::Duration;
16use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder};
17use tower_http::{
18 classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer,
19};
20use tracing::Span;
21
22use super::body::Body;
23use crate::{client::ConfigExt, Client, Config, Error, Result};
24
25pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send + Unpin;
29
30pub struct ClientBuilder<Svc> {
32 service: Svc,
33 default_ns: String,
34 valid_until: Option<DateTime<Utc>>,
35}
36
37impl<Svc> ClientBuilder<Svc> {
38 pub fn new(service: Svc, default_namespace: impl Into<String>) -> Self
43 where
44 Svc: Service<Request<Body>>,
45 {
46 Self {
47 service,
48 default_ns: default_namespace.into(),
49 valid_until: None,
50 }
51 }
52
53 pub fn with_layer<L: Layer<Svc>>(self, layer: &L) -> ClientBuilder<L::Service> {
55 let Self {
56 service: stack,
57 default_ns,
58 valid_until,
59 } = self;
60 ClientBuilder {
61 service: layer.layer(stack),
62 default_ns,
63 valid_until,
64 }
65 }
66
67 pub fn with_valid_until(self, valid_until: Option<DateTime<Utc>>) -> Self {
69 ClientBuilder {
70 service: self.service,
71 default_ns: self.default_ns,
72 valid_until,
73 }
74 }
75
76 pub fn build<B>(self) -> Client
78 where
79 Svc: Service<Request<Body>, Response = Response<B>> + Send + 'static,
80 Svc::Future: Send + 'static,
81 Svc::Error: Into<BoxError>,
82 B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
83 B::Error: Into<BoxError>,
84 {
85 Client::new(self.service, self.default_ns).with_valid_until(self.valid_until)
86 }
87}
88
89pub type GenericService = BoxService<Request<Body>, Response<Box<DynBody>>, BoxError>;
90
91impl TryFrom<Config> for ClientBuilder<GenericService> {
92 type Error = Error;
93
94 fn try_from(config: Config) -> Result<Self> {
96 let mut connector = HttpConnector::new();
97 connector.enforce_http(false);
98
99 #[cfg(all(feature = "aws-lc-rs", feature = "rustls-tls"))]
100 {
101 if rustls::crypto::CryptoProvider::get_default().is_none() {
102 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
105 }
106 }
107
108 match config.proxy_url.as_ref() {
109 Some(proxy_url) if proxy_url.scheme_str() == Some("socks5") => {
110 #[cfg(feature = "socks5")]
111 {
112 let connector = hyper_socks2::SocksConnector {
113 proxy_addr: proxy_url.clone(),
114 auth: None,
115 connector,
116 };
117 make_generic_builder(connector, config)
118 }
119
120 #[cfg(not(feature = "socks5"))]
121 Err(Error::ProxyProtocolDisabled {
122 proxy_url: proxy_url.clone(),
123 protocol_feature: "kube/socks5",
124 })
125 }
126
127 Some(proxy_url) if proxy_url.scheme_str() == Some("http") => {
128 #[cfg(feature = "http-proxy")]
129 {
130 let proxy =
131 hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::All, proxy_url.clone());
132 let connector = hyper_http_proxy::ProxyConnector::from_proxy_unsecured(connector, proxy);
133
134 make_generic_builder(connector, config)
135 }
136
137 #[cfg(not(feature = "http-proxy"))]
138 Err(Error::ProxyProtocolDisabled {
139 proxy_url: proxy_url.clone(),
140 protocol_feature: "kube/http-proxy",
141 })
142 }
143
144 Some(proxy_url) => Err(Error::ProxyProtocolUnsupported {
145 proxy_url: proxy_url.clone(),
146 }),
147
148 None => make_generic_builder(connector, config),
149 }
150 }
151}
152
153fn make_generic_builder<H>(connector: H, config: Config) -> Result<ClientBuilder<GenericService>, Error>
156where
157 H: 'static + Clone + Send + Sync + Service<http::Uri>,
158 H::Response: 'static + Connection + Read + Write + Send + Unpin,
159 H::Future: 'static + Send,
160 H::Error: 'static + Send + Sync + std::error::Error,
161{
162 let default_ns = config.default_namespace.clone();
163 let auth_layer = config.auth_layer()?;
164
165 let client: hyper_util::client::legacy::Client<_, Body> = {
166 #[cfg(feature = "rustls-tls")]
172 let connector = config.rustls_https_connector_with_connector(connector)?;
173 #[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
174 let connector = config.openssl_https_connector_with_connector(connector)?;
175 #[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
176 if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
177 return Err(Error::TlsRequired);
179 }
180
181 let mut connector = TimeoutConnector::new(connector);
182
183 connector.set_connect_timeout(config.connect_timeout);
185 connector.set_read_timeout(config.read_timeout);
186 connector.set_write_timeout(config.write_timeout);
187
188 hyper_util::client::legacy::Builder::new(TokioExecutor::new()).build(connector)
189 };
190
191 let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
192 #[cfg(feature = "gzip")]
193 let stack = ServiceBuilder::new()
194 .layer(stack)
195 .layer(
196 tower_http::decompression::DecompressionLayer::new()
197 .no_br()
198 .no_deflate()
199 .no_zstd()
200 .gzip(!config.disable_compression),
201 )
202 .into_inner();
203
204 let service = ServiceBuilder::new()
205 .layer(stack)
206 .option_layer(auth_layer)
207 .layer(config.extra_headers_layer()?)
208 .layer(
209 TraceLayer::new_for_http()
212 .make_span_with(|req: &Request<Body>| {
213 tracing::debug_span!(
214 "HTTP",
215 http.method = %req.method(),
216 http.url = %req.uri(),
217 http.status_code = tracing::field::Empty,
218 otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
219 otel.kind = "client",
220 otel.status_code = tracing::field::Empty,
221 )
222 })
223 .on_request(|_req: &Request<Body>, _span: &Span| {
224 tracing::debug!("requesting");
225 })
226 .on_response(|res: &Response<Incoming>, _latency: Duration, span: &Span| {
227 let status = res.status();
228 span.record("http.status_code", status.as_u16());
229 if status.is_client_error() || status.is_server_error() {
230 span.record("otel.status_code", "ERROR");
231 }
232 })
233 .on_body_chunk(())
235 .on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
236 tracing::debug!("stream closed");
237 })
238 .on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {
239 span.record("otel.status_code", "ERROR");
245 match ec {
246 ServerErrorsFailureClass::StatusCode(status) => {
247 span.record("http.status_code", status.as_u16());
248 tracing::error!("failed with status {}", status)
249 }
250 ServerErrorsFailureClass::Error(err) => {
251 tracing::error!("failed with error {}", err)
252 }
253 }
254 }),
255 )
256 .map_err(BoxError::from)
257 .service(client);
258
259
260 let (_, expiration) = config.exec_identity_pem();
261
262 let client = ClientBuilder::new(
263 BoxService::new(
264 MapResponseBodyLayer::new(|body| {
265 Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
266 })
267 .layer(service),
268 ),
269 default_ns,
270 )
271 .with_valid_until(expiration);
272
273 Ok(client)
274}
275
276#[cfg(test)]
277mod tests {
278 #[cfg(feature = "gzip")] use super::*;
279
280 #[cfg(feature = "gzip")]
281 #[tokio::test]
282 async fn test_no_accept_encoding_header_sent_when_compression_disabled(
283 ) -> Result<(), Box<dyn std::error::Error>> {
284 use http::Uri;
285 use std::net::SocketAddr;
286 use tokio::net::{TcpListener, TcpStream};
287
288 let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
290 let listener = TcpListener::bind(addr).await?;
291 let local_addr = listener.local_addr()?;
292 let uri: Uri = format!("http://{}", local_addr).parse()?;
293
294 tokio::spawn(async move {
295 use http_body_util::Full;
296 use hyper::{server::conn::http1, service::service_fn};
297 use hyper_util::rt::{TokioIo, TokioTimer};
298 use std::convert::Infallible;
299
300 loop {
301 let (tcp, _) = listener.accept().await.unwrap();
302 let io: TokioIo<TcpStream> = TokioIo::new(tcp);
303
304 tokio::spawn(async move {
305 let _ = http1::Builder::new()
306 .timer(TokioTimer::new())
307 .serve_connection(
308 io,
309 service_fn(|req| async move {
310 let response = req
311 .headers()
312 .get(http::header::ACCEPT_ENCODING)
313 .map(|b| Bytes::copy_from_slice(b.as_bytes()))
314 .unwrap_or_default();
315 Ok::<_, Infallible>(Response::new(Full::new(response)))
316 }),
317 )
318 .await
319 .unwrap();
320 });
321 }
322 });
323
324 let config = Config { ..Config::new(uri) };
326 let client = make_generic_builder(HttpConnector::new(), config.clone())?.build();
327 let response = client.request_text(http::Request::default()).await?;
328 assert_eq!(&response, "gzip");
329
330 let config = Config {
332 disable_compression: true,
333 ..config
334 };
335 let client = make_generic_builder(HttpConnector::new(), config)?.build();
336 let response = client.request_text(http::Request::default()).await?;
337 assert_eq!(&response, "");
338
339 Ok(())
340 }
341}