tokio_signal/unix.rs
1//! Unix-specific types for signal handling.
2//!
3//! This module is only defined on Unix platforms and contains the primary
4//! `Signal` type for receiving notifications of signals.
5
6#![cfg(unix)]
7
8pub extern crate libc;
9extern crate mio;
10extern crate mio_uds;
11extern crate signal_hook_registry;
12
13use std::io::prelude::*;
14use std::io::{self, Error, ErrorKind};
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Mutex, Once, ONCE_INIT};
17
18use self::libc::c_int;
19use self::mio_uds::UnixStream;
20use futures::future;
21use futures::sync::mpsc::{channel, Receiver, Sender};
22use futures::{Async, Future};
23use futures::{Poll, Stream};
24use tokio_io::IoFuture;
25use tokio_reactor::{Handle, PollEvented};
26
27pub use self::libc::{SIGALRM, SIGHUP, SIGPIPE, SIGQUIT, SIGTRAP};
28pub use self::libc::{SIGINT, SIGTERM, SIGUSR1, SIGUSR2};
29
30/// BSD-specific definitions
31#[cfg(any(
32 target_os = "dragonfly",
33 target_os = "freebsd",
34 target_os = "macos",
35 target_os = "netbsd",
36 target_os = "openbsd",
37))]
38pub mod bsd {
39 #[cfg(any(
40 target_os = "dragonfly",
41 target_os = "freebsd",
42 target_os = "macos",
43 target_os = "netbsd",
44 target_os = "openbsd"
45 ))]
46 pub use super::libc::SIGINFO;
47}
48
49// Number of different unix signals
50// (FreeBSD has 33)
51const SIGNUM: usize = 33;
52
53type SignalSender = Sender<c_int>;
54
55struct SignalInfo {
56 pending: AtomicBool,
57 // The ones interested in this signal
58 recipients: Mutex<Vec<Box<SignalSender>>>,
59
60 init: Once,
61 initialized: AtomicBool,
62}
63
64struct Globals {
65 sender: UnixStream,
66 receiver: UnixStream,
67 signals: Vec<SignalInfo>,
68}
69
70impl Globals {
71 /// Register a new `Signal` instance's channel sender.
72 /// Returns a `SignalId` which should be later used for deregistering
73 /// this sender.
74 fn register_signal_sender(signal: c_int, tx: SignalSender) -> SignalId {
75 let tx = Box::new(tx);
76 let id = SignalId::from(&tx);
77
78 let idx = signal as usize;
79 globals().signals[idx].recipients.lock().unwrap().push(tx);
80 id
81 }
82
83 /// Deregister a `Signal` instance's channel sender because the `Signal`
84 /// is no longer interested in receiving events (e.g. dropped).
85 fn deregister_signal_receiver(signal: c_int, id: SignalId) {
86 let idx = signal as usize;
87 let mut list = globals().signals[idx].recipients.lock().unwrap();
88 list.retain(|sender| SignalId::from(sender) != id);
89 }
90}
91
92/// A newtype which represents a unique identifier for each `Signal` instance.
93/// The id is derived by boxing the channel `Sender` associated with this instance
94/// and using its address in memory.
95#[derive(Debug, Copy, Clone, PartialEq, Eq)]
96struct SignalId(usize);
97
98impl<'a> From<&'a Box<SignalSender>> for SignalId {
99 fn from(tx: &'a Box<SignalSender>) -> Self {
100 SignalId(&**tx as *const _ as usize)
101 }
102}
103
104impl Default for SignalInfo {
105 fn default() -> SignalInfo {
106 SignalInfo {
107 pending: AtomicBool::new(false),
108 init: ONCE_INIT,
109 initialized: AtomicBool::new(false),
110 recipients: Mutex::new(Vec::new()),
111 }
112 }
113}
114
115static mut GLOBALS: *mut Globals = 0 as *mut Globals;
116
117fn globals() -> &'static Globals {
118 static INIT: Once = ONCE_INIT;
119
120 unsafe {
121 INIT.call_once(|| {
122 let (receiver, sender) = UnixStream::pair().unwrap();
123 let globals = Globals {
124 sender: sender,
125 receiver: receiver,
126 signals: (0..SIGNUM).map(|_| Default::default()).collect(),
127 };
128 GLOBALS = Box::into_raw(Box::new(globals));
129 });
130 &*GLOBALS
131 }
132}
133
134/// Our global signal handler for all signals registered by this module.
135///
136/// The purpose of this signal handler is to primarily:
137///
138/// 1. Flag that our specific signal was received (e.g. store an atomic flag)
139/// 2. Wake up driver tasks by writing a byte to a pipe
140///
141/// Those two operations shoudl both be async-signal safe.
142fn action(slot: &SignalInfo, mut sender: &UnixStream) {
143 slot.pending.store(true, Ordering::SeqCst);
144
145 // Send a wakeup, ignore any errors (anything reasonably possible is
146 // full pipe and then it will wake up anyway).
147 drop(sender.write(&[1]));
148}
149
150/// Enable this module to receive signal notifications for the `signal`
151/// provided.
152///
153/// This will register the signal handler if it hasn't already been registered,
154/// returning any error along the way if that fails.
155fn signal_enable(signal: c_int) -> io::Result<()> {
156 if signal_hook_registry::FORBIDDEN.contains(&signal) {
157 return Err(Error::new(
158 ErrorKind::Other,
159 format!("Refusing to register signal {}", signal),
160 ));
161 }
162
163 let globals = globals();
164 let siginfo = match globals.signals.get(signal as usize) {
165 Some(slot) => slot,
166 None => return Err(io::Error::new(io::ErrorKind::Other, "signal too large")),
167 };
168 let mut registered = Ok(());
169 siginfo.init.call_once(|| {
170 registered = unsafe {
171 signal_hook_registry::register(signal, move || action(siginfo, &globals.sender))
172 .map(|_| ())
173 };
174 if registered.is_ok() {
175 siginfo.initialized.store(true, Ordering::Relaxed);
176 }
177 });
178 registered?;
179 // If the call_once failed, it won't be retried on the next attempt to register the signal. In
180 // such case it is not run, registered is still `Ok(())`, initialized is still false.
181 if siginfo.initialized.load(Ordering::Relaxed) {
182 Ok(())
183 } else {
184 Err(Error::new(
185 ErrorKind::Other,
186 "Failed to register signal handler",
187 ))
188 }
189}
190
191struct Driver {
192 wakeup: PollEvented<UnixStream>,
193}
194
195impl Future for Driver {
196 type Item = ();
197 type Error = ();
198
199 fn poll(&mut self) -> Poll<(), ()> {
200 // Drain the data from the pipe and maintain interest in getting more
201 self.drain();
202 // Broadcast any signals which were received
203 self.broadcast();
204
205 // This task just lives until the end of the event loop
206 Ok(Async::NotReady)
207 }
208}
209
210impl Driver {
211 fn new(handle: &Handle) -> io::Result<Driver> {
212 // NB: We give each driver a "fresh" reciever file descriptor to avoid
213 // the issues described in alexcrichton/tokio-process#42.
214 //
215 // In the past we would reuse the actual receiver file descriptor and
216 // swallow any errors around double registration of the same descriptor.
217 // I'm not sure if the second (failed) registration simply doesn't end up
218 // receiving wake up notifications, or there could be some race condition
219 // when consuming readiness events, but having distinct descriptors for
220 // distinct PollEvented instances appears to mitigate this.
221 //
222 // Unfortunately we cannot just use a single global PollEvented instance
223 // either, since we can't compare Handles or assume they will always
224 // point to the exact same reactor.
225 let stream = globals().receiver.try_clone()?;
226 let wakeup = PollEvented::new_with_handle(stream, handle)?;
227
228 Ok(Driver { wakeup: wakeup })
229 }
230
231 /// Drain all data in the global receiver, ensuring we'll get woken up when
232 /// there is a write on the other end.
233 ///
234 /// We do *NOT* use the existence of any read bytes as evidence a sigal was
235 /// received since the `pending` flags would have already been set if that
236 /// was the case. See #38 for more info.
237 fn drain(&mut self) {
238 loop {
239 match self.wakeup.read(&mut [0; 128]) {
240 Ok(0) => panic!("EOF on self-pipe"),
241 Ok(_) => {}
242 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
243 Err(e) => panic!("Bad read on self-pipe: {}", e),
244 }
245 }
246 }
247
248 /// Go through all the signals and broadcast everything.
249 ///
250 /// Driver tasks wake up for *any* signal and simply process all globally
251 /// registered signal streams, so each task is sort of cooperatively working
252 /// for all the rest as well.
253 fn broadcast(&self) {
254 for (sig, slot) in globals().signals.iter().enumerate() {
255 // Any signal of this kind arrived since we checked last?
256 if !slot.pending.swap(false, Ordering::SeqCst) {
257 continue;
258 }
259
260 let signum = sig as c_int;
261 let mut recipients = slot.recipients.lock().unwrap();
262
263 // Notify all waiters on this signal that the signal has been
264 // received. If we can't push a message into the queue then we don't
265 // worry about it as everything is coalesced anyway. If the channel
266 // has gone away then we can remove that slot.
267 for i in (0..recipients.len()).rev() {
268 match recipients[i].try_send(signum) {
269 Ok(()) => {}
270 Err(ref e) if e.is_disconnected() => {
271 recipients.swap_remove(i);
272 }
273
274 // Channel is full, ignore the error since the
275 // receiver has already been woken up
276 Err(e) => {
277 // Sanity check in case this error type ever gets
278 // additional variants we have not considered.
279 debug_assert!(e.is_full());
280 }
281 }
282 }
283 }
284 }
285}
286
287/// An implementation of `Stream` for receiving a particular type of signal.
288///
289/// This structure implements the `Stream` trait and represents notifications
290/// of the current process receiving a particular signal. The signal being
291/// listened for is passed to `Signal::new`, and the same signal number is then
292/// yielded as each element for the stream.
293///
294/// In general signal handling on Unix is a pretty tricky topic, and this
295/// structure is no exception! There are some important limitations to keep in
296/// mind when using `Signal` streams:
297///
298/// * Signals handling in Unix already necessitates coalescing signals
299/// together sometimes. This `Signal` stream is also no exception here in
300/// that it will also coalesce signals. That is, even if the signal handler
301/// for this process runs multiple times, the `Signal` stream may only return
302/// one signal notification. Specifically, before `poll` is called, all
303/// signal notifications are coalesced into one item returned from `poll`.
304/// Once `poll` has been called, however, a further signal is guaranteed to
305/// be yielded as an item.
306///
307/// Put another way, any element pulled off the returned stream corresponds to
308/// *at least one* signal, but possibly more.
309///
310/// * Signal handling in general is relatively inefficient. Although some
311/// improvements are possible in this crate, it's recommended to not plan on
312/// having millions of signal channels open.
313///
314/// * Currently the "driver task" to process incoming signals never exits. This
315/// driver task runs in the background of the event loop provided, and
316/// in general you shouldn't need to worry about it.
317///
318/// If you've got any questions about this feel free to open an issue on the
319/// repo, though, as I'd love to chat about this! In other words, I'd love to
320/// alleviate some of these limitations if possible!
321pub struct Signal {
322 driver: Driver,
323 signal: c_int,
324 id: SignalId,
325 rx: Receiver<c_int>,
326}
327
328impl Signal {
329 /// Creates a new stream which will receive notifications when the current
330 /// process receives the signal `signal`.
331 ///
332 /// This function will create a new stream which binds to the default event
333 /// loop. This function returns a future which will
334 /// then resolve to the signal stream, if successful.
335 ///
336 /// The `Signal` stream is an infinite stream which will receive
337 /// notifications whenever a signal is received. More documentation can be
338 /// found on `Signal` itself, but to reiterate:
339 ///
340 /// * Signals may be coalesced beyond what the kernel already does.
341 /// * Once a signal handler is registered with the process the underlying
342 /// libc signal handler is never unregistered.
343 ///
344 /// A `Signal` stream can be created for a particular signal number
345 /// multiple times. When a signal is received then all the associated
346 /// channels will receive the signal notification.
347 ///
348 /// # Errors
349 ///
350 /// * If the lower-level C functions fail for some reason.
351 /// * If the previous initialization of this specific signal failed.
352 /// * If the signal is one of
353 /// [`signal_hook::FORBIDDEN`](https://docs.rs/signal-hook/*/signal_hook/fn.register.html#panics)
354 pub fn new(signal: c_int) -> IoFuture<Signal> {
355 Signal::with_handle(signal, &Handle::default())
356 }
357
358 /// Creates a new stream which will receive notifications when the current
359 /// process receives the signal `signal`.
360 ///
361 /// This function will create a new stream which may be based on the
362 /// event loop handle provided. This function returns a future which will
363 /// then resolve to the signal stream, if successful.
364 ///
365 /// The `Signal` stream is an infinite stream which will receive
366 /// notifications whenever a signal is received. More documentation can be
367 /// found on `Signal` itself, but to reiterate:
368 ///
369 /// * Signals may be coalesced beyond what the kernel already does.
370 /// * Once a signal handler is registered with the process the underlying
371 /// libc signal handler is never unregistered.
372 ///
373 /// A `Signal` stream can be created for a particular signal number
374 /// multiple times. When a signal is received then all the associated
375 /// channels will receive the signal notification.
376 pub fn with_handle(signal: c_int, handle: &Handle) -> IoFuture<Signal> {
377 let handle = handle.clone();
378 Box::new(future::lazy(move || {
379 let result = (|| {
380 // Turn the signal delivery on once we are ready for it
381 signal_enable(signal)?;
382
383 // Ensure there's a driver for our associated event loop processing
384 // signals.
385 let driver = Driver::new(&handle)?;
386
387 // One wakeup in a queue is enough, no need for us to buffer up any
388 // more. NB: channels always guarantee at least one slot per sender,
389 // so we don't need additional slots
390 let (tx, rx) = channel(0);
391 let id = Globals::register_signal_sender(signal, tx);
392 Ok(Signal {
393 driver: driver,
394 rx: rx,
395 id: id,
396 signal: signal,
397 })
398 })();
399 future::result(result)
400 }))
401 }
402}
403
404impl Stream for Signal {
405 type Item = c_int;
406 type Error = io::Error;
407
408 fn poll(&mut self) -> Poll<Option<c_int>, io::Error> {
409 self.driver.poll().unwrap();
410 // receivers don't generate errors
411 self.rx.poll().map_err(|_| panic!())
412 }
413}
414
415impl Drop for Signal {
416 fn drop(&mut self) {
417 Globals::deregister_signal_receiver(self.signal, self.id);
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 extern crate tokio;
424
425 use super::*;
426
427 #[test]
428 fn dropped_signal_senders_are_cleaned_up() {
429 let mut rt =
430 self::tokio::runtime::current_thread::Runtime::new().expect("failed to init runtime");
431
432 let signum = libc::SIGUSR1;
433 let signal = rt
434 .block_on(Signal::new(signum))
435 .expect("failed to create signal");
436
437 {
438 let recipients = globals().signals[signum as usize]
439 .recipients
440 .lock()
441 .unwrap();
442 assert!(!recipients.is_empty());
443 }
444
445 drop(signal);
446
447 unsafe {
448 assert_eq!(libc::kill(libc::getpid(), signum), 0);
449 }
450
451 {
452 let recipients = globals().signals[signum as usize]
453 .recipients
454 .lock()
455 .unwrap();
456 assert!(recipients.is_empty());
457 }
458 }
459}