1use bytes::Bytes;
2use http::{header::HeaderMap, Request, Response};
3use hyper::{
4 body::Incoming,
5 rt::{Read, Write},
6};
7use hyper_timeout::TimeoutConnector;
8
9use hyper_util::{
10 client::legacy::connect::{Connection, HttpConnector},
11 rt::TokioExecutor,
12};
13
14use std::time::Duration;
15use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder};
16use tower_http::{
17 classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer,
18};
19use tracing::Span;
20
21use super::body::Body;
22use crate::{client::ConfigExt, Client, Config, Error, Result};
23
24pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send + Unpin;
28
29pub struct ClientBuilder<Svc> {
31 service: Svc,
32 default_ns: String,
33}
34
35impl<Svc> ClientBuilder<Svc> {
36 pub fn new(service: Svc, default_namespace: impl Into<String>) -> Self
41 where
42 Svc: Service<Request<Body>>,
43 {
44 Self {
45 service,
46 default_ns: default_namespace.into(),
47 }
48 }
49
50 pub fn with_layer<L: Layer<Svc>>(self, layer: &L) -> ClientBuilder<L::Service> {
52 let Self {
53 service: stack,
54 default_ns,
55 } = self;
56 ClientBuilder {
57 service: layer.layer(stack),
58 default_ns,
59 }
60 }
61
62 pub fn build<B>(self) -> Client
64 where
65 Svc: Service<Request<Body>, Response = Response<B>> + Send + 'static,
66 Svc::Future: Send + 'static,
67 Svc::Error: Into<BoxError>,
68 B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
69 B::Error: Into<BoxError>,
70 {
71 Client::new(self.service, self.default_ns)
72 }
73}
74
75pub type GenericService = BoxService<Request<Body>, Response<Box<DynBody>>, BoxError>;
76
77impl TryFrom<Config> for ClientBuilder<GenericService> {
78 type Error = Error;
79
80 fn try_from(config: Config) -> Result<Self> {
82 let mut connector = HttpConnector::new();
83 connector.enforce_http(false);
84
85 #[cfg(all(feature = "aws-lc-rs", feature = "rustls-tls"))]
86 {
87 if rustls::crypto::CryptoProvider::get_default().is_none() {
88 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
91 }
92 }
93
94 match config.proxy_url.as_ref() {
95 Some(proxy_url) if proxy_url.scheme_str() == Some("socks5") => {
96 #[cfg(feature = "socks5")]
97 {
98 let connector = hyper_socks2::SocksConnector {
99 proxy_addr: proxy_url.clone(),
100 auth: None,
101 connector,
102 };
103 make_generic_builder(connector, config)
104 }
105
106 #[cfg(not(feature = "socks5"))]
107 Err(Error::ProxyProtocolDisabled {
108 proxy_url: proxy_url.clone(),
109 protocol_feature: "kube/socks5",
110 })
111 }
112
113 Some(proxy_url) if proxy_url.scheme_str() == Some("http") => {
114 #[cfg(feature = "http-proxy")]
115 {
116 let proxy =
117 hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::All, proxy_url.clone());
118 let connector = hyper_http_proxy::ProxyConnector::from_proxy_unsecured(connector, proxy);
119
120 make_generic_builder(connector, config)
121 }
122
123 #[cfg(not(feature = "http-proxy"))]
124 Err(Error::ProxyProtocolDisabled {
125 proxy_url: proxy_url.clone(),
126 protocol_feature: "kube/http-proxy",
127 })
128 }
129
130 Some(proxy_url) => Err(Error::ProxyProtocolUnsupported {
131 proxy_url: proxy_url.clone(),
132 }),
133
134 None => make_generic_builder(connector, config),
135 }
136 }
137}
138
139fn make_generic_builder<H>(connector: H, config: Config) -> Result<ClientBuilder<GenericService>, Error>
142where
143 H: 'static + Clone + Send + Sync + Service<http::Uri>,
144 H::Response: 'static + Connection + Read + Write + Send + Unpin,
145 H::Future: 'static + Send,
146 H::Error: 'static + Send + Sync + std::error::Error,
147{
148 let default_ns = config.default_namespace.clone();
149 let auth_layer = config.auth_layer()?;
150
151 let client: hyper_util::client::legacy::Client<_, Body> = {
152 #[cfg(feature = "rustls-tls")]
158 let connector = config.rustls_https_connector_with_connector(connector)?;
159 #[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
160 let connector = config.openssl_https_connector_with_connector(connector)?;
161 #[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
162 if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
163 return Err(Error::TlsRequired);
165 }
166
167 let mut connector = TimeoutConnector::new(connector);
168
169 connector.set_connect_timeout(config.connect_timeout);
171 connector.set_read_timeout(config.read_timeout);
172 connector.set_write_timeout(config.write_timeout);
173
174 hyper_util::client::legacy::Builder::new(TokioExecutor::new()).build(connector)
175 };
176
177 let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
178 #[cfg(feature = "gzip")]
179 let stack = ServiceBuilder::new()
180 .layer(stack)
181 .layer(
182 tower_http::decompression::DecompressionLayer::new()
183 .no_br()
184 .no_deflate()
185 .no_zstd()
186 .gzip(!config.disable_compression),
187 )
188 .into_inner();
189
190 let service = ServiceBuilder::new()
191 .layer(stack)
192 .option_layer(auth_layer)
193 .layer(config.extra_headers_layer()?)
194 .layer(
195 TraceLayer::new_for_http()
198 .make_span_with(|req: &Request<Body>| {
199 tracing::debug_span!(
200 "HTTP",
201 http.method = %req.method(),
202 http.url = %req.uri(),
203 http.status_code = tracing::field::Empty,
204 otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
205 otel.kind = "client",
206 otel.status_code = tracing::field::Empty,
207 )
208 })
209 .on_request(|_req: &Request<Body>, _span: &Span| {
210 tracing::debug!("requesting");
211 })
212 .on_response(|res: &Response<Incoming>, _latency: Duration, span: &Span| {
213 let status = res.status();
214 span.record("http.status_code", status.as_u16());
215 if status.is_client_error() || status.is_server_error() {
216 span.record("otel.status_code", "ERROR");
217 }
218 })
219 .on_body_chunk(())
221 .on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
222 tracing::debug!("stream closed");
223 })
224 .on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {
225 span.record("otel.status_code", "ERROR");
231 match ec {
232 ServerErrorsFailureClass::StatusCode(status) => {
233 span.record("http.status_code", status.as_u16());
234 tracing::error!("failed with status {}", status)
235 }
236 ServerErrorsFailureClass::Error(err) => {
237 tracing::error!("failed with error {}", err)
238 }
239 }
240 }),
241 )
242 .map_err(BoxError::from)
243 .service(client);
244
245 Ok(ClientBuilder::new(
246 BoxService::new(
247 MapResponseBodyLayer::new(|body| {
248 Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
249 })
250 .layer(service),
251 ),
252 default_ns,
253 ))
254}
255
256#[cfg(test)]
257mod tests {
258 #[cfg(feature = "gzip")] use super::*;
259
260 #[cfg(feature = "gzip")]
261 #[tokio::test]
262 async fn test_no_accept_encoding_header_sent_when_compression_disabled(
263 ) -> Result<(), Box<dyn std::error::Error>> {
264 use http::Uri;
265 use std::net::SocketAddr;
266 use tokio::net::{TcpListener, TcpStream};
267
268 let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
270 let listener = TcpListener::bind(addr).await?;
271 let local_addr = listener.local_addr()?;
272 let uri: Uri = format!("http://{}", local_addr).parse()?;
273
274 tokio::spawn(async move {
275 use http_body_util::Full;
276 use hyper::{server::conn::http1, service::service_fn};
277 use hyper_util::rt::{TokioIo, TokioTimer};
278 use std::convert::Infallible;
279
280 loop {
281 let (tcp, _) = listener.accept().await.unwrap();
282 let io: TokioIo<TcpStream> = TokioIo::new(tcp);
283
284 tokio::spawn(async move {
285 let _ = http1::Builder::new()
286 .timer(TokioTimer::new())
287 .serve_connection(
288 io,
289 service_fn(|req| async move {
290 let response = req
291 .headers()
292 .get(http::header::ACCEPT_ENCODING)
293 .map(|b| Bytes::copy_from_slice(b.as_bytes()))
294 .unwrap_or_default();
295 Ok::<_, Infallible>(Response::new(Full::new(response)))
296 }),
297 )
298 .await
299 .unwrap();
300 });
301 }
302 });
303
304 let config = Config { ..Config::new(uri) };
306 let client = make_generic_builder(HttpConnector::new(), config.clone())?.build();
307 let response = client.request_text(http::Request::default()).await?;
308 assert_eq!(&response, "gzip");
309
310 let config = Config {
312 disable_compression: true,
313 ..config
314 };
315 let client = make_generic_builder(HttpConnector::new(), config)?.build();
316 let response = client.request_text(http::Request::default()).await?;
317 assert_eq!(&response, "");
318
319 Ok(())
320 }
321}