1#![doc(html_root_url = "https://docs.rs/tracing-futures/0.2.5")]
77#![doc(
78 html_logo_url = "https://raw.githubusercontent.com/tokio-rs/tracing/master/assets/logo-type.png",
79 issue_tracker_base_url = "https://github.com/tokio-rs/tracing/issues/"
80)]
81#![warn(
82 missing_debug_implementations,
83 missing_docs,
84 rust_2018_idioms,
85 unreachable_pub,
86 bad_style,
87 const_err,
88 dead_code,
89 improper_ctypes,
90 non_shorthand_field_patterns,
91 no_mangle_generic_items,
92 overflowing_literals,
93 path_statements,
94 patterns_in_fns_without_body,
95 private_in_public,
96 unconditional_recursion,
97 unused,
98 unused_allocation,
99 unused_comparisons,
100 unused_parens,
101 while_true
102)]
103#![cfg_attr(not(feature = "std"), no_std)]
104#![cfg_attr(docsrs, feature(doc_cfg), deny(broken_intra_doc_links))]
105#[cfg(feature = "std-future")]
106use pin_project::pin_project;
107
108pub(crate) mod stdlib;
109
110#[cfg(feature = "std-future")]
111use crate::stdlib::{pin::Pin, task::Context};
112
113#[cfg(feature = "std")]
114use tracing::{dispatcher, Dispatch};
115
116use tracing::Span;
117
118pub mod executor;
120
121pub trait Instrument: Sized {
126 fn instrument(self, span: Span) -> Instrumented<Self> {
155 Instrumented { inner: self, span }
156 }
157
158 #[inline]
191 fn in_current_span(self) -> Instrumented<Self> {
192 self.instrument(Span::current())
193 }
194}
195
196#[cfg(feature = "std")]
201#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
202pub trait WithSubscriber: Sized {
203 fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
214 where
215 S: Into<Dispatch>,
216 {
217 WithDispatch {
218 inner: self,
219 dispatch: subscriber.into(),
220 }
221 }
222
223 #[inline]
237 fn with_current_subscriber(self) -> WithDispatch<Self> {
238 WithDispatch {
239 inner: self,
240 dispatch: dispatcher::get_default(|default| default.clone()),
241 }
242 }
243}
244
245#[cfg_attr(feature = "std-future", pin_project)]
247#[derive(Debug, Clone)]
248pub struct Instrumented<T> {
249 #[cfg(feature = "std-future")]
250 #[pin]
251 inner: T,
252 #[cfg(not(feature = "std-future"))]
253 inner: T,
254 span: Span,
255}
256
257#[cfg(feature = "std")]
260#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
261#[cfg_attr(feature = "std-future", pin_project)]
262#[derive(Clone, Debug)]
263pub struct WithDispatch<T> {
264 #[cfg(feature = "std-future")]
266 #[pin]
267 inner: T,
268 #[cfg(not(feature = "std-future"))]
269 inner: T,
270 dispatch: Dispatch,
271}
272
273impl<T: Sized> Instrument for T {}
274
275#[cfg(feature = "std-future")]
276#[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
277impl<T: crate::stdlib::future::Future> crate::stdlib::future::Future for Instrumented<T> {
278 type Output = T::Output;
279
280 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output> {
281 let this = self.project();
282 let _enter = this.span.enter();
283 this.inner.poll(cx)
284 }
285}
286
287#[cfg(feature = "futures-01")]
288#[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
289impl<T: futures_01::Future> futures_01::Future for Instrumented<T> {
290 type Item = T::Item;
291 type Error = T::Error;
292
293 fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
294 let _enter = self.span.enter();
295 self.inner.poll()
296 }
297}
298
299#[cfg(feature = "futures-01")]
300#[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
301impl<T: futures_01::Stream> futures_01::Stream for Instrumented<T> {
302 type Item = T::Item;
303 type Error = T::Error;
304
305 fn poll(&mut self) -> futures_01::Poll<Option<Self::Item>, Self::Error> {
306 let _enter = self.span.enter();
307 self.inner.poll()
308 }
309}
310
311#[cfg(feature = "futures-01")]
312#[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
313impl<T: futures_01::Sink> futures_01::Sink for Instrumented<T> {
314 type SinkItem = T::SinkItem;
315 type SinkError = T::SinkError;
316
317 fn start_send(
318 &mut self,
319 item: Self::SinkItem,
320 ) -> futures_01::StartSend<Self::SinkItem, Self::SinkError> {
321 let _enter = self.span.enter();
322 self.inner.start_send(item)
323 }
324
325 fn poll_complete(&mut self) -> futures_01::Poll<(), Self::SinkError> {
326 let _enter = self.span.enter();
327 self.inner.poll_complete()
328 }
329}
330
331#[cfg(all(feature = "futures-03", feature = "std-future"))]
332#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-03", feature = "std-future"))))]
333impl<T: futures::Stream> futures::Stream for Instrumented<T> {
334 type Item = T::Item;
335
336 fn poll_next(
337 self: Pin<&mut Self>,
338 cx: &mut Context<'_>,
339 ) -> futures::task::Poll<Option<Self::Item>> {
340 let this = self.project();
341 let _enter = this.span.enter();
342 T::poll_next(this.inner, cx)
343 }
344}
345
346#[cfg(all(feature = "futures-03", feature = "std-future"))]
347#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-03", feature = "std-future"))))]
348impl<I, T: futures::Sink<I>> futures::Sink<I> for Instrumented<T>
349where
350 T: futures::Sink<I>,
351{
352 type Error = T::Error;
353
354 fn poll_ready(
355 self: Pin<&mut Self>,
356 cx: &mut Context<'_>,
357 ) -> futures::task::Poll<Result<(), Self::Error>> {
358 let this = self.project();
359 let _enter = this.span.enter();
360 T::poll_ready(this.inner, cx)
361 }
362
363 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
364 let this = self.project();
365 let _enter = this.span.enter();
366 T::start_send(this.inner, item)
367 }
368
369 fn poll_flush(
370 self: Pin<&mut Self>,
371 cx: &mut Context<'_>,
372 ) -> futures::task::Poll<Result<(), Self::Error>> {
373 let this = self.project();
374 let _enter = this.span.enter();
375 T::poll_flush(this.inner, cx)
376 }
377
378 fn poll_close(
379 self: Pin<&mut Self>,
380 cx: &mut Context<'_>,
381 ) -> futures::task::Poll<Result<(), Self::Error>> {
382 let this = self.project();
383 let _enter = this.span.enter();
384 T::poll_close(this.inner, cx)
385 }
386}
387
388impl<T> Instrumented<T> {
389 pub fn span(&self) -> &Span {
391 &self.span
392 }
393
394 pub fn span_mut(&mut self) -> &mut Span {
396 &mut self.span
397 }
398
399 pub fn inner(&self) -> &T {
401 &self.inner
402 }
403
404 pub fn inner_mut(&mut self) -> &mut T {
406 &mut self.inner
407 }
408
409 #[cfg(feature = "std-future")]
411 #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
412 pub fn inner_pin_ref(self: Pin<&Self>) -> Pin<&T> {
413 self.project_ref().inner
414 }
415
416 #[cfg(feature = "std-future")]
418 #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
419 pub fn inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
420 self.project().inner
421 }
422
423 pub fn into_inner(self) -> T {
427 self.inner
428 }
429}
430
431#[cfg(feature = "std")]
432impl<T: Sized> WithSubscriber for T {}
433
434#[cfg(all(feature = "futures-01", feature = "std"))]
435#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-01", feature = "std"))))]
436impl<T: futures_01::Future> futures_01::Future for WithDispatch<T> {
437 type Item = T::Item;
438 type Error = T::Error;
439
440 fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
441 let inner = &mut self.inner;
442 dispatcher::with_default(&self.dispatch, || inner.poll())
443 }
444}
445
446#[cfg(all(feature = "std-future", feature = "std"))]
447#[cfg_attr(docsrs, doc(cfg(all(feature = "std-future", feature = "std"))))]
448impl<T: crate::stdlib::future::Future> crate::stdlib::future::Future for WithDispatch<T> {
449 type Output = T::Output;
450
451 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output> {
452 let this = self.project();
453 let dispatch = this.dispatch;
454 let future = this.inner;
455 dispatcher::with_default(dispatch, || future.poll(cx))
456 }
457}
458
459#[cfg(feature = "std")]
460impl<T> WithDispatch<T> {
461 pub fn with_dispatch<U>(&self, inner: U) -> WithDispatch<U> {
463 WithDispatch {
464 dispatch: self.dispatch.clone(),
465 inner,
466 }
467 }
468
469 pub fn dispatch(&self) -> &Dispatch {
471 &self.dispatch
472 }
473
474 #[cfg(feature = "std-future")]
476 #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
477 pub fn inner_pin_ref(self: Pin<&Self>) -> Pin<&T> {
478 self.project_ref().inner
479 }
480
481 #[cfg(feature = "std-future")]
483 #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
484 pub fn inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
485 self.project().inner
486 }
487
488 pub fn inner(&self) -> &T {
490 &self.inner
491 }
492
493 pub fn inner_mut(&mut self) -> &mut T {
495 &mut self.inner
496 }
497
498 pub fn into_inner(self) -> T {
500 self.inner
501 }
502}
503
504#[cfg(test)]
505pub(crate) use self::support as test_support;
506#[path = "../../tracing/tests/support/mod.rs"]
508#[cfg(test)]
509#[allow(unreachable_pub)]
510pub(crate) mod support;
511
512#[cfg(test)]
513mod tests {
514 use super::{test_support::*, *};
515
516 #[cfg(feature = "futures-01")]
517 mod futures_01_tests {
518 use futures_01::{future, stream, task, Async, Future, Stream};
519 use tracing::subscriber::with_default;
520
521 use super::*;
522
523 struct PollN<T, E> {
524 and_return: Option<Result<T, E>>,
525 finish_at: usize,
526 polls: usize,
527 }
528
529 impl PollN<(), ()> {
530 fn new_ok(finish_at: usize) -> Self {
531 Self {
532 and_return: Some(Ok(())),
533 finish_at,
534 polls: 0,
535 }
536 }
537
538 fn new_err(finish_at: usize) -> Self {
539 Self {
540 and_return: Some(Err(())),
541 finish_at,
542 polls: 0,
543 }
544 }
545 }
546
547 impl<T, E> futures_01::Future for PollN<T, E> {
548 type Item = T;
549 type Error = E;
550 fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
551 self.polls += 1;
552 if self.polls == self.finish_at {
553 self.and_return
554 .take()
555 .expect("polled after ready")
556 .map(Async::Ready)
557 } else {
558 task::current().notify();
559 Ok(Async::NotReady)
560 }
561 }
562 }
563
564 #[test]
565 fn future_enter_exit_is_reasonable() {
566 let (subscriber, handle) = subscriber::mock()
567 .enter(span::mock().named("foo"))
568 .exit(span::mock().named("foo"))
569 .enter(span::mock().named("foo"))
570 .exit(span::mock().named("foo"))
571 .drop_span(span::mock().named("foo"))
572 .done()
573 .run_with_handle();
574 with_default(subscriber, || {
575 PollN::new_ok(2)
576 .instrument(tracing::trace_span!("foo"))
577 .wait()
578 .unwrap();
579 });
580 handle.assert_finished();
581 }
582
583 #[test]
584 fn future_error_ends_span() {
585 let (subscriber, handle) = subscriber::mock()
586 .enter(span::mock().named("foo"))
587 .exit(span::mock().named("foo"))
588 .enter(span::mock().named("foo"))
589 .exit(span::mock().named("foo"))
590 .drop_span(span::mock().named("foo"))
591 .done()
592 .run_with_handle();
593 with_default(subscriber, || {
594 PollN::new_err(2)
595 .instrument(tracing::trace_span!("foo"))
596 .wait()
597 .unwrap_err();
598 });
599
600 handle.assert_finished();
601 }
602
603 #[test]
604 fn stream_enter_exit_is_reasonable() {
605 let (subscriber, handle) = subscriber::mock()
606 .enter(span::mock().named("foo"))
607 .exit(span::mock().named("foo"))
608 .enter(span::mock().named("foo"))
609 .exit(span::mock().named("foo"))
610 .enter(span::mock().named("foo"))
611 .exit(span::mock().named("foo"))
612 .enter(span::mock().named("foo"))
613 .exit(span::mock().named("foo"))
614 .drop_span(span::mock().named("foo"))
615 .run_with_handle();
616 with_default(subscriber, || {
617 stream::iter_ok::<_, ()>(&[1, 2, 3])
618 .instrument(tracing::trace_span!("foo"))
619 .for_each(|_| future::ok(()))
620 .wait()
621 .unwrap();
622 });
623 handle.assert_finished();
624 }
625
626 #[test]
627 fn span_follows_future_onto_threadpool() {
628 let (subscriber, handle) = subscriber::mock()
629 .enter(span::mock().named("a"))
630 .enter(span::mock().named("b"))
631 .exit(span::mock().named("b"))
632 .enter(span::mock().named("b"))
633 .exit(span::mock().named("b"))
634 .drop_span(span::mock().named("b"))
635 .exit(span::mock().named("a"))
636 .drop_span(span::mock().named("a"))
637 .done()
638 .run_with_handle();
639 let mut runtime = tokio::runtime::Runtime::new().unwrap();
640 with_default(subscriber, || {
641 tracing::trace_span!("a").in_scope(|| {
642 let future = PollN::new_ok(2)
643 .instrument(tracing::trace_span!("b"))
644 .map(|_| {
645 tracing::trace_span!("c").in_scope(|| {
646 })
649 });
650 runtime.block_on(Box::new(future)).unwrap();
651 })
652 });
653 handle.assert_finished();
654 }
655 }
656
657 #[cfg(all(feature = "futures-03", feature = "std-future"))]
658 mod futures_03_tests {
659 use futures::{future, sink, stream, FutureExt, SinkExt, StreamExt};
660 use tracing::subscriber::with_default;
661
662 use super::*;
663
664 #[test]
665 fn stream_enter_exit_is_reasonable() {
666 let (subscriber, handle) = subscriber::mock()
667 .enter(span::mock().named("foo"))
668 .exit(span::mock().named("foo"))
669 .enter(span::mock().named("foo"))
670 .exit(span::mock().named("foo"))
671 .enter(span::mock().named("foo"))
672 .exit(span::mock().named("foo"))
673 .enter(span::mock().named("foo"))
674 .exit(span::mock().named("foo"))
675 .drop_span(span::mock().named("foo"))
676 .run_with_handle();
677 with_default(subscriber, || {
678 Instrument::instrument(stream::iter(&[1, 2, 3]), tracing::trace_span!("foo"))
679 .for_each(|_| future::ready(()))
680 .now_or_never()
681 .unwrap();
682 });
683 handle.assert_finished();
684 }
685
686 #[test]
687 fn sink_enter_exit_is_reasonable() {
688 let (subscriber, handle) = subscriber::mock()
689 .enter(span::mock().named("foo"))
690 .exit(span::mock().named("foo"))
691 .enter(span::mock().named("foo"))
692 .exit(span::mock().named("foo"))
693 .enter(span::mock().named("foo"))
694 .exit(span::mock().named("foo"))
695 .drop_span(span::mock().named("foo"))
696 .run_with_handle();
697 with_default(subscriber, || {
698 Instrument::instrument(sink::drain(), tracing::trace_span!("foo"))
699 .send(1u8)
700 .now_or_never()
701 .unwrap()
702 .unwrap()
703 });
704 handle.assert_finished();
705 }
706 }
707}