futures_util/compat/
executor.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use super::{Compat, Future01CompatExt};
use crate::{
    future::{FutureExt, TryFutureExt, UnitError},
    task::SpawnExt,
};
use futures_01::future::{ExecuteError as ExecuteError01, Executor as Executor01};
use futures_01::Future as Future01;
use futures_task::{FutureObj, Spawn as Spawn03, SpawnError as SpawnError03};

/// A future that can run on a futures 0.1
/// [`Executor`](futures_01::future::Executor).
pub type Executor01Future = Compat<UnitError<FutureObj<'static, ()>>>;

/// Extension trait for futures 0.1 [`Executor`](futures_01::future::Executor).
pub trait Executor01CompatExt: Executor01<Executor01Future> + Clone + Send + 'static {
    /// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
    /// futures 0.3 [`Spawn`](futures_task::Spawn).
    ///
    /// ```
    /// # if cfg!(miri) { return; } // Miri does not support epoll
    /// use futures::task::SpawnExt;
    /// use futures::future::{FutureExt, TryFutureExt};
    /// use futures_util::compat::Executor01CompatExt;
    /// use tokio::executor::DefaultExecutor;
    ///
    /// # let (tx, rx) = futures::channel::oneshot::channel();
    ///
    /// let spawner = DefaultExecutor::current().compat();
    /// let future03 = async move {
    ///     println!("Running on the pool");
    ///     spawner.spawn(async {
    ///         println!("Spawned!");
    ///         # tx.send(42).unwrap();
    ///     }).unwrap();
    /// };
    ///
    /// let future01 = future03.unit_error().boxed().compat();
    ///
    /// tokio::run(future01);
    /// # futures::executor::block_on(rx).unwrap();
    /// ```
    fn compat(self) -> Executor01As03<Self>
    where
        Self: Sized;
}

impl<Ex> Executor01CompatExt for Ex
where
    Ex: Executor01<Executor01Future> + Clone + Send + 'static,
{
    fn compat(self) -> Executor01As03<Self> {
        Executor01As03 { executor01: self }
    }
}

/// Converts a futures 0.1 [`Executor`](futures_01::future::Executor) into a
/// futures 0.3 [`Spawn`](futures_task::Spawn).
#[derive(Debug, Clone)]
pub struct Executor01As03<Ex> {
    executor01: Ex,
}

impl<Ex> Spawn03 for Executor01As03<Ex>
where
    Ex: Executor01<Executor01Future> + Clone + Send + 'static,
{
    fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError03> {
        let future = future.unit_error().compat();

        self.executor01.execute(future).map_err(|_| SpawnError03::shutdown())
    }
}

#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
impl<Sp, Fut> Executor01<Fut> for Compat<Sp>
where
    for<'a> &'a Sp: Spawn03,
    Fut: Future01<Item = (), Error = ()> + Send + 'static,
{
    fn execute(&self, future: Fut) -> Result<(), ExecuteError01<Fut>> {
        (&self.inner)
            .spawn(future.compat().map(|_| ()))
            .expect("unable to spawn future from Compat executor");
        Ok(())
    }
}