1use std::cell::RefCell;
8use std::fmt;
9use std::io;
10use std::rc::{Rc, Weak};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicUsize, AtomicBool, ATOMIC_USIZE_INIT, Ordering};
13use std::time::{Instant, Duration};
14
15use tokio;
16use tokio::executor::current_thread::{CurrentThread, TaskExecutor};
17use tokio_executor;
18use tokio_executor::park::{Park, Unpark, ParkThread, UnparkThread};
19use tokio_timer::timer::{self, Timer};
20
21use futures::{Future, IntoFuture, Async};
22use futures::future::{self, Executor, ExecuteError};
23use futures::executor::{self, Spawn, Notify};
24use futures::sync::mpsc;
25use mio;
26
27mod poll_evented;
28mod poll_evented2;
29mod timeout;
30mod interval;
31pub use self::poll_evented::PollEvented;
32pub(crate) use self::poll_evented2::PollEvented as PollEvented2;
33pub use self::timeout::Timeout;
34pub use self::interval::Interval;
35
36static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
37scoped_thread_local!(static CURRENT_LOOP: Core);
38
39pub struct Core {
47 id: usize,
49
50 rt: tokio::runtime::Runtime,
52
53 executor: RefCell<CurrentThread<Timer<ParkThread>>>,
55
56 timer_handle: timer::Handle,
58
59 notify_future: Arc<MyNotify>,
61
62 notify_rx: Arc<MyNotify>,
64
65 tx: mpsc::UnboundedSender<Message>,
67
68 rx: RefCell<Spawn<mpsc::UnboundedReceiver<Message>>>,
70
71 inner: Rc<RefCell<Inner>>,
73}
74
75struct Inner {
76 pending_spawn: Vec<Box<Future<Item = (), Error = ()>>>,
78}
79
80#[derive(Clone,Copy,Eq,PartialEq,Hash,Debug)]
87pub struct CoreId(usize);
88
89#[derive(Clone)]
95pub struct Remote {
96 id: usize,
97 tx: mpsc::UnboundedSender<Message>,
98 new_handle: tokio::reactor::Handle,
99 timer_handle: timer::Handle,
100}
101
102#[derive(Clone)]
105pub struct Handle {
106 remote: Remote,
107 inner: Weak<RefCell<Inner>>,
108 thread_pool: ::tokio::runtime::TaskExecutor,
109}
110
111enum Message {
112 Run(Box<FnBox>),
113}
114
115impl Core {
118 pub fn new() -> io::Result<Core> {
121 let timer = Timer::new(ParkThread::new());
123
124 let notify_future = Arc::new(MyNotify::new(timer.unpark()));
126 let notify_rx = Arc::new(MyNotify::new(timer.unpark()));
127
128 let rt = tokio::runtime::Runtime::new()?;
130
131 let timer_handle = timer.handle();
132
133 let executor = RefCell::new(CurrentThread::new_with_park(timer));
135
136 let (tx, rx) = mpsc::unbounded();
138
139 let rx = RefCell::new(executor::spawn(rx));
141
142 let id = NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed);
143
144 Ok(Core {
145 id,
146 rt,
147 notify_future,
148 notify_rx,
149 tx,
150 rx,
151 executor,
152 timer_handle,
153 inner: Rc::new(RefCell::new(Inner {
154 pending_spawn: vec![],
155 })),
156 })
157 }
158
159 pub fn handle(&self) -> Handle {
166 Handle {
167 remote: self.remote(),
168 inner: Rc::downgrade(&self.inner),
169 thread_pool: self.rt.executor().clone(),
170 }
171 }
172
173 pub fn runtime(&self) -> &tokio::runtime::Runtime {
177 &self.rt
178 }
179
180 pub fn remote(&self) -> Remote {
183 Remote {
184 id: self.id,
185 tx: self.tx.clone(),
186 new_handle: self.rt.reactor().clone(),
187 timer_handle: self.timer_handle.clone()
188 }
189 }
190
191 pub fn run<F>(&mut self, f: F) -> Result<F::Item, F::Error>
211 where F: Future,
212 {
213 let mut task = executor::spawn(f);
214 let handle1 = self.rt.reactor().clone();
215 let handle2 = self.rt.reactor().clone();
216 let mut executor1 = self.rt.executor().clone();
217 let mut executor2 = self.rt.executor().clone();
218 let timer_handle = self.timer_handle.clone();
219
220 self.notify_future.notify(0);
222
223 loop {
224 if self.notify_future.take() {
225 let mut enter = tokio_executor::enter()
226 .ok().expect("cannot recursively call into `Core`");
227
228 let notify = &self.notify_future;
229 let mut current_thread = self.executor.borrow_mut();
230
231 let res = try!(CURRENT_LOOP.set(self, || {
232 ::tokio_reactor::with_default(&handle1, &mut enter, |enter| {
233 tokio_executor::with_default(&mut executor1, enter, |enter| {
234 timer::with_default(&timer_handle, enter, |enter| {
235 current_thread.enter(enter)
236 .block_on(future::lazy(|| {
237 Ok::<_, ()>(task.poll_future_notify(notify, 0))
238 })).unwrap()
239 })
240 })
241 })
242 }));
243
244 if let Async::Ready(e) = res {
245 return Ok(e)
246 }
247 }
248
249 self.poll(None, &handle2, &mut executor2);
250 }
251 }
252
253 pub fn turn(&mut self, max_wait: Option<Duration>) {
262 let handle = self.rt.reactor().clone();
263 let mut executor = self.rt.executor().clone();
264 self.poll(max_wait, &handle, &mut executor);
265 }
266
267 fn poll(&mut self, max_wait: Option<Duration>,
268 handle: &tokio::reactor::Handle,
269 sender: &mut tokio::runtime::TaskExecutor) {
270 let mut enter = tokio_executor::enter()
271 .ok().expect("cannot recursively call into `Core`");
272 let timer_handle = self.timer_handle.clone();
273
274 ::tokio_reactor::with_default(handle, &mut enter, |enter| {
275 tokio_executor::with_default(sender, enter, |enter| {
276 timer::with_default(&timer_handle, enter, |enter| {
277 let start = Instant::now();
278
279 if self.notify_rx.take() {
281 CURRENT_LOOP.set(self, || self.consume_queue());
282 }
283
284 {
286 let mut e = self.executor.borrow_mut();
287 let mut i = self.inner.borrow_mut();
288
289 for f in i.pending_spawn.drain(..) {
290 e.enter(enter).block_on(future::lazy(|| {
292 TaskExecutor::current().spawn_local(f).unwrap();
293 Ok::<_, ()>(())
294 })).unwrap();
295 }
296 }
297
298 CURRENT_LOOP.set(self, || {
299 self.executor.borrow_mut()
300 .enter(enter)
301 .turn(max_wait)
302 .ok().expect("error in `CurrentThread::turn`");
303 });
304
305 let after_poll = Instant::now();
306 debug!("loop poll - {:?}", after_poll - start);
307 debug!("loop time - {:?}", after_poll);
308
309 debug!("loop process, {:?}", after_poll.elapsed());
310 })
311 });
312 });
313 }
314
315 fn consume_queue(&self) {
316 debug!("consuming notification queue");
317 loop {
319 let msg = self.rx.borrow_mut().poll_stream_notify(&self.notify_rx, 0).unwrap();
320 match msg {
321 Async::Ready(Some(msg)) => self.notify(msg),
322 Async::NotReady |
323 Async::Ready(None) => break,
324 }
325 }
326 }
327
328 fn notify(&self, msg: Message) {
329 let Message::Run(r) = msg;
330 r.call_box(self);
331 }
332
333 pub fn id(&self) -> CoreId {
335 CoreId(self.id)
336 }
337}
338
339impl<F> Executor<F> for Core
340 where F: Future<Item = (), Error = ()> + 'static,
341{
342 fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
343 self.handle().execute(future)
344 }
345}
346
347impl fmt::Debug for Core {
348 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
349 f.debug_struct("Core")
350 .field("id", &self.id())
351 .finish()
352 }
353}
354
355impl Remote {
356 fn send(&self, msg: Message) {
357 self.with_loop(|lp| {
358 match lp {
359 Some(lp) => {
360 if lp.notify_rx.take() {
374 lp.consume_queue();
375 }
376 lp.notify(msg);
377 }
378 None => {
379 match self.tx.unbounded_send(msg) {
380 Ok(()) => {}
381
382 Err(e) => drop(e),
386 }
387 }
388 }
389 })
390 }
391
392 fn with_loop<F, R>(&self, f: F) -> R
393 where F: FnOnce(Option<&Core>) -> R
394 {
395 if CURRENT_LOOP.is_set() {
396 CURRENT_LOOP.with(|lp| {
397 let same = lp.id == self.id;
398 if same {
399 f(Some(lp))
400 } else {
401 f(None)
402 }
403 })
404 } else {
405 f(None)
406 }
407 }
408
409 pub fn spawn<F, R>(&self, f: F)
424 where F: FnOnce(&Handle) -> R + Send + 'static,
425 R: IntoFuture<Item=(), Error=()>,
426 R::Future: 'static,
427 {
428 self.send(Message::Run(Box::new(|lp: &Core| {
429 let f = f(&lp.handle());
430 lp.handle().spawn(f.into_future());
431 })));
432 }
433
434 pub fn id(&self) -> CoreId {
436 CoreId(self.id)
437 }
438
439 pub fn handle(&self) -> Option<Handle> {
452 if CURRENT_LOOP.is_set() {
453 CURRENT_LOOP.with(|lp| {
454 let same = lp.id == self.id;
455 if same {
456 Some(lp.handle())
457 } else {
458 None
459 }
460 })
461 } else {
462 None
463 }
464 }
465}
466
467impl<F> Executor<F> for Remote
468 where F: Future<Item = (), Error = ()> + Send + 'static,
469{
470 fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
471 self.spawn(|_| future);
472 Ok(())
473 }
474}
475
476impl fmt::Debug for Remote {
477 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
478 f.debug_struct("Remote")
479 .field("id", &self.id())
480 .finish()
481 }
482}
483
484impl Handle {
485 pub fn new_tokio_handle(&self) -> &::tokio::reactor::Handle {
487 &self.remote.new_handle
488 }
489
490 pub fn remote(&self) -> &Remote {
492 &self.remote
493 }
494
495 pub fn spawn<F>(&self, f: F)
503 where F: Future<Item=(), Error=()> + 'static,
504 {
505 let inner = match self.inner.upgrade() {
506 Some(inner) => inner,
507 None => {
508 return;
509 }
510 };
511
512 if let Ok(mut inner) = inner.try_borrow_mut() {
514 inner.pending_spawn.push(Box::new(f));
515 return;
516 }
517
518 let _ = TaskExecutor::current().spawn_local(Box::new(f));
521 }
522
523 pub fn spawn_send<F>(&self, f: F)
530 where F: Future<Item=(), Error=()> + Send + 'static,
531 {
532 self.thread_pool.spawn(f);
533 }
534
535 pub fn spawn_fn<F, R>(&self, f: F)
548 where F: FnOnce() -> R + 'static,
549 R: IntoFuture<Item=(), Error=()> + 'static,
550 {
551 self.spawn(future::lazy(f))
552 }
553
554 pub fn id(&self) -> CoreId {
556 self.remote.id()
557 }
558}
559
560impl<F> Executor<F> for Handle
561 where F: Future<Item = (), Error = ()> + 'static,
562{
563 fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
564 self.spawn(future);
565 Ok(())
566 }
567}
568
569impl fmt::Debug for Handle {
570 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
571 f.debug_struct("Handle")
572 .field("id", &self.id())
573 .finish()
574 }
575}
576
577struct MyNotify {
578 unpark: UnparkThread,
579 notified: AtomicBool,
580}
581
582impl MyNotify {
583 fn new(unpark: UnparkThread) -> Self {
584 MyNotify {
585 unpark,
586 notified: AtomicBool::new(true),
587 }
588 }
589
590 fn take(&self) -> bool {
591 self.notified.swap(false, Ordering::SeqCst)
592 }
593}
594
595impl Notify for MyNotify {
596 fn notify(&self, _: usize) {
597 self.notified.store(true, Ordering::SeqCst);
598 self.unpark.unpark();
599 }
600}
601
602trait FnBox: Send + 'static {
603 fn call_box(self: Box<Self>, lp: &Core);
604}
605
606impl<F: FnOnce(&Core) + Send + 'static> FnBox for F {
607 fn call_box(self: Box<Self>, lp: &Core) {
608 (*self)(lp)
609 }
610}
611
612const READ: usize = 1 << 0;
613const WRITE: usize = 1 << 1;
614
615fn ready2usize(ready: mio::Ready) -> usize {
616 let mut bits = 0;
617 if ready.is_readable() {
618 bits |= READ;
619 }
620 if ready.is_writable() {
621 bits |= WRITE;
622 }
623 bits | platform::ready2usize(ready)
624}
625
626fn usize2ready(bits: usize) -> mio::Ready {
627 let mut ready = mio::Ready::empty();
628 if bits & READ != 0 {
629 ready.insert(mio::Ready::readable());
630 }
631 if bits & WRITE != 0 {
632 ready.insert(mio::Ready::writable());
633 }
634 ready | platform::usize2ready(bits)
635}
636
637#[cfg(all(unix, not(target_os = "fuchsia")))]
638mod platform {
639 use mio::Ready;
640 use mio::unix::UnixReady;
641
642 const HUP: usize = 1 << 2;
643 const ERROR: usize = 1 << 3;
644 const AIO: usize = 1 << 4;
645
646 #[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
647 fn is_aio(ready: &Ready) -> bool {
648 UnixReady::from(*ready).is_aio()
649 }
650
651 #[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))]
652 fn is_aio(_ready: &Ready) -> bool {
653 false
654 }
655
656 pub fn ready2usize(ready: Ready) -> usize {
657 let ready = UnixReady::from(ready);
658 let mut bits = 0;
659 if is_aio(&ready) {
660 bits |= AIO;
661 }
662 if ready.is_error() {
663 bits |= ERROR;
664 }
665 if ready.is_hup() {
666 bits |= HUP;
667 }
668 bits
669 }
670
671 #[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "ios",
672 target_os = "macos"))]
673 fn usize2ready_aio(ready: &mut UnixReady) {
674 ready.insert(UnixReady::aio());
675 }
676
677 #[cfg(not(any(target_os = "dragonfly",
678 target_os = "freebsd", target_os = "ios", target_os = "macos")))]
679 fn usize2ready_aio(_ready: &mut UnixReady) {
680 }
682
683 pub fn usize2ready(bits: usize) -> Ready {
684 let mut ready = UnixReady::from(Ready::empty());
685 if bits & AIO != 0 {
686 usize2ready_aio(&mut ready);
687 }
688 if bits & HUP != 0 {
689 ready.insert(UnixReady::hup());
690 }
691 if bits & ERROR != 0 {
692 ready.insert(UnixReady::error());
693 }
694 ready.into()
695 }
696}
697
698#[cfg(any(windows, target_os = "fuchsia"))]
699mod platform {
700 use mio::Ready;
701
702 pub fn ready2usize(_r: Ready) -> usize {
703 0
704 }
705
706 pub fn usize2ready(_r: usize) -> Ready {
707 Ready::empty()
708 }
709}