rama_http_backend/client/
svc.rs1use rama_core::{
2 error::{BoxError, ErrorContext, OpaqueError},
3 Context, Service,
4};
5use rama_http_types::{
6 dep::{http::uri::PathAndQuery, http_body},
7 header::{CONNECTION, HOST, KEEP_ALIVE, PROXY_CONNECTION, TRANSFER_ENCODING, UPGRADE},
8 headers::HeaderMapExt,
9 Method, Request, Response, Version,
10};
11use rama_net::{address::ProxyAddress, http::RequestContext};
12
13#[derive(Debug)]
14pub(super) enum SendRequest<Body> {
15 Http1(rama_http_core::client::conn::http1::SendRequest<Body>),
16 Http2(rama_http_core::client::conn::http2::SendRequest<Body>),
17}
18
19#[derive(Debug)]
20pub struct HttpClientService<Body>(pub(super) SendRequest<Body>);
22
23impl<State, Body> Service<State, Request<Body>> for HttpClientService<Body>
24where
25 State: Clone + Send + Sync + 'static,
26 Body: http_body::Body<Data: Send + 'static, Error: Into<BoxError>> + Unpin + Send + 'static,
27{
28 type Response = Response;
29 type Error = BoxError;
30
31 async fn serve(
32 &self,
33 mut ctx: Context<State>,
34 req: Request<Body>,
35 ) -> Result<Self::Response, Self::Error> {
36 let req = sanitize_client_req_header(&mut ctx, req)?;
45
46 let resp = match &self.0 {
47 SendRequest::Http1(sender) => sender.send_request(req).await,
48 SendRequest::Http2(sender) => sender.send_request(req).await,
49 }?;
50
51 Ok(resp.map(rama_http_types::Body::new))
52 }
53}
54
55fn sanitize_client_req_header<S, B>(
56 ctx: &mut Context<S>,
57 req: Request<B>,
58) -> Result<Request<B>, BoxError> {
59 if req.method() == Method::CONNECT && req.uri().host().is_none() {
61 return Err(OpaqueError::from_display("missing host in CONNECT request").into());
62 }
63
64 Ok(match req.version() {
66 Version::HTTP_09 | Version::HTTP_10 | Version::HTTP_11 => {
67 if !ctx.contains::<ProxyAddress>() && req.uri().host().is_some() {
70 tracing::trace!(
71 "remove authority and scheme from non-connect direct http(~1) request"
72 );
73 let (mut parts, body) = req.into_parts();
74 let mut uri_parts = parts.uri.into_parts();
75 uri_parts.scheme = None;
76 let authority = uri_parts
77 .authority
78 .take()
79 .expect("to exist due to our host existence test");
80
81 if uri_parts.path_and_query.as_ref().map(|pq| pq.as_str()) == Some("/") {
92 uri_parts.path_and_query = Some(PathAndQuery::from_static("/"));
93 }
94
95 if !parts.headers.contains_key(HOST) {
97 parts
98 .headers
99 .typed_insert(rama_http_types::headers::Host::from(authority));
100 }
101
102 parts.uri = rama_http_types::Uri::from_parts(uri_parts)?;
103 Request::from_parts(parts, body)
104 } else if !req.headers().contains_key(HOST) {
105 tracing::trace!(uri = %req.uri(), "add authority as HOST header to req (was missing it)");
106 let authority = req
107 .uri()
108 .authority()
109 .ok_or_else(|| {
110 OpaqueError::from_display(
111 "[http1] missing authority in uri and missing host",
112 )
113 })?
114 .clone();
115 let mut req = req;
116 req.headers_mut()
117 .typed_insert(rama_http_types::headers::Host::from(authority));
118 req
119 } else {
120 req
121 }
122 }
123 Version::HTTP_2 => {
124 let mut req = if req.uri().host().is_none() {
127 let request_ctx = ctx.get::<RequestContext>().ok_or_else(|| {
128 OpaqueError::from_display("[h2+] add scheme/host: missing RequestCtx")
129 .into_boxed()
130 })?;
131
132 tracing::trace!(
133 http_version = ?req.version(),
134 "defining authority and scheme to non-connect direct http request"
135 );
136
137 let (mut parts, body) = req.into_parts();
138 let mut uri_parts = parts.uri.into_parts();
139 uri_parts.scheme = Some(
140 request_ctx
141 .protocol
142 .as_str()
143 .try_into()
144 .context("use RequestContext.protocol as http scheme")?,
145 );
146 uri_parts.authority = Some(
150 request_ctx
151 .authority
152 .to_string()
153 .try_into()
154 .context("use RequestContext.authority as http authority")?,
155 );
156
157 parts.uri = rama_http_types::Uri::from_parts(uri_parts)
158 .context("create http uri from parts")?;
159
160 Request::from_parts(parts, body)
161 } else {
162 req
163 };
164
165 for illegal_h2_header in [
167 &CONNECTION,
168 &TRANSFER_ENCODING,
169 &PROXY_CONNECTION,
170 &UPGRADE,
171 &KEEP_ALIVE,
172 &HOST,
173 ] {
174 if let Some(header) = req.headers_mut().remove(illegal_h2_header) {
175 tracing::trace!(?header, "removed illegal (~http1) header from h2 request");
176 }
177 }
178
179 req
180 }
181 Version::HTTP_3 => {
182 tracing::debug!(
183 uri = %req.uri(),
184 "h3 request detected, but sanitize_client_req_header does not yet support this",
185 );
186 req
187 }
188 _ => {
189 tracing::warn!(
190 uri = %req.uri(),
191 method = ?req.method(),
192 "request with unknown version detected, sanitize_client_req_header cannot support this",
193 );
194 req
195 }
196 })
197}