pingora_core/server/
mod.rs

1// Copyright 2024 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Server process and configuration management
16
17pub mod configuration;
18#[cfg(unix)]
19mod daemon;
20#[cfg(unix)]
21pub(crate) mod transfer_fd;
22
23#[cfg(unix)]
24use daemon::daemonize;
25use log::{debug, error, info, warn};
26use pingora_runtime::Runtime;
27use pingora_timeout::fast_timeout;
28#[cfg(feature = "sentry")]
29use sentry::ClientOptions;
30use std::sync::Arc;
31use std::thread;
32#[cfg(unix)]
33use tokio::signal::unix;
34use tokio::sync::{watch, Mutex};
35use tokio::time::{sleep, Duration};
36
37use crate::services::Service;
38use configuration::{Opt, ServerConf};
39#[cfg(unix)]
40pub use transfer_fd::Fds;
41
42use pingora_error::{Error, ErrorType, Result};
43
44/* Time to wait before exiting the program.
45This is the graceful period for all existing sessions to finish */
46const EXIT_TIMEOUT: u64 = 60 * 5;
47/* Time to wait before shutting down listening sockets.
48This is the graceful period for the new service to get ready */
49const CLOSE_TIMEOUT: u64 = 5;
50
51enum ShutdownType {
52    Graceful,
53    Quick,
54}
55
56/// The receiver for server's shutdown event. The value will turn to true once the server starts
57/// to shutdown
58pub type ShutdownWatch = watch::Receiver<bool>;
59#[cfg(unix)]
60pub type ListenFds = Arc<Mutex<Fds>>;
61
62/// The server object
63///
64/// This object represents an entire pingora server process which may have multiple independent
65/// services (see [crate::services]). The server object handles signals, reading configuration,
66/// zero downtime upgrade and error reporting.
67pub struct Server {
68    services: Vec<Box<dyn Service>>,
69    #[cfg(unix)]
70    listen_fds: Option<ListenFds>,
71    shutdown_watch: watch::Sender<bool>,
72    // TODO: we many want to drop this copy to let sender call closed()
73    shutdown_recv: ShutdownWatch,
74    /// The parsed server configuration
75    pub configuration: Arc<ServerConf>,
76    /// The parser command line options
77    pub options: Option<Opt>,
78    #[cfg(feature = "sentry")]
79    #[cfg_attr(docsrs, doc(cfg(feature = "sentry")))]
80    /// The Sentry ClientOptions.
81    ///
82    /// Panics and other events sentry captures will be sent to this DSN **only in release mode**
83    pub sentry: Option<ClientOptions>,
84}
85
86// TODO: delete the pid when exit
87
88impl Server {
89    #[cfg(unix)]
90    async fn main_loop(&self) -> ShutdownType {
91        // waiting for exit signal
92        // TODO: there should be a signal handling function
93        let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
94        let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
95        let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
96        tokio::select! {
97            _ = fast_shutdown_signal.recv() => {
98                info!("SIGINT received, exiting");
99                ShutdownType::Quick
100            },
101            _ = graceful_terminate_signal.recv() => {
102                // we receive a graceful terminate, all instances are instructed to stop
103                info!("SIGTERM received, gracefully exiting");
104                // graceful shutdown if there are listening sockets
105                info!("Broadcasting graceful shutdown");
106                match self.shutdown_watch.send(true) {
107                    Ok(_) => { info!("Graceful shutdown started!"); }
108                    Err(e) => {
109                        error!("Graceful shutdown broadcast failed: {e}");
110                    }
111                }
112                info!("Broadcast graceful shutdown complete");
113                ShutdownType::Graceful
114            }
115            _ = graceful_upgrade_signal.recv() => {
116                // TODO: still need to select! on signals in case a fast shutdown is needed
117                // aka: move below to another task and only kick it off here
118                info!("SIGQUIT received, sending socks and gracefully exiting");
119                if let Some(fds) = &self.listen_fds {
120                    let fds = fds.lock().await;
121                    info!("Trying to send socks");
122                    // XXX: this is blocking IO
123                    match fds.send_to_sock(
124                        self.configuration.as_ref().upgrade_sock.as_str())
125                    {
126                        Ok(_) => {info!("listener sockets sent");},
127                        Err(e) => {
128                            error!("Unable to send listener sockets to new process: {e}");
129                            // sentry log error on fd send failure
130                            #[cfg(all(not(debug_assertions), feature = "sentry"))]
131                            sentry::capture_error(&e);
132                        }
133                    }
134                    sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
135                    info!("Broadcasting graceful shutdown");
136                    // gracefully exiting
137                    match self.shutdown_watch.send(true) {
138                        Ok(_) => { info!("Graceful shutdown started!"); }
139                        Err(e) => {
140                            error!("Graceful shutdown broadcast failed: {e}");
141                            // switch to fast shutdown
142                            return ShutdownType::Graceful;
143                        }
144                    }
145                    info!("Broadcast graceful shutdown complete");
146                    ShutdownType::Graceful
147                } else {
148                    info!("No socks to send, shutting down.");
149                    ShutdownType::Graceful
150                }
151            },
152        }
153    }
154
155    fn run_service(
156        mut service: Box<dyn Service>,
157        #[cfg(unix)] fds: Option<ListenFds>,
158        shutdown: ShutdownWatch,
159        threads: usize,
160        work_stealing: bool,
161    ) -> Runtime
162// NOTE: we need to keep the runtime outside async since
163        // otherwise the runtime will be dropped.
164    {
165        let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
166        service_runtime.get_handle().spawn(async move {
167            service
168                .start_service(
169                    #[cfg(unix)]
170                    fds,
171                    shutdown,
172                )
173                .await;
174            info!("service exited.")
175        });
176        service_runtime
177    }
178
179    #[cfg(unix)]
180    fn load_fds(&mut self, upgrade: bool) -> Result<(), nix::Error> {
181        let mut fds = Fds::new();
182        if upgrade {
183            debug!("Trying to receive socks");
184            fds.get_from_sock(self.configuration.as_ref().upgrade_sock.as_str())?
185        }
186        self.listen_fds = Some(Arc::new(Mutex::new(fds)));
187        Ok(())
188    }
189
190    /// Create a new [`Server`], using the [`Opt`] and [`ServerConf`] values provided
191    ///
192    /// This method is intended for pingora frontends that are NOT using the built-in
193    /// command line and configuration file parsing, and are instead using their own.
194    ///
195    /// If a configuration file path is provided as part of `opt`, it will be ignored
196    /// and a warning will be logged.
197    pub fn new_with_opt_and_conf(raw_opt: impl Into<Option<Opt>>, mut conf: ServerConf) -> Server {
198        let opt = raw_opt.into();
199        if let Some(opts) = &opt {
200            if let Some(c) = opts.conf.as_ref() {
201                warn!("Ignoring command line argument using '{c}' as configuration, and using provided configuration instead.");
202            }
203            conf.merge_with_opt(opts);
204        }
205
206        let (tx, rx) = watch::channel(false);
207
208        Server {
209            services: vec![],
210            #[cfg(unix)]
211            listen_fds: None,
212            shutdown_watch: tx,
213            shutdown_recv: rx,
214            configuration: Arc::new(conf),
215            options: opt,
216            #[cfg(feature = "sentry")]
217            sentry: None,
218        }
219    }
220
221    /// Create a new [`Server`].
222    ///
223    /// Only one [`Server`] needs to be created for a process. A [`Server`] can hold multiple
224    /// independent services.
225    ///
226    /// Command line options can either be passed by parsing the command line arguments via
227    /// `Opt::parse_args()`, or be generated by other means.
228    pub fn new(opt: impl Into<Option<Opt>>) -> Result<Server> {
229        let opt = opt.into();
230        let (tx, rx) = watch::channel(false);
231
232        let conf = if let Some(opt) = opt.as_ref() {
233            opt.conf.as_ref().map_or_else(
234                || {
235                    // options, no conf, generated
236                    ServerConf::new_with_opt_override(opt).ok_or_else(|| {
237                        Error::explain(ErrorType::ReadError, "Conf generation failed")
238                    })
239                },
240                |_| {
241                    // options and conf loaded
242                    ServerConf::load_yaml_with_opt_override(opt)
243                },
244            )
245        } else {
246            ServerConf::new()
247                .ok_or_else(|| Error::explain(ErrorType::ReadError, "Conf generation failed"))
248        }?;
249
250        Ok(Server {
251            services: vec![],
252            #[cfg(unix)]
253            listen_fds: None,
254            shutdown_watch: tx,
255            shutdown_recv: rx,
256            configuration: Arc::new(conf),
257            options: opt,
258            #[cfg(feature = "sentry")]
259            sentry: None,
260        })
261    }
262
263    /// Add a service to this server.
264    ///
265    /// A service is anything that implements [`Service`].
266    pub fn add_service(&mut self, service: impl Service + 'static) {
267        self.services.push(Box::new(service));
268    }
269
270    /// Similar to [`Self::add_service()`], but take a list of services
271    pub fn add_services(&mut self, services: Vec<Box<dyn Service>>) {
272        self.services.extend(services);
273    }
274
275    /// Prepare the server to start
276    ///
277    /// When trying to zero downtime upgrade from an older version of the server which is already
278    /// running, this function will try to get all its listening sockets in order to take them over.
279    pub fn bootstrap(&mut self) {
280        info!("Bootstrap starting");
281        debug!("{:#?}", self.options);
282
283        /* only init sentry in release builds */
284        #[cfg(all(not(debug_assertions), feature = "sentry"))]
285        let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
286
287        if self.options.as_ref().map_or(false, |o| o.test) {
288            info!("Server Test passed, exiting");
289            std::process::exit(0);
290        }
291
292        // load fds
293        #[cfg(unix)]
294        match self.load_fds(self.options.as_ref().map_or(false, |o| o.upgrade)) {
295            Ok(_) => {
296                info!("Bootstrap done");
297            }
298            Err(e) => {
299                // sentry log error on fd load failure
300                #[cfg(all(not(debug_assertions), feature = "sentry"))]
301                sentry::capture_error(&e);
302
303                error!("Bootstrap failed on error: {:?}, exiting.", e);
304                std::process::exit(1);
305            }
306        }
307    }
308
309    /// Start the server
310    ///
311    /// This function will block forever until the server needs to quit. So this would be the last
312    /// function to call for this object.
313    ///
314    /// Note: this function may fork the process for daemonization, so any additional threads created
315    /// before this function will be lost to any service logic once this function is called.
316    pub fn run_forever(mut self) -> ! {
317        info!("Server starting");
318
319        let conf = self.configuration.as_ref();
320
321        #[cfg(unix)]
322        if conf.daemon {
323            info!("Daemonizing the server");
324            fast_timeout::pause_for_fork();
325            daemonize(&self.configuration);
326            fast_timeout::unpause();
327        }
328
329        #[cfg(windows)]
330        if conf.daemon {
331            panic!("Daemonizing under windows is not supported");
332        }
333
334        /* only init sentry in release builds */
335        #[cfg(all(not(debug_assertions), feature = "sentry"))]
336        let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
337
338        let mut runtimes: Vec<Runtime> = Vec::new();
339
340        while let Some(service) = self.services.pop() {
341            let threads = service.threads().unwrap_or(conf.threads);
342            let runtime = Server::run_service(
343                service,
344                #[cfg(unix)]
345                self.listen_fds.clone(),
346                self.shutdown_recv.clone(),
347                threads,
348                conf.work_stealing,
349            );
350            runtimes.push(runtime);
351        }
352
353        // blocked on main loop so that it runs forever
354        // Only work steal runtime can use block_on()
355        let server_runtime = Server::create_runtime("Server", 1, true);
356        #[cfg(unix)]
357        let shutdown_type = server_runtime.get_handle().block_on(self.main_loop());
358        #[cfg(windows)]
359        let shutdown_type = ShutdownType::Graceful;
360
361        if matches!(shutdown_type, ShutdownType::Graceful) {
362            let exit_timeout = self
363                .configuration
364                .as_ref()
365                .grace_period_seconds
366                .unwrap_or(EXIT_TIMEOUT);
367            info!("Graceful shutdown: grace period {}s starts", exit_timeout);
368            thread::sleep(Duration::from_secs(exit_timeout));
369            info!("Graceful shutdown: grace period ends");
370        }
371
372        // Give tokio runtimes time to exit
373        let shutdown_timeout = match shutdown_type {
374            ShutdownType::Quick => Duration::from_secs(0),
375            ShutdownType::Graceful => Duration::from_secs(
376                self.configuration
377                    .as_ref()
378                    .graceful_shutdown_timeout_seconds
379                    .unwrap_or(5),
380            ),
381        };
382        let shutdowns: Vec<_> = runtimes
383            .into_iter()
384            .map(|rt| {
385                info!("Waiting for runtimes to exit!");
386                thread::spawn(move || {
387                    rt.shutdown_timeout(shutdown_timeout);
388                    thread::sleep(shutdown_timeout)
389                })
390            })
391            .collect();
392        for shutdown in shutdowns {
393            if let Err(e) = shutdown.join() {
394                error!("Failed to shutdown runtime: {:?}", e);
395            }
396        }
397        info!("All runtimes exited, exiting now");
398        std::process::exit(0)
399    }
400
401    fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
402        if work_steal {
403            Runtime::new_steal(threads, name)
404        } else {
405            Runtime::new_no_steal(threads, name)
406        }
407    }
408}