broker_tokio/signal/unix.rs
1//! Unix-specific types for signal handling.
2//!
3//! This module is only defined on Unix platforms and contains the primary
4//! `Signal` type for receiving notifications of signals.
5
6#![cfg(unix)]
7
8use crate::io::{AsyncRead, PollEvented};
9use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
10use crate::sync::mpsc::{channel, Receiver};
11
12use libc::c_int;
13use mio_uds::UnixStream;
14use std::io::{self, Error, ErrorKind, Write};
15use std::pin::Pin;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Once;
18use std::task::{Context, Poll};
19
20pub(crate) type OsStorage = Vec<SignalInfo>;
21
22// Number of different unix signals
23// (FreeBSD has 33)
24const SIGNUM: usize = 33;
25
26impl Init for OsStorage {
27 fn init() -> Self {
28 (0..SIGNUM).map(|_| SignalInfo::default()).collect()
29 }
30}
31
32impl Storage for OsStorage {
33 fn event_info(&self, id: EventId) -> Option<&EventInfo> {
34 self.get(id).map(|si| &si.event_info)
35 }
36
37 fn for_each<'a, F>(&'a self, f: F)
38 where
39 F: FnMut(&'a EventInfo),
40 {
41 self.iter().map(|si| &si.event_info).for_each(f)
42 }
43}
44
45#[derive(Debug)]
46pub(crate) struct OsExtraData {
47 sender: UnixStream,
48 receiver: UnixStream,
49}
50
51impl Init for OsExtraData {
52 fn init() -> Self {
53 let (receiver, sender) = UnixStream::pair().expect("failed to create UnixStream");
54
55 Self { sender, receiver }
56 }
57}
58
59/// Represents the specific kind of signal to listen for.
60#[derive(Debug, Clone, Copy)]
61pub struct SignalKind(c_int);
62
63impl SignalKind {
64 /// Allows for listening to any valid OS signal.
65 ///
66 /// For example, this can be used for listening for platform-specific
67 /// signals.
68 /// ```rust,no_run
69 /// # use tokio::signal::unix::SignalKind;
70 /// # let signum = -1;
71 /// // let signum = libc::OS_SPECIFIC_SIGNAL;
72 /// let kind = SignalKind::from_raw(signum);
73 /// ```
74 pub fn from_raw(signum: c_int) -> Self {
75 Self(signum)
76 }
77
78 /// Represents the SIGALRM signal.
79 ///
80 /// On Unix systems this signal is sent when a real-time timer has expired.
81 /// By default, the process is terminated by this signal.
82 pub fn alarm() -> Self {
83 Self(libc::SIGALRM)
84 }
85
86 /// Represents the SIGCHLD signal.
87 ///
88 /// On Unix systems this signal is sent when the status of a child process
89 /// has changed. By default, this signal is ignored.
90 pub fn child() -> Self {
91 Self(libc::SIGCHLD)
92 }
93
94 /// Represents the SIGHUP signal.
95 ///
96 /// On Unix systems this signal is sent when the terminal is disconnected.
97 /// By default, the process is terminated by this signal.
98 pub fn hangup() -> Self {
99 Self(libc::SIGHUP)
100 }
101
102 /// Represents the SIGINFO signal.
103 ///
104 /// On Unix systems this signal is sent to request a status update from the
105 /// process. By default, this signal is ignored.
106 #[cfg(any(
107 target_os = "dragonfly",
108 target_os = "freebsd",
109 target_os = "macos",
110 target_os = "netbsd",
111 target_os = "openbsd"
112 ))]
113 pub fn info() -> Self {
114 Self(libc::SIGINFO)
115 }
116
117 /// Represents the SIGINT signal.
118 ///
119 /// On Unix systems this signal is sent to interrupt a program.
120 /// By default, the process is terminated by this signal.
121 pub fn interrupt() -> Self {
122 Self(libc::SIGINT)
123 }
124
125 /// Represents the SIGIO signal.
126 ///
127 /// On Unix systems this signal is sent when I/O operations are possible
128 /// on some file descriptor. By default, this signal is ignored.
129 pub fn io() -> Self {
130 Self(libc::SIGIO)
131 }
132
133 /// Represents the SIGPIPE signal.
134 ///
135 /// On Unix systems this signal is sent when the process attempts to write
136 /// to a pipe which has no reader. By default, the process is terminated by
137 /// this signal.
138 pub fn pipe() -> Self {
139 Self(libc::SIGPIPE)
140 }
141
142 /// Represents the SIGQUIT signal.
143 ///
144 /// On Unix systems this signal is sent to issue a shutdown of the
145 /// process, after which the OS will dump the process core.
146 /// By default, the process is terminated by this signal.
147 pub fn quit() -> Self {
148 Self(libc::SIGQUIT)
149 }
150
151 /// Represents the SIGTERM signal.
152 ///
153 /// On Unix systems this signal is sent to issue a shutdown of the
154 /// process. By default, the process is terminated by this signal.
155 pub fn terminate() -> Self {
156 Self(libc::SIGTERM)
157 }
158
159 /// Represents the SIGUSR1 signal.
160 ///
161 /// On Unix systems this is a user defined signal.
162 /// By default, the process is terminated by this signal.
163 pub fn user_defined1() -> Self {
164 Self(libc::SIGUSR1)
165 }
166
167 /// Represents the SIGUSR2 signal.
168 ///
169 /// On Unix systems this is a user defined signal.
170 /// By default, the process is terminated by this signal.
171 pub fn user_defined2() -> Self {
172 Self(libc::SIGUSR2)
173 }
174
175 /// Represents the SIGWINCH signal.
176 ///
177 /// On Unix systems this signal is sent when the terminal window is resized.
178 /// By default, this signal is ignored.
179 pub fn window_change() -> Self {
180 Self(libc::SIGWINCH)
181 }
182}
183
184pub(crate) struct SignalInfo {
185 event_info: EventInfo,
186 init: Once,
187 initialized: AtomicBool,
188}
189
190impl Default for SignalInfo {
191 fn default() -> SignalInfo {
192 SignalInfo {
193 event_info: Default::default(),
194 init: Once::new(),
195 initialized: AtomicBool::new(false),
196 }
197 }
198}
199
200/// Our global signal handler for all signals registered by this module.
201///
202/// The purpose of this signal handler is to primarily:
203///
204/// 1. Flag that our specific signal was received (e.g. store an atomic flag)
205/// 2. Wake up driver tasks by writing a byte to a pipe
206///
207/// Those two operations shoudl both be async-signal safe.
208fn action(globals: Pin<&'static Globals>, signal: c_int) {
209 globals.record_event(signal as EventId);
210
211 // Send a wakeup, ignore any errors (anything reasonably possible is
212 // full pipe and then it will wake up anyway).
213 let mut sender = &globals.sender;
214 drop(sender.write(&[1]));
215}
216
217/// Enable this module to receive signal notifications for the `signal`
218/// provided.
219///
220/// This will register the signal handler if it hasn't already been registered,
221/// returning any error along the way if that fails.
222fn signal_enable(signal: c_int) -> io::Result<()> {
223 if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) {
224 return Err(Error::new(
225 ErrorKind::Other,
226 format!("Refusing to register signal {}", signal),
227 ));
228 }
229
230 let globals = globals();
231 let siginfo = match globals.storage().get(signal as EventId) {
232 Some(slot) => slot,
233 None => return Err(io::Error::new(io::ErrorKind::Other, "signal too large")),
234 };
235 let mut registered = Ok(());
236 siginfo.init.call_once(|| {
237 registered = unsafe {
238 signal_hook_registry::register(signal, move || action(globals, signal)).map(|_| ())
239 };
240 if registered.is_ok() {
241 siginfo.initialized.store(true, Ordering::Relaxed);
242 }
243 });
244 registered?;
245 // If the call_once failed, it won't be retried on the next attempt to register the signal. In
246 // such case it is not run, registered is still `Ok(())`, initialized is still false.
247 if siginfo.initialized.load(Ordering::Relaxed) {
248 Ok(())
249 } else {
250 Err(Error::new(
251 ErrorKind::Other,
252 "Failed to register signal handler",
253 ))
254 }
255}
256
257#[derive(Debug)]
258struct Driver {
259 wakeup: PollEvented<UnixStream>,
260}
261
262impl Driver {
263 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
264 // Drain the data from the pipe and maintain interest in getting more
265 self.drain(cx);
266 // Broadcast any signals which were received
267 globals().broadcast();
268
269 Poll::Pending
270 }
271}
272
273impl Driver {
274 fn new() -> io::Result<Driver> {
275 // NB: We give each driver a "fresh" reciever file descriptor to avoid
276 // the issues described in alexcrichton/tokio-process#42.
277 //
278 // In the past we would reuse the actual receiver file descriptor and
279 // swallow any errors around double registration of the same descriptor.
280 // I'm not sure if the second (failed) registration simply doesn't end up
281 // receiving wake up notifications, or there could be some race condition
282 // when consuming readiness events, but having distinct descriptors for
283 // distinct PollEvented instances appears to mitigate this.
284 //
285 // Unfortunately we cannot just use a single global PollEvented instance
286 // either, since we can't compare Handles or assume they will always
287 // point to the exact same reactor.
288 let stream = globals().receiver.try_clone()?;
289 let wakeup = PollEvented::new(stream)?;
290
291 Ok(Driver { wakeup })
292 }
293
294 /// Drain all data in the global receiver, ensuring we'll get woken up when
295 /// there is a write on the other end.
296 ///
297 /// We do *NOT* use the existence of any read bytes as evidence a signal was
298 /// received since the `pending` flags would have already been set if that
299 /// was the case. See
300 /// [#38](https://github.com/alexcrichton/tokio-signal/issues/38) for more
301 /// info.
302 fn drain(&mut self, cx: &mut Context<'_>) {
303 loop {
304 match Pin::new(&mut self.wakeup).poll_read(cx, &mut [0; 128]) {
305 Poll::Ready(Ok(0)) => panic!("EOF on self-pipe"),
306 Poll::Ready(Ok(_)) => {}
307 Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e),
308 Poll::Pending => break,
309 }
310 }
311 }
312}
313
314/// A stream of events for receiving a particular type of OS signal.
315///
316/// In general signal handling on Unix is a pretty tricky topic, and this
317/// structure is no exception! There are some important limitations to keep in
318/// mind when using `Signal` streams:
319///
320/// * Signals handling in Unix already necessitates coalescing signals
321/// together sometimes. This `Signal` stream is also no exception here in
322/// that it will also coalesce signals. That is, even if the signal handler
323/// for this process runs multiple times, the `Signal` stream may only return
324/// one signal notification. Specifically, before `poll` is called, all
325/// signal notifications are coalesced into one item returned from `poll`.
326/// Once `poll` has been called, however, a further signal is guaranteed to
327/// be yielded as an item.
328///
329/// Put another way, any element pulled off the returned stream corresponds to
330/// *at least one* signal, but possibly more.
331///
332/// * Signal handling in general is relatively inefficient. Although some
333/// improvements are possible in this crate, it's recommended to not plan on
334/// having millions of signal channels open.
335///
336/// If you've got any questions about this feel free to open an issue on the
337/// repo! New approaches to alleviate some of these limitations are always
338/// appreciated!
339///
340/// # Caveats
341///
342/// The first time that a `Signal` instance is registered for a particular
343/// signal kind, an OS signal-handler is installed which replaces the default
344/// platform behavior when that signal is received, **for the duration of the
345/// entire process**.
346///
347/// For example, Unix systems will terminate a process by default when it
348/// receives SIGINT. But, when a `Signal` instance is created to listen for
349/// this signal, the next SIGINT that arrives will be translated to a stream
350/// event, and the process will continue to execute. **Even if this `Signal`
351/// instance is dropped, subsequent SIGINT deliveries will end up captured by
352/// Tokio, and the default platform behavior will NOT be reset**.
353///
354/// Thus, applications should take care to ensure the expected signal behavior
355/// occurs as expected after listening for specific signals.
356///
357/// # Examples
358///
359/// Wait for SIGHUP
360///
361/// ```rust,no_run
362/// use tokio::signal::unix::{signal, SignalKind};
363///
364/// #[tokio::main]
365/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
366/// // An infinite stream of hangup signals.
367/// let mut stream = signal(SignalKind::hangup())?;
368///
369/// // Print whenever a HUP signal is received
370/// loop {
371/// stream.recv().await;
372/// println!("got signal HUP");
373/// }
374/// }
375/// ```
376#[must_use = "streams do nothing unless polled"]
377#[derive(Debug)]
378pub struct Signal {
379 driver: Driver,
380 rx: Receiver<()>,
381}
382
383/// Creates a new stream which will receive notifications when the current
384/// process receives the specified signal `kind`.
385///
386/// This function will create a new stream which binds to the default reactor.
387/// The `Signal` stream is an infinite stream which will receive
388/// notifications whenever a signal is received. More documentation can be
389/// found on `Signal` itself, but to reiterate:
390///
391/// * Signals may be coalesced beyond what the kernel already does.
392/// * Once a signal handler is registered with the process the underlying
393/// libc signal handler is never unregistered.
394///
395/// A `Signal` stream can be created for a particular signal number
396/// multiple times. When a signal is received then all the associated
397/// channels will receive the signal notification.
398///
399/// # Errors
400///
401/// * If the lower-level C functions fail for some reason.
402/// * If the previous initialization of this specific signal failed.
403/// * If the signal is one of
404/// [`signal_hook::FORBIDDEN`](https://docs.rs/signal-hook/*/signal_hook/fn.register.html#panics)
405pub fn signal(kind: SignalKind) -> io::Result<Signal> {
406 let signal = kind.0;
407
408 // Turn the signal delivery on once we are ready for it
409 signal_enable(signal)?;
410
411 // Ensure there's a driver for our associated event loop processing
412 // signals.
413 let driver = Driver::new()?;
414
415 // One wakeup in a queue is enough, no need for us to buffer up any
416 // more.
417 let (tx, rx) = channel(1);
418 globals().register_listener(signal as EventId, tx);
419
420 Ok(Signal { driver, rx })
421}
422
423impl Signal {
424 /// Receive the next signal notification event.
425 ///
426 /// `None` is returned if no more events can be received by this stream.
427 ///
428 /// # Examples
429 ///
430 /// Wait for SIGHUP
431 ///
432 /// ```rust,no_run
433 /// use tokio::signal::unix::{signal, SignalKind};
434 ///
435 /// #[tokio::main]
436 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
437 /// // An infinite stream of hangup signals.
438 /// let mut stream = signal(SignalKind::hangup())?;
439 ///
440 /// // Print whenever a HUP signal is received
441 /// loop {
442 /// stream.recv().await;
443 /// println!("got signal HUP");
444 /// }
445 /// }
446 /// ```
447 pub async fn recv(&mut self) -> Option<()> {
448 use crate::future::poll_fn;
449 poll_fn(|cx| self.poll_recv(cx)).await
450 }
451
452 /// Poll to receive the next signal notification event, outside of an
453 /// `async` context.
454 ///
455 /// `None` is returned if no more events can be received by this stream.
456 ///
457 /// # Examples
458 ///
459 /// Polling from a manually implemented future
460 ///
461 /// ```rust,no_run
462 /// use std::pin::Pin;
463 /// use std::future::Future;
464 /// use std::task::{Context, Poll};
465 /// use tokio::signal::unix::Signal;
466 ///
467 /// struct MyFuture {
468 /// signal: Signal,
469 /// }
470 ///
471 /// impl Future for MyFuture {
472 /// type Output = Option<()>;
473 ///
474 /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
475 /// println!("polling MyFuture");
476 /// self.signal.poll_recv(cx)
477 /// }
478 /// }
479 /// ```
480 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
481 let _ = self.driver.poll(cx);
482 self.rx.poll_recv(cx)
483 }
484}
485
486cfg_stream! {
487 impl crate::stream::Stream for Signal {
488 type Item = ();
489
490 fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
491 self.poll_recv(cx)
492 }
493 }
494}
495
496pub(crate) fn ctrl_c() -> io::Result<Signal> {
497 signal(SignalKind::interrupt())
498}
499
500#[cfg(all(test, not(loom)))]
501mod tests {
502 use super::*;
503
504 #[test]
505 fn signal_enable_error_on_invalid_input() {
506 signal_enable(-1).unwrap_err();
507 }
508
509 #[test]
510 fn signal_enable_error_on_forbidden_input() {
511 signal_enable(signal_hook_registry::FORBIDDEN[0]).unwrap_err();
512 }
513}