pingora_core/apps/
http_app.rs1use async_trait::async_trait;
18use http::Response;
19use log::{debug, error, trace};
20use pingora_http::ResponseHeader;
21use std::sync::Arc;
22
23use crate::apps::HttpServerApp;
24use crate::modules::http::{HttpModules, ModuleBuilder};
25use crate::protocols::http::HttpTask;
26use crate::protocols::http::ServerSession;
27use crate::protocols::Stream;
28use crate::server::ShutdownWatch;
29
30#[async_trait]
32pub trait ServeHttp {
33 async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>>;
42}
43
44#[async_trait]
46impl<SV> HttpServerApp for SV
47where
48 SV: ServeHttp + Send + Sync,
49{
50 async fn process_new_http(
51 self: &Arc<Self>,
52 mut http: ServerSession,
53 shutdown: &ShutdownWatch,
54 ) -> Option<Stream> {
55 match http.read_request().await {
56 Ok(res) => match res {
57 false => {
58 debug!("Failed to read request header");
59 return None;
60 }
61 true => {
62 debug!("Successfully get a new request");
63 }
64 },
65 Err(e) => {
66 error!("HTTP server fails to read from downstream: {e}");
67 return None;
68 }
69 }
70 trace!("{:?}", http.req_header());
71 if *shutdown.borrow() {
72 http.set_keepalive(None);
73 } else {
74 http.set_keepalive(Some(60));
75 }
76 let new_response = self.response(&mut http).await;
77 let (parts, body) = new_response.into_parts();
78 let resp_header: ResponseHeader = parts.into();
79 match http.write_response_header(Box::new(resp_header)).await {
80 Ok(()) => {
81 debug!("HTTP response header done.");
82 }
83 Err(e) => {
84 error!(
85 "HTTP server fails to write to downstream: {e}, {}",
86 http.request_summary()
87 );
88 }
89 }
90 if !body.is_empty() {
91 match http.write_response_body(body.into(), true).await {
93 Ok(_) => debug!("HTTP response written."),
94 Err(e) => error!(
95 "HTTP server fails to write to downstream: {e}, {}",
96 http.request_summary()
97 ),
98 }
99 }
100 match http.finish().await {
101 Ok(c) => c,
102 Err(e) => {
103 error!("HTTP server fails to finish the request: {e}");
104 None
105 }
106 }
107 }
108}
109
110pub struct HttpServer<SV> {
112 app: SV,
113 modules: HttpModules,
114}
115
116impl<SV> HttpServer<SV> {
117 pub fn new_app(app: SV) -> Self {
119 HttpServer {
120 app,
121 modules: HttpModules::new(),
122 }
123 }
124
125 pub fn add_module(&mut self, module: ModuleBuilder) {
127 self.modules.add_module(module)
128 }
129}
130
131#[async_trait]
132impl<SV> HttpServerApp for HttpServer<SV>
133where
134 SV: ServeHttp + Send + Sync,
135{
136 async fn process_new_http(
137 self: &Arc<Self>,
138 mut http: ServerSession,
139 shutdown: &ShutdownWatch,
140 ) -> Option<Stream> {
141 match http.read_request().await {
142 Ok(res) => match res {
143 false => {
144 debug!("Failed to read request header");
145 return None;
146 }
147 true => {
148 debug!("Successfully get a new request");
149 }
150 },
151 Err(e) => {
152 error!("HTTP server fails to read from downstream: {e}");
153 return None;
154 }
155 }
156 trace!("{:?}", http.req_header());
157 if *shutdown.borrow() {
158 http.set_keepalive(None);
159 } else {
160 http.set_keepalive(Some(60));
161 }
162 let mut module_ctx = self.modules.build_ctx();
163 let req = http.req_header_mut();
164 module_ctx.request_header_filter(req).await.ok()?;
165 let new_response = self.app.response(&mut http).await;
166 let (parts, body) = new_response.into_parts();
167 let mut resp_header: ResponseHeader = parts.into();
168 module_ctx
169 .response_header_filter(&mut resp_header, body.is_empty())
170 .await
171 .ok()?;
172
173 let task = HttpTask::Header(Box::new(resp_header), body.is_empty());
174 trace!("{task:?}");
175
176 match http.response_duplex_vec(vec![task]).await {
177 Ok(_) => {
178 debug!("HTTP response header done.");
179 }
180 Err(e) => {
181 error!(
182 "HTTP server fails to write to downstream: {e}, {}",
183 http.request_summary()
184 );
185 }
186 }
187
188 let mut body = Some(body.into());
189 module_ctx.response_body_filter(&mut body, true).ok()?;
190
191 let task = HttpTask::Body(body, true);
192
193 trace!("{task:?}");
194
195 match http.response_duplex_vec(vec![task]).await {
197 Ok(_) => debug!("HTTP response written."),
198 Err(e) => error!(
199 "HTTP server fails to write to downstream: {e}, {}",
200 http.request_summary()
201 ),
202 }
203 match http.finish().await {
204 Ok(c) => c,
205 Err(e) => {
206 error!("HTTP server fails to finish the request: {e}");
207 None
208 }
209 }
210 }
211}