broker_tokio/signal/
unix.rs

1//! Unix-specific types for signal handling.
2//!
3//! This module is only defined on Unix platforms and contains the primary
4//! `Signal` type for receiving notifications of signals.
5
6#![cfg(unix)]
7
8use crate::io::{AsyncRead, PollEvented};
9use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
10use crate::sync::mpsc::{channel, Receiver};
11
12use libc::c_int;
13use mio_uds::UnixStream;
14use std::io::{self, Error, ErrorKind, Write};
15use std::pin::Pin;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Once;
18use std::task::{Context, Poll};
19
20pub(crate) type OsStorage = Vec<SignalInfo>;
21
22// Number of different unix signals
23// (FreeBSD has 33)
24const SIGNUM: usize = 33;
25
26impl Init for OsStorage {
27    fn init() -> Self {
28        (0..SIGNUM).map(|_| SignalInfo::default()).collect()
29    }
30}
31
32impl Storage for OsStorage {
33    fn event_info(&self, id: EventId) -> Option<&EventInfo> {
34        self.get(id).map(|si| &si.event_info)
35    }
36
37    fn for_each<'a, F>(&'a self, f: F)
38    where
39        F: FnMut(&'a EventInfo),
40    {
41        self.iter().map(|si| &si.event_info).for_each(f)
42    }
43}
44
45#[derive(Debug)]
46pub(crate) struct OsExtraData {
47    sender: UnixStream,
48    receiver: UnixStream,
49}
50
51impl Init for OsExtraData {
52    fn init() -> Self {
53        let (receiver, sender) = UnixStream::pair().expect("failed to create UnixStream");
54
55        Self { sender, receiver }
56    }
57}
58
59/// Represents the specific kind of signal to listen for.
60#[derive(Debug, Clone, Copy)]
61pub struct SignalKind(c_int);
62
63impl SignalKind {
64    /// Allows for listening to any valid OS signal.
65    ///
66    /// For example, this can be used for listening for platform-specific
67    /// signals.
68    /// ```rust,no_run
69    /// # use tokio::signal::unix::SignalKind;
70    /// # let signum = -1;
71    /// // let signum = libc::OS_SPECIFIC_SIGNAL;
72    /// let kind = SignalKind::from_raw(signum);
73    /// ```
74    pub fn from_raw(signum: c_int) -> Self {
75        Self(signum)
76    }
77
78    /// Represents the SIGALRM signal.
79    ///
80    /// On Unix systems this signal is sent when a real-time timer has expired.
81    /// By default, the process is terminated by this signal.
82    pub fn alarm() -> Self {
83        Self(libc::SIGALRM)
84    }
85
86    /// Represents the SIGCHLD signal.
87    ///
88    /// On Unix systems this signal is sent when the status of a child process
89    /// has changed. By default, this signal is ignored.
90    pub fn child() -> Self {
91        Self(libc::SIGCHLD)
92    }
93
94    /// Represents the SIGHUP signal.
95    ///
96    /// On Unix systems this signal is sent when the terminal is disconnected.
97    /// By default, the process is terminated by this signal.
98    pub fn hangup() -> Self {
99        Self(libc::SIGHUP)
100    }
101
102    /// Represents the SIGINFO signal.
103    ///
104    /// On Unix systems this signal is sent to request a status update from the
105    /// process. By default, this signal is ignored.
106    #[cfg(any(
107        target_os = "dragonfly",
108        target_os = "freebsd",
109        target_os = "macos",
110        target_os = "netbsd",
111        target_os = "openbsd"
112    ))]
113    pub fn info() -> Self {
114        Self(libc::SIGINFO)
115    }
116
117    /// Represents the SIGINT signal.
118    ///
119    /// On Unix systems this signal is sent to interrupt a program.
120    /// By default, the process is terminated by this signal.
121    pub fn interrupt() -> Self {
122        Self(libc::SIGINT)
123    }
124
125    /// Represents the SIGIO signal.
126    ///
127    /// On Unix systems this signal is sent when I/O operations are possible
128    /// on some file descriptor. By default, this signal is ignored.
129    pub fn io() -> Self {
130        Self(libc::SIGIO)
131    }
132
133    /// Represents the SIGPIPE signal.
134    ///
135    /// On Unix systems this signal is sent when the process attempts to write
136    /// to a pipe which has no reader. By default, the process is terminated by
137    /// this signal.
138    pub fn pipe() -> Self {
139        Self(libc::SIGPIPE)
140    }
141
142    /// Represents the SIGQUIT signal.
143    ///
144    /// On Unix systems this signal is sent to issue a shutdown of the
145    /// process, after which the OS will dump the process core.
146    /// By default, the process is terminated by this signal.
147    pub fn quit() -> Self {
148        Self(libc::SIGQUIT)
149    }
150
151    /// Represents the SIGTERM signal.
152    ///
153    /// On Unix systems this signal is sent to issue a shutdown of the
154    /// process. By default, the process is terminated by this signal.
155    pub fn terminate() -> Self {
156        Self(libc::SIGTERM)
157    }
158
159    /// Represents the SIGUSR1 signal.
160    ///
161    /// On Unix systems this is a user defined signal.
162    /// By default, the process is terminated by this signal.
163    pub fn user_defined1() -> Self {
164        Self(libc::SIGUSR1)
165    }
166
167    /// Represents the SIGUSR2 signal.
168    ///
169    /// On Unix systems this is a user defined signal.
170    /// By default, the process is terminated by this signal.
171    pub fn user_defined2() -> Self {
172        Self(libc::SIGUSR2)
173    }
174
175    /// Represents the SIGWINCH signal.
176    ///
177    /// On Unix systems this signal is sent when the terminal window is resized.
178    /// By default, this signal is ignored.
179    pub fn window_change() -> Self {
180        Self(libc::SIGWINCH)
181    }
182}
183
184pub(crate) struct SignalInfo {
185    event_info: EventInfo,
186    init: Once,
187    initialized: AtomicBool,
188}
189
190impl Default for SignalInfo {
191    fn default() -> SignalInfo {
192        SignalInfo {
193            event_info: Default::default(),
194            init: Once::new(),
195            initialized: AtomicBool::new(false),
196        }
197    }
198}
199
200/// Our global signal handler for all signals registered by this module.
201///
202/// The purpose of this signal handler is to primarily:
203///
204/// 1. Flag that our specific signal was received (e.g. store an atomic flag)
205/// 2. Wake up driver tasks by writing a byte to a pipe
206///
207/// Those two operations shoudl both be async-signal safe.
208fn action(globals: Pin<&'static Globals>, signal: c_int) {
209    globals.record_event(signal as EventId);
210
211    // Send a wakeup, ignore any errors (anything reasonably possible is
212    // full pipe and then it will wake up anyway).
213    let mut sender = &globals.sender;
214    drop(sender.write(&[1]));
215}
216
217/// Enable this module to receive signal notifications for the `signal`
218/// provided.
219///
220/// This will register the signal handler if it hasn't already been registered,
221/// returning any error along the way if that fails.
222fn signal_enable(signal: c_int) -> io::Result<()> {
223    if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) {
224        return Err(Error::new(
225            ErrorKind::Other,
226            format!("Refusing to register signal {}", signal),
227        ));
228    }
229
230    let globals = globals();
231    let siginfo = match globals.storage().get(signal as EventId) {
232        Some(slot) => slot,
233        None => return Err(io::Error::new(io::ErrorKind::Other, "signal too large")),
234    };
235    let mut registered = Ok(());
236    siginfo.init.call_once(|| {
237        registered = unsafe {
238            signal_hook_registry::register(signal, move || action(globals, signal)).map(|_| ())
239        };
240        if registered.is_ok() {
241            siginfo.initialized.store(true, Ordering::Relaxed);
242        }
243    });
244    registered?;
245    // If the call_once failed, it won't be retried on the next attempt to register the signal. In
246    // such case it is not run, registered is still `Ok(())`, initialized is still false.
247    if siginfo.initialized.load(Ordering::Relaxed) {
248        Ok(())
249    } else {
250        Err(Error::new(
251            ErrorKind::Other,
252            "Failed to register signal handler",
253        ))
254    }
255}
256
257#[derive(Debug)]
258struct Driver {
259    wakeup: PollEvented<UnixStream>,
260}
261
262impl Driver {
263    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
264        // Drain the data from the pipe and maintain interest in getting more
265        self.drain(cx);
266        // Broadcast any signals which were received
267        globals().broadcast();
268
269        Poll::Pending
270    }
271}
272
273impl Driver {
274    fn new() -> io::Result<Driver> {
275        // NB: We give each driver a "fresh" reciever file descriptor to avoid
276        // the issues described in alexcrichton/tokio-process#42.
277        //
278        // In the past we would reuse the actual receiver file descriptor and
279        // swallow any errors around double registration of the same descriptor.
280        // I'm not sure if the second (failed) registration simply doesn't end up
281        // receiving wake up notifications, or there could be some race condition
282        // when consuming readiness events, but having distinct descriptors for
283        // distinct PollEvented instances appears to mitigate this.
284        //
285        // Unfortunately we cannot just use a single global PollEvented instance
286        // either, since we can't compare Handles or assume they will always
287        // point to the exact same reactor.
288        let stream = globals().receiver.try_clone()?;
289        let wakeup = PollEvented::new(stream)?;
290
291        Ok(Driver { wakeup })
292    }
293
294    /// Drain all data in the global receiver, ensuring we'll get woken up when
295    /// there is a write on the other end.
296    ///
297    /// We do *NOT* use the existence of any read bytes as evidence a signal was
298    /// received since the `pending` flags would have already been set if that
299    /// was the case. See
300    /// [#38](https://github.com/alexcrichton/tokio-signal/issues/38) for more
301    /// info.
302    fn drain(&mut self, cx: &mut Context<'_>) {
303        loop {
304            match Pin::new(&mut self.wakeup).poll_read(cx, &mut [0; 128]) {
305                Poll::Ready(Ok(0)) => panic!("EOF on self-pipe"),
306                Poll::Ready(Ok(_)) => {}
307                Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e),
308                Poll::Pending => break,
309            }
310        }
311    }
312}
313
314/// A stream of events for receiving a particular type of OS signal.
315///
316/// In general signal handling on Unix is a pretty tricky topic, and this
317/// structure is no exception! There are some important limitations to keep in
318/// mind when using `Signal` streams:
319///
320/// * Signals handling in Unix already necessitates coalescing signals
321///   together sometimes. This `Signal` stream is also no exception here in
322///   that it will also coalesce signals. That is, even if the signal handler
323///   for this process runs multiple times, the `Signal` stream may only return
324///   one signal notification. Specifically, before `poll` is called, all
325///   signal notifications are coalesced into one item returned from `poll`.
326///   Once `poll` has been called, however, a further signal is guaranteed to
327///   be yielded as an item.
328///
329///   Put another way, any element pulled off the returned stream corresponds to
330///   *at least one* signal, but possibly more.
331///
332/// * Signal handling in general is relatively inefficient. Although some
333///   improvements are possible in this crate, it's recommended to not plan on
334///   having millions of signal channels open.
335///
336/// If you've got any questions about this feel free to open an issue on the
337/// repo! New approaches to alleviate some of these limitations are always
338/// appreciated!
339///
340/// # Caveats
341///
342/// The first time that a `Signal` instance is registered for a particular
343/// signal kind, an OS signal-handler is installed which replaces the default
344/// platform behavior when that signal is received, **for the duration of the
345/// entire process**.
346///
347/// For example, Unix systems will terminate a process by default when it
348/// receives SIGINT. But, when a `Signal` instance is created to listen for
349/// this signal, the next SIGINT that arrives will be translated to a stream
350/// event, and the process will continue to execute. **Even if this `Signal`
351/// instance is dropped, subsequent SIGINT deliveries will end up captured by
352/// Tokio, and the default platform behavior will NOT be reset**.
353///
354/// Thus, applications should take care to ensure the expected signal behavior
355/// occurs as expected after listening for specific signals.
356///
357/// # Examples
358///
359/// Wait for SIGHUP
360///
361/// ```rust,no_run
362/// use tokio::signal::unix::{signal, SignalKind};
363///
364/// #[tokio::main]
365/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
366///     // An infinite stream of hangup signals.
367///     let mut stream = signal(SignalKind::hangup())?;
368///
369///     // Print whenever a HUP signal is received
370///     loop {
371///         stream.recv().await;
372///         println!("got signal HUP");
373///     }
374/// }
375/// ```
376#[must_use = "streams do nothing unless polled"]
377#[derive(Debug)]
378pub struct Signal {
379    driver: Driver,
380    rx: Receiver<()>,
381}
382
383/// Creates a new stream which will receive notifications when the current
384/// process receives the specified signal `kind`.
385///
386/// This function will create a new stream which binds to the default reactor.
387/// The `Signal` stream is an infinite stream which will receive
388/// notifications whenever a signal is received. More documentation can be
389/// found on `Signal` itself, but to reiterate:
390///
391/// * Signals may be coalesced beyond what the kernel already does.
392/// * Once a signal handler is registered with the process the underlying
393///   libc signal handler is never unregistered.
394///
395/// A `Signal` stream can be created for a particular signal number
396/// multiple times. When a signal is received then all the associated
397/// channels will receive the signal notification.
398///
399/// # Errors
400///
401/// * If the lower-level C functions fail for some reason.
402/// * If the previous initialization of this specific signal failed.
403/// * If the signal is one of
404///   [`signal_hook::FORBIDDEN`](https://docs.rs/signal-hook/*/signal_hook/fn.register.html#panics)
405pub fn signal(kind: SignalKind) -> io::Result<Signal> {
406    let signal = kind.0;
407
408    // Turn the signal delivery on once we are ready for it
409    signal_enable(signal)?;
410
411    // Ensure there's a driver for our associated event loop processing
412    // signals.
413    let driver = Driver::new()?;
414
415    // One wakeup in a queue is enough, no need for us to buffer up any
416    // more.
417    let (tx, rx) = channel(1);
418    globals().register_listener(signal as EventId, tx);
419
420    Ok(Signal { driver, rx })
421}
422
423impl Signal {
424    /// Receive the next signal notification event.
425    ///
426    /// `None` is returned if no more events can be received by this stream.
427    ///
428    /// # Examples
429    ///
430    /// Wait for SIGHUP
431    ///
432    /// ```rust,no_run
433    /// use tokio::signal::unix::{signal, SignalKind};
434    ///
435    /// #[tokio::main]
436    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
437    ///     // An infinite stream of hangup signals.
438    ///     let mut stream = signal(SignalKind::hangup())?;
439    ///
440    ///     // Print whenever a HUP signal is received
441    ///     loop {
442    ///         stream.recv().await;
443    ///         println!("got signal HUP");
444    ///     }
445    /// }
446    /// ```
447    pub async fn recv(&mut self) -> Option<()> {
448        use crate::future::poll_fn;
449        poll_fn(|cx| self.poll_recv(cx)).await
450    }
451
452    /// Poll to receive the next signal notification event, outside of an
453    /// `async` context.
454    ///
455    /// `None` is returned if no more events can be received by this stream.
456    ///
457    /// # Examples
458    ///
459    /// Polling from a manually implemented future
460    ///
461    /// ```rust,no_run
462    /// use std::pin::Pin;
463    /// use std::future::Future;
464    /// use std::task::{Context, Poll};
465    /// use tokio::signal::unix::Signal;
466    ///
467    /// struct MyFuture {
468    ///     signal: Signal,
469    /// }
470    ///
471    /// impl Future for MyFuture {
472    ///     type Output = Option<()>;
473    ///
474    ///     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
475    ///         println!("polling MyFuture");
476    ///         self.signal.poll_recv(cx)
477    ///     }
478    /// }
479    /// ```
480    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
481        let _ = self.driver.poll(cx);
482        self.rx.poll_recv(cx)
483    }
484}
485
486cfg_stream! {
487    impl crate::stream::Stream for Signal {
488        type Item = ();
489
490        fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
491            self.poll_recv(cx)
492        }
493    }
494}
495
496pub(crate) fn ctrl_c() -> io::Result<Signal> {
497    signal(SignalKind::interrupt())
498}
499
500#[cfg(all(test, not(loom)))]
501mod tests {
502    use super::*;
503
504    #[test]
505    fn signal_enable_error_on_invalid_input() {
506        signal_enable(-1).unwrap_err();
507    }
508
509    #[test]
510    fn signal_enable_error_on_forbidden_input() {
511        signal_enable(signal_hook_registry::FORBIDDEN[0]).unwrap_err();
512    }
513}