1pub mod configuration;
18#[cfg(unix)]
19mod daemon;
20#[cfg(unix)]
21pub(crate) mod transfer_fd;
22
23#[cfg(unix)]
24use daemon::daemonize;
25use log::{debug, error, info, warn};
26use pingora_runtime::Runtime;
27use pingora_timeout::fast_timeout;
28#[cfg(feature = "sentry")]
29use sentry::ClientOptions;
30use std::sync::Arc;
31use std::thread;
32#[cfg(unix)]
33use tokio::signal::unix;
34use tokio::sync::{watch, Mutex};
35use tokio::time::{sleep, Duration};
36
37use crate::services::Service;
38use configuration::{Opt, ServerConf};
39#[cfg(unix)]
40pub use transfer_fd::Fds;
41
42use pingora_error::{Error, ErrorType, Result};
43
44const EXIT_TIMEOUT: u64 = 60 * 5;
47const CLOSE_TIMEOUT: u64 = 5;
50
51enum ShutdownType {
52 Graceful,
53 Quick,
54}
55
56pub type ShutdownWatch = watch::Receiver<bool>;
59#[cfg(unix)]
60pub type ListenFds = Arc<Mutex<Fds>>;
61
62pub struct Server {
68 services: Vec<Box<dyn Service>>,
69 #[cfg(unix)]
70 listen_fds: Option<ListenFds>,
71 shutdown_watch: watch::Sender<bool>,
72 shutdown_recv: ShutdownWatch,
74 pub configuration: Arc<ServerConf>,
76 pub options: Option<Opt>,
78 #[cfg(feature = "sentry")]
79 #[cfg_attr(docsrs, doc(cfg(feature = "sentry")))]
80 pub sentry: Option<ClientOptions>,
84}
85
86impl Server {
89 #[cfg(unix)]
90 async fn main_loop(&self) -> ShutdownType {
91 let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
94 let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
95 let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
96 tokio::select! {
97 _ = fast_shutdown_signal.recv() => {
98 info!("SIGINT received, exiting");
99 ShutdownType::Quick
100 },
101 _ = graceful_terminate_signal.recv() => {
102 info!("SIGTERM received, gracefully exiting");
104 info!("Broadcasting graceful shutdown");
106 match self.shutdown_watch.send(true) {
107 Ok(_) => { info!("Graceful shutdown started!"); }
108 Err(e) => {
109 error!("Graceful shutdown broadcast failed: {e}");
110 }
111 }
112 info!("Broadcast graceful shutdown complete");
113 ShutdownType::Graceful
114 }
115 _ = graceful_upgrade_signal.recv() => {
116 info!("SIGQUIT received, sending socks and gracefully exiting");
119 if let Some(fds) = &self.listen_fds {
120 let fds = fds.lock().await;
121 info!("Trying to send socks");
122 match fds.send_to_sock(
124 self.configuration.as_ref().upgrade_sock.as_str())
125 {
126 Ok(_) => {info!("listener sockets sent");},
127 Err(e) => {
128 error!("Unable to send listener sockets to new process: {e}");
129 #[cfg(all(not(debug_assertions), feature = "sentry"))]
131 sentry::capture_error(&e);
132 }
133 }
134 sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
135 info!("Broadcasting graceful shutdown");
136 match self.shutdown_watch.send(true) {
138 Ok(_) => { info!("Graceful shutdown started!"); }
139 Err(e) => {
140 error!("Graceful shutdown broadcast failed: {e}");
141 return ShutdownType::Graceful;
143 }
144 }
145 info!("Broadcast graceful shutdown complete");
146 ShutdownType::Graceful
147 } else {
148 info!("No socks to send, shutting down.");
149 ShutdownType::Graceful
150 }
151 },
152 }
153 }
154
155 fn run_service(
156 mut service: Box<dyn Service>,
157 #[cfg(unix)] fds: Option<ListenFds>,
158 shutdown: ShutdownWatch,
159 threads: usize,
160 work_stealing: bool,
161 ) -> Runtime
162{
165 let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
166 service_runtime.get_handle().spawn(async move {
167 service
168 .start_service(
169 #[cfg(unix)]
170 fds,
171 shutdown,
172 )
173 .await;
174 info!("service exited.")
175 });
176 service_runtime
177 }
178
179 #[cfg(unix)]
180 fn load_fds(&mut self, upgrade: bool) -> Result<(), nix::Error> {
181 let mut fds = Fds::new();
182 if upgrade {
183 debug!("Trying to receive socks");
184 fds.get_from_sock(self.configuration.as_ref().upgrade_sock.as_str())?
185 }
186 self.listen_fds = Some(Arc::new(Mutex::new(fds)));
187 Ok(())
188 }
189
190 pub fn new_with_opt_and_conf(raw_opt: impl Into<Option<Opt>>, mut conf: ServerConf) -> Server {
198 let opt = raw_opt.into();
199 if let Some(opts) = &opt {
200 if let Some(c) = opts.conf.as_ref() {
201 warn!("Ignoring command line argument using '{c}' as configuration, and using provided configuration instead.");
202 }
203 conf.merge_with_opt(opts);
204 }
205
206 let (tx, rx) = watch::channel(false);
207
208 Server {
209 services: vec![],
210 #[cfg(unix)]
211 listen_fds: None,
212 shutdown_watch: tx,
213 shutdown_recv: rx,
214 configuration: Arc::new(conf),
215 options: opt,
216 #[cfg(feature = "sentry")]
217 sentry: None,
218 }
219 }
220
221 pub fn new(opt: impl Into<Option<Opt>>) -> Result<Server> {
229 let opt = opt.into();
230 let (tx, rx) = watch::channel(false);
231
232 let conf = if let Some(opt) = opt.as_ref() {
233 opt.conf.as_ref().map_or_else(
234 || {
235 ServerConf::new_with_opt_override(opt).ok_or_else(|| {
237 Error::explain(ErrorType::ReadError, "Conf generation failed")
238 })
239 },
240 |_| {
241 ServerConf::load_yaml_with_opt_override(opt)
243 },
244 )
245 } else {
246 ServerConf::new()
247 .ok_or_else(|| Error::explain(ErrorType::ReadError, "Conf generation failed"))
248 }?;
249
250 Ok(Server {
251 services: vec![],
252 #[cfg(unix)]
253 listen_fds: None,
254 shutdown_watch: tx,
255 shutdown_recv: rx,
256 configuration: Arc::new(conf),
257 options: opt,
258 #[cfg(feature = "sentry")]
259 sentry: None,
260 })
261 }
262
263 pub fn add_service(&mut self, service: impl Service + 'static) {
267 self.services.push(Box::new(service));
268 }
269
270 pub fn add_services(&mut self, services: Vec<Box<dyn Service>>) {
272 self.services.extend(services);
273 }
274
275 pub fn bootstrap(&mut self) {
280 info!("Bootstrap starting");
281 debug!("{:#?}", self.options);
282
283 #[cfg(all(not(debug_assertions), feature = "sentry"))]
285 let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
286
287 if self.options.as_ref().map_or(false, |o| o.test) {
288 info!("Server Test passed, exiting");
289 std::process::exit(0);
290 }
291
292 #[cfg(unix)]
294 match self.load_fds(self.options.as_ref().map_or(false, |o| o.upgrade)) {
295 Ok(_) => {
296 info!("Bootstrap done");
297 }
298 Err(e) => {
299 #[cfg(all(not(debug_assertions), feature = "sentry"))]
301 sentry::capture_error(&e);
302
303 error!("Bootstrap failed on error: {:?}, exiting.", e);
304 std::process::exit(1);
305 }
306 }
307 }
308
309 pub fn run_forever(mut self) -> ! {
317 info!("Server starting");
318
319 let conf = self.configuration.as_ref();
320
321 #[cfg(unix)]
322 if conf.daemon {
323 info!("Daemonizing the server");
324 fast_timeout::pause_for_fork();
325 daemonize(&self.configuration);
326 fast_timeout::unpause();
327 }
328
329 #[cfg(windows)]
330 if conf.daemon {
331 panic!("Daemonizing under windows is not supported");
332 }
333
334 #[cfg(all(not(debug_assertions), feature = "sentry"))]
336 let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
337
338 let mut runtimes: Vec<Runtime> = Vec::new();
339
340 while let Some(service) = self.services.pop() {
341 let threads = service.threads().unwrap_or(conf.threads);
342 let runtime = Server::run_service(
343 service,
344 #[cfg(unix)]
345 self.listen_fds.clone(),
346 self.shutdown_recv.clone(),
347 threads,
348 conf.work_stealing,
349 );
350 runtimes.push(runtime);
351 }
352
353 let server_runtime = Server::create_runtime("Server", 1, true);
356 #[cfg(unix)]
357 let shutdown_type = server_runtime.get_handle().block_on(self.main_loop());
358 #[cfg(windows)]
359 let shutdown_type = ShutdownType::Graceful;
360
361 if matches!(shutdown_type, ShutdownType::Graceful) {
362 let exit_timeout = self
363 .configuration
364 .as_ref()
365 .grace_period_seconds
366 .unwrap_or(EXIT_TIMEOUT);
367 info!("Graceful shutdown: grace period {}s starts", exit_timeout);
368 thread::sleep(Duration::from_secs(exit_timeout));
369 info!("Graceful shutdown: grace period ends");
370 }
371
372 let shutdown_timeout = match shutdown_type {
374 ShutdownType::Quick => Duration::from_secs(0),
375 ShutdownType::Graceful => Duration::from_secs(
376 self.configuration
377 .as_ref()
378 .graceful_shutdown_timeout_seconds
379 .unwrap_or(5),
380 ),
381 };
382 let shutdowns: Vec<_> = runtimes
383 .into_iter()
384 .map(|rt| {
385 info!("Waiting for runtimes to exit!");
386 thread::spawn(move || {
387 rt.shutdown_timeout(shutdown_timeout);
388 thread::sleep(shutdown_timeout)
389 })
390 })
391 .collect();
392 for shutdown in shutdowns {
393 if let Err(e) = shutdown.join() {
394 error!("Failed to shutdown runtime: {:?}", e);
395 }
396 }
397 info!("All runtimes exited, exiting now");
398 std::process::exit(0)
399 }
400
401 fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
402 if work_steal {
403 Runtime::new_steal(threads, name)
404 } else {
405 Runtime::new_no_steal(threads, name)
406 }
407 }
408}