console_subscriber/
lib.rs

1#![doc = include_str!("../README.md")]
2use console_api as proto;
3use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
4use serde::Serialize;
5use std::{
6    cell::RefCell,
7    fmt,
8    net::{IpAddr, Ipv4Addr},
9    sync::{
10        atomic::{AtomicUsize, Ordering},
11        Arc,
12    },
13    time::{Duration, Instant},
14};
15use thread_local::ThreadLocal;
16#[cfg(unix)]
17use tokio::net::UnixListener;
18use tokio::sync::{mpsc, oneshot};
19#[cfg(unix)]
20use tokio_stream::wrappers::UnixListenerStream;
21use tracing_core::{
22    span::{self, Id},
23    subscriber::{self, Subscriber},
24    Metadata,
25};
26use tracing_subscriber::{
27    layer::Context,
28    registry::{Extensions, LookupSpan},
29    Layer,
30};
31
32mod aggregator;
33mod attribute;
34mod builder;
35mod callsites;
36mod record;
37mod stack;
38mod stats;
39pub(crate) mod sync;
40mod visitors;
41
42pub use aggregator::Aggregator;
43pub use builder::{Builder, ServerAddr};
44use callsites::Callsites;
45use record::Recorder;
46use stack::SpanStack;
47use visitors::{AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, WakerVisitor};
48
49pub use builder::{init, spawn};
50
51use crate::visitors::{PollOpVisitor, StateUpdateVisitor};
52
53/// A [`ConsoleLayer`] is a [`tracing_subscriber::Layer`] that records [`tracing`]
54/// spans and events emitted by the async runtime.
55///
56/// Runtimes emit [`tracing`] spans and events that represent specific operations
57/// that occur in asynchronous Rust programs, such as spawning tasks and waker
58/// operations. The `ConsoleLayer` collects and aggregates these events, and the
59/// resulting diagnostic data is exported to clients by the corresponding gRPC
60/// [`Server`] instance.
61///
62/// [`tracing`]: https://docs.rs/tracing
63pub struct ConsoleLayer {
64    current_spans: ThreadLocal<RefCell<SpanStack>>,
65    tx: mpsc::Sender<Event>,
66    shared: Arc<Shared>,
67    /// When the channel capacity goes under this number, a flush in the aggregator
68    /// will be triggered.
69    flush_under_capacity: usize,
70
71    /// Set of callsites for spans representing spawned tasks.
72    ///
73    /// For task spans, each runtime these will have like, 1-5 callsites in it, max, so
74    /// 8 should be plenty. If several runtimes are in use, we may have to spill
75    /// over into the backup hashmap, but it's unlikely.
76    spawn_callsites: Callsites<8>,
77
78    /// Set of callsites for events representing waker operations.
79    ///
80    /// 16 is probably a reasonable number of waker ops; it's a bit generous if
81    /// there's only one async runtime library in use, but if there are multiple,
82    /// they might all have their own sets of waker ops.
83    waker_callsites: Callsites<16>,
84
85    /// Set of callsites for spans representing resources
86    ///
87    /// TODO: Take some time to determine more reasonable numbers
88    resource_callsites: Callsites<32>,
89
90    /// Set of callsites for spans representing async operations on resources
91    ///
92    /// TODO: Take some time to determine more reasonable numbers
93    async_op_callsites: Callsites<32>,
94
95    /// Set of callsites for spans representing async op poll operations
96    ///
97    /// TODO: Take some time to determine more reasonable numbers
98    async_op_poll_callsites: Callsites<32>,
99
100    /// Set of callsites for events representing poll operation invocations on resources
101    ///
102    /// TODO: Take some time to determine more reasonable numbers
103    poll_op_callsites: Callsites<32>,
104
105    /// Set of callsites for events representing state attribute state updates on resources
106    ///
107    /// TODO: Take some time to determine more reasonable numbers
108    resource_state_update_callsites: Callsites<32>,
109
110    /// Set of callsites for events representing state attribute state updates on async resource ops
111    ///
112    /// TODO: Take some time to determine more reasonable numbers
113    async_op_state_update_callsites: Callsites<32>,
114
115    /// A sink to record all events to a file.
116    recorder: Option<Recorder>,
117
118    /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
119    /// timestamp that can be sent over the wire or recorded to JSON.
120    base_time: stats::TimeAnchor,
121
122    /// Maximum value for the poll time histogram.
123    ///
124    /// By default, this is one second.
125    max_poll_duration_nanos: u64,
126
127    /// Maximum value for the scheduled time histogram.
128    ///
129    /// By default, this is one second.
130    max_scheduled_duration_nanos: u64,
131}
132
133/// A gRPC [`Server`] that implements the [`tokio-console` wire format][wire].
134///
135/// Client applications, such as the [`tokio-console` CLI][cli] connect to the gRPC
136/// server, and stream data about the runtime's history (such as a list of the
137/// currently active tasks, or statistics summarizing polling times). A [`Server`] also
138/// interprets commands from a client application, such a request to focus in on
139/// a specific task, and translates that into a stream of details specific to
140/// that task.
141///
142/// [wire]: https://docs.rs/console-api
143/// [cli]: https://crates.io/crates/tokio-console
144pub struct Server {
145    subscribe: mpsc::Sender<Command>,
146    addr: ServerAddr,
147    aggregator: Option<Aggregator>,
148    client_buffer: usize,
149}
150
151pub(crate) trait ToProto {
152    type Output;
153    fn to_proto(&self, base_time: &stats::TimeAnchor) -> Self::Output;
154}
155
156/// State shared between the `ConsoleLayer` and the `Aggregator` task.
157#[derive(Debug, Default)]
158struct Shared {
159    /// Used to notify the aggregator task when the event buffer should be
160    /// flushed.
161    flush: aggregator::Flush,
162
163    /// A counter of how many task events were dropped because the event buffer
164    /// was at capacity.
165    dropped_tasks: AtomicUsize,
166
167    /// A counter of how many async op events were dropped because the event buffer
168    /// was at capacity.
169    dropped_async_ops: AtomicUsize,
170
171    /// A counter of how many resource events were dropped because the event buffer
172    /// was at capacity.
173    dropped_resources: AtomicUsize,
174}
175
176struct Watch<T>(mpsc::Sender<Result<T, tonic::Status>>);
177
178enum Command {
179    Instrument(Watch<proto::instrument::Update>),
180    WatchTaskDetail(WatchRequest<proto::tasks::TaskDetails>),
181    Pause,
182    Resume,
183}
184
185struct WatchRequest<T> {
186    id: Id,
187    stream_sender: oneshot::Sender<mpsc::Receiver<Result<T, tonic::Status>>>,
188    buffer: usize,
189}
190
191#[derive(Debug)]
192enum Event {
193    Metadata(&'static Metadata<'static>),
194    Spawn {
195        id: span::Id,
196        metadata: &'static Metadata<'static>,
197        stats: Arc<stats::TaskStats>,
198        fields: Vec<proto::Field>,
199        location: Option<proto::Location>,
200    },
201    Resource {
202        id: span::Id,
203        parent_id: Option<span::Id>,
204        metadata: &'static Metadata<'static>,
205        concrete_type: String,
206        kind: resource::Kind,
207        location: Option<proto::Location>,
208        is_internal: bool,
209        stats: Arc<stats::ResourceStats>,
210    },
211    PollOp {
212        metadata: &'static Metadata<'static>,
213        resource_id: span::Id,
214        op_name: String,
215        async_op_id: span::Id,
216        task_id: span::Id,
217        is_ready: bool,
218    },
219    AsyncResourceOp {
220        id: span::Id,
221        parent_id: Option<span::Id>,
222        resource_id: span::Id,
223        metadata: &'static Metadata<'static>,
224        source: String,
225
226        stats: Arc<stats::AsyncOpStats>,
227    },
228}
229
230#[derive(Clone, Debug, Copy, Serialize)]
231enum WakeOp {
232    Wake { self_wake: bool },
233    WakeByRef { self_wake: bool },
234    Clone,
235    Drop,
236}
237
238impl ConsoleLayer {
239    /// Returns a `ConsoleLayer` built with the default settings.
240    ///
241    /// Note: these defaults do *not* include values provided via the
242    /// environment variables specified in [`Builder::with_default_env`].
243    ///
244    /// See also [`Builder::build`].
245    pub fn new() -> (Self, Server) {
246        Self::builder().build()
247    }
248
249    /// Returns a [`Builder`] for configuring a `ConsoleLayer`.
250    ///
251    /// Note that the returned builder does *not* include values provided via
252    /// the environment variables specified in [`Builder::with_default_env`].
253    /// To extract those, you can call that method on the returned builder.
254    pub fn builder() -> Builder {
255        Builder::default()
256    }
257
258    fn build(config: Builder) -> (Self, Server) {
259        // The `cfg` value *appears* to be a constant to clippy, but it changes
260        // depending on the build-time configuration...
261        #![allow(clippy::assertions_on_constants)]
262        assert!(
263            cfg!(any(tokio_unstable, console_without_tokio_unstable)),
264            "task tracing requires Tokio to be built with RUSTFLAGS=\"--cfg tokio_unstable\"!"
265        );
266
267        let base_time = stats::TimeAnchor::new();
268        tracing::debug!(
269            config.event_buffer_capacity,
270            config.client_buffer_capacity,
271            ?config.publish_interval,
272            ?config.retention,
273            ?config.server_addr,
274            ?config.recording_path,
275            ?config.filter_env_var,
276            ?config.poll_duration_max,
277            ?config.scheduled_duration_max,
278            ?base_time,
279            "configured console subscriber"
280        );
281
282        let (tx, events) = mpsc::channel(config.event_buffer_capacity);
283        let (subscribe, rpcs) = mpsc::channel(256);
284        let shared = Arc::new(Shared::default());
285        let aggregator = Aggregator::new(events, rpcs, &config, shared.clone(), base_time.clone());
286        // Conservatively, start to trigger a flush when half the channel is full.
287        // This tries to reduce the chance of losing events to a full channel.
288        let flush_under_capacity = config.event_buffer_capacity / 2;
289        let recorder = config
290            .recording_path
291            .as_ref()
292            .map(|path| Recorder::new(path).expect("creating recorder"));
293        let server = Server {
294            aggregator: Some(aggregator),
295            addr: config.server_addr,
296            subscribe,
297            client_buffer: config.client_buffer_capacity,
298        };
299        let layer = Self {
300            current_spans: ThreadLocal::new(),
301            tx,
302            shared,
303            flush_under_capacity,
304            spawn_callsites: Callsites::default(),
305            waker_callsites: Callsites::default(),
306            resource_callsites: Callsites::default(),
307            async_op_callsites: Callsites::default(),
308            async_op_poll_callsites: Callsites::default(),
309            poll_op_callsites: Callsites::default(),
310            resource_state_update_callsites: Callsites::default(),
311            async_op_state_update_callsites: Callsites::default(),
312            recorder,
313            base_time,
314            max_poll_duration_nanos: config.poll_duration_max.as_nanos() as u64,
315            max_scheduled_duration_nanos: config.scheduled_duration_max.as_nanos() as u64,
316        };
317        (layer, server)
318    }
319}
320
321impl ConsoleLayer {
322    /// Default maximum capacity for the channel of events sent from a
323    /// [`ConsoleLayer`] to a [`Server`].
324    ///
325    /// When this capacity is exhausted, additional events will be dropped.
326    /// Decreasing this value will reduce memory usage, but may result in
327    /// events being dropped more frequently.
328    ///
329    /// See also [`Builder::event_buffer_capacity`].
330    pub const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024 * 100;
331    /// Default maximum capacity for th echannel of events sent from a
332    /// [`Server`] to each subscribed client.
333    ///
334    /// When this capacity is exhausted, the client is assumed to be inactive,
335    /// and may be disconnected.
336    ///
337    /// See also [`Builder::client_buffer_capacity`].
338    pub const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024 * 4;
339
340    /// Default frequency for publishing events to clients.
341    ///
342    /// Note that methods like [`init`][`crate::init`] and [`spawn`][`crate::spawn`] will take the value
343    /// from the `TOKIO_CONSOLE_PUBLISH_INTERVAL` [environment variable] before falling
344    /// back on this default.
345    ///
346    /// See also [`Builder::publish_interval`].
347    ///
348    /// [environment variable]: `Builder::with_default_env`
349    pub const DEFAULT_PUBLISH_INTERVAL: Duration = Duration::from_secs(1);
350
351    /// By default, completed spans are retained for one hour.
352    ///
353    /// Note that methods like [`init`][`crate::init`] and
354    /// [`spawn`][`crate::spawn`] will take the value from the
355    /// `TOKIO_CONSOLE_RETENTION` [environment variable] before falling back on
356    /// this default.
357    ///
358    /// See also [`Builder::retention`].
359    ///
360    /// [environment variable]: `Builder::with_default_env`
361    pub const DEFAULT_RETENTION: Duration = Duration::from_secs(60 * 60);
362
363    /// The default maximum value for task poll duration histograms.
364    ///
365    /// Any poll duration exceeding this will be clamped to this value. By
366    /// default, the maximum poll duration is one second.
367    ///
368    /// See also [`Builder::poll_duration_histogram_max`].
369    pub const DEFAULT_POLL_DURATION_MAX: Duration = Duration::from_secs(1);
370
371    /// The default maximum value for the task scheduled duration histogram.
372    ///
373    /// Any scheduled duration (the time from a task being woken until it is next
374    /// polled) exceeding this will be clamped to this value. By default, the
375    /// maximum scheduled duration is one second.
376    ///
377    /// See also [`Builder::scheduled_duration_histogram_max`].
378    pub const DEFAULT_SCHEDULED_DURATION_MAX: Duration = Duration::from_secs(1);
379
380    fn is_spawn(&self, meta: &'static Metadata<'static>) -> bool {
381        self.spawn_callsites.contains(meta)
382    }
383
384    fn is_resource(&self, meta: &'static Metadata<'static>) -> bool {
385        self.resource_callsites.contains(meta)
386    }
387
388    fn is_async_op(&self, meta: &'static Metadata<'static>) -> bool {
389        self.async_op_callsites.contains(meta)
390    }
391
392    fn is_id_spawned<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
393    where
394        S: Subscriber + for<'a> LookupSpan<'a>,
395    {
396        cx.span(id)
397            .map(|span| self.is_spawn(span.metadata()))
398            .unwrap_or(false)
399    }
400
401    fn is_id_resource<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
402    where
403        S: Subscriber + for<'a> LookupSpan<'a>,
404    {
405        cx.span(id)
406            .map(|span| self.is_resource(span.metadata()))
407            .unwrap_or(false)
408    }
409
410    fn is_id_async_op<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
411    where
412        S: Subscriber + for<'a> LookupSpan<'a>,
413    {
414        cx.span(id)
415            .map(|span| self.is_async_op(span.metadata()))
416            .unwrap_or(false)
417    }
418
419    fn first_entered<P>(&self, stack: &SpanStack, p: P) -> Option<span::Id>
420    where
421        P: Fn(&span::Id) -> bool,
422    {
423        stack
424            .stack()
425            .iter()
426            .rev()
427            .find(|id| p(id.id()))
428            .map(|id| id.id())
429            .cloned()
430    }
431
432    fn send_metadata(&self, dropped: &AtomicUsize, event: Event) -> bool {
433        self.send_stats(dropped, move || (event, ())).is_some()
434    }
435
436    fn send_stats<S>(
437        &self,
438        dropped: &AtomicUsize,
439        mk_event: impl FnOnce() -> (Event, S),
440    ) -> Option<S> {
441        use mpsc::error::TrySendError;
442
443        // Return whether or not we actually sent the event.
444        let sent = match self.tx.try_reserve() {
445            Ok(permit) => {
446                let (event, stats) = mk_event();
447                permit.send(event);
448                Some(stats)
449            }
450            Err(TrySendError::Closed(_)) => {
451                // we should warn here eventually, but nop for now because we
452                // can't trigger tracing events...
453                None
454            }
455            Err(TrySendError::Full(_)) => {
456                // this shouldn't happen, since we trigger a flush when
457                // approaching the high water line...but if the executor wait
458                // time is very high, maybe the aggregator task hasn't been
459                // polled yet. so... eek?!
460                dropped.fetch_add(1, Ordering::Release);
461                None
462            }
463        };
464
465        let capacity = self.tx.capacity();
466        if capacity <= self.flush_under_capacity {
467            self.shared.flush.trigger();
468        }
469
470        sent
471    }
472
473    fn record(&self, event: impl FnOnce() -> record::Event) {
474        if let Some(ref recorder) = self.recorder {
475            recorder.record(event());
476        }
477    }
478
479    fn state_update<S>(
480        &self,
481        id: &Id,
482        event: &tracing::Event<'_>,
483        ctx: &Context<'_, S>,
484        get_stats: impl for<'a> Fn(&'a Extensions) -> Option<&'a stats::ResourceStats>,
485    ) where
486        S: Subscriber + for<'a> LookupSpan<'a>,
487    {
488        let meta_id = event.metadata().into();
489        let mut state_update_visitor = StateUpdateVisitor::new(meta_id);
490        event.record(&mut state_update_visitor);
491
492        let update = match state_update_visitor.result() {
493            Some(update) => update,
494            None => return,
495        };
496
497        let span = match ctx.span(id) {
498            Some(span) => span,
499            // XXX(eliza): no span exists for a resource ID, we should maybe
500            // record an error here...
501            None => return,
502        };
503
504        let exts = span.extensions();
505        let stats = match get_stats(&exts) {
506            Some(stats) => stats,
507            // XXX(eliza): a resource span was not a resource??? this is a bug
508            None => return,
509        };
510
511        stats.update_attribute(id, &update);
512
513        if let Some(parent) = stats.parent_id.as_ref().and_then(|parent| ctx.span(parent)) {
514            let exts = parent.extensions();
515            if let Some(stats) = get_stats(&exts) {
516                if stats.inherit_child_attributes {
517                    stats.update_attribute(id, &update);
518                }
519            }
520        }
521    }
522}
523
524impl<S> Layer<S> for ConsoleLayer
525where
526    S: Subscriber + for<'a> LookupSpan<'a>,
527{
528    fn register_callsite(&self, meta: &'static Metadata<'static>) -> subscriber::Interest {
529        if !meta.is_span() && !meta.is_event() {
530            return subscriber::Interest::never();
531        }
532
533        let dropped = match (meta.name(), meta.target()) {
534            ("runtime.spawn", _) | ("task", "tokio::task") => {
535                self.spawn_callsites.insert(meta);
536                &self.shared.dropped_tasks
537            }
538            (_, "runtime::waker") | (_, "tokio::task::waker") => {
539                self.waker_callsites.insert(meta);
540                &self.shared.dropped_tasks
541            }
542            (ResourceVisitor::RES_SPAN_NAME, _) => {
543                self.resource_callsites.insert(meta);
544                &self.shared.dropped_resources
545            }
546            (AsyncOpVisitor::ASYNC_OP_SPAN_NAME, _) => {
547                self.async_op_callsites.insert(meta);
548                &self.shared.dropped_async_ops
549            }
550            ("runtime.resource.async_op.poll", _) => {
551                self.async_op_poll_callsites.insert(meta);
552                &self.shared.dropped_async_ops
553            }
554            (_, PollOpVisitor::POLL_OP_EVENT_TARGET) => {
555                self.poll_op_callsites.insert(meta);
556                &self.shared.dropped_async_ops
557            }
558            (_, StateUpdateVisitor::RE_STATE_UPDATE_EVENT_TARGET) => {
559                self.resource_state_update_callsites.insert(meta);
560                &self.shared.dropped_resources
561            }
562            (_, StateUpdateVisitor::AO_STATE_UPDATE_EVENT_TARGET) => {
563                self.async_op_state_update_callsites.insert(meta);
564                &self.shared.dropped_async_ops
565            }
566            (_, _) => &self.shared.dropped_tasks,
567        };
568
569        self.send_metadata(dropped, Event::Metadata(meta));
570        subscriber::Interest::always()
571    }
572
573    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
574        let metadata = attrs.metadata();
575        if self.is_spawn(metadata) {
576            let at = Instant::now();
577            let mut task_visitor = TaskVisitor::new(metadata.into());
578            attrs.record(&mut task_visitor);
579            let (fields, location) = task_visitor.result();
580            self.record(|| record::Event::Spawn {
581                id: id.into_u64(),
582                at: self.base_time.to_system_time(at),
583                fields: record::SerializeFields(fields.clone()),
584            });
585            if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || {
586                let stats = Arc::new(stats::TaskStats::new(
587                    self.max_poll_duration_nanos,
588                    self.max_scheduled_duration_nanos,
589                    at,
590                ));
591                let event = Event::Spawn {
592                    id: id.clone(),
593                    stats: stats.clone(),
594                    metadata,
595                    fields,
596                    location,
597                };
598                (event, stats)
599            }) {
600                ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
601            }
602            return;
603        }
604
605        if self.is_resource(metadata) {
606            let at = Instant::now();
607            let mut resource_visitor = ResourceVisitor::default();
608            attrs.record(&mut resource_visitor);
609            if let Some(result) = resource_visitor.result() {
610                let ResourceVisitorResult {
611                    concrete_type,
612                    kind,
613                    location,
614                    is_internal,
615                    inherit_child_attrs,
616                } = result;
617                let parent_id = self.current_spans.get().and_then(|stack| {
618                    self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
619                });
620                if let Some(stats) = self.send_stats(&self.shared.dropped_resources, move || {
621                    let stats = Arc::new(stats::ResourceStats::new(
622                        at,
623                        inherit_child_attrs,
624                        parent_id.clone(),
625                    ));
626                    let event = Event::Resource {
627                        id: id.clone(),
628                        parent_id,
629                        metadata,
630                        concrete_type,
631                        kind,
632                        location,
633                        is_internal,
634                        stats: stats.clone(),
635                    };
636                    (event, stats)
637                }) {
638                    ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
639                }
640            }
641            return;
642        }
643
644        if self.is_async_op(metadata) {
645            let at = Instant::now();
646            let mut async_op_visitor = AsyncOpVisitor::default();
647            attrs.record(&mut async_op_visitor);
648            if let Some((source, inherit_child_attrs)) = async_op_visitor.result() {
649                let resource_id = self.current_spans.get().and_then(|stack| {
650                    self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
651                });
652
653                let parent_id = self.current_spans.get().and_then(|stack| {
654                    self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
655                });
656
657                if let Some(resource_id) = resource_id {
658                    if let Some(stats) =
659                        self.send_stats(&self.shared.dropped_async_ops, move || {
660                            let stats = Arc::new(stats::AsyncOpStats::new(
661                                at,
662                                inherit_child_attrs,
663                                parent_id.clone(),
664                            ));
665                            let event = Event::AsyncResourceOp {
666                                id: id.clone(),
667                                parent_id,
668                                resource_id,
669                                metadata,
670                                source,
671                                stats: stats.clone(),
672                            };
673                            (event, stats)
674                        })
675                    {
676                        ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
677                    }
678                }
679            }
680        }
681    }
682
683    fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
684        let metadata = event.metadata();
685        if self.waker_callsites.contains(metadata) {
686            let at = Instant::now();
687            let mut visitor = WakerVisitor::default();
688            event.record(&mut visitor);
689            // XXX (eliza): ew...
690            if let Some((id, mut op)) = visitor.result() {
691                if let Some(span) = ctx.span(&id) {
692                    let exts = span.extensions();
693                    if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
694                        if op.is_wake() {
695                            // Are we currently inside the task's span? If so, the task
696                            // has woken itself.
697
698                            let self_wake = self
699                                .current_spans
700                                .get()
701                                .map(|spans| spans.borrow().iter().any(|span| span == &id))
702                                .unwrap_or(false);
703                            op = op.self_wake(self_wake);
704                        }
705
706                        stats.record_wake_op(op, at);
707                        self.record(|| record::Event::Waker {
708                            id: id.into_u64(),
709                            at: self.base_time.to_system_time(at),
710                            op,
711                        });
712                    }
713                }
714            }
715            return;
716        }
717
718        if self.poll_op_callsites.contains(metadata) {
719            let resource_id = self.current_spans.get().and_then(|stack| {
720                self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
721            });
722            // poll op event should have a resource span parent
723            if let Some(resource_id) = resource_id {
724                let mut poll_op_visitor = PollOpVisitor::default();
725                event.record(&mut poll_op_visitor);
726                if let Some((op_name, is_ready)) = poll_op_visitor.result() {
727                    let task_and_async_op_ids = self.current_spans.get().and_then(|stack| {
728                        let stack = stack.borrow();
729                        let task_id =
730                            self.first_entered(&stack, |id| self.is_id_spawned(id, &ctx))?;
731                        let async_op_id =
732                            self.first_entered(&stack, |id| self.is_id_async_op(id, &ctx))?;
733                        Some((task_id, async_op_id))
734                    });
735                    // poll op event should be emitted in the context of an async op and task spans
736                    if let Some((task_id, async_op_id)) = task_and_async_op_ids {
737                        if let Some(span) = ctx.span(&async_op_id) {
738                            let exts = span.extensions();
739                            if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
740                                stats.set_task_id(&task_id);
741                            }
742                        }
743
744                        self.send_stats(&self.shared.dropped_async_ops, || {
745                            let event = Event::PollOp {
746                                metadata,
747                                op_name,
748                                resource_id,
749                                async_op_id,
750                                task_id,
751                                is_ready,
752                            };
753                            (event, ())
754                        });
755
756                        // TODO: JSON recorder doesn't care about poll ops.
757                    }
758                }
759            }
760            return;
761        }
762
763        if self.resource_state_update_callsites.contains(metadata) {
764            // state update event should have a resource span parent
765            let resource_id = self.current_spans.get().and_then(|stack| {
766                self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
767            });
768            if let Some(id) = resource_id {
769                self.state_update(&id, event, &ctx, |exts| {
770                    exts.get::<Arc<stats::ResourceStats>>()
771                        .map(<Arc<stats::ResourceStats> as std::ops::Deref>::deref)
772                });
773            }
774
775            return;
776        }
777
778        if self.async_op_state_update_callsites.contains(metadata) {
779            let async_op_id = self.current_spans.get().and_then(|stack| {
780                self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
781            });
782            if let Some(id) = async_op_id {
783                self.state_update(&id, event, &ctx, |exts| {
784                    let async_op = exts.get::<Arc<stats::AsyncOpStats>>()?;
785                    Some(&async_op.stats)
786                });
787            }
788        }
789    }
790
791    fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) {
792        if let Some(span) = cx.span(id) {
793            let now = Instant::now();
794            let exts = span.extensions();
795            // if the span we are entering is a task or async op, record the
796            // poll stats.
797            if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
798                stats.start_poll(now);
799            } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
800                stats.start_poll(now);
801            } else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
802                // otherwise, is the span a resource? in that case, we also want
803                // to enter it, although we don't care about recording poll
804                // stats.
805            } else {
806                return;
807            };
808
809            self.current_spans
810                .get_or_default()
811                .borrow_mut()
812                .push(id.clone());
813
814            self.record(|| record::Event::Enter {
815                id: id.into_u64(),
816                at: self.base_time.to_system_time(now),
817            });
818        }
819    }
820
821    fn on_exit(&self, id: &span::Id, cx: Context<'_, S>) {
822        if let Some(span) = cx.span(id) {
823            let exts = span.extensions();
824            let now = Instant::now();
825            // if the span we are entering is a task or async op, record the
826            // poll stats.
827            if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
828                stats.end_poll(now);
829            } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
830                stats.end_poll(now);
831            } else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
832                // otherwise, is the span a resource? in that case, we also want
833                // to enter it, although we don't care about recording poll
834                // stats.
835            } else {
836                return;
837            };
838
839            self.current_spans.get_or_default().borrow_mut().pop(id);
840
841            self.record(|| record::Event::Exit {
842                id: id.into_u64(),
843                at: self.base_time.to_system_time(now),
844            });
845        }
846    }
847
848    fn on_close(&self, id: span::Id, cx: Context<'_, S>) {
849        if let Some(span) = cx.span(&id) {
850            let now = Instant::now();
851            let exts = span.extensions();
852            if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
853                stats.drop_task(now);
854            } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
855                stats.drop_async_op(now);
856            } else if let Some(stats) = exts.get::<Arc<stats::ResourceStats>>() {
857                stats.drop_resource(now);
858            }
859            self.record(|| record::Event::Close {
860                id: id.into_u64(),
861                at: self.base_time.to_system_time(now),
862            });
863        }
864    }
865}
866
867impl fmt::Debug for ConsoleLayer {
868    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
869        f.debug_struct("ConsoleLayer")
870            // mpsc::Sender debug impl is not very useful
871            .field("tx", &format_args!("<...>"))
872            .field("tx.capacity", &self.tx.capacity())
873            .field("shared", &self.shared)
874            .field("spawn_callsites", &self.spawn_callsites)
875            .field("waker_callsites", &self.waker_callsites)
876            .finish()
877    }
878}
879
880impl Server {
881    // XXX(eliza): why is `SocketAddr::new` not `const`???
882    /// A [`Server`] by default binds socket address 127.0.0.1 to service remote
883    /// procedure calls.
884    ///
885    /// Note that methods like [`init`][`crate::init`] and
886    /// [`spawn`][`crate::spawn`] will parse the socket address from the
887    /// `TOKIO_CONSOLE_BIND` [environment variable] before falling back on
888    /// constructing a socket address from this default.
889    ///
890    /// See also [`Builder::server_addr`].
891    ///
892    /// [environment variable]: `Builder::with_default_env`
893    pub const DEFAULT_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
894
895    /// A [`Server`] by default binds port 6669 to service remote procedure
896    /// calls.
897    ///
898    /// Note that methods like [`init`][`crate::init`] and
899    /// [`spawn`][`crate::spawn`] will parse the socket address from the
900    /// `TOKIO_CONSOLE_BIND` [environment variable] before falling back on
901    /// constructing a socket address from this default.
902    ///
903    /// See also [`Builder::server_addr`].
904    ///
905    /// [environment variable]: `Builder::with_default_env`
906    pub const DEFAULT_PORT: u16 = 6669;
907
908    /// Starts the gRPC service with the default gRPC settings.
909    ///
910    /// To configure gRPC server settings before starting the server, use
911    /// [`serve_with`] instead. This method is equivalent to calling [`serve_with`]
912    /// and providing the default gRPC server settings:
913    ///
914    /// ```rust
915    /// # async fn docs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
916    /// # let (_, server) = console_subscriber::ConsoleLayer::new();
917    /// server.serve_with(tonic::transport::Server::default()).await
918    /// # }
919    /// ```
920    /// [`serve_with`]: Server::serve_with
921    pub async fn serve(self) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
922        self.serve_with(tonic::transport::Server::default()).await
923    }
924
925    /// Starts the gRPC service with the given [`tonic`] gRPC transport server
926    /// `builder`.
927    ///
928    /// The `builder` parameter may be used to configure gRPC-specific settings
929    /// prior to starting the server.
930    ///
931    /// This spawns both the server task and the event aggregation worker
932    /// task on the current async runtime.
933    ///
934    /// [`tonic`]: https://docs.rs/tonic/
935    pub async fn serve_with(
936        self,
937        mut builder: tonic::transport::Server,
938    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
939        let addr = self.addr.clone();
940        let ServerParts {
941            instrument_server,
942            aggregator,
943        } = self.into_parts();
944        let aggregate = spawn_named(aggregator.run(), "console::aggregate");
945        let router = builder.add_service(instrument_server);
946        let res = match addr {
947            ServerAddr::Tcp(addr) => {
948                let serve = router.serve(addr);
949                spawn_named(serve, "console::serve").await
950            }
951            #[cfg(unix)]
952            ServerAddr::Unix(path) => {
953                let incoming = UnixListener::bind(path)?;
954                let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
955                spawn_named(serve, "console::serve").await
956            }
957        };
958        aggregate.abort();
959        res?.map_err(Into::into)
960    }
961
962    /// Starts the gRPC service with the default gRPC settings and gRPC-Web
963    /// support.
964    ///
965    /// # Examples
966    ///
967    /// To serve the instrument server with gRPC-Web support with the default
968    /// settings:
969    ///
970    /// ```rust
971    /// # async fn docs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
972    /// # let (_, server) = console_subscriber::ConsoleLayer::new();
973    /// server.serve_with_grpc_web(tonic::transport::Server::default()).await
974    /// # }
975    /// ```
976    ///
977    /// To serve the instrument server with gRPC-Web support and a custom CORS configuration, use the
978    /// following code:
979    ///
980    /// ```rust
981    /// # use std::{thread, time::Duration};
982    /// #
983    /// use console_subscriber::{ConsoleLayer, ServerParts};
984    /// use tonic_web::GrpcWebLayer;
985    /// use tower_http::cors::{CorsLayer, AllowOrigin};
986    /// use http::header::HeaderName;
987    /// # use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
988    /// # const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60);
989    /// # const DEFAULT_EXPOSED_HEADERS: [&str; 3] =
990    /// #    ["grpc-status", "grpc-message", "grpc-status-details-bin"];
991    /// # const DEFAULT_ALLOW_HEADERS: [&str; 5] = [
992    /// #    "x-grpc-web",
993    /// #    "content-type",
994    /// #    "x-user-agent",
995    /// #    "grpc-timeout",
996    /// #    "user-agent",
997    /// # ];
998    ///
999    /// let (console_layer, server) = ConsoleLayer::builder().with_default_env().build();
1000    /// # thread::Builder::new()
1001    /// #    .name("subscriber".into())
1002    /// #    .spawn(move || {
1003    /// // Customize the CORS configuration.
1004    /// let cors = CorsLayer::new()
1005    ///     .allow_origin(AllowOrigin::mirror_request())
1006    ///     .allow_credentials(true)
1007    ///     .max_age(DEFAULT_MAX_AGE)
1008    ///     .expose_headers(
1009    ///         DEFAULT_EXPOSED_HEADERS
1010    ///             .iter()
1011    ///             .cloned()
1012    ///             .map(HeaderName::from_static)
1013    ///             .collect::<Vec<HeaderName>>(),
1014    ///     )
1015    ///     .allow_headers(
1016    ///         DEFAULT_ALLOW_HEADERS
1017    ///             .iter()
1018    ///             .cloned()
1019    ///             .map(HeaderName::from_static)
1020    ///             .collect::<Vec<HeaderName>>(),
1021    ///     );
1022    /// #       let runtime = tokio::runtime::Builder::new_current_thread()
1023    /// #           .enable_all()
1024    /// #           .build()
1025    /// #           .expect("console subscriber runtime initialization failed");
1026    /// #       runtime.block_on(async move {
1027    ///
1028    /// let ServerParts {
1029    ///     instrument_server,
1030    ///     aggregator,
1031    ///     ..
1032    /// } = server.into_parts();
1033    /// tokio::spawn(aggregator.run());
1034    ///
1035    /// // Serve the instrument server with gRPC-Web support and the CORS configuration.
1036    /// let router = tonic::transport::Server::builder()
1037    ///     .accept_http1(true)
1038    ///     .layer(cors)
1039    ///     .layer(GrpcWebLayer::new())
1040    ///     .add_service(instrument_server);
1041    /// let serve = router.serve(std::net::SocketAddr::new(
1042    ///     std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
1043    ///     // 6669 is a restricted port on Chrome, so we cannot use it. We use a different port instead.
1044    ///     9999,
1045    /// ));
1046    ///
1047    /// // Finally, spawn the server.
1048    /// serve.await.expect("console subscriber server failed");
1049    /// #       });
1050    /// #   })
1051    /// #   .expect("console subscriber could not spawn thread");
1052    /// # tracing_subscriber::registry().with(console_layer).init();
1053    /// ```
1054    ///
1055    /// For a comprehensive understanding and complete code example,
1056    /// please refer to the `grpc-web` example in the examples directory.
1057    ///
1058    /// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1059    #[cfg(feature = "grpc-web")]
1060    pub async fn serve_with_grpc_web(
1061        self,
1062        builder: tonic::transport::Server,
1063    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
1064        let addr = self.addr.clone();
1065        let ServerParts {
1066            instrument_server,
1067            aggregator,
1068        } = self.into_parts();
1069        let router = builder
1070            .accept_http1(true)
1071            .add_service(tonic_web::enable(instrument_server));
1072        let aggregate = spawn_named(aggregator.run(), "console::aggregate");
1073        let res = match addr {
1074            ServerAddr::Tcp(addr) => {
1075                let serve = router.serve(addr);
1076                spawn_named(serve, "console::serve").await
1077            }
1078            #[cfg(unix)]
1079            ServerAddr::Unix(path) => {
1080                let incoming = UnixListener::bind(path)?;
1081                let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
1082                spawn_named(serve, "console::serve").await
1083            }
1084        };
1085        aggregate.abort();
1086        res?.map_err(Into::into)
1087    }
1088
1089    /// Returns the parts needed to spawn a gRPC server and the aggregator that
1090    /// supplies it.
1091    ///
1092    /// Note that a server spawned in this way will disregard any value set by
1093    /// [`Builder::server_addr`], as the user becomes responsible for defining
1094    /// the address when calling [`Router::serve`].
1095    ///
1096    /// Additionally, the user of this API must ensure that the [`Aggregator`]
1097    /// is running for as long as the gRPC server is. If the server stops
1098    /// running, the aggregator task can be aborted.
1099    ///
1100    /// # Examples
1101    ///
1102    /// The parts can be used to serve the instrument server together with
1103    /// other endpoints from the same gRPC server.
1104    ///
1105    /// ```
1106    /// use console_subscriber::{ConsoleLayer, ServerParts};
1107    ///
1108    /// # let runtime = tokio::runtime::Builder::new_current_thread()
1109    /// #     .enable_all()
1110    /// #     .build()
1111    /// #     .unwrap();
1112    /// # runtime.block_on(async {
1113    /// let (console_layer, server) = ConsoleLayer::builder().build();
1114    /// let ServerParts {
1115    ///     instrument_server,
1116    ///     aggregator,
1117    ///     ..
1118    /// } = server.into_parts();
1119    ///
1120    /// let aggregator_handle = tokio::spawn(aggregator.run());
1121    /// let router = tonic::transport::Server::builder()
1122    ///     //.add_service(some_other_service)
1123    ///     .add_service(instrument_server);
1124    /// let serve = router.serve(std::net::SocketAddr::new(
1125    ///     std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
1126    ///     6669,
1127    /// ));
1128    ///
1129    /// // Finally, spawn the server.
1130    /// tokio::spawn(serve);
1131    /// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
1132    /// # drop(console_layer);
1133    /// # let mut aggregator_handle = aggregator_handle;
1134    /// # aggregator_handle.abort();
1135    /// # });
1136    /// ```
1137    ///
1138    /// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1139    pub fn into_parts(mut self) -> ServerParts {
1140        let aggregator = self
1141            .aggregator
1142            .take()
1143            .expect("cannot start server multiple times");
1144
1145        let instrument_server = proto::instrument::instrument_server::InstrumentServer::new(self);
1146
1147        ServerParts {
1148            instrument_server,
1149            aggregator,
1150        }
1151    }
1152}
1153
1154/// Server Parts
1155///
1156/// This struct contains the parts returned by [`Server::into_parts`]. It may contain
1157/// further parts in the future, an as such is marked as `non_exhaustive`.
1158///
1159/// The `InstrumentServer<Server>` can be used to construct a router which
1160/// can be added to a [`tonic`] gRPC server.
1161///
1162/// The `aggregator` is a future which should be running as long as the server is.
1163/// Generally, this future should be spawned onto an appropriate runtime and then
1164/// aborted if the server gets shut down.
1165///
1166/// See the [`Server::into_parts`] documentation for usage.
1167#[non_exhaustive]
1168pub struct ServerParts {
1169    /// The instrument server.
1170    ///
1171    /// See the documentation for [`InstrumentServer`] for details.
1172    pub instrument_server: InstrumentServer<Server>,
1173
1174    /// The aggregator.
1175    ///
1176    /// Responsible for collecting and preparing traces for the instrument server
1177    /// to send its clients.
1178    ///
1179    /// The aggregator should be [`run`] when the instrument server is started.
1180    /// If the server stops running for any reason, the aggregator task can be
1181    /// aborted.
1182    ///
1183    /// [`run`]: fn@crate::Aggregator::run
1184    pub aggregator: Aggregator,
1185}
1186
1187#[tonic::async_trait]
1188impl proto::instrument::instrument_server::Instrument for Server {
1189    type WatchUpdatesStream =
1190        tokio_stream::wrappers::ReceiverStream<Result<proto::instrument::Update, tonic::Status>>;
1191    type WatchTaskDetailsStream =
1192        tokio_stream::wrappers::ReceiverStream<Result<proto::tasks::TaskDetails, tonic::Status>>;
1193    async fn watch_updates(
1194        &self,
1195        req: tonic::Request<proto::instrument::InstrumentRequest>,
1196    ) -> Result<tonic::Response<Self::WatchUpdatesStream>, tonic::Status> {
1197        match req.remote_addr() {
1198            Some(addr) => tracing::debug!(client.addr = %addr, "starting a new watch"),
1199            None => tracing::debug!(client.addr = %"<unknown>", "starting a new watch"),
1200        }
1201        let permit = self.subscribe.reserve().await.map_err(|_| {
1202            tonic::Status::internal("cannot start new watch, aggregation task is not running")
1203        })?;
1204        let (tx, rx) = mpsc::channel(self.client_buffer);
1205        permit.send(Command::Instrument(Watch(tx)));
1206        tracing::debug!("watch started");
1207        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1208        Ok(tonic::Response::new(stream))
1209    }
1210
1211    async fn watch_task_details(
1212        &self,
1213        req: tonic::Request<proto::instrument::TaskDetailsRequest>,
1214    ) -> Result<tonic::Response<Self::WatchTaskDetailsStream>, tonic::Status> {
1215        let task_id = req
1216            .into_inner()
1217            .id
1218            .ok_or_else(|| tonic::Status::invalid_argument("missing task_id"))?
1219            .id;
1220
1221        // `tracing` reserves span ID 0 for niche optimization for `Option<Id>`.
1222        let id = std::num::NonZeroU64::new(task_id)
1223            .map(Id::from_non_zero_u64)
1224            .ok_or_else(|| tonic::Status::invalid_argument("task_id cannot be 0"))?;
1225
1226        let permit = self.subscribe.reserve().await.map_err(|_| {
1227            tonic::Status::internal("cannot start new watch, aggregation task is not running")
1228        })?;
1229
1230        // Check with the aggregator task to request a stream if the task exists.
1231        let (stream_sender, stream_recv) = oneshot::channel();
1232        permit.send(Command::WatchTaskDetail(WatchRequest {
1233            id,
1234            stream_sender,
1235            buffer: self.client_buffer,
1236        }));
1237        // If the aggregator drops the sender, the task doesn't exist.
1238        let rx = stream_recv.await.map_err(|_| {
1239            tracing::warn!(id = ?task_id, "requested task not found");
1240            tonic::Status::not_found("task not found")
1241        })?;
1242
1243        tracing::debug!(id = ?task_id, "task details watch started");
1244        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1245        Ok(tonic::Response::new(stream))
1246    }
1247
1248    async fn pause(
1249        &self,
1250        _req: tonic::Request<proto::instrument::PauseRequest>,
1251    ) -> Result<tonic::Response<proto::instrument::PauseResponse>, tonic::Status> {
1252        self.subscribe.send(Command::Pause).await.map_err(|_| {
1253            tonic::Status::internal("cannot pause, aggregation task is not running")
1254        })?;
1255        Ok(tonic::Response::new(proto::instrument::PauseResponse {}))
1256    }
1257
1258    async fn resume(
1259        &self,
1260        _req: tonic::Request<proto::instrument::ResumeRequest>,
1261    ) -> Result<tonic::Response<proto::instrument::ResumeResponse>, tonic::Status> {
1262        self.subscribe.send(Command::Resume).await.map_err(|_| {
1263            tonic::Status::internal("cannot resume, aggregation task is not running")
1264        })?;
1265        Ok(tonic::Response::new(proto::instrument::ResumeResponse {}))
1266    }
1267}
1268
1269impl WakeOp {
1270    /// Returns `true` if `self` is a `Wake` or `WakeByRef` event.
1271    fn is_wake(self) -> bool {
1272        matches!(self, Self::Wake { .. } | Self::WakeByRef { .. })
1273    }
1274
1275    fn self_wake(self, self_wake: bool) -> Self {
1276        match self {
1277            Self::Wake { .. } => Self::Wake { self_wake },
1278            Self::WakeByRef { .. } => Self::WakeByRef { self_wake },
1279            x => x,
1280        }
1281    }
1282}
1283
1284#[track_caller]
1285pub(crate) fn spawn_named<T>(
1286    task: impl std::future::Future<Output = T> + Send + 'static,
1287    _name: &str,
1288) -> tokio::task::JoinHandle<T>
1289where
1290    T: Send + 'static,
1291{
1292    #[cfg(tokio_unstable)]
1293    return tokio::task::Builder::new().name(_name).spawn(task).unwrap();
1294
1295    #[cfg(not(tokio_unstable))]
1296    tokio::spawn(task)
1297}