use bytes::Bytes;
use http::{Request, Response};
use hyper::{self, client::HttpConnector};
use hyper_timeout::TimeoutConnector;
pub use kube_core::response::Status;
use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder};
use tower_http::{
classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer,
};
use crate::{client::ConfigExt, Client, Config, Error, Result};
pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send + Unpin;
pub struct ClientBuilder<Svc> {
service: Svc,
default_ns: String,
}
impl<Svc> ClientBuilder<Svc> {
pub fn new(service: Svc, default_namespace: impl Into<String>) -> Self
where
Svc: Service<Request<hyper::Body>>,
{
Self {
service,
default_ns: default_namespace.into(),
}
}
pub fn with_layer<L: Layer<Svc>>(self, layer: &L) -> ClientBuilder<L::Service> {
let Self {
service: stack,
default_ns,
} = self;
ClientBuilder {
service: layer.layer(stack),
default_ns,
}
}
pub fn build<B>(self) -> Client
where
Svc: Service<Request<hyper::Body>, Response = Response<B>> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
Client::new(self.service, self.default_ns)
}
}
impl TryFrom<Config> for ClientBuilder<BoxService<Request<hyper::Body>, Response<Box<DynBody>>, BoxError>> {
type Error = Error;
fn try_from(config: Config) -> Result<Self> {
use std::time::Duration;
use http::header::HeaderMap;
use tracing::Span;
let default_ns = config.default_namespace.clone();
let client: hyper::Client<_, hyper::Body> = {
let mut connector = HttpConnector::new();
connector.enforce_http(false);
#[cfg(feature = "openssl-tls")]
let connector = config.openssl_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "openssl-tls"), feature = "rustls-tls"))]
let connector = config.rustls_https_connector_with_connector(connector)?;
let mut connector = TimeoutConnector::new(connector);
connector.set_connect_timeout(config.connect_timeout);
connector.set_read_timeout(config.read_timeout);
connector.set_write_timeout(config.write_timeout);
hyper::Client::builder().build(connector)
};
let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
#[cfg(feature = "gzip")]
let stack = ServiceBuilder::new()
.layer(stack)
.layer(tower_http::decompression::DecompressionLayer::new())
.into_inner();
let service = ServiceBuilder::new()
.layer(stack)
.option_layer(config.auth_layer()?)
.layer(config.extra_headers_layer()?)
.layer(
TraceLayer::new_for_http()
.make_span_with(|req: &Request<hyper::Body>| {
tracing::debug_span!(
"HTTP",
http.method = %req.method(),
http.url = %req.uri(),
http.status_code = tracing::field::Empty,
otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
otel.kind = "client",
otel.status_code = tracing::field::Empty,
)
})
.on_request(|_req: &Request<hyper::Body>, _span: &Span| {
tracing::debug!("requesting");
})
.on_response(|res: &Response<hyper::Body>, _latency: Duration, span: &Span| {
let status = res.status();
span.record("http.status_code", status.as_u16());
if status.is_client_error() || status.is_server_error() {
span.record("otel.status_code", "ERROR");
}
})
.on_body_chunk(())
.on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
tracing::debug!("stream closed");
})
.on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {
span.record("otel.status_code", "ERROR");
match ec {
ServerErrorsFailureClass::StatusCode(status) => {
span.record("http.status_code", status.as_u16());
tracing::error!("failed with status {}", status)
}
ServerErrorsFailureClass::Error(err) => {
tracing::error!("failed with error {}", err)
}
}
}),
)
.service(client);
Ok(Self::new(
BoxService::new(
MapResponseBodyLayer::new(|body| {
Box::new(http_body::Body::map_err(body, BoxError::from)) as Box<DynBody>
})
.layer(service),
),
default_ns,
))
}
}