1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use core::mem;
use sync::spsc::oneshot::{channel, Receiver, RecvError};
use thread::prelude::*;

/// A future for result from another thread.
///
/// This future can be created by the instance of [`Thread`].
///
/// [`Thread`]: ../trait.Thread.html
#[must_use]
pub struct RoutineFuture<R, E> {
  rx: Receiver<R, E>,
}

impl<R, E> RoutineFuture<R, E> {
  pub(crate) fn new<T, G>(thread: &T, mut generator: G) -> Self
  where
    T: Thread,
    G: Generator<Yield = (), Return = Result<R, E>>,
    G: Send + 'static,
    R: Send + 'static,
    E: Send + 'static,
  {
    let (tx, rx) = channel();
    thread.routines().push(move || loop {
      if tx.is_canceled() {
        break;
      }
      match generator.resume() {
        Yielded(()) => {}
        Complete(complete) => {
          tx.send(complete).ok();
          break;
        }
      }
      yield;
    });
    Self { rx }
  }

  /// Gracefully close this future, preventing sending any future messages.
  #[inline(always)]
  pub fn close(&mut self) {
    self.rx.close()
  }
}

impl<R, E> Future for RoutineFuture<R, E> {
  type Item = R;
  type Error = E;

  #[inline(always)]
  fn poll(&mut self) -> Poll<R, E> {
    self.rx.poll().map_err(|err| match err {
      RecvError::Complete(err) => err,
      RecvError::Canceled => unsafe { mem::unreachable() },
    })
  }
}