tokio_signal/
unix.rs

1//! Unix-specific types for signal handling.
2//!
3//! This module is only defined on Unix platforms and contains the primary
4//! `Signal` type for receiving notifications of signals.
5
6#![cfg(unix)]
7
8pub extern crate libc;
9extern crate mio;
10extern crate mio_uds;
11extern crate signal_hook_registry;
12
13use std::io::prelude::*;
14use std::io::{self, Error, ErrorKind};
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Mutex, Once, ONCE_INIT};
17
18use self::libc::c_int;
19use self::mio_uds::UnixStream;
20use futures::future;
21use futures::sync::mpsc::{channel, Receiver, Sender};
22use futures::{Async, Future};
23use futures::{Poll, Stream};
24use tokio_io::IoFuture;
25use tokio_reactor::{Handle, PollEvented};
26
27pub use self::libc::{SIGALRM, SIGHUP, SIGPIPE, SIGQUIT, SIGTRAP};
28pub use self::libc::{SIGINT, SIGTERM, SIGUSR1, SIGUSR2};
29
30/// BSD-specific definitions
31#[cfg(any(
32    target_os = "dragonfly",
33    target_os = "freebsd",
34    target_os = "macos",
35    target_os = "netbsd",
36    target_os = "openbsd",
37))]
38pub mod bsd {
39    #[cfg(any(
40        target_os = "dragonfly",
41        target_os = "freebsd",
42        target_os = "macos",
43        target_os = "netbsd",
44        target_os = "openbsd"
45    ))]
46    pub use super::libc::SIGINFO;
47}
48
49// Number of different unix signals
50// (FreeBSD has 33)
51const SIGNUM: usize = 33;
52
53type SignalSender = Sender<c_int>;
54
55struct SignalInfo {
56    pending: AtomicBool,
57    // The ones interested in this signal
58    recipients: Mutex<Vec<Box<SignalSender>>>,
59
60    init: Once,
61    initialized: AtomicBool,
62}
63
64struct Globals {
65    sender: UnixStream,
66    receiver: UnixStream,
67    signals: Vec<SignalInfo>,
68}
69
70impl Globals {
71    /// Register a new `Signal` instance's channel sender.
72    /// Returns a `SignalId` which should be later used for deregistering
73    /// this sender.
74    fn register_signal_sender(signal: c_int, tx: SignalSender) -> SignalId {
75        let tx = Box::new(tx);
76        let id = SignalId::from(&tx);
77
78        let idx = signal as usize;
79        globals().signals[idx].recipients.lock().unwrap().push(tx);
80        id
81    }
82
83    /// Deregister a `Signal` instance's channel sender because the `Signal`
84    /// is no longer interested in receiving events (e.g. dropped).
85    fn deregister_signal_receiver(signal: c_int, id: SignalId) {
86        let idx = signal as usize;
87        let mut list = globals().signals[idx].recipients.lock().unwrap();
88        list.retain(|sender| SignalId::from(sender) != id);
89    }
90}
91
92/// A newtype which represents a unique identifier for each `Signal` instance.
93/// The id is derived by boxing the channel `Sender` associated with this instance
94/// and using its address in memory.
95#[derive(Debug, Copy, Clone, PartialEq, Eq)]
96struct SignalId(usize);
97
98impl<'a> From<&'a Box<SignalSender>> for SignalId {
99    fn from(tx: &'a Box<SignalSender>) -> Self {
100        SignalId(&**tx as *const _ as usize)
101    }
102}
103
104impl Default for SignalInfo {
105    fn default() -> SignalInfo {
106        SignalInfo {
107            pending: AtomicBool::new(false),
108            init: ONCE_INIT,
109            initialized: AtomicBool::new(false),
110            recipients: Mutex::new(Vec::new()),
111        }
112    }
113}
114
115static mut GLOBALS: *mut Globals = 0 as *mut Globals;
116
117fn globals() -> &'static Globals {
118    static INIT: Once = ONCE_INIT;
119
120    unsafe {
121        INIT.call_once(|| {
122            let (receiver, sender) = UnixStream::pair().unwrap();
123            let globals = Globals {
124                sender: sender,
125                receiver: receiver,
126                signals: (0..SIGNUM).map(|_| Default::default()).collect(),
127            };
128            GLOBALS = Box::into_raw(Box::new(globals));
129        });
130        &*GLOBALS
131    }
132}
133
134/// Our global signal handler for all signals registered by this module.
135///
136/// The purpose of this signal handler is to primarily:
137///
138/// 1. Flag that our specific signal was received (e.g. store an atomic flag)
139/// 2. Wake up driver tasks by writing a byte to a pipe
140///
141/// Those two operations shoudl both be async-signal safe.
142fn action(slot: &SignalInfo, mut sender: &UnixStream) {
143    slot.pending.store(true, Ordering::SeqCst);
144
145    // Send a wakeup, ignore any errors (anything reasonably possible is
146    // full pipe and then it will wake up anyway).
147    drop(sender.write(&[1]));
148}
149
150/// Enable this module to receive signal notifications for the `signal`
151/// provided.
152///
153/// This will register the signal handler if it hasn't already been registered,
154/// returning any error along the way if that fails.
155fn signal_enable(signal: c_int) -> io::Result<()> {
156    if signal_hook_registry::FORBIDDEN.contains(&signal) {
157        return Err(Error::new(
158            ErrorKind::Other,
159            format!("Refusing to register signal {}", signal),
160        ));
161    }
162
163    let globals = globals();
164    let siginfo = match globals.signals.get(signal as usize) {
165        Some(slot) => slot,
166        None => return Err(io::Error::new(io::ErrorKind::Other, "signal too large")),
167    };
168    let mut registered = Ok(());
169    siginfo.init.call_once(|| {
170        registered = unsafe {
171            signal_hook_registry::register(signal, move || action(siginfo, &globals.sender))
172                .map(|_| ())
173        };
174        if registered.is_ok() {
175            siginfo.initialized.store(true, Ordering::Relaxed);
176        }
177    });
178    registered?;
179    // If the call_once failed, it won't be retried on the next attempt to register the signal. In
180    // such case it is not run, registered is still `Ok(())`, initialized is still false.
181    if siginfo.initialized.load(Ordering::Relaxed) {
182        Ok(())
183    } else {
184        Err(Error::new(
185            ErrorKind::Other,
186            "Failed to register signal handler",
187        ))
188    }
189}
190
191struct Driver {
192    wakeup: PollEvented<UnixStream>,
193}
194
195impl Future for Driver {
196    type Item = ();
197    type Error = ();
198
199    fn poll(&mut self) -> Poll<(), ()> {
200        // Drain the data from the pipe and maintain interest in getting more
201        self.drain();
202        // Broadcast any signals which were received
203        self.broadcast();
204
205        // This task just lives until the end of the event loop
206        Ok(Async::NotReady)
207    }
208}
209
210impl Driver {
211    fn new(handle: &Handle) -> io::Result<Driver> {
212        // NB: We give each driver a "fresh" reciever file descriptor to avoid
213        // the issues described in alexcrichton/tokio-process#42.
214        //
215        // In the past we would reuse the actual receiver file descriptor and
216        // swallow any errors around double registration of the same descriptor.
217        // I'm not sure if the second (failed) registration simply doesn't end up
218        // receiving wake up notifications, or there could be some race condition
219        // when consuming readiness events, but having distinct descriptors for
220        // distinct PollEvented instances appears to mitigate this.
221        //
222        // Unfortunately we cannot just use a single global PollEvented instance
223        // either, since we can't compare Handles or assume they will always
224        // point to the exact same reactor.
225        let stream = globals().receiver.try_clone()?;
226        let wakeup = PollEvented::new_with_handle(stream, handle)?;
227
228        Ok(Driver { wakeup: wakeup })
229    }
230
231    /// Drain all data in the global receiver, ensuring we'll get woken up when
232    /// there is a write on the other end.
233    ///
234    /// We do *NOT* use the existence of any read bytes as evidence a sigal was
235    /// received since the `pending` flags would have already been set if that
236    /// was the case. See #38 for more info.
237    fn drain(&mut self) {
238        loop {
239            match self.wakeup.read(&mut [0; 128]) {
240                Ok(0) => panic!("EOF on self-pipe"),
241                Ok(_) => {}
242                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
243                Err(e) => panic!("Bad read on self-pipe: {}", e),
244            }
245        }
246    }
247
248    /// Go through all the signals and broadcast everything.
249    ///
250    /// Driver tasks wake up for *any* signal and simply process all globally
251    /// registered signal streams, so each task is sort of cooperatively working
252    /// for all the rest as well.
253    fn broadcast(&self) {
254        for (sig, slot) in globals().signals.iter().enumerate() {
255            // Any signal of this kind arrived since we checked last?
256            if !slot.pending.swap(false, Ordering::SeqCst) {
257                continue;
258            }
259
260            let signum = sig as c_int;
261            let mut recipients = slot.recipients.lock().unwrap();
262
263            // Notify all waiters on this signal that the signal has been
264            // received. If we can't push a message into the queue then we don't
265            // worry about it as everything is coalesced anyway. If the channel
266            // has gone away then we can remove that slot.
267            for i in (0..recipients.len()).rev() {
268                match recipients[i].try_send(signum) {
269                    Ok(()) => {}
270                    Err(ref e) if e.is_disconnected() => {
271                        recipients.swap_remove(i);
272                    }
273
274                    // Channel is full, ignore the error since the
275                    // receiver has already been woken up
276                    Err(e) => {
277                        // Sanity check in case this error type ever gets
278                        // additional variants we have not considered.
279                        debug_assert!(e.is_full());
280                    }
281                }
282            }
283        }
284    }
285}
286
287/// An implementation of `Stream` for receiving a particular type of signal.
288///
289/// This structure implements the `Stream` trait and represents notifications
290/// of the current process receiving a particular signal. The signal being
291/// listened for is passed to `Signal::new`, and the same signal number is then
292/// yielded as each element for the stream.
293///
294/// In general signal handling on Unix is a pretty tricky topic, and this
295/// structure is no exception! There are some important limitations to keep in
296/// mind when using `Signal` streams:
297///
298/// * Signals handling in Unix already necessitates coalescing signals
299///   together sometimes. This `Signal` stream is also no exception here in
300///   that it will also coalesce signals. That is, even if the signal handler
301///   for this process runs multiple times, the `Signal` stream may only return
302///   one signal notification. Specifically, before `poll` is called, all
303///   signal notifications are coalesced into one item returned from `poll`.
304///   Once `poll` has been called, however, a further signal is guaranteed to
305///   be yielded as an item.
306///
307///   Put another way, any element pulled off the returned stream corresponds to
308///   *at least one* signal, but possibly more.
309///
310/// * Signal handling in general is relatively inefficient. Although some
311///   improvements are possible in this crate, it's recommended to not plan on
312///   having millions of signal channels open.
313///
314/// * Currently the "driver task" to process incoming signals never exits. This
315///   driver task runs in the background of the event loop provided, and
316///   in general you shouldn't need to worry about it.
317///
318/// If you've got any questions about this feel free to open an issue on the
319/// repo, though, as I'd love to chat about this! In other words, I'd love to
320/// alleviate some of these limitations if possible!
321pub struct Signal {
322    driver: Driver,
323    signal: c_int,
324    id: SignalId,
325    rx: Receiver<c_int>,
326}
327
328impl Signal {
329    /// Creates a new stream which will receive notifications when the current
330    /// process receives the signal `signal`.
331    ///
332    /// This function will create a new stream which binds to the default event
333    /// loop. This function returns a future which will
334    /// then resolve to the signal stream, if successful.
335    ///
336    /// The `Signal` stream is an infinite stream which will receive
337    /// notifications whenever a signal is received. More documentation can be
338    /// found on `Signal` itself, but to reiterate:
339    ///
340    /// * Signals may be coalesced beyond what the kernel already does.
341    /// * Once a signal handler is registered with the process the underlying
342    ///   libc signal handler is never unregistered.
343    ///
344    /// A `Signal` stream can be created for a particular signal number
345    /// multiple times. When a signal is received then all the associated
346    /// channels will receive the signal notification.
347    ///
348    /// # Errors
349    ///
350    /// * If the lower-level C functions fail for some reason.
351    /// * If the previous initialization of this specific signal failed.
352    /// * If the signal is one of
353    ///   [`signal_hook::FORBIDDEN`](https://docs.rs/signal-hook/*/signal_hook/fn.register.html#panics)
354    pub fn new(signal: c_int) -> IoFuture<Signal> {
355        Signal::with_handle(signal, &Handle::default())
356    }
357
358    /// Creates a new stream which will receive notifications when the current
359    /// process receives the signal `signal`.
360    ///
361    /// This function will create a new stream which may be based on the
362    /// event loop handle provided. This function returns a future which will
363    /// then resolve to the signal stream, if successful.
364    ///
365    /// The `Signal` stream is an infinite stream which will receive
366    /// notifications whenever a signal is received. More documentation can be
367    /// found on `Signal` itself, but to reiterate:
368    ///
369    /// * Signals may be coalesced beyond what the kernel already does.
370    /// * Once a signal handler is registered with the process the underlying
371    ///   libc signal handler is never unregistered.
372    ///
373    /// A `Signal` stream can be created for a particular signal number
374    /// multiple times. When a signal is received then all the associated
375    /// channels will receive the signal notification.
376    pub fn with_handle(signal: c_int, handle: &Handle) -> IoFuture<Signal> {
377        let handle = handle.clone();
378        Box::new(future::lazy(move || {
379            let result = (|| {
380                // Turn the signal delivery on once we are ready for it
381                signal_enable(signal)?;
382
383                // Ensure there's a driver for our associated event loop processing
384                // signals.
385                let driver = Driver::new(&handle)?;
386
387                // One wakeup in a queue is enough, no need for us to buffer up any
388                // more. NB: channels always guarantee at least one slot per sender,
389                // so we don't need additional slots
390                let (tx, rx) = channel(0);
391                let id = Globals::register_signal_sender(signal, tx);
392                Ok(Signal {
393                    driver: driver,
394                    rx: rx,
395                    id: id,
396                    signal: signal,
397                })
398            })();
399            future::result(result)
400        }))
401    }
402}
403
404impl Stream for Signal {
405    type Item = c_int;
406    type Error = io::Error;
407
408    fn poll(&mut self) -> Poll<Option<c_int>, io::Error> {
409        self.driver.poll().unwrap();
410        // receivers don't generate errors
411        self.rx.poll().map_err(|_| panic!())
412    }
413}
414
415impl Drop for Signal {
416    fn drop(&mut self) {
417        Globals::deregister_signal_receiver(self.signal, self.id);
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    extern crate tokio;
424
425    use super::*;
426
427    #[test]
428    fn dropped_signal_senders_are_cleaned_up() {
429        let mut rt =
430            self::tokio::runtime::current_thread::Runtime::new().expect("failed to init runtime");
431
432        let signum = libc::SIGUSR1;
433        let signal = rt
434            .block_on(Signal::new(signum))
435            .expect("failed to create signal");
436
437        {
438            let recipients = globals().signals[signum as usize]
439                .recipients
440                .lock()
441                .unwrap();
442            assert!(!recipients.is_empty());
443        }
444
445        drop(signal);
446
447        unsafe {
448            assert_eq!(libc::kill(libc::getpid(), signum), 0);
449        }
450
451        {
452            let recipients = globals().signals[signum as usize]
453                .recipients
454                .lock()
455                .unwrap();
456            assert!(recipients.is_empty());
457        }
458    }
459}