tracing_futures/
lib.rs

1//! Futures compatibility for [`tracing`].
2//!
3//! # Overview
4//!
5//! [`tracing`] is a framework for instrumenting Rust programs to collect
6//! structured, event-based diagnostic information. This crate provides utilities
7//! for using `tracing` to instrument asynchronous code written using futures and
8//! async/await.
9//!
10//! The crate provides the following traits:
11//!
12//! * [`Instrument`] allows a `tracing` [span] to be attached to a future, sink,
13//!   stream, or executor.
14//!
15//! * [`WithSubscriber`] allows a `tracing` [`Subscriber`] to be attached to a
16//!   future, sink, stream, or executor.
17//!
18//! *Compiler support: [requires `rustc` 1.42+][msrv]*
19//!
20//! [msrv]: #supported-rust-versions
21//!
22//! # Feature flags
23//!
24//! This crate provides a number of feature flags that enable compatibility
25//! features with other crates in the asynchronous ecosystem:
26//!
27//! - `tokio`: Enables compatibility with the `tokio` crate, including
28//!    [`Instrument`] and [`WithSubscriber`] implementations for
29//!    `tokio::executor::Executor`, `tokio::runtime::Runtime`, and
30//!    `tokio::runtime::current_thread`. Enabled by default.
31//! - `tokio-executor`: Enables compatibility with the `tokio-executor`
32//!    crate, including [`Instrument`] and [`WithSubscriber`]
33//!    implementations for types implementing `tokio_executor::Executor`.
34//!    This is intended primarily for use in crates which depend on
35//!    `tokio-executor` rather than `tokio`; in general the `tokio` feature
36//!    should be used instead.
37//! - `std-future`: Enables compatibility with `std::future::Future`.
38//! - `futures-01`: Enables compatibility with version 0.1.x of the [`futures`]
39//!   crate.
40//! - `futures-03`: Enables compatibility with version 0.3.x of the `futures`
41//!   crate's `Spawn` and `LocalSpawn` traits.
42//! - `tokio-alpha`: Enables compatibility with `tokio` 0.2's alpha releases,
43//!   including the `tokio` 0.2 `Executor` and `TypedExecutor` traits.
44//! - `std`: Depend on the Rust standard library.
45//!
46//!   `no_std` users may disable this feature with `default-features = false`:
47//!
48//!   ```toml
49//!   [dependencies]
50//!   tracing-futures = { version = "0.2.5", default-features = false }
51//!   ```
52//!
53//! The `tokio`, `std-future` and `std` features are enabled by default.
54//!
55//! [`tracing`]: https://crates.io/crates/tracing
56//! [span]: https://docs.rs/tracing/latest/tracing/span/index.html
57//! [`Subscriber`]: https://docs.rs/tracing/latest/tracing/subscriber/index.html
58//! [`Instrument`]: trait.Instrument.html
59//! [`WithSubscriber`]: trait.WithSubscriber.html
60//! [`futures`]: https://crates.io/crates/futures
61//!
62//! ## Supported Rust Versions
63//!
64//! Tracing is built against the latest stable release. The minimum supported
65//! version is 1.42. The current Tracing version is not guaranteed to build on
66//! Rust versions earlier than the minimum supported version.
67//!
68//! Tracing follows the same compiler support policies as the rest of the Tokio
69//! project. The current stable Rust compiler and the three most recent minor
70//! versions before it will always be supported. For example, if the current
71//! stable compiler version is 1.45, the minimum supported version will not be
72//! increased past 1.42, three minor versions prior. Increasing the minimum
73//! supported compiler version is not considered a semver breaking change as
74//! long as doing so complies with this policy.
75//!
76#![doc(html_root_url = "https://docs.rs/tracing-futures/0.2.5")]
77#![doc(
78    html_logo_url = "https://raw.githubusercontent.com/tokio-rs/tracing/master/assets/logo-type.png",
79    issue_tracker_base_url = "https://github.com/tokio-rs/tracing/issues/"
80)]
81#![warn(
82    missing_debug_implementations,
83    missing_docs,
84    rust_2018_idioms,
85    unreachable_pub,
86    bad_style,
87    const_err,
88    dead_code,
89    improper_ctypes,
90    non_shorthand_field_patterns,
91    no_mangle_generic_items,
92    overflowing_literals,
93    path_statements,
94    patterns_in_fns_without_body,
95    private_in_public,
96    unconditional_recursion,
97    unused,
98    unused_allocation,
99    unused_comparisons,
100    unused_parens,
101    while_true
102)]
103#![cfg_attr(not(feature = "std"), no_std)]
104#![cfg_attr(docsrs, feature(doc_cfg), deny(broken_intra_doc_links))]
105#[cfg(feature = "std-future")]
106use pin_project::pin_project;
107
108pub(crate) mod stdlib;
109
110#[cfg(feature = "std-future")]
111use crate::stdlib::{pin::Pin, task::Context};
112
113#[cfg(feature = "std")]
114use tracing::{dispatcher, Dispatch};
115
116use tracing::Span;
117
118/// Implementations for `Instrument`ed future executors.
119pub mod executor;
120
121/// Extension trait allowing futures, streams, sinks, and executors to be
122/// instrumented with a `tracing` [span].
123///
124/// [span]: https://docs.rs/tracing/latest/tracing/span/index.html
125pub trait Instrument: Sized {
126    /// Instruments this type with the provided `Span`, returning an
127    /// `Instrumented` wrapper.
128    ///
129    /// If the instrumented type is a future, stream, or sink, the attached `Span`
130    /// will be [entered] every time it is polled. If the instrumented type
131    /// is a future executor, every future spawned on that executor will be
132    /// instrumented by the attached `Span`.
133    ///
134    /// # Examples
135    ///
136    /// Instrumenting a future:
137    ///
138    // TODO: ignored until async-await is stable...
139    /// ```rust,ignore
140    /// use tracing_futures::Instrument;
141    ///
142    /// # async fn doc() {
143    /// let my_future = async {
144    ///     // ...
145    /// };
146    ///
147    /// my_future
148    ///     .instrument(tracing::info_span!("my_future"))
149    ///     .await
150    /// # }
151    /// ```
152    ///
153    /// [entered]: https://docs.rs/tracing/latest/tracing/span/struct.Span.html#method.enter
154    fn instrument(self, span: Span) -> Instrumented<Self> {
155        Instrumented { inner: self, span }
156    }
157
158    /// Instruments this type with the [current] `Span`, returning an
159    /// `Instrumented` wrapper.
160    ///
161    /// If the instrumented type is a future, stream, or sink, the attached `Span`
162    /// will be [entered] every time it is polled. If the instrumented type
163    /// is a future executor, every future spawned on that executor will be
164    /// instrumented by the attached `Span`.
165    ///
166    /// This can be used to propagate the current span when spawning a new future.
167    ///
168    /// # Examples
169    ///
170    // TODO: ignored until async-await is stable...
171    /// ```rust,ignore
172    /// use tracing_futures::Instrument;
173    ///
174    /// # async fn doc() {
175    /// let span = tracing::info_span!("my_span");
176    /// let _enter = span.enter();
177    ///
178    /// // ...
179    ///
180    /// let future = async {
181    ///     tracing::debug!("this event will occur inside `my_span`");
182    ///     // ...
183    /// };
184    /// tokio::spawn(future.in_current_span());
185    /// # }
186    /// ```
187    ///
188    /// [current]: https://docs.rs/tracing/latest/tracing/span/struct.Span.html#method.current
189    /// [entered]: https://docs.rs/tracing/latest/tracing/span/struct.Span.html#method.enter
190    #[inline]
191    fn in_current_span(self) -> Instrumented<Self> {
192        self.instrument(Span::current())
193    }
194}
195
196/// Extension trait allowing futures, streams, and sinks to be instrumented with
197/// a `tracing` [`Subscriber`].
198///
199/// [`Subscriber`]: https://docs.rs/tracing/latest/tracing/subscriber/trait.Subscriber.html
200#[cfg(feature = "std")]
201#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
202pub trait WithSubscriber: Sized {
203    /// Attaches the provided [`Subscriber`] to this type, returning a
204    /// `WithDispatch` wrapper.
205    ///
206    /// When the wrapped type is a future, stream, or sink, the attached
207    /// subscriber will be set as the [default] while it is being polled.
208    /// When the wrapped type is an executor, the subscriber will be set as the
209    /// default for any futures spawned on that executor.
210    ///
211    /// [`Subscriber`]: https://docs.rs/tracing/latest/tracing/subscriber/trait.Subscriber.html
212    /// [default]: https://docs.rs/tracing/latest/tracing/dispatcher/index.html#setting-the-default-subscriber
213    fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
214    where
215        S: Into<Dispatch>,
216    {
217        WithDispatch {
218            inner: self,
219            dispatch: subscriber.into(),
220        }
221    }
222
223    /// Attaches the current [default] [`Subscriber`] to this type, returning a
224    /// `WithDispatch` wrapper.
225    ///
226    /// When the wrapped type is a future, stream, or sink, the attached
227    /// subscriber will be set as the [default] while it is being polled.
228    /// When the wrapped type is an executor, the subscriber will be set as the
229    /// default for any futures spawned on that executor.
230    ///
231    /// This can be used to propagate the current dispatcher context when
232    /// spawning a new future.
233    ///
234    /// [`Subscriber`]: https://docs.rs/tracing/latest/tracing/subscriber/trait.Subscriber.html
235    /// [default]: https://docs.rs/tracing/latest/tracing/dispatcher/index.html#setting-the-default-subscriber
236    #[inline]
237    fn with_current_subscriber(self) -> WithDispatch<Self> {
238        WithDispatch {
239            inner: self,
240            dispatch: dispatcher::get_default(|default| default.clone()),
241        }
242    }
243}
244
245/// A future, stream, sink, or executor that has been instrumented with a `tracing` span.
246#[cfg_attr(feature = "std-future", pin_project)]
247#[derive(Debug, Clone)]
248pub struct Instrumented<T> {
249    #[cfg(feature = "std-future")]
250    #[pin]
251    inner: T,
252    #[cfg(not(feature = "std-future"))]
253    inner: T,
254    span: Span,
255}
256
257/// A future, stream, sink, or executor that has been instrumented with a
258/// `tracing` subscriber.
259#[cfg(feature = "std")]
260#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
261#[cfg_attr(feature = "std-future", pin_project)]
262#[derive(Clone, Debug)]
263pub struct WithDispatch<T> {
264    // cfg_attr doesn't work inside structs, apparently...
265    #[cfg(feature = "std-future")]
266    #[pin]
267    inner: T,
268    #[cfg(not(feature = "std-future"))]
269    inner: T,
270    dispatch: Dispatch,
271}
272
273impl<T: Sized> Instrument for T {}
274
275#[cfg(feature = "std-future")]
276#[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
277impl<T: crate::stdlib::future::Future> crate::stdlib::future::Future for Instrumented<T> {
278    type Output = T::Output;
279
280    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output> {
281        let this = self.project();
282        let _enter = this.span.enter();
283        this.inner.poll(cx)
284    }
285}
286
287#[cfg(feature = "futures-01")]
288#[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
289impl<T: futures_01::Future> futures_01::Future for Instrumented<T> {
290    type Item = T::Item;
291    type Error = T::Error;
292
293    fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
294        let _enter = self.span.enter();
295        self.inner.poll()
296    }
297}
298
299#[cfg(feature = "futures-01")]
300#[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
301impl<T: futures_01::Stream> futures_01::Stream for Instrumented<T> {
302    type Item = T::Item;
303    type Error = T::Error;
304
305    fn poll(&mut self) -> futures_01::Poll<Option<Self::Item>, Self::Error> {
306        let _enter = self.span.enter();
307        self.inner.poll()
308    }
309}
310
311#[cfg(feature = "futures-01")]
312#[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
313impl<T: futures_01::Sink> futures_01::Sink for Instrumented<T> {
314    type SinkItem = T::SinkItem;
315    type SinkError = T::SinkError;
316
317    fn start_send(
318        &mut self,
319        item: Self::SinkItem,
320    ) -> futures_01::StartSend<Self::SinkItem, Self::SinkError> {
321        let _enter = self.span.enter();
322        self.inner.start_send(item)
323    }
324
325    fn poll_complete(&mut self) -> futures_01::Poll<(), Self::SinkError> {
326        let _enter = self.span.enter();
327        self.inner.poll_complete()
328    }
329}
330
331#[cfg(all(feature = "futures-03", feature = "std-future"))]
332#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-03", feature = "std-future"))))]
333impl<T: futures::Stream> futures::Stream for Instrumented<T> {
334    type Item = T::Item;
335
336    fn poll_next(
337        self: Pin<&mut Self>,
338        cx: &mut Context<'_>,
339    ) -> futures::task::Poll<Option<Self::Item>> {
340        let this = self.project();
341        let _enter = this.span.enter();
342        T::poll_next(this.inner, cx)
343    }
344}
345
346#[cfg(all(feature = "futures-03", feature = "std-future"))]
347#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-03", feature = "std-future"))))]
348impl<I, T: futures::Sink<I>> futures::Sink<I> for Instrumented<T>
349where
350    T: futures::Sink<I>,
351{
352    type Error = T::Error;
353
354    fn poll_ready(
355        self: Pin<&mut Self>,
356        cx: &mut Context<'_>,
357    ) -> futures::task::Poll<Result<(), Self::Error>> {
358        let this = self.project();
359        let _enter = this.span.enter();
360        T::poll_ready(this.inner, cx)
361    }
362
363    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
364        let this = self.project();
365        let _enter = this.span.enter();
366        T::start_send(this.inner, item)
367    }
368
369    fn poll_flush(
370        self: Pin<&mut Self>,
371        cx: &mut Context<'_>,
372    ) -> futures::task::Poll<Result<(), Self::Error>> {
373        let this = self.project();
374        let _enter = this.span.enter();
375        T::poll_flush(this.inner, cx)
376    }
377
378    fn poll_close(
379        self: Pin<&mut Self>,
380        cx: &mut Context<'_>,
381    ) -> futures::task::Poll<Result<(), Self::Error>> {
382        let this = self.project();
383        let _enter = this.span.enter();
384        T::poll_close(this.inner, cx)
385    }
386}
387
388impl<T> Instrumented<T> {
389    /// Borrows the `Span` that this type is instrumented by.
390    pub fn span(&self) -> &Span {
391        &self.span
392    }
393
394    /// Mutably borrows the `Span` that this type is instrumented by.
395    pub fn span_mut(&mut self) -> &mut Span {
396        &mut self.span
397    }
398
399    /// Borrows the wrapped type.
400    pub fn inner(&self) -> &T {
401        &self.inner
402    }
403
404    /// Mutably borrows the wrapped type.
405    pub fn inner_mut(&mut self) -> &mut T {
406        &mut self.inner
407    }
408
409    /// Get a pinned reference to the wrapped type.
410    #[cfg(feature = "std-future")]
411    #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
412    pub fn inner_pin_ref(self: Pin<&Self>) -> Pin<&T> {
413        self.project_ref().inner
414    }
415
416    /// Get a pinned mutable reference to the wrapped type.
417    #[cfg(feature = "std-future")]
418    #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
419    pub fn inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
420        self.project().inner
421    }
422
423    /// Consumes the `Instrumented`, returning the wrapped type.
424    ///
425    /// Note that this drops the span.
426    pub fn into_inner(self) -> T {
427        self.inner
428    }
429}
430
431#[cfg(feature = "std")]
432impl<T: Sized> WithSubscriber for T {}
433
434#[cfg(all(feature = "futures-01", feature = "std"))]
435#[cfg_attr(docsrs, doc(cfg(all(feature = "futures-01", feature = "std"))))]
436impl<T: futures_01::Future> futures_01::Future for WithDispatch<T> {
437    type Item = T::Item;
438    type Error = T::Error;
439
440    fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
441        let inner = &mut self.inner;
442        dispatcher::with_default(&self.dispatch, || inner.poll())
443    }
444}
445
446#[cfg(all(feature = "std-future", feature = "std"))]
447#[cfg_attr(docsrs, doc(cfg(all(feature = "std-future", feature = "std"))))]
448impl<T: crate::stdlib::future::Future> crate::stdlib::future::Future for WithDispatch<T> {
449    type Output = T::Output;
450
451    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output> {
452        let this = self.project();
453        let dispatch = this.dispatch;
454        let future = this.inner;
455        dispatcher::with_default(dispatch, || future.poll(cx))
456    }
457}
458
459#[cfg(feature = "std")]
460impl<T> WithDispatch<T> {
461    /// Wrap a future, stream, sink or executor with the same subscriber as this WithDispatch.
462    pub fn with_dispatch<U>(&self, inner: U) -> WithDispatch<U> {
463        WithDispatch {
464            dispatch: self.dispatch.clone(),
465            inner,
466        }
467    }
468
469    /// Borrows the `Dispatch` that this type is instrumented by.
470    pub fn dispatch(&self) -> &Dispatch {
471        &self.dispatch
472    }
473
474    /// Get a pinned reference to the wrapped type.
475    #[cfg(feature = "std-future")]
476    #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
477    pub fn inner_pin_ref(self: Pin<&Self>) -> Pin<&T> {
478        self.project_ref().inner
479    }
480
481    /// Get a pinned mutable reference to the wrapped type.
482    #[cfg(feature = "std-future")]
483    #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
484    pub fn inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
485        self.project().inner
486    }
487
488    /// Borrows the wrapped type.
489    pub fn inner(&self) -> &T {
490        &self.inner
491    }
492
493    /// Mutably borrows the wrapped type.
494    pub fn inner_mut(&mut self) -> &mut T {
495        &mut self.inner
496    }
497
498    /// Consumes the `WithDispatch`, returning the wrapped type.
499    pub fn into_inner(self) -> T {
500        self.inner
501    }
502}
503
504#[cfg(test)]
505pub(crate) use self::support as test_support;
506// This has to have the same name as the module in `tracing`.
507#[path = "../../tracing/tests/support/mod.rs"]
508#[cfg(test)]
509#[allow(unreachable_pub)]
510pub(crate) mod support;
511
512#[cfg(test)]
513mod tests {
514    use super::{test_support::*, *};
515
516    #[cfg(feature = "futures-01")]
517    mod futures_01_tests {
518        use futures_01::{future, stream, task, Async, Future, Stream};
519        use tracing::subscriber::with_default;
520
521        use super::*;
522
523        struct PollN<T, E> {
524            and_return: Option<Result<T, E>>,
525            finish_at: usize,
526            polls: usize,
527        }
528
529        impl PollN<(), ()> {
530            fn new_ok(finish_at: usize) -> Self {
531                Self {
532                    and_return: Some(Ok(())),
533                    finish_at,
534                    polls: 0,
535                }
536            }
537
538            fn new_err(finish_at: usize) -> Self {
539                Self {
540                    and_return: Some(Err(())),
541                    finish_at,
542                    polls: 0,
543                }
544            }
545        }
546
547        impl<T, E> futures_01::Future for PollN<T, E> {
548            type Item = T;
549            type Error = E;
550            fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
551                self.polls += 1;
552                if self.polls == self.finish_at {
553                    self.and_return
554                        .take()
555                        .expect("polled after ready")
556                        .map(Async::Ready)
557                } else {
558                    task::current().notify();
559                    Ok(Async::NotReady)
560                }
561            }
562        }
563
564        #[test]
565        fn future_enter_exit_is_reasonable() {
566            let (subscriber, handle) = subscriber::mock()
567                .enter(span::mock().named("foo"))
568                .exit(span::mock().named("foo"))
569                .enter(span::mock().named("foo"))
570                .exit(span::mock().named("foo"))
571                .drop_span(span::mock().named("foo"))
572                .done()
573                .run_with_handle();
574            with_default(subscriber, || {
575                PollN::new_ok(2)
576                    .instrument(tracing::trace_span!("foo"))
577                    .wait()
578                    .unwrap();
579            });
580            handle.assert_finished();
581        }
582
583        #[test]
584        fn future_error_ends_span() {
585            let (subscriber, handle) = subscriber::mock()
586                .enter(span::mock().named("foo"))
587                .exit(span::mock().named("foo"))
588                .enter(span::mock().named("foo"))
589                .exit(span::mock().named("foo"))
590                .drop_span(span::mock().named("foo"))
591                .done()
592                .run_with_handle();
593            with_default(subscriber, || {
594                PollN::new_err(2)
595                    .instrument(tracing::trace_span!("foo"))
596                    .wait()
597                    .unwrap_err();
598            });
599
600            handle.assert_finished();
601        }
602
603        #[test]
604        fn stream_enter_exit_is_reasonable() {
605            let (subscriber, handle) = subscriber::mock()
606                .enter(span::mock().named("foo"))
607                .exit(span::mock().named("foo"))
608                .enter(span::mock().named("foo"))
609                .exit(span::mock().named("foo"))
610                .enter(span::mock().named("foo"))
611                .exit(span::mock().named("foo"))
612                .enter(span::mock().named("foo"))
613                .exit(span::mock().named("foo"))
614                .drop_span(span::mock().named("foo"))
615                .run_with_handle();
616            with_default(subscriber, || {
617                stream::iter_ok::<_, ()>(&[1, 2, 3])
618                    .instrument(tracing::trace_span!("foo"))
619                    .for_each(|_| future::ok(()))
620                    .wait()
621                    .unwrap();
622            });
623            handle.assert_finished();
624        }
625
626        #[test]
627        fn span_follows_future_onto_threadpool() {
628            let (subscriber, handle) = subscriber::mock()
629                .enter(span::mock().named("a"))
630                .enter(span::mock().named("b"))
631                .exit(span::mock().named("b"))
632                .enter(span::mock().named("b"))
633                .exit(span::mock().named("b"))
634                .drop_span(span::mock().named("b"))
635                .exit(span::mock().named("a"))
636                .drop_span(span::mock().named("a"))
637                .done()
638                .run_with_handle();
639            let mut runtime = tokio::runtime::Runtime::new().unwrap();
640            with_default(subscriber, || {
641                tracing::trace_span!("a").in_scope(|| {
642                    let future = PollN::new_ok(2)
643                        .instrument(tracing::trace_span!("b"))
644                        .map(|_| {
645                            tracing::trace_span!("c").in_scope(|| {
646                                // "c" happens _outside_ of the instrumented future's
647                                // span, so we don't expect it.
648                            })
649                        });
650                    runtime.block_on(Box::new(future)).unwrap();
651                })
652            });
653            handle.assert_finished();
654        }
655    }
656
657    #[cfg(all(feature = "futures-03", feature = "std-future"))]
658    mod futures_03_tests {
659        use futures::{future, sink, stream, FutureExt, SinkExt, StreamExt};
660        use tracing::subscriber::with_default;
661
662        use super::*;
663
664        #[test]
665        fn stream_enter_exit_is_reasonable() {
666            let (subscriber, handle) = subscriber::mock()
667                .enter(span::mock().named("foo"))
668                .exit(span::mock().named("foo"))
669                .enter(span::mock().named("foo"))
670                .exit(span::mock().named("foo"))
671                .enter(span::mock().named("foo"))
672                .exit(span::mock().named("foo"))
673                .enter(span::mock().named("foo"))
674                .exit(span::mock().named("foo"))
675                .drop_span(span::mock().named("foo"))
676                .run_with_handle();
677            with_default(subscriber, || {
678                Instrument::instrument(stream::iter(&[1, 2, 3]), tracing::trace_span!("foo"))
679                    .for_each(|_| future::ready(()))
680                    .now_or_never()
681                    .unwrap();
682            });
683            handle.assert_finished();
684        }
685
686        #[test]
687        fn sink_enter_exit_is_reasonable() {
688            let (subscriber, handle) = subscriber::mock()
689                .enter(span::mock().named("foo"))
690                .exit(span::mock().named("foo"))
691                .enter(span::mock().named("foo"))
692                .exit(span::mock().named("foo"))
693                .enter(span::mock().named("foo"))
694                .exit(span::mock().named("foo"))
695                .drop_span(span::mock().named("foo"))
696                .run_with_handle();
697            with_default(subscriber, || {
698                Instrument::instrument(sink::drain(), tracing::trace_span!("foo"))
699                    .send(1u8)
700                    .now_or_never()
701                    .unwrap()
702                    .unwrap()
703            });
704            handle.assert_finished();
705        }
706    }
707}