1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
//! ipfs-embed supports different configuration of the used async executor to
//! spawn its background tasks (network, garbage collection, etc.). Those can be
//! configured with the following feature flags:
//! * `async_global`: Uses the `async-global-executor` crate (with async-std).
//!   This is the default.
//! * `tokio`: Uses a user provided tokio >= 1.0 runtime to spawn its background
//!   tasks.  Note, that
//! for this to work `ipfs-embed` needs to be executed within the context of a
//! tokio runtime. ipfs-embed won't spawn any on its own.

use futures::{Future, FutureExt};
use pin_project::pin_project;
use std::{pin::Pin, task::Poll};

#[derive(Clone)]
pub enum Executor {
    #[cfg(feature = "tokio")]
    Tokio,
    #[cfg(feature = "async_global")]
    AsyncGlobal,
}
impl Executor {
    #[allow(unreachable_code)]
    pub fn new() -> Self {
        #[cfg(feature = "async_global")]
        return Self::AsyncGlobal;

        #[cfg(feature = "tokio")]
        return Self::Tokio;
    }
    pub fn spawn<F: Future<Output = T> + Send + 'static, T: Send + 'static>(
        &self,
        future: F,
    ) -> JoinHandle<T> {
        match self {
            #[cfg(feature = "async_global")]
            Self::AsyncGlobal => {
                let task = async_global_executor::spawn(future);
                JoinHandle::AsyncGlobal(Some(task))
            }
            #[cfg(feature = "tokio")]
            Self::Tokio => {
                let task = tokio_crate::spawn(future);
                JoinHandle::Tokio(task)
            }
        }
    }

    pub fn spawn_blocking<Fun: FnOnce() -> T + Send + 'static, T: Send + 'static>(
        &self,
        f: Fun,
    ) -> JoinHandle<T> {
        match self {
            #[cfg(feature = "async_global")]
            Self::AsyncGlobal => {
                let task = async_global_executor::spawn(async_global_executor::spawn_blocking(f));
                JoinHandle::AsyncGlobal(Some(task))
            }
            #[cfg(feature = "tokio")]
            Self::Tokio => {
                let task = tokio_crate::task::spawn_blocking(f);
                JoinHandle::Tokio(task)
            }
        }
    }
}

impl Default for Executor {
    fn default() -> Self {
        Self::new()
    }
}

#[pin_project(project = EnumProj)]
pub enum JoinHandle<T> {
    #[cfg(feature = "tokio")]
    Tokio(tokio_crate::task::JoinHandle<T>),
    #[cfg(feature = "async_global")]
    AsyncGlobal(Option<async_global_executor::Task<T>>),
}

impl<T> JoinHandle<T> {
    #[allow(unused_mut)]
    pub fn detach(mut self) {
        #[cfg(feature = "async_global")]
        if let Self::AsyncGlobal(Some(t)) = self {
            t.detach()
        }
        // tokio task detaches when dropped
    }
    #[allow(unused_mut)]
    pub fn abort(mut self) {
        #[cfg(all(feature = "tokio", not(feature = "async_global")))]
        {
            let Self::Tokio(t) = self;
            t.abort()
        }
        // async-global-executor task cancels when drop
    }
}

impl<T> Future for JoinHandle<T> {
    type Output = anyhow::Result<T>;
    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        match this {
            #[cfg(feature = "tokio")]
            EnumProj::Tokio(j) => match j.poll_unpin(cx) {
                Poll::Ready(t) => {
                    use anyhow::Context;
                    Poll::Ready(t.context("tokio::task::JoinHandle Error"))
                }
                Poll::Pending => Poll::Pending,
            },
            #[cfg(feature = "async_global")]
            EnumProj::AsyncGlobal(h) => {
                if let Some(handle) = h {
                    match handle.poll_unpin(cx) {
                        Poll::Ready(r) => Poll::Ready(Ok(r)),
                        _ => Poll::Pending,
                    }
                } else {
                    Poll::Ready(Err(anyhow::anyhow!("Future detached")))
                }
            }
        }
    }
}

#[cfg(test)]
mod test {
    use crate::{Config, DefaultParams, Ipfs};
    use libp2p::identity::ed25519::Keypair;
    use tempdir::TempDir;

    #[test]
    fn should_work_with_async_global_per_default() {
        use futures::executor::block_on;
        let tmp = TempDir::new("ipfs-embed").unwrap();
        block_on(Ipfs::<DefaultParams>::new(Config::new(
            tmp.path(),
            Keypair::generate(),
        )))
        .unwrap();
    }

    #[cfg(feature = "tokio")]
    #[test]
    #[should_panic(
        expected = "no reactor running, must be called from the context of a Tokio 1.x runtime"
    )]
    fn should_panic_without_a_tokio_runtime() {
        use futures::executor::block_on;
        let tmp = TempDir::new("ipfs-embed").unwrap();
        let _ = block_on(Ipfs::<DefaultParams>::new0(
            Config::new(tmp.path(), Keypair::generate()),
            crate::Executor::Tokio,
        ));
    }

    #[cfg(feature = "tokio")]
    #[test]
    fn should_not_panic_with_a_tokio_runtime() {
        let tmp = TempDir::new("ipfs-embed").unwrap();
        let rt = tokio_crate::runtime::Builder::new_current_thread()
            .build()
            .unwrap();
        rt.block_on(Ipfs::<DefaultParams>::new0(
            Config::new(tmp.path(), Keypair::generate()),
            crate::Executor::Tokio,
        ))
        .unwrap();
    }
}