1use crate::{Instrument, Instrumented, WithDispatch};
2use futures_01::{
3 future::{ExecuteError, Executor},
4 Future,
7macro_rules! deinstrument_err {
8 ($e:expr) => {
9 $e.map_err(|e| {
10 let kind = e.kind();
11 let future = e.into_future().inner;
12 ExecuteError::new(kind, future)
13 })
14 };
17impl<T, F> Executor<F> for Instrumented<T>
19 T: Executor<Instrumented<F>>,
20 F: Future<Item = (), Error = ()>,
22 fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
23 let future = future.instrument(self.span.clone());
24 deinstrument_err!(self.inner.execute(future))
25 }
28impl<T, F> Executor<F> for WithDispatch<T>
30 T: Executor<WithDispatch<F>>,
31 F: Future<Item = (), Error = ()>,
33 fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
34 let future = self.with_dispatch(future);
35 deinstrument_err!(self.inner.execute(future))
36 }
39#[cfg(feature = "tokio")]
40#[allow(unreachable_pub)] //
41pub use self::tokio::*;
43#[cfg(feature = "tokio")]
44mod tokio {
45 use crate::{Instrument, Instrumented, WithDispatch};
46 use futures_01::Future;
47 use tokio::{
48 executor::{Executor, SpawnError, TypedExecutor},
49 runtime::{current_thread, Runtime, TaskExecutor},
50 };
52 impl<T> Executor for Instrumented<T>
53 where
54 T: Executor,
55 {
56 fn spawn(
57 &mut self,
58 future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>,
59 ) -> Result<(), SpawnError> {
60 // TODO: get rid of double box somehow?
61 let future = Box::new(future.instrument(self.span.clone()));
62 self.inner.spawn(future)
63 }
64 }
66 impl<T, F> TypedExecutor<F> for Instrumented<T>
67 where
68 T: TypedExecutor<Instrumented<F>>,
69 {
70 fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
71 self.inner.spawn(future.instrument(self.span.clone()))
72 }
74 fn status(&self) -> Result<(), SpawnError> {
75 self.inner.status()
76 }
77 }
79 impl Instrumented<Runtime> {
80 /// Spawn an instrumented future onto the Tokio runtime.
81 ///
82 /// This spawns the given future onto the runtime's executor, usually a
83 /// thread pool. The thread pool is then responsible for polling the
84 /// future until it completes.
85 ///
86 /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`,
87 /// instrumenting the spawned future beforehand.
88 pub fn spawn<F>(&mut self, future: F) -> &mut Self
89 where
90 F: Future<Item = (), Error = ()> + Send + 'static,
91 {
92 let future = future.instrument(self.span.clone());
93 self.inner.spawn(future);
94 self
95 }
97 /// Run an instrumented future to completion on the Tokio runtime.
98 ///
99 /// This runs the given future on the runtime, blocking until it is
100 /// complete, and yielding its resolved result. Any tasks or timers which
101 /// the future spawns internally will be executed on the runtime.
102 ///
103 /// This method should not be called from an asynchronous context.
104 ///
105 /// This method simply wraps a call to `tokio::runtime::Runtime::block_on`,
106 /// instrumenting the spawned future beforehand.
107 ///
108 /// # Panics
109 ///
110 /// This function panics if the executor is at capacity, if the provided
111 /// future panics, or if called within an asynchronous execution context.
112 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
113 where
114 F: Send + 'static + Future<Item = R, Error = E>,
115 R: Send + 'static,
116 E: Send + 'static,
117 {
118 let future = future.instrument(self.span.clone());
119 self.inner.block_on(future)
120 }
122 /// Return an instrumented handle to the runtime's executor.
123 ///
124 /// The returned handle can be used to spawn tasks that run on this runtime.
125 ///
126 /// The instrumented handle functions identically to a
127 /// `tokio::runtime::TaskExecutor`, but instruments the spawned
128 /// futures prior to spawning them.
129 pub fn executor(&self) -> Instrumented<TaskExecutor> {
130 self.inner.executor().instrument(self.span.clone())
131 }
132 }
134 impl Instrumented<current_thread::Runtime> {
135 /// Spawn an instrumented future onto the single-threaded Tokio runtime.
136 ///
137 /// This method simply wraps a call to `current_thread::Runtime::spawn`,
138 /// instrumenting the spawned future beforehand.
139 pub fn spawn<F>(&mut self, future: F) -> &mut Self
140 where
141 F: Future<Item = (), Error = ()> + 'static,
142 {
143 let future = future.instrument(self.span.clone());
144 self.inner.spawn(future);
145 self
146 }
148 /// Instruments and runs the provided future, blocking the current thread
149 /// until the future completes.
150 ///
151 /// This function can be used to synchronously block the current thread
152 /// until the provided `future` has resolved either successfully or with an
153 /// error. The result of the future is then returned from this function
154 /// call.
155 ///
156 /// Note that this function will **also** execute any spawned futures on the
157 /// current thread, but will **not** block until these other spawned futures
158 /// have completed. Once the function returns, any uncompleted futures
159 /// remain pending in the `Runtime` instance. These futures will not run
160 /// until `block_on` or `run` is called again.
161 ///
162 /// The caller is responsible for ensuring that other spawned futures
163 /// complete execution by calling `block_on` or `run`.
164 ///
165 /// This method simply wraps a call to `current_thread::Runtime::block_on`,
166 /// instrumenting the spawned future beforehand.
167 ///
168 /// # Panics
169 ///
170 /// This function panics if the executor is at capacity, if the provided
171 /// future panics, or if called within an asynchronous execution context.
172 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
173 where
174 F: 'static + Future<Item = R, Error = E>,
175 R: 'static,
176 E: 'static,
177 {
178 let future = future.instrument(self.span.clone());
179 self.inner.block_on(future)
180 }
182 /// Get a new instrumented handle to spawn futures on the single-threaded
183 /// Tokio runtime
184 ///
185 /// Different to the runtime itself, the handle can be sent to different
186 /// threads.
187 ///
188 /// The instrumented handle functions identically to a
189 /// `tokio::runtime::current_thread::Handle`, but instruments the spawned
190 /// futures prior to spawning them.
191 pub fn handle(&self) -> Instrumented<current_thread::Handle> {
192 self.inner.handle().instrument(self.span.clone())
193 }
194 }
196 impl<T> Executor for WithDispatch<T>
197 where
198 T: Executor,
199 {
200 fn spawn(
201 &mut self,
202 future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>,
203 ) -> Result<(), SpawnError> {
204 // TODO: get rid of double box?
205 let future = Box::new(self.with_dispatch(future));
206 self.inner.spawn(future)
207 }
208 }
210 impl<T, F> TypedExecutor<F> for WithDispatch<T>
211 where
212 T: TypedExecutor<WithDispatch<F>>,
213 {
214 fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
215 self.inner.spawn(self.with_dispatch(future))
216 }
218 fn status(&self) -> Result<(), SpawnError> {
219 self.inner.status()
220 }
221 }
223 impl WithDispatch<Runtime> {
224 /// Spawn a future onto the Tokio runtime, in the context of this
225 /// `WithDispatch`'s trace dispatcher.
226 ///
227 /// This spawns the given future onto the runtime's executor, usually a
228 /// thread pool. The thread pool is then responsible for polling the
229 /// future until it completes.
230 ///
231 /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`,
232 /// instrumenting the spawned future beforehand.
233 pub fn spawn<F>(&mut self, future: F) -> &mut Self
234 where
235 F: Future<Item = (), Error = ()> + Send + 'static,
236 {
237 let future = self.with_dispatch(future);
238 self.inner.spawn(future);
239 self
240 }
242 /// Run a future to completion on the Tokio runtime, in the context of this
243 /// `WithDispatch`'s trace dispatcher.
244 ///
245 /// This runs the given future on the runtime, blocking until it is
246 /// complete, and yielding its resolved result. Any tasks or timers which
247 /// the future spawns internally will be executed on the runtime.
248 ///
249 /// This method should not be called from an asynchronous context.
250 ///
251 /// This method simply wraps a call to `tokio::runtime::Runtime::block_on`,
252 /// instrumenting the spawned future beforehand.
253 ///
254 /// # Panics
255 ///
256 /// This function panics if the executor is at capacity, if the provided
257 /// future panics, or if called within an asynchronous execution context.
258 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
259 where
260 F: Send + 'static + Future<Item = R, Error = E>,
261 R: Send + 'static,
262 E: Send + 'static,
263 {
264 let future = self.with_dispatch(future);
265 self.inner.block_on(future)
266 }
268 /// Return a handle to the runtime's executor, in the context of this
269 /// `WithDispatch`'s trace dispatcher.
270 ///
271 /// The returned handle can be used to spawn tasks that run on this runtime.
272 ///
273 /// The instrumented handle functions identically to a
274 /// `tokio::runtime::TaskExecutor`, but instruments the spawned
275 /// futures prior to spawning them.
276 pub fn executor(&self) -> WithDispatch<TaskExecutor> {
277 self.with_dispatch(self.inner.executor())
278 }
279 }
281 impl WithDispatch<current_thread::Runtime> {
282 /// Spawn a future onto the single-threaded Tokio runtime, in the context
283 /// of this `WithDispatch`'s trace dispatcher.
284 ///
285 /// This method simply wraps a call to `current_thread::Runtime::spawn`,
286 /// instrumenting the spawned future beforehand.
287 pub fn spawn<F>(&mut self, future: F) -> &mut Self
288 where
289 F: Future<Item = (), Error = ()> + 'static,
290 {
291 let future = self.with_dispatch(future);
292 self.inner.spawn(future);
293 self
294 }
296 /// Runs the provided future in the context of this `WithDispatch`'s trace
297 /// dispatcher, blocking the current thread until the future completes.
298 ///
299 /// This function can be used to synchronously block the current thread
300 /// until the provided `future` has resolved either successfully or with an
301 /// error. The result of the future is then returned from this function
302 /// call.
303 ///
304 /// Note that this function will **also** execute any spawned futures on the
305 /// current thread, but will **not** block until these other spawned futures
306 /// have completed. Once the function returns, any uncompleted futures
307 /// remain pending in the `Runtime` instance. These futures will not run
308 /// until `block_on` or `run` is called again.
309 ///
310 /// The caller is responsible for ensuring that other spawned futures
311 /// complete execution by calling `block_on` or `run`.
312 ///
313 /// This method simply wraps a call to `current_thread::Runtime::block_on`,
314 /// instrumenting the spawned future beforehand.
315 ///
316 /// # Panics
317 ///
318 /// This function panics if the executor is at capacity, if the provided
319 /// future panics, or if called within an asynchronous execution context.
320 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
321 where
322 F: 'static + Future<Item = R, Error = E>,
323 R: 'static,
324 E: 'static,
325 {
326 let future = self.with_dispatch(future);
327 self.inner.block_on(future)
328 }
330 /// Get a new handle to spawn futures on the single-threaded Tokio runtime,
331 /// in the context of this `WithDispatch`'s trace dispatcher.\
332 ///
333 /// Different to the runtime itself, the handle can be sent to different
334 /// threads.
335 ///
336 /// The instrumented handle functions identically to a
337 /// `tokio::runtime::current_thread::Handle`, but the spawned
338 /// futures are run in the context of the trace dispatcher.
339 pub fn handle(&self) -> WithDispatch<current_thread::Handle> {
340 self.with_dispatch(self.inner.handle())
341 }
342 }