tokio_core/reactor/
mod.rs

1//! The core reactor driving all I/O
2//!
3//! This module contains the `Core` type which is the reactor for all I/O
4//! happening in `tokio-core`. This reactor (or event loop) is used to run
5//! futures, schedule tasks, issue I/O requests, etc.
6
7use std::cell::RefCell;
8use std::fmt;
9use std::io;
10use std::rc::{Rc, Weak};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicUsize, AtomicBool, ATOMIC_USIZE_INIT, Ordering};
13use std::time::{Instant, Duration};
14
15use tokio;
16use tokio::executor::current_thread::{CurrentThread, TaskExecutor};
17use tokio_executor;
18use tokio_executor::park::{Park, Unpark, ParkThread, UnparkThread};
19use tokio_timer::timer::{self, Timer};
20
21use futures::{Future, IntoFuture, Async};
22use futures::future::{self, Executor, ExecuteError};
23use futures::executor::{self, Spawn, Notify};
24use futures::sync::mpsc;
25use mio;
26
27mod poll_evented;
28mod poll_evented2;
29mod timeout;
30mod interval;
31pub use self::poll_evented::PollEvented;
32pub(crate) use self::poll_evented2::PollEvented as PollEvented2;
33pub use self::timeout::Timeout;
34pub use self::interval::Interval;
35
36static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
37scoped_thread_local!(static CURRENT_LOOP: Core);
38
39/// An event loop.
40///
41/// The event loop is the main source of blocking in an application which drives
42/// all other I/O events and notifications happening. Each event loop can have
43/// multiple handles pointing to it, each of which can then be used to create
44/// various I/O objects to interact with the event loop in interesting ways.
45// TODO: expand this
46pub struct Core {
47    /// Uniquely identifies the reactor
48    id: usize,
49
50    /// Handle to the Tokio runtime
51    rt: tokio::runtime::Runtime,
52
53    /// Executes tasks
54    executor: RefCell<CurrentThread<Timer<ParkThread>>>,
55
56    /// Timer handle
57    timer_handle: timer::Handle,
58
59    /// Wakes up the thread when the `run` future is notified
60    notify_future: Arc<MyNotify>,
61
62    /// Wakes up the thread when a message is posted to `rx`
63    notify_rx: Arc<MyNotify>,
64
65    /// Send messages across threads to the core
66    tx: mpsc::UnboundedSender<Message>,
67
68    /// Receive messages
69    rx: RefCell<Spawn<mpsc::UnboundedReceiver<Message>>>,
70
71    // Shared inner state
72    inner: Rc<RefCell<Inner>>,
73}
74
75struct Inner {
76    // Tasks that need to be spawned onto the executor.
77    pending_spawn: Vec<Box<Future<Item = (), Error = ()>>>,
78}
79
80/// An unique ID for a Core
81///
82/// An ID by which different cores may be distinguished. Can be compared and used as an index in
83/// a `HashMap`.
84///
85/// The ID is globally unique and never reused.
86#[derive(Clone,Copy,Eq,PartialEq,Hash,Debug)]
87pub struct CoreId(usize);
88
89/// Handle to an event loop, used to construct I/O objects, send messages, and
90/// otherwise interact indirectly with the event loop itself.
91///
92/// Handles can be cloned, and when cloned they will still refer to the
93/// same underlying event loop.
94#[derive(Clone)]
95pub struct Remote {
96    id: usize,
97    tx: mpsc::UnboundedSender<Message>,
98    new_handle: tokio::reactor::Handle,
99    timer_handle: timer::Handle,
100}
101
102/// A non-sendable handle to an event loop, typically passed into functions that
103/// create I/O objects to bind them to this event loop.
104#[derive(Clone)]
105pub struct Handle {
106    remote: Remote,
107    inner: Weak<RefCell<Inner>>,
108    thread_pool: ::tokio::runtime::TaskExecutor,
109}
110
111enum Message {
112    Run(Box<FnBox>),
113}
114
115// ===== impl Core =====
116
117impl Core {
118    /// Creates a new event loop, returning any error that happened during the
119    /// creation.
120    pub fn new() -> io::Result<Core> {
121        // Create a new parker
122        let timer = Timer::new(ParkThread::new());
123
124        // Create notifiers
125        let notify_future = Arc::new(MyNotify::new(timer.unpark()));
126        let notify_rx = Arc::new(MyNotify::new(timer.unpark()));
127
128        // New Tokio reactor + threadpool
129        let rt = tokio::runtime::Runtime::new()?;
130
131        let timer_handle = timer.handle();
132
133        // Executor to run !Send futures
134        let executor = RefCell::new(CurrentThread::new_with_park(timer));
135
136        // Used to send messages across threads
137        let (tx, rx) = mpsc::unbounded();
138
139        // Wrap the rx half with a future context and refcell
140        let rx = RefCell::new(executor::spawn(rx));
141
142        let id = NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed);
143
144        Ok(Core {
145            id,
146            rt,
147            notify_future,
148            notify_rx,
149            tx,
150            rx,
151            executor,
152            timer_handle,
153            inner: Rc::new(RefCell::new(Inner {
154                pending_spawn: vec![],
155            })),
156        })
157    }
158
159    /// Returns a handle to this event loop which cannot be sent across threads
160    /// but can be used as a proxy to the event loop itself.
161    ///
162    /// Handles are cloneable and clones always refer to the same event loop.
163    /// This handle is typically passed into functions that create I/O objects
164    /// to bind them to this event loop.
165    pub fn handle(&self) -> Handle {
166        Handle {
167            remote: self.remote(),
168            inner: Rc::downgrade(&self.inner),
169            thread_pool: self.rt.executor().clone(),
170        }
171    }
172
173    /// Returns a reference to the runtime backing the instance
174    ///
175    /// This provides access to the newer features of Tokio.
176    pub fn runtime(&self) -> &tokio::runtime::Runtime {
177        &self.rt
178    }
179
180    /// Generates a remote handle to this event loop which can be used to spawn
181    /// tasks from other threads into this event loop.
182    pub fn remote(&self) -> Remote {
183        Remote {
184            id: self.id,
185            tx: self.tx.clone(),
186            new_handle: self.rt.reactor().clone(),
187            timer_handle: self.timer_handle.clone()
188        }
189    }
190
191    /// Runs a future until completion, driving the event loop while we're
192    /// otherwise waiting for the future to complete.
193    ///
194    /// This function will begin executing the event loop and will finish once
195    /// the provided future is resolved. Note that the future argument here
196    /// crucially does not require the `'static` nor `Send` bounds. As a result
197    /// the future will be "pinned" to not only this thread but also this stack
198    /// frame.
199    ///
200    /// This function will return the value that the future resolves to once
201    /// the future has finished. If the future never resolves then this function
202    /// will never return. Any other futures spawned on this core may still be
203    /// incomplete when this function returns.
204    ///
205    /// # Panics
206    ///
207    /// This method will **not** catch panics from polling the future `f`. If
208    /// the future panics then it's the responsibility of the caller to catch
209    /// that panic and handle it as appropriate.
210    pub fn run<F>(&mut self, f: F) -> Result<F::Item, F::Error>
211        where F: Future,
212    {
213        let mut task = executor::spawn(f);
214        let handle1 = self.rt.reactor().clone();
215        let handle2 = self.rt.reactor().clone();
216        let mut executor1 = self.rt.executor().clone();
217        let mut executor2 = self.rt.executor().clone();
218        let timer_handle = self.timer_handle.clone();
219
220        // Make sure the future will run at least once on enter
221        self.notify_future.notify(0);
222
223        loop {
224            if self.notify_future.take() {
225                let mut enter = tokio_executor::enter()
226                    .ok().expect("cannot recursively call into `Core`");
227
228                let notify = &self.notify_future;
229                let mut current_thread = self.executor.borrow_mut();
230
231                let res = try!(CURRENT_LOOP.set(self, || {
232                    ::tokio_reactor::with_default(&handle1, &mut enter, |enter| {
233                        tokio_executor::with_default(&mut executor1, enter, |enter| {
234                            timer::with_default(&timer_handle, enter, |enter| {
235                                current_thread.enter(enter)
236                                    .block_on(future::lazy(|| {
237                                        Ok::<_, ()>(task.poll_future_notify(notify, 0))
238                                    })).unwrap()
239                            })
240                        })
241                    })
242                }));
243
244                if let Async::Ready(e) = res {
245                    return Ok(e)
246                }
247            }
248
249            self.poll(None, &handle2, &mut executor2);
250        }
251    }
252
253    /// Performs one iteration of the event loop, blocking on waiting for events
254    /// for at most `max_wait` (forever if `None`).
255    ///
256    /// It only makes sense to call this method if you've previously spawned
257    /// a future onto this event loop.
258    ///
259    /// `loop { core.turn(None) }` is equivalent to calling `run` with an
260    /// empty future (one that never finishes).
261    pub fn turn(&mut self, max_wait: Option<Duration>) {
262        let handle = self.rt.reactor().clone();
263        let mut executor = self.rt.executor().clone();
264        self.poll(max_wait, &handle, &mut executor);
265    }
266
267    fn poll(&mut self, max_wait: Option<Duration>,
268            handle: &tokio::reactor::Handle,
269            sender: &mut tokio::runtime::TaskExecutor) {
270        let mut enter = tokio_executor::enter()
271            .ok().expect("cannot recursively call into `Core`");
272        let timer_handle = self.timer_handle.clone();
273
274        ::tokio_reactor::with_default(handle, &mut enter, |enter| {
275            tokio_executor::with_default(sender, enter, |enter| {
276                timer::with_default(&timer_handle, enter, |enter| {
277                    let start = Instant::now();
278
279                    // Process all the events that came in, dispatching appropriately
280                    if self.notify_rx.take() {
281                        CURRENT_LOOP.set(self, || self.consume_queue());
282                    }
283
284                    // Drain any futures pending spawn
285                    {
286                        let mut e = self.executor.borrow_mut();
287                        let mut i = self.inner.borrow_mut();
288
289                        for f in i.pending_spawn.drain(..) {
290                            // Little hack
291                            e.enter(enter).block_on(future::lazy(|| {
292                                TaskExecutor::current().spawn_local(f).unwrap();
293                                Ok::<_, ()>(())
294                            })).unwrap();
295                        }
296                    }
297
298                    CURRENT_LOOP.set(self, || {
299                        self.executor.borrow_mut()
300                            .enter(enter)
301                            .turn(max_wait)
302                            .ok().expect("error in `CurrentThread::turn`");
303                    });
304
305                    let after_poll = Instant::now();
306                    debug!("loop poll - {:?}", after_poll - start);
307                    debug!("loop time - {:?}", after_poll);
308
309                    debug!("loop process, {:?}", after_poll.elapsed());
310                })
311            });
312        });
313    }
314
315    fn consume_queue(&self) {
316        debug!("consuming notification queue");
317        // TODO: can we do better than `.unwrap()` here?
318        loop {
319            let msg = self.rx.borrow_mut().poll_stream_notify(&self.notify_rx, 0).unwrap();
320            match msg {
321                Async::Ready(Some(msg)) => self.notify(msg),
322                Async::NotReady |
323                Async::Ready(None) => break,
324            }
325        }
326    }
327
328    fn notify(&self, msg: Message) {
329        let Message::Run(r) = msg;
330        r.call_box(self);
331    }
332
333    /// Get the ID of this loop
334    pub fn id(&self) -> CoreId {
335        CoreId(self.id)
336    }
337}
338
339impl<F> Executor<F> for Core
340    where F: Future<Item = (), Error = ()> + 'static,
341{
342    fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
343        self.handle().execute(future)
344    }
345}
346
347impl fmt::Debug for Core {
348    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
349        f.debug_struct("Core")
350         .field("id", &self.id())
351         .finish()
352    }
353}
354
355impl Remote {
356    fn send(&self, msg: Message) {
357        self.with_loop(|lp| {
358            match lp {
359                Some(lp) => {
360                    // We want to make sure that all messages are received in
361                    // order, so we need to consume pending messages before
362                    // delivering this message to the core. The actually
363                    // `consume_queue` function, however, can be somewhat slow
364                    // right now where receiving on a channel will acquire a
365                    // lock and block the current task.
366                    //
367                    // To speed this up check the message queue's readiness as a
368                    // sort of preflight check to see if we've actually got any
369                    // messages. This should just involve some atomics and if it
370                    // comes back false then we know for sure there are no
371                    // pending messages, so we can immediately deliver our
372                    // message.
373                    if lp.notify_rx.take() {
374                        lp.consume_queue();
375                    }
376                    lp.notify(msg);
377                }
378                None => {
379                    match self.tx.unbounded_send(msg) {
380                        Ok(()) => {}
381
382                        // TODO: this error should punt upwards and we should
383                        //       notify the caller that the message wasn't
384                        //       received. This is tokio-core#17
385                        Err(e) => drop(e),
386                    }
387                }
388            }
389        })
390    }
391
392    fn with_loop<F, R>(&self, f: F) -> R
393        where F: FnOnce(Option<&Core>) -> R
394    {
395        if CURRENT_LOOP.is_set() {
396            CURRENT_LOOP.with(|lp| {
397                let same = lp.id == self.id;
398                if same {
399                    f(Some(lp))
400                } else {
401                    f(None)
402                }
403            })
404        } else {
405            f(None)
406        }
407    }
408
409    /// Spawns a new future into the event loop this remote is associated with.
410    ///
411    /// This function takes a closure which is executed within the context of
412    /// the I/O loop itself. The future returned by the closure will be
413    /// scheduled on the event loop and run to completion.
414    ///
415    /// Note that while the closure, `F`, requires the `Send` bound as it might
416    /// cross threads, the future `R` does not.
417    ///
418    /// # Panics
419    ///
420    /// This method will **not** catch panics from polling the future `f`. If
421    /// the future panics then it's the responsibility of the caller to catch
422    /// that panic and handle it as appropriate.
423    pub fn spawn<F, R>(&self, f: F)
424        where F: FnOnce(&Handle) -> R + Send + 'static,
425              R: IntoFuture<Item=(), Error=()>,
426              R::Future: 'static,
427    {
428        self.send(Message::Run(Box::new(|lp: &Core| {
429            let f = f(&lp.handle());
430            lp.handle().spawn(f.into_future());
431        })));
432    }
433
434    /// Return the ID of the represented Core
435    pub fn id(&self) -> CoreId {
436        CoreId(self.id)
437    }
438
439    /// Attempts to "promote" this remote to a handle, if possible.
440    ///
441    /// This function is intended for structures which typically work through a
442    /// `Remote` but want to optimize runtime when the remote doesn't actually
443    /// leave the thread of the original reactor. This will attempt to return a
444    /// handle if the `Remote` is on the same thread as the event loop and the
445    /// event loop is running.
446    ///
447    /// If this `Remote` has moved to a different thread or if the event loop is
448    /// running, then `None` may be returned. If you need to guarantee access to
449    /// a `Handle`, then you can call this function and fall back to using
450    /// `spawn` above if it returns `None`.
451    pub fn handle(&self) -> Option<Handle> {
452        if CURRENT_LOOP.is_set() {
453            CURRENT_LOOP.with(|lp| {
454                let same = lp.id == self.id;
455                if same {
456                    Some(lp.handle())
457                } else {
458                    None
459                }
460            })
461        } else {
462            None
463        }
464    }
465}
466
467impl<F> Executor<F> for Remote
468    where F: Future<Item = (), Error = ()> + Send + 'static,
469{
470    fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
471        self.spawn(|_| future);
472        Ok(())
473    }
474}
475
476impl fmt::Debug for Remote {
477    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
478        f.debug_struct("Remote")
479         .field("id", &self.id())
480         .finish()
481    }
482}
483
484impl Handle {
485    /// Returns a reference to the new Tokio handle
486    pub fn new_tokio_handle(&self) -> &::tokio::reactor::Handle {
487        &self.remote.new_handle
488    }
489
490    /// Returns a reference to the underlying remote handle to the event loop.
491    pub fn remote(&self) -> &Remote {
492        &self.remote
493    }
494
495    /// Spawns a new future on the event loop this handle is associated with.
496    ///
497    /// # Panics
498    ///
499    /// This method will **not** catch panics from polling the future `f`. If
500    /// the future panics then it's the responsibility of the caller to catch
501    /// that panic and handle it as appropriate.
502    pub fn spawn<F>(&self, f: F)
503        where F: Future<Item=(), Error=()> + 'static,
504    {
505        let inner = match self.inner.upgrade() {
506            Some(inner) => inner,
507            None => {
508                return;
509            }
510        };
511
512        // Try accessing the executor directly
513        if let Ok(mut inner) = inner.try_borrow_mut() {
514            inner.pending_spawn.push(Box::new(f));
515            return;
516        }
517
518        // If that doesn't work, the executor is probably active, so spawn using
519        // the global fn.
520        let _ = TaskExecutor::current().spawn_local(Box::new(f));
521    }
522
523    /// Spawns a new future onto the threadpool
524    ///
525    /// # Panics
526    ///
527    /// This function panics if the spawn fails. Failure occurs if the executor
528    /// is currently at capacity and is unable to spawn a new future.
529    pub fn spawn_send<F>(&self, f: F)
530        where F: Future<Item=(), Error=()> + Send + 'static,
531    {
532        self.thread_pool.spawn(f);
533    }
534
535    /// Spawns a closure on this event loop.
536    ///
537    /// This function is a convenience wrapper around the `spawn` function above
538    /// for running a closure wrapped in `futures::lazy`. It will spawn the
539    /// function `f` provided onto the event loop, and continue to run the
540    /// future returned by `f` on the event loop as well.
541    ///
542    /// # Panics
543    ///
544    /// This method will **not** catch panics from polling the future `f`. If
545    /// the future panics then it's the responsibility of the caller to catch
546    /// that panic and handle it as appropriate.
547    pub fn spawn_fn<F, R>(&self, f: F)
548        where F: FnOnce() -> R + 'static,
549              R: IntoFuture<Item=(), Error=()> + 'static,
550    {
551        self.spawn(future::lazy(f))
552    }
553
554    /// Return the ID of the represented Core
555    pub fn id(&self) -> CoreId {
556        self.remote.id()
557    }
558}
559
560impl<F> Executor<F> for Handle
561    where F: Future<Item = (), Error = ()> + 'static,
562{
563    fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
564        self.spawn(future);
565        Ok(())
566    }
567}
568
569impl fmt::Debug for Handle {
570    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
571        f.debug_struct("Handle")
572         .field("id", &self.id())
573         .finish()
574    }
575}
576
577struct MyNotify {
578    unpark: UnparkThread,
579    notified: AtomicBool,
580}
581
582impl MyNotify {
583    fn new(unpark: UnparkThread) -> Self {
584        MyNotify {
585            unpark,
586            notified: AtomicBool::new(true),
587        }
588    }
589
590    fn take(&self) -> bool {
591        self.notified.swap(false, Ordering::SeqCst)
592    }
593}
594
595impl Notify for MyNotify {
596    fn notify(&self, _: usize) {
597        self.notified.store(true, Ordering::SeqCst);
598        self.unpark.unpark();
599    }
600}
601
602trait FnBox: Send + 'static {
603    fn call_box(self: Box<Self>, lp: &Core);
604}
605
606impl<F: FnOnce(&Core) + Send + 'static> FnBox for F {
607    fn call_box(self: Box<Self>, lp: &Core) {
608        (*self)(lp)
609    }
610}
611
612const READ: usize = 1 << 0;
613const WRITE: usize = 1 << 1;
614
615fn ready2usize(ready: mio::Ready) -> usize {
616    let mut bits = 0;
617    if ready.is_readable() {
618        bits |= READ;
619    }
620    if ready.is_writable() {
621        bits |= WRITE;
622    }
623    bits | platform::ready2usize(ready)
624}
625
626fn usize2ready(bits: usize) -> mio::Ready {
627    let mut ready = mio::Ready::empty();
628    if bits & READ != 0 {
629        ready.insert(mio::Ready::readable());
630    }
631    if bits & WRITE != 0 {
632        ready.insert(mio::Ready::writable());
633    }
634    ready | platform::usize2ready(bits)
635}
636
637#[cfg(all(unix, not(target_os = "fuchsia")))]
638mod platform {
639    use mio::Ready;
640    use mio::unix::UnixReady;
641
642    const HUP: usize = 1 << 2;
643    const ERROR: usize = 1 << 3;
644    const AIO: usize = 1 << 4;
645
646    #[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
647    fn is_aio(ready: &Ready) -> bool {
648        UnixReady::from(*ready).is_aio()
649    }
650
651    #[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))]
652    fn is_aio(_ready: &Ready) -> bool {
653        false
654    }
655
656    pub fn ready2usize(ready: Ready) -> usize {
657        let ready = UnixReady::from(ready);
658        let mut bits = 0;
659        if is_aio(&ready) {
660            bits |= AIO;
661        }
662        if ready.is_error() {
663            bits |= ERROR;
664        }
665        if ready.is_hup() {
666            bits |= HUP;
667        }
668        bits
669    }
670
671    #[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "ios",
672              target_os = "macos"))]
673    fn usize2ready_aio(ready: &mut UnixReady) {
674        ready.insert(UnixReady::aio());
675    }
676
677    #[cfg(not(any(target_os = "dragonfly",
678        target_os = "freebsd", target_os = "ios", target_os = "macos")))]
679    fn usize2ready_aio(_ready: &mut UnixReady) {
680        // aio not available here → empty
681    }
682
683    pub fn usize2ready(bits: usize) -> Ready {
684        let mut ready = UnixReady::from(Ready::empty());
685        if bits & AIO != 0 {
686            usize2ready_aio(&mut ready);
687        }
688        if bits & HUP != 0 {
689            ready.insert(UnixReady::hup());
690        }
691        if bits & ERROR != 0 {
692            ready.insert(UnixReady::error());
693        }
694        ready.into()
695    }
696}
697
698#[cfg(any(windows, target_os = "fuchsia"))]
699mod platform {
700    use mio::Ready;
701
702    pub fn ready2usize(_r: Ready) -> usize {
703        0
704    }
705
706    pub fn usize2ready(_r: usize) -> Ready {
707        Ready::empty()
708    }
709}