1#![cfg(windows)]
9
10extern crate mio;
11extern crate winapi;
12
13use std::cell::RefCell;
14use std::io;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Once, ONCE_INIT};
17
18use self::winapi::shared::minwindef::*;
19use self::winapi::um::consoleapi::SetConsoleCtrlHandler;
20use self::winapi::um::wincon::*;
21use futures::future;
22use futures::stream::Fuse;
23use futures::sync::mpsc;
24use futures::sync::oneshot;
25use futures::{Async, Future, Poll, Stream};
26use mio::Ready;
27use tokio_reactor::{Handle, PollEvented};
28
29use IoFuture;
30
31static INIT: Once = ONCE_INIT;
32static mut GLOBAL_STATE: *mut GlobalState = 0 as *mut _;
33
34pub struct Event {
48 reg: PollEvented<MyRegistration>,
49 _finished: oneshot::Sender<()>,
50}
51
52struct GlobalState {
53 ready: mio::SetReadiness,
54 tx: mpsc::UnboundedSender<Message>,
55 ctrl_c: GlobalEventState,
56 ctrl_break: GlobalEventState,
57}
58
59struct GlobalEventState {
60 ready: AtomicBool,
61}
62
63enum Message {
64 NewEvent(DWORD, oneshot::Sender<io::Result<Event>>),
65}
66
67struct DriverTask {
68 handle: Handle,
69 reg: PollEvented<MyRegistration>,
70 rx: Fuse<mpsc::UnboundedReceiver<Message>>,
71 ctrl_c: EventState,
72 ctrl_break: EventState,
73}
74
75struct EventState {
76 tasks: Vec<(RefCell<oneshot::Receiver<()>>, mio::SetReadiness)>,
77}
78
79impl Event {
80 pub fn ctrl_c() -> IoFuture<Event> {
85 Event::ctrl_c_handle(&Handle::default())
86 }
87
88 pub fn ctrl_c_handle(handle: &Handle) -> IoFuture<Event> {
93 Event::new(CTRL_C_EVENT, handle)
94 }
95
96 pub fn ctrl_break() -> IoFuture<Event> {
101 Event::ctrl_break_handle(&Handle::default())
102 }
103
104 pub fn ctrl_break_handle(handle: &Handle) -> IoFuture<Event> {
109 Event::new(CTRL_BREAK_EVENT, handle)
110 }
111
112 fn new(signum: DWORD, handle: &Handle) -> IoFuture<Event> {
113 let handle = handle.clone();
114 let new_signal = future::poll_fn(move || {
115 let mut init = None;
116 INIT.call_once(|| {
117 init = Some(global_init(&handle));
118 });
119
120 if let Some(Err(e)) = init {
121 return Err(e);
122 }
123
124 let (tx, rx) = oneshot::channel();
125 let msg = Message::NewEvent(signum, tx);
126 let res = unsafe { (*GLOBAL_STATE).tx.clone().unbounded_send(msg) };
127 res.expect(
128 "failed to request a new signal stream, did the \
129 first event loop go away?",
130 );
131 Ok(Async::Ready(rx.then(|r| r.unwrap())))
132 });
133
134 Box::new(new_signal.flatten())
135 }
136}
137
138impl Stream for Event {
139 type Item = ();
140 type Error = io::Error;
141
142 fn poll(&mut self) -> Poll<Option<()>, io::Error> {
143 if !self.reg.poll_read_ready(Ready::readable())?.is_ready() {
144 return Ok(Async::NotReady);
145 }
146 self.reg.clear_read_ready(Ready::readable())?;
147 self.reg
148 .get_ref()
149 .readiness
150 .set_readiness(mio::Ready::empty())
151 .expect("failed to set readiness");
152 Ok(Async::Ready(Some(())))
153 }
154}
155
156fn global_init(handle: &Handle) -> io::Result<()> {
157 let reg = MyRegistration::new();
158 let ready = reg.readiness.clone();
159
160 let (tx, rx) = mpsc::unbounded();
161 let reg = try!(PollEvented::new_with_handle(reg, handle));
162
163 unsafe {
164 let state = Box::new(GlobalState {
165 ready: ready,
166 ctrl_c: GlobalEventState {
167 ready: AtomicBool::new(false),
168 },
169 ctrl_break: GlobalEventState {
170 ready: AtomicBool::new(false),
171 },
172 tx: tx,
173 });
174 GLOBAL_STATE = Box::into_raw(state);
175
176 let rc = SetConsoleCtrlHandler(Some(handler), TRUE);
177 if rc == 0 {
178 Box::from_raw(GLOBAL_STATE);
179 GLOBAL_STATE = 0 as *mut _;
180 return Err(io::Error::last_os_error());
181 }
182
183 ::tokio_executor::spawn(Box::new(DriverTask {
184 handle: handle.clone(),
185 rx: rx.fuse(),
186 reg: reg,
187 ctrl_c: EventState { tasks: Vec::new() },
188 ctrl_break: EventState { tasks: Vec::new() },
189 }));
190
191 Ok(())
192 }
193}
194
195impl Future for DriverTask {
196 type Item = ();
197 type Error = ();
198
199 fn poll(&mut self) -> Poll<(), ()> {
200 self.check_event_drops();
201 self.check_messages();
202 self.check_events().unwrap();
203
204 Ok(Async::NotReady)
206 }
207}
208
209impl DriverTask {
210 fn check_event_drops(&mut self) {
211 self.ctrl_c
212 .tasks
213 .retain(|task| !task.0.borrow_mut().poll().is_err());
214 self.ctrl_break
215 .tasks
216 .retain(|task| !task.0.borrow_mut().poll().is_err());
217 }
218
219 fn check_messages(&mut self) {
220 loop {
221 let message = match self.rx.poll().unwrap() {
223 Async::Ready(Some(e)) => e,
224 Async::Ready(None) | Async::NotReady => break,
225 };
226 let (sig, complete) = match message {
227 Message::NewEvent(sig, complete) => (sig, complete),
228 };
229
230 let event = if sig == CTRL_C_EVENT {
231 &mut self.ctrl_c
232 } else {
233 &mut self.ctrl_break
234 };
235
236 let reg = MyRegistration::new();
239 let ready = reg.readiness.clone();
240
241 let reg = match PollEvented::new_with_handle(reg, &self.handle) {
242 Ok(reg) => reg,
243 Err(e) => {
244 drop(complete.send(Err(e)));
245 continue;
246 }
247 };
248
249 let (tx, rx) = oneshot::channel();
252 drop(complete.send(Ok(Event {
253 reg: reg,
254 _finished: tx,
255 })));
256 event.tasks.push((RefCell::new(rx), ready));
257 }
258 }
259
260 fn check_events(&mut self) -> io::Result<()> {
261 if self.reg.poll_read_ready(Ready::readable())?.is_not_ready() {
262 return Ok(());
263 }
264 self.reg.clear_read_ready(Ready::readable())?;
265 self.reg
266 .get_ref()
267 .readiness
268 .set_readiness(mio::Ready::empty())
269 .expect("failed to set readiness");
270
271 if unsafe { (*GLOBAL_STATE).ctrl_c.ready.swap(false, Ordering::SeqCst) } {
272 for task in self.ctrl_c.tasks.iter() {
273 task.1.set_readiness(mio::Ready::readable()).unwrap();
274 }
275 }
276 if unsafe {
277 (*GLOBAL_STATE)
278 .ctrl_break
279 .ready
280 .swap(false, Ordering::SeqCst)
281 } {
282 for task in self.ctrl_break.tasks.iter() {
283 task.1.set_readiness(mio::Ready::readable()).unwrap();
284 }
285 }
286 Ok(())
287 }
288}
289
290unsafe extern "system" fn handler(ty: DWORD) -> BOOL {
291 let event = match ty {
292 CTRL_C_EVENT => &(*GLOBAL_STATE).ctrl_c,
293 CTRL_BREAK_EVENT => &(*GLOBAL_STATE).ctrl_break,
294 _ => return FALSE,
295 };
296 if event.ready.swap(true, Ordering::SeqCst) {
297 FALSE
298 } else {
299 drop((*GLOBAL_STATE).ready.set_readiness(mio::Ready::readable()));
300 TRUE
304 }
305}
306
307struct MyRegistration {
308 registration: mio::Registration,
309 readiness: mio::SetReadiness,
310}
311
312impl MyRegistration {
313 fn new() -> Self {
314 let (registration, readiness) = mio::Registration::new2();
315
316 Self {
317 registration,
318 readiness,
319 }
320 }
321}
322
323impl mio::Evented for MyRegistration {
324 fn register(
325 &self,
326 poll: &mio::Poll,
327 token: mio::Token,
328 events: mio::Ready,
329 opts: mio::PollOpt,
330 ) -> io::Result<()> {
331 self.registration.register(poll, token, events, opts)
332 }
333
334 fn reregister(
335 &self,
336 poll: &mio::Poll,
337 token: mio::Token,
338 events: mio::Ready,
339 opts: mio::PollOpt,
340 ) -> io::Result<()> {
341 self.registration.reregister(poll, token, events, opts)
342 }
343
344 fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
345 mio::Evented::deregister(&self.registration, poll)
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 extern crate tokio;
352
353 use self::tokio::runtime::current_thread;
354 use self::tokio::timer::Timeout;
355 use super::*;
356 use std::time::Duration;
357
358 fn with_timeout<F: Future>(future: F) -> impl Future<Item = F::Item, Error = F::Error> {
359 Timeout::new(future, Duration::from_secs(1)).map_err(|e| {
360 if e.is_timer() {
361 panic!("failed to register timer");
362 } else if e.is_elapsed() {
363 panic!("timed out")
364 } else {
365 e.into_inner().expect("missing inner error")
366 }
367 })
368 }
369
370 #[test]
371 fn ctrl_c_and_ctrl_break() {
372 let mut rt = current_thread::Runtime::new().unwrap();
375 let event_ctrl_c = rt
376 .block_on(with_timeout(Event::ctrl_c()))
377 .expect("failed to run future");
378
379 unsafe {
383 super::handler(CTRL_C_EVENT);
384 }
385
386 rt.block_on(with_timeout(event_ctrl_c.into_future()))
387 .ok()
388 .expect("failed to run event");
389
390 let event_ctrl_break = rt
391 .block_on(with_timeout(Event::ctrl_break()))
392 .expect("failed to run future");
393 unsafe {
394 super::handler(CTRL_BREAK_EVENT);
395 }
396
397 rt.block_on(with_timeout(event_ctrl_break.into_future()))
398 .ok()
399 .expect("failed to run event");
400 }
401}