tracing_futures/executor/
futures_01.rs

1use crate::{Instrument, Instrumented, WithDispatch};
2use futures_01::{
3    future::{ExecuteError, Executor},
4    Future,
5};
6
7macro_rules! deinstrument_err {
8    ($e:expr) => {
9        $e.map_err(|e| {
10            let kind = e.kind();
11            let future = e.into_future().inner;
12            ExecuteError::new(kind, future)
13        })
14    };
15}
16
17impl<T, F> Executor<F> for Instrumented<T>
18where
19    T: Executor<Instrumented<F>>,
20    F: Future<Item = (), Error = ()>,
21{
22    fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
23        let future = future.instrument(self.span.clone());
24        deinstrument_err!(self.inner.execute(future))
25    }
26}
27
28impl<T, F> Executor<F> for WithDispatch<T>
29where
30    T: Executor<WithDispatch<F>>,
31    F: Future<Item = (), Error = ()>,
32{
33    fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
34        let future = self.with_dispatch(future);
35        deinstrument_err!(self.inner.execute(future))
36    }
37}
38
39#[cfg(feature = "tokio")]
40#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
41pub use self::tokio::*;
42
43#[cfg(feature = "tokio")]
44mod tokio {
45    use crate::{Instrument, Instrumented, WithDispatch};
46    use futures_01::Future;
47    use tokio::{
48        executor::{Executor, SpawnError, TypedExecutor},
49        runtime::{current_thread, Runtime, TaskExecutor},
50    };
51
52    impl<T> Executor for Instrumented<T>
53    where
54        T: Executor,
55    {
56        fn spawn(
57            &mut self,
58            future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>,
59        ) -> Result<(), SpawnError> {
60            // TODO: get rid of double box somehow?
61            let future = Box::new(future.instrument(self.span.clone()));
62            self.inner.spawn(future)
63        }
64    }
65
66    impl<T, F> TypedExecutor<F> for Instrumented<T>
67    where
68        T: TypedExecutor<Instrumented<F>>,
69    {
70        fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
71            self.inner.spawn(future.instrument(self.span.clone()))
72        }
73
74        fn status(&self) -> Result<(), SpawnError> {
75            self.inner.status()
76        }
77    }
78
79    impl Instrumented<Runtime> {
80        /// Spawn an instrumented future onto the Tokio runtime.
81        ///
82        /// This spawns the given future onto the runtime's executor, usually a
83        /// thread pool. The thread pool is then responsible for polling the
84        /// future until it completes.
85        ///
86        /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`,
87        /// instrumenting the spawned future beforehand.
88        pub fn spawn<F>(&mut self, future: F) -> &mut Self
89        where
90            F: Future<Item = (), Error = ()> + Send + 'static,
91        {
92            let future = future.instrument(self.span.clone());
93            self.inner.spawn(future);
94            self
95        }
96
97        /// Run an instrumented future to completion on the Tokio runtime.
98        ///
99        /// This runs the given future on the runtime, blocking until it is
100        /// complete, and yielding its resolved result. Any tasks or timers which
101        /// the future spawns internally will be executed on the runtime.
102        ///
103        /// This method should not be called from an asynchronous context.
104        ///
105        /// This method simply wraps a call to `tokio::runtime::Runtime::block_on`,
106        /// instrumenting the spawned future beforehand.
107        ///
108        /// # Panics
109        ///
110        /// This function panics if the executor is at capacity, if the provided
111        /// future panics, or if called within an asynchronous execution context.
112        pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
113        where
114            F: Send + 'static + Future<Item = R, Error = E>,
115            R: Send + 'static,
116            E: Send + 'static,
117        {
118            let future = future.instrument(self.span.clone());
119            self.inner.block_on(future)
120        }
121
122        /// Return an instrumented handle to the runtime's executor.
123        ///
124        /// The returned handle can be used to spawn tasks that run on this runtime.
125        ///
126        /// The instrumented handle functions identically to a
127        /// `tokio::runtime::TaskExecutor`, but instruments the spawned
128        /// futures prior to spawning them.
129        pub fn executor(&self) -> Instrumented<TaskExecutor> {
130            self.inner.executor().instrument(self.span.clone())
131        }
132    }
133
134    impl Instrumented<current_thread::Runtime> {
135        /// Spawn an instrumented future onto the single-threaded Tokio runtime.
136        ///
137        /// This method simply wraps a call to `current_thread::Runtime::spawn`,
138        /// instrumenting the spawned future beforehand.
139        pub fn spawn<F>(&mut self, future: F) -> &mut Self
140        where
141            F: Future<Item = (), Error = ()> + 'static,
142        {
143            let future = future.instrument(self.span.clone());
144            self.inner.spawn(future);
145            self
146        }
147
148        /// Instruments and runs the provided future, blocking the current thread
149        /// until the future completes.
150        ///
151        /// This function can be used to synchronously block the current thread
152        /// until the provided `future` has resolved either successfully or with an
153        /// error. The result of the future is then returned from this function
154        /// call.
155        ///
156        /// Note that this function will **also** execute any spawned futures on the
157        /// current thread, but will **not** block until these other spawned futures
158        /// have completed. Once the function returns, any uncompleted futures
159        /// remain pending in the `Runtime` instance. These futures will not run
160        /// until `block_on` or `run` is called again.
161        ///
162        /// The caller is responsible for ensuring that other spawned futures
163        /// complete execution by calling `block_on` or `run`.
164        ///
165        /// This method simply wraps a call to `current_thread::Runtime::block_on`,
166        /// instrumenting the spawned future beforehand.
167        ///
168        /// # Panics
169        ///
170        /// This function panics if the executor is at capacity, if the provided
171        /// future panics, or if called within an asynchronous execution context.
172        pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
173        where
174            F: 'static + Future<Item = R, Error = E>,
175            R: 'static,
176            E: 'static,
177        {
178            let future = future.instrument(self.span.clone());
179            self.inner.block_on(future)
180        }
181
182        /// Get a new instrumented handle to spawn futures on the single-threaded
183        /// Tokio runtime
184        ///
185        /// Different to the runtime itself, the handle can be sent to different
186        /// threads.
187        ///
188        /// The instrumented handle functions identically to a
189        /// `tokio::runtime::current_thread::Handle`, but instruments the spawned
190        /// futures prior to spawning them.
191        pub fn handle(&self) -> Instrumented<current_thread::Handle> {
192            self.inner.handle().instrument(self.span.clone())
193        }
194    }
195
196    impl<T> Executor for WithDispatch<T>
197    where
198        T: Executor,
199    {
200        fn spawn(
201            &mut self,
202            future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>,
203        ) -> Result<(), SpawnError> {
204            // TODO: get rid of double box?
205            let future = Box::new(self.with_dispatch(future));
206            self.inner.spawn(future)
207        }
208    }
209
210    impl<T, F> TypedExecutor<F> for WithDispatch<T>
211    where
212        T: TypedExecutor<WithDispatch<F>>,
213    {
214        fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
215            self.inner.spawn(self.with_dispatch(future))
216        }
217
218        fn status(&self) -> Result<(), SpawnError> {
219            self.inner.status()
220        }
221    }
222
223    impl WithDispatch<Runtime> {
224        /// Spawn a future onto the Tokio runtime, in the context of this
225        /// `WithDispatch`'s trace dispatcher.
226        ///
227        /// This spawns the given future onto the runtime's executor, usually a
228        /// thread pool. The thread pool is then responsible for polling the
229        /// future until it completes.
230        ///
231        /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`,
232        /// instrumenting the spawned future beforehand.
233        pub fn spawn<F>(&mut self, future: F) -> &mut Self
234        where
235            F: Future<Item = (), Error = ()> + Send + 'static,
236        {
237            let future = self.with_dispatch(future);
238            self.inner.spawn(future);
239            self
240        }
241
242        /// Run a future to completion on the Tokio runtime, in the context of this
243        /// `WithDispatch`'s trace dispatcher.
244        ///
245        /// This runs the given future on the runtime, blocking until it is
246        /// complete, and yielding its resolved result. Any tasks or timers which
247        /// the future spawns internally will be executed on the runtime.
248        ///
249        /// This method should not be called from an asynchronous context.
250        ///
251        /// This method simply wraps a call to `tokio::runtime::Runtime::block_on`,
252        /// instrumenting the spawned future beforehand.
253        ///
254        /// # Panics
255        ///
256        /// This function panics if the executor is at capacity, if the provided
257        /// future panics, or if called within an asynchronous execution context.
258        pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
259        where
260            F: Send + 'static + Future<Item = R, Error = E>,
261            R: Send + 'static,
262            E: Send + 'static,
263        {
264            let future = self.with_dispatch(future);
265            self.inner.block_on(future)
266        }
267
268        /// Return a handle to the runtime's executor, in the context of this
269        /// `WithDispatch`'s trace dispatcher.
270        ///
271        /// The returned handle can be used to spawn tasks that run on this runtime.
272        ///
273        /// The instrumented handle functions identically to a
274        /// `tokio::runtime::TaskExecutor`, but instruments the spawned
275        /// futures prior to spawning them.
276        pub fn executor(&self) -> WithDispatch<TaskExecutor> {
277            self.with_dispatch(self.inner.executor())
278        }
279    }
280
281    impl WithDispatch<current_thread::Runtime> {
282        /// Spawn a future onto the single-threaded Tokio runtime, in the context
283        /// of this `WithDispatch`'s trace dispatcher.
284        ///
285        /// This method simply wraps a call to `current_thread::Runtime::spawn`,
286        /// instrumenting the spawned future beforehand.
287        pub fn spawn<F>(&mut self, future: F) -> &mut Self
288        where
289            F: Future<Item = (), Error = ()> + 'static,
290        {
291            let future = self.with_dispatch(future);
292            self.inner.spawn(future);
293            self
294        }
295
296        /// Runs the provided future in the context of this `WithDispatch`'s trace
297        /// dispatcher, blocking the current thread until the future completes.
298        ///
299        /// This function can be used to synchronously block the current thread
300        /// until the provided `future` has resolved either successfully or with an
301        /// error. The result of the future is then returned from this function
302        /// call.
303        ///
304        /// Note that this function will **also** execute any spawned futures on the
305        /// current thread, but will **not** block until these other spawned futures
306        /// have completed. Once the function returns, any uncompleted futures
307        /// remain pending in the `Runtime` instance. These futures will not run
308        /// until `block_on` or `run` is called again.
309        ///
310        /// The caller is responsible for ensuring that other spawned futures
311        /// complete execution by calling `block_on` or `run`.
312        ///
313        /// This method simply wraps a call to `current_thread::Runtime::block_on`,
314        /// instrumenting the spawned future beforehand.
315        ///
316        /// # Panics
317        ///
318        /// This function panics if the executor is at capacity, if the provided
319        /// future panics, or if called within an asynchronous execution context.
320        pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
321        where
322            F: 'static + Future<Item = R, Error = E>,
323            R: 'static,
324            E: 'static,
325        {
326            let future = self.with_dispatch(future);
327            self.inner.block_on(future)
328        }
329
330        /// Get a new handle to spawn futures on the single-threaded Tokio runtime,
331        /// in the context of this `WithDispatch`'s trace dispatcher.\
332        ///
333        /// Different to the runtime itself, the handle can be sent to different
334        /// threads.
335        ///
336        /// The instrumented handle functions identically to a
337        /// `tokio::runtime::current_thread::Handle`, but the spawned
338        /// futures are run in the context of the trace dispatcher.
339        pub fn handle(&self) -> WithDispatch<current_thread::Handle> {
340            self.with_dispatch(self.inner.handle())
341        }
342    }
343}