tokio_signal/
windows.rs

1//! Windows-specific types for signal handling.
2//!
3//! This module is only defined on Windows and contains the primary `Event` type
4//! for receiving notifications of events. These events are listened for via the
5//! `SetConsoleCtrlHandler` function which receives events of the type
6//! `CTRL_C_EVENT` and `CTRL_BREAK_EVENT`
7
8#![cfg(windows)]
9
10extern crate mio;
11extern crate winapi;
12
13use std::cell::RefCell;
14use std::io;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Once, ONCE_INIT};
17
18use self::winapi::shared::minwindef::*;
19use self::winapi::um::consoleapi::SetConsoleCtrlHandler;
20use self::winapi::um::wincon::*;
21use futures::future;
22use futures::stream::Fuse;
23use futures::sync::mpsc;
24use futures::sync::oneshot;
25use futures::{Async, Future, Poll, Stream};
26use mio::Ready;
27use tokio_reactor::{Handle, PollEvented};
28
29use IoFuture;
30
31static INIT: Once = ONCE_INIT;
32static mut GLOBAL_STATE: *mut GlobalState = 0 as *mut _;
33
34/// Stream of events discovered via `SetConsoleCtrlHandler`.
35///
36/// This structure can be used to listen for events of the type `CTRL_C_EVENT`
37/// and `CTRL_BREAK_EVENT`. The `Stream` trait is implemented for this struct
38/// and will resolve for each notification received by the process. Note that
39/// there are few limitations with this as well:
40///
41/// * A notification to this process notifies *all* `Event` streams for that
42///   event type.
43/// * Notifications to an `Event` stream **are coalesced** if they aren't
44///   processed quickly enough. This means that if two notifications are
45///   received back-to-back, then the stream may only receive one item about the
46///   two notifications.
47pub struct Event {
48    reg: PollEvented<MyRegistration>,
49    _finished: oneshot::Sender<()>,
50}
51
52struct GlobalState {
53    ready: mio::SetReadiness,
54    tx: mpsc::UnboundedSender<Message>,
55    ctrl_c: GlobalEventState,
56    ctrl_break: GlobalEventState,
57}
58
59struct GlobalEventState {
60    ready: AtomicBool,
61}
62
63enum Message {
64    NewEvent(DWORD, oneshot::Sender<io::Result<Event>>),
65}
66
67struct DriverTask {
68    handle: Handle,
69    reg: PollEvented<MyRegistration>,
70    rx: Fuse<mpsc::UnboundedReceiver<Message>>,
71    ctrl_c: EventState,
72    ctrl_break: EventState,
73}
74
75struct EventState {
76    tasks: Vec<(RefCell<oneshot::Receiver<()>>, mio::SetReadiness)>,
77}
78
79impl Event {
80    /// Creates a new stream listening for the `CTRL_C_EVENT` events.
81    ///
82    /// This function will register a handler via `SetConsoleCtrlHandler` and
83    /// deliver notifications to the returned stream.
84    pub fn ctrl_c() -> IoFuture<Event> {
85        Event::ctrl_c_handle(&Handle::default())
86    }
87
88    /// Creates a new stream listening for the `CTRL_C_EVENT` events.
89    ///
90    /// This function will register a handler via `SetConsoleCtrlHandler` and
91    /// deliver notifications to the returned stream.
92    pub fn ctrl_c_handle(handle: &Handle) -> IoFuture<Event> {
93        Event::new(CTRL_C_EVENT, handle)
94    }
95
96    /// Creates a new stream listening for the `CTRL_BREAK_EVENT` events.
97    ///
98    /// This function will register a handler via `SetConsoleCtrlHandler` and
99    /// deliver notifications to the returned stream.
100    pub fn ctrl_break() -> IoFuture<Event> {
101        Event::ctrl_break_handle(&Handle::default())
102    }
103
104    /// Creates a new stream listening for the `CTRL_BREAK_EVENT` events.
105    ///
106    /// This function will register a handler via `SetConsoleCtrlHandler` and
107    /// deliver notifications to the returned stream.
108    pub fn ctrl_break_handle(handle: &Handle) -> IoFuture<Event> {
109        Event::new(CTRL_BREAK_EVENT, handle)
110    }
111
112    fn new(signum: DWORD, handle: &Handle) -> IoFuture<Event> {
113        let handle = handle.clone();
114        let new_signal = future::poll_fn(move || {
115            let mut init = None;
116            INIT.call_once(|| {
117                init = Some(global_init(&handle));
118            });
119
120            if let Some(Err(e)) = init {
121                return Err(e);
122            }
123
124            let (tx, rx) = oneshot::channel();
125            let msg = Message::NewEvent(signum, tx);
126            let res = unsafe { (*GLOBAL_STATE).tx.clone().unbounded_send(msg) };
127            res.expect(
128                "failed to request a new signal stream, did the \
129                 first event loop go away?",
130            );
131            Ok(Async::Ready(rx.then(|r| r.unwrap())))
132        });
133
134        Box::new(new_signal.flatten())
135    }
136}
137
138impl Stream for Event {
139    type Item = ();
140    type Error = io::Error;
141
142    fn poll(&mut self) -> Poll<Option<()>, io::Error> {
143        if !self.reg.poll_read_ready(Ready::readable())?.is_ready() {
144            return Ok(Async::NotReady);
145        }
146        self.reg.clear_read_ready(Ready::readable())?;
147        self.reg
148            .get_ref()
149            .readiness
150            .set_readiness(mio::Ready::empty())
151            .expect("failed to set readiness");
152        Ok(Async::Ready(Some(())))
153    }
154}
155
156fn global_init(handle: &Handle) -> io::Result<()> {
157    let reg = MyRegistration::new();
158    let ready = reg.readiness.clone();
159
160    let (tx, rx) = mpsc::unbounded();
161    let reg = try!(PollEvented::new_with_handle(reg, handle));
162
163    unsafe {
164        let state = Box::new(GlobalState {
165            ready: ready,
166            ctrl_c: GlobalEventState {
167                ready: AtomicBool::new(false),
168            },
169            ctrl_break: GlobalEventState {
170                ready: AtomicBool::new(false),
171            },
172            tx: tx,
173        });
174        GLOBAL_STATE = Box::into_raw(state);
175
176        let rc = SetConsoleCtrlHandler(Some(handler), TRUE);
177        if rc == 0 {
178            Box::from_raw(GLOBAL_STATE);
179            GLOBAL_STATE = 0 as *mut _;
180            return Err(io::Error::last_os_error());
181        }
182
183        ::tokio_executor::spawn(Box::new(DriverTask {
184            handle: handle.clone(),
185            rx: rx.fuse(),
186            reg: reg,
187            ctrl_c: EventState { tasks: Vec::new() },
188            ctrl_break: EventState { tasks: Vec::new() },
189        }));
190
191        Ok(())
192    }
193}
194
195impl Future for DriverTask {
196    type Item = ();
197    type Error = ();
198
199    fn poll(&mut self) -> Poll<(), ()> {
200        self.check_event_drops();
201        self.check_messages();
202        self.check_events().unwrap();
203
204        // TODO: when to finish this task?
205        Ok(Async::NotReady)
206    }
207}
208
209impl DriverTask {
210    fn check_event_drops(&mut self) {
211        self.ctrl_c
212            .tasks
213            .retain(|task| !task.0.borrow_mut().poll().is_err());
214        self.ctrl_break
215            .tasks
216            .retain(|task| !task.0.borrow_mut().poll().is_err());
217    }
218
219    fn check_messages(&mut self) {
220        loop {
221            // Acquire the next message
222            let message = match self.rx.poll().unwrap() {
223                Async::Ready(Some(e)) => e,
224                Async::Ready(None) | Async::NotReady => break,
225            };
226            let (sig, complete) = match message {
227                Message::NewEvent(sig, complete) => (sig, complete),
228            };
229
230            let event = if sig == CTRL_C_EVENT {
231                &mut self.ctrl_c
232            } else {
233                &mut self.ctrl_break
234            };
235
236            // Acquire the (registration, set_readiness) pair by... assuming
237            // we're on the event loop (true because of the spawn above).
238            let reg = MyRegistration::new();
239            let ready = reg.readiness.clone();
240
241            let reg = match PollEvented::new_with_handle(reg, &self.handle) {
242                Ok(reg) => reg,
243                Err(e) => {
244                    drop(complete.send(Err(e)));
245                    continue;
246                }
247            };
248
249            // Create the `Event` to pass back and then also keep a handle to
250            // the `SetReadiness` for ourselves internally.
251            let (tx, rx) = oneshot::channel();
252            drop(complete.send(Ok(Event {
253                reg: reg,
254                _finished: tx,
255            })));
256            event.tasks.push((RefCell::new(rx), ready));
257        }
258    }
259
260    fn check_events(&mut self) -> io::Result<()> {
261        if self.reg.poll_read_ready(Ready::readable())?.is_not_ready() {
262            return Ok(());
263        }
264        self.reg.clear_read_ready(Ready::readable())?;
265        self.reg
266            .get_ref()
267            .readiness
268            .set_readiness(mio::Ready::empty())
269            .expect("failed to set readiness");
270
271        if unsafe { (*GLOBAL_STATE).ctrl_c.ready.swap(false, Ordering::SeqCst) } {
272            for task in self.ctrl_c.tasks.iter() {
273                task.1.set_readiness(mio::Ready::readable()).unwrap();
274            }
275        }
276        if unsafe {
277            (*GLOBAL_STATE)
278                .ctrl_break
279                .ready
280                .swap(false, Ordering::SeqCst)
281        } {
282            for task in self.ctrl_break.tasks.iter() {
283                task.1.set_readiness(mio::Ready::readable()).unwrap();
284            }
285        }
286        Ok(())
287    }
288}
289
290unsafe extern "system" fn handler(ty: DWORD) -> BOOL {
291    let event = match ty {
292        CTRL_C_EVENT => &(*GLOBAL_STATE).ctrl_c,
293        CTRL_BREAK_EVENT => &(*GLOBAL_STATE).ctrl_break,
294        _ => return FALSE,
295    };
296    if event.ready.swap(true, Ordering::SeqCst) {
297        FALSE
298    } else {
299        drop((*GLOBAL_STATE).ready.set_readiness(mio::Ready::readable()));
300        // TODO(1000): this will report that we handled a CTRL_BREAK_EVENT when
301        //       in fact we may not have any streams actually created for that
302        //       event.
303        TRUE
304    }
305}
306
307struct MyRegistration {
308    registration: mio::Registration,
309    readiness: mio::SetReadiness,
310}
311
312impl MyRegistration {
313    fn new() -> Self {
314        let (registration, readiness) = mio::Registration::new2();
315
316        Self {
317            registration,
318            readiness,
319        }
320    }
321}
322
323impl mio::Evented for MyRegistration {
324    fn register(
325        &self,
326        poll: &mio::Poll,
327        token: mio::Token,
328        events: mio::Ready,
329        opts: mio::PollOpt,
330    ) -> io::Result<()> {
331        self.registration.register(poll, token, events, opts)
332    }
333
334    fn reregister(
335        &self,
336        poll: &mio::Poll,
337        token: mio::Token,
338        events: mio::Ready,
339        opts: mio::PollOpt,
340    ) -> io::Result<()> {
341        self.registration.reregister(poll, token, events, opts)
342    }
343
344    fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
345        mio::Evented::deregister(&self.registration, poll)
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    extern crate tokio;
352
353    use self::tokio::runtime::current_thread;
354    use self::tokio::timer::Timeout;
355    use super::*;
356    use std::time::Duration;
357
358    fn with_timeout<F: Future>(future: F) -> impl Future<Item = F::Item, Error = F::Error> {
359        Timeout::new(future, Duration::from_secs(1)).map_err(|e| {
360            if e.is_timer() {
361                panic!("failed to register timer");
362            } else if e.is_elapsed() {
363                panic!("timed out")
364            } else {
365                e.into_inner().expect("missing inner error")
366            }
367        })
368    }
369
370    #[test]
371    fn ctrl_c_and_ctrl_break() {
372        // FIXME(1000): combining into one test due to a restriction where the
373        // first event loop cannot go away
374        let mut rt = current_thread::Runtime::new().unwrap();
375        let event_ctrl_c = rt
376            .block_on(with_timeout(Event::ctrl_c()))
377            .expect("failed to run future");
378
379        // Windows doesn't have a good programmatic way of sending events
380        // like sending signals on Unix, so we'll stub out the actual OS
381        // integration and test that our handling works.
382        unsafe {
383            super::handler(CTRL_C_EVENT);
384        }
385
386        rt.block_on(with_timeout(event_ctrl_c.into_future()))
387            .ok()
388            .expect("failed to run event");
389
390        let event_ctrl_break = rt
391            .block_on(with_timeout(Event::ctrl_break()))
392            .expect("failed to run future");
393        unsafe {
394            super::handler(CTRL_BREAK_EVENT);
395        }
396
397        rt.block_on(with_timeout(event_ctrl_break.into_future()))
398            .ok()
399            .expect("failed to run event");
400    }
401}