pingora_core/apps/
http_app.rs

1// Copyright 2024 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A simple HTTP application trait that maps a request to a response
16
17use async_trait::async_trait;
18use http::Response;
19use log::{debug, error, trace};
20use pingora_http::ResponseHeader;
21use std::sync::Arc;
22
23use crate::apps::HttpServerApp;
24use crate::modules::http::{HttpModules, ModuleBuilder};
25use crate::protocols::http::HttpTask;
26use crate::protocols::http::ServerSession;
27use crate::protocols::Stream;
28use crate::server::ShutdownWatch;
29
30/// This trait defines how to map a request to a response
31#[async_trait]
32pub trait ServeHttp {
33    /// Define the mapping from a request to a response.
34    /// Note that the request header is already read, but the implementation needs to read the
35    /// request body if any.
36    ///
37    /// # Limitation
38    /// In this API, the entire response has to be generated before the end of this call.
39    /// So it is not suitable for streaming response or interactive communications.
40    /// Users need to implement their own [`super::HttpServerApp`] for those use cases.
41    async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>>;
42}
43
44// TODO: remove this in favor of HttpServer?
45#[async_trait]
46impl<SV> HttpServerApp for SV
47where
48    SV: ServeHttp + Send + Sync,
49{
50    async fn process_new_http(
51        self: &Arc<Self>,
52        mut http: ServerSession,
53        shutdown: &ShutdownWatch,
54    ) -> Option<Stream> {
55        match http.read_request().await {
56            Ok(res) => match res {
57                false => {
58                    debug!("Failed to read request header");
59                    return None;
60                }
61                true => {
62                    debug!("Successfully get a new request");
63                }
64            },
65            Err(e) => {
66                error!("HTTP server fails to read from downstream: {e}");
67                return None;
68            }
69        }
70        trace!("{:?}", http.req_header());
71        if *shutdown.borrow() {
72            http.set_keepalive(None);
73        } else {
74            http.set_keepalive(Some(60));
75        }
76        let new_response = self.response(&mut http).await;
77        let (parts, body) = new_response.into_parts();
78        let resp_header: ResponseHeader = parts.into();
79        match http.write_response_header(Box::new(resp_header)).await {
80            Ok(()) => {
81                debug!("HTTP response header done.");
82            }
83            Err(e) => {
84                error!(
85                    "HTTP server fails to write to downstream: {e}, {}",
86                    http.request_summary()
87                );
88            }
89        }
90        if !body.is_empty() {
91            // TODO: check if chunked encoding is needed
92            match http.write_response_body(body.into(), true).await {
93                Ok(_) => debug!("HTTP response written."),
94                Err(e) => error!(
95                    "HTTP server fails to write to downstream: {e}, {}",
96                    http.request_summary()
97                ),
98            }
99        }
100        match http.finish().await {
101            Ok(c) => c,
102            Err(e) => {
103                error!("HTTP server fails to finish the request: {e}");
104                None
105            }
106        }
107    }
108}
109
110/// A helper struct for HTTP server with http modules embedded
111pub struct HttpServer<SV> {
112    app: SV,
113    modules: HttpModules,
114}
115
116impl<SV> HttpServer<SV> {
117    /// Create a new [HttpServer] with the given app which implements [ServeHttp]
118    pub fn new_app(app: SV) -> Self {
119        HttpServer {
120            app,
121            modules: HttpModules::new(),
122        }
123    }
124
125    /// Add [ModuleBuilder] to this [HttpServer]
126    pub fn add_module(&mut self, module: ModuleBuilder) {
127        self.modules.add_module(module)
128    }
129}
130
131#[async_trait]
132impl<SV> HttpServerApp for HttpServer<SV>
133where
134    SV: ServeHttp + Send + Sync,
135{
136    async fn process_new_http(
137        self: &Arc<Self>,
138        mut http: ServerSession,
139        shutdown: &ShutdownWatch,
140    ) -> Option<Stream> {
141        match http.read_request().await {
142            Ok(res) => match res {
143                false => {
144                    debug!("Failed to read request header");
145                    return None;
146                }
147                true => {
148                    debug!("Successfully get a new request");
149                }
150            },
151            Err(e) => {
152                error!("HTTP server fails to read from downstream: {e}");
153                return None;
154            }
155        }
156        trace!("{:?}", http.req_header());
157        if *shutdown.borrow() {
158            http.set_keepalive(None);
159        } else {
160            http.set_keepalive(Some(60));
161        }
162        let mut module_ctx = self.modules.build_ctx();
163        let req = http.req_header_mut();
164        module_ctx.request_header_filter(req).await.ok()?;
165        let new_response = self.app.response(&mut http).await;
166        let (parts, body) = new_response.into_parts();
167        let mut resp_header: ResponseHeader = parts.into();
168        module_ctx
169            .response_header_filter(&mut resp_header, body.is_empty())
170            .await
171            .ok()?;
172
173        let task = HttpTask::Header(Box::new(resp_header), body.is_empty());
174        trace!("{task:?}");
175
176        match http.response_duplex_vec(vec![task]).await {
177            Ok(_) => {
178                debug!("HTTP response header done.");
179            }
180            Err(e) => {
181                error!(
182                    "HTTP server fails to write to downstream: {e}, {}",
183                    http.request_summary()
184                );
185            }
186        }
187
188        let mut body = Some(body.into());
189        module_ctx.response_body_filter(&mut body, true).ok()?;
190
191        let task = HttpTask::Body(body, true);
192
193        trace!("{task:?}");
194
195        // TODO: check if chunked encoding is needed
196        match http.response_duplex_vec(vec![task]).await {
197            Ok(_) => debug!("HTTP response written."),
198            Err(e) => error!(
199                "HTTP server fails to write to downstream: {e}, {}",
200                http.request_summary()
201            ),
202        }
203        match http.finish().await {
204            Ok(c) => c,
205            Err(e) => {
206                error!("HTTP server fails to finish the request: {e}");
207                None
208            }
209        }
210    }
211}