1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
use bytes::Bytes;
use http::{Request, Response};
use hyper::{self, client::HttpConnector};
use hyper_timeout::TimeoutConnector;
pub use kube_core::response::Status;
use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder};
use tower_http::{
classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer,
};
use crate::{client::ConfigExt, Client, Config, Error, Result};
pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send + Unpin;
pub struct ClientBuilder<Svc> {
service: Svc,
default_ns: String,
}
impl<Svc> ClientBuilder<Svc> {
pub fn new(service: Svc, default_namespace: impl Into<String>) -> Self
where
Svc: Service<Request<hyper::Body>>,
{
Self {
service,
default_ns: default_namespace.into(),
}
}
pub fn with_layer<L: Layer<Svc>>(self, layer: &L) -> ClientBuilder<L::Service> {
let Self {
service: stack,
default_ns,
} = self;
ClientBuilder {
service: layer.layer(stack),
default_ns,
}
}
pub fn build<B>(self) -> Client
where
Svc: Service<Request<hyper::Body>, Response = Response<B>> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
Client::new(self.service, self.default_ns)
}
}
impl TryFrom<Config> for ClientBuilder<BoxService<Request<hyper::Body>, Response<Box<DynBody>>, BoxError>> {
type Error = Error;
fn try_from(config: Config) -> Result<Self> {
use std::time::Duration;
use http::header::HeaderMap;
use tracing::Span;
let timeout = config.timeout;
let default_ns = config.default_namespace.clone();
let client: hyper::Client<_, hyper::Body> = {
let mut connector = HttpConnector::new();
connector.enforce_http(false);
#[cfg(feature = "openssl-tls")]
let connector = config.openssl_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "openssl-tls"), feature = "native-tls"))]
let connector = hyper_tls::HttpsConnector::from((
connector,
tokio_native_tls::TlsConnector::from(config.native_tls_connector()?),
));
#[cfg(all(
not(any(feature = "openssl-tls", feature = "native-tls")),
feature = "rustls-tls"
))]
let connector = hyper_rustls::HttpsConnector::from((
connector,
std::sync::Arc::new(config.rustls_client_config()?),
));
let mut connector = TimeoutConnector::new(connector);
connector.set_connect_timeout(timeout);
connector.set_read_timeout(timeout);
hyper::Client::builder().build(connector)
};
let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
#[cfg(feature = "gzip")]
let stack = ServiceBuilder::new()
.layer(stack)
.layer(tower_http::decompression::DecompressionLayer::new())
.into_inner();
let service = ServiceBuilder::new()
.layer(stack)
.option_layer(config.auth_layer()?)
.layer(config.extra_headers_layer()?)
.layer(
TraceLayer::new_for_http()
.make_span_with(|req: &Request<hyper::Body>| {
tracing::debug_span!(
"HTTP",
http.method = %req.method(),
http.url = %req.uri(),
http.status_code = tracing::field::Empty,
otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
otel.kind = "client",
otel.status_code = tracing::field::Empty,
)
})
.on_request(|_req: &Request<hyper::Body>, _span: &Span| {
tracing::debug!("requesting");
})
.on_response(|res: &Response<hyper::Body>, _latency: Duration, span: &Span| {
let status = res.status();
span.record("http.status_code", &status.as_u16());
if status.is_client_error() || status.is_server_error() {
span.record("otel.status_code", &"ERROR");
}
})
.on_body_chunk(())
.on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
tracing::debug!("stream closed");
})
.on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {
span.record("otel.status_code", &"ERROR");
match ec {
ServerErrorsFailureClass::StatusCode(status) => {
span.record("http.status_code", &status.as_u16());
tracing::error!("failed with status {}", status)
}
ServerErrorsFailureClass::Error(err) => {
tracing::error!("failed with error {}", err)
}
}
}),
)
.service(client);
Ok(Self::new(
BoxService::new(
MapResponseBodyLayer::new(|body| {
Box::new(http_body::Body::map_err(body, BoxError::from)) as Box<DynBody>
})
.layer(service),
),
default_ns,
))
}
}