console_subscriber/lib.rs
1#![doc = include_str!("../README.md")]
2use console_api as proto;
3use proto::{instrument::instrument_server::InstrumentServer, resources::resource};
4use serde::Serialize;
5use std::{
6 cell::RefCell,
7 fmt,
8 net::{IpAddr, Ipv4Addr},
9 sync::{
10 atomic::{AtomicUsize, Ordering},
11 Arc,
12 },
13 time::{Duration, Instant},
14};
15use thread_local::ThreadLocal;
16#[cfg(unix)]
17use tokio::net::UnixListener;
18use tokio::sync::{mpsc, oneshot};
19#[cfg(unix)]
20use tokio_stream::wrappers::UnixListenerStream;
21use tracing_core::{
22 span::{self, Id},
23 subscriber::{self, Subscriber},
24 Metadata,
25};
26use tracing_subscriber::{
27 layer::Context,
28 registry::{Extensions, LookupSpan},
29 Layer,
30};
31
32mod aggregator;
33mod attribute;
34mod builder;
35mod callsites;
36mod record;
37mod stack;
38mod stats;
39pub(crate) mod sync;
40mod visitors;
41
42pub use aggregator::Aggregator;
43pub use builder::{Builder, ServerAddr};
44use callsites::Callsites;
45use record::Recorder;
46use stack::SpanStack;
47use visitors::{AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, WakerVisitor};
48
49pub use builder::{init, spawn};
50
51use crate::visitors::{PollOpVisitor, StateUpdateVisitor};
52
53/// A [`ConsoleLayer`] is a [`tracing_subscriber::Layer`] that records [`tracing`]
54/// spans and events emitted by the async runtime.
55///
56/// Runtimes emit [`tracing`] spans and events that represent specific operations
57/// that occur in asynchronous Rust programs, such as spawning tasks and waker
58/// operations. The `ConsoleLayer` collects and aggregates these events, and the
59/// resulting diagnostic data is exported to clients by the corresponding gRPC
60/// [`Server`] instance.
61///
62/// [`tracing`]: https://docs.rs/tracing
63pub struct ConsoleLayer {
64 current_spans: ThreadLocal<RefCell<SpanStack>>,
65 tx: mpsc::Sender<Event>,
66 shared: Arc<Shared>,
67 /// When the channel capacity goes under this number, a flush in the aggregator
68 /// will be triggered.
69 flush_under_capacity: usize,
70
71 /// Set of callsites for spans representing spawned tasks.
72 ///
73 /// For task spans, each runtime these will have like, 1-5 callsites in it, max, so
74 /// 8 should be plenty. If several runtimes are in use, we may have to spill
75 /// over into the backup hashmap, but it's unlikely.
76 spawn_callsites: Callsites<8>,
77
78 /// Set of callsites for events representing waker operations.
79 ///
80 /// 16 is probably a reasonable number of waker ops; it's a bit generous if
81 /// there's only one async runtime library in use, but if there are multiple,
82 /// they might all have their own sets of waker ops.
83 waker_callsites: Callsites<16>,
84
85 /// Set of callsites for spans representing resources
86 ///
87 /// TODO: Take some time to determine more reasonable numbers
88 resource_callsites: Callsites<32>,
89
90 /// Set of callsites for spans representing async operations on resources
91 ///
92 /// TODO: Take some time to determine more reasonable numbers
93 async_op_callsites: Callsites<32>,
94
95 /// Set of callsites for spans representing async op poll operations
96 ///
97 /// TODO: Take some time to determine more reasonable numbers
98 async_op_poll_callsites: Callsites<32>,
99
100 /// Set of callsites for events representing poll operation invocations on resources
101 ///
102 /// TODO: Take some time to determine more reasonable numbers
103 poll_op_callsites: Callsites<32>,
104
105 /// Set of callsites for events representing state attribute state updates on resources
106 ///
107 /// TODO: Take some time to determine more reasonable numbers
108 resource_state_update_callsites: Callsites<32>,
109
110 /// Set of callsites for events representing state attribute state updates on async resource ops
111 ///
112 /// TODO: Take some time to determine more reasonable numbers
113 async_op_state_update_callsites: Callsites<32>,
114
115 /// A sink to record all events to a file.
116 recorder: Option<Recorder>,
117
118 /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
119 /// timestamp that can be sent over the wire or recorded to JSON.
120 base_time: stats::TimeAnchor,
121
122 /// Maximum value for the poll time histogram.
123 ///
124 /// By default, this is one second.
125 max_poll_duration_nanos: u64,
126
127 /// Maximum value for the scheduled time histogram.
128 ///
129 /// By default, this is one second.
130 max_scheduled_duration_nanos: u64,
131}
132
133/// A gRPC [`Server`] that implements the [`tokio-console` wire format][wire].
134///
135/// Client applications, such as the [`tokio-console` CLI][cli] connect to the gRPC
136/// server, and stream data about the runtime's history (such as a list of the
137/// currently active tasks, or statistics summarizing polling times). A [`Server`] also
138/// interprets commands from a client application, such a request to focus in on
139/// a specific task, and translates that into a stream of details specific to
140/// that task.
141///
142/// [wire]: https://docs.rs/console-api
143/// [cli]: https://crates.io/crates/tokio-console
144pub struct Server {
145 subscribe: mpsc::Sender<Command>,
146 addr: ServerAddr,
147 aggregator: Option<Aggregator>,
148 client_buffer: usize,
149}
150
151pub(crate) trait ToProto {
152 type Output;
153 fn to_proto(&self, base_time: &stats::TimeAnchor) -> Self::Output;
154}
155
156/// State shared between the `ConsoleLayer` and the `Aggregator` task.
157#[derive(Debug, Default)]
158struct Shared {
159 /// Used to notify the aggregator task when the event buffer should be
160 /// flushed.
161 flush: aggregator::Flush,
162
163 /// A counter of how many task events were dropped because the event buffer
164 /// was at capacity.
165 dropped_tasks: AtomicUsize,
166
167 /// A counter of how many async op events were dropped because the event buffer
168 /// was at capacity.
169 dropped_async_ops: AtomicUsize,
170
171 /// A counter of how many resource events were dropped because the event buffer
172 /// was at capacity.
173 dropped_resources: AtomicUsize,
174}
175
176struct Watch<T>(mpsc::Sender<Result<T, tonic::Status>>);
177
178enum Command {
179 Instrument(Watch<proto::instrument::Update>),
180 WatchTaskDetail(WatchRequest<proto::tasks::TaskDetails>),
181 Pause,
182 Resume,
183}
184
185struct WatchRequest<T> {
186 id: Id,
187 stream_sender: oneshot::Sender<mpsc::Receiver<Result<T, tonic::Status>>>,
188 buffer: usize,
189}
190
191#[derive(Debug)]
192enum Event {
193 Metadata(&'static Metadata<'static>),
194 Spawn {
195 id: span::Id,
196 metadata: &'static Metadata<'static>,
197 stats: Arc<stats::TaskStats>,
198 fields: Vec<proto::Field>,
199 location: Option<proto::Location>,
200 },
201 Resource {
202 id: span::Id,
203 parent_id: Option<span::Id>,
204 metadata: &'static Metadata<'static>,
205 concrete_type: String,
206 kind: resource::Kind,
207 location: Option<proto::Location>,
208 is_internal: bool,
209 stats: Arc<stats::ResourceStats>,
210 },
211 PollOp {
212 metadata: &'static Metadata<'static>,
213 resource_id: span::Id,
214 op_name: String,
215 async_op_id: span::Id,
216 task_id: span::Id,
217 is_ready: bool,
218 },
219 AsyncResourceOp {
220 id: span::Id,
221 parent_id: Option<span::Id>,
222 resource_id: span::Id,
223 metadata: &'static Metadata<'static>,
224 source: String,
225
226 stats: Arc<stats::AsyncOpStats>,
227 },
228}
229
230#[derive(Clone, Debug, Copy, Serialize)]
231enum WakeOp {
232 Wake { self_wake: bool },
233 WakeByRef { self_wake: bool },
234 Clone,
235 Drop,
236}
237
238impl ConsoleLayer {
239 /// Returns a `ConsoleLayer` built with the default settings.
240 ///
241 /// Note: these defaults do *not* include values provided via the
242 /// environment variables specified in [`Builder::with_default_env`].
243 ///
244 /// See also [`Builder::build`].
245 pub fn new() -> (Self, Server) {
246 Self::builder().build()
247 }
248
249 /// Returns a [`Builder`] for configuring a `ConsoleLayer`.
250 ///
251 /// Note that the returned builder does *not* include values provided via
252 /// the environment variables specified in [`Builder::with_default_env`].
253 /// To extract those, you can call that method on the returned builder.
254 pub fn builder() -> Builder {
255 Builder::default()
256 }
257
258 fn build(config: Builder) -> (Self, Server) {
259 // The `cfg` value *appears* to be a constant to clippy, but it changes
260 // depending on the build-time configuration...
261 #![allow(clippy::assertions_on_constants)]
262 assert!(
263 cfg!(any(tokio_unstable, console_without_tokio_unstable)),
264 "task tracing requires Tokio to be built with RUSTFLAGS=\"--cfg tokio_unstable\"!"
265 );
266
267 let base_time = stats::TimeAnchor::new();
268 tracing::debug!(
269 config.event_buffer_capacity,
270 config.client_buffer_capacity,
271 ?config.publish_interval,
272 ?config.retention,
273 ?config.server_addr,
274 ?config.recording_path,
275 ?config.filter_env_var,
276 ?config.poll_duration_max,
277 ?config.scheduled_duration_max,
278 ?base_time,
279 "configured console subscriber"
280 );
281
282 let (tx, events) = mpsc::channel(config.event_buffer_capacity);
283 let (subscribe, rpcs) = mpsc::channel(256);
284 let shared = Arc::new(Shared::default());
285 let aggregator = Aggregator::new(events, rpcs, &config, shared.clone(), base_time.clone());
286 // Conservatively, start to trigger a flush when half the channel is full.
287 // This tries to reduce the chance of losing events to a full channel.
288 let flush_under_capacity = config.event_buffer_capacity / 2;
289 let recorder = config
290 .recording_path
291 .as_ref()
292 .map(|path| Recorder::new(path).expect("creating recorder"));
293 let server = Server {
294 aggregator: Some(aggregator),
295 addr: config.server_addr,
296 subscribe,
297 client_buffer: config.client_buffer_capacity,
298 };
299 let layer = Self {
300 current_spans: ThreadLocal::new(),
301 tx,
302 shared,
303 flush_under_capacity,
304 spawn_callsites: Callsites::default(),
305 waker_callsites: Callsites::default(),
306 resource_callsites: Callsites::default(),
307 async_op_callsites: Callsites::default(),
308 async_op_poll_callsites: Callsites::default(),
309 poll_op_callsites: Callsites::default(),
310 resource_state_update_callsites: Callsites::default(),
311 async_op_state_update_callsites: Callsites::default(),
312 recorder,
313 base_time,
314 max_poll_duration_nanos: config.poll_duration_max.as_nanos() as u64,
315 max_scheduled_duration_nanos: config.scheduled_duration_max.as_nanos() as u64,
316 };
317 (layer, server)
318 }
319}
320
321impl ConsoleLayer {
322 /// Default maximum capacity for the channel of events sent from a
323 /// [`ConsoleLayer`] to a [`Server`].
324 ///
325 /// When this capacity is exhausted, additional events will be dropped.
326 /// Decreasing this value will reduce memory usage, but may result in
327 /// events being dropped more frequently.
328 ///
329 /// See also [`Builder::event_buffer_capacity`].
330 pub const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 1024 * 100;
331 /// Default maximum capacity for th echannel of events sent from a
332 /// [`Server`] to each subscribed client.
333 ///
334 /// When this capacity is exhausted, the client is assumed to be inactive,
335 /// and may be disconnected.
336 ///
337 /// See also [`Builder::client_buffer_capacity`].
338 pub const DEFAULT_CLIENT_BUFFER_CAPACITY: usize = 1024 * 4;
339
340 /// Default frequency for publishing events to clients.
341 ///
342 /// Note that methods like [`init`][`crate::init`] and [`spawn`][`crate::spawn`] will take the value
343 /// from the `TOKIO_CONSOLE_PUBLISH_INTERVAL` [environment variable] before falling
344 /// back on this default.
345 ///
346 /// See also [`Builder::publish_interval`].
347 ///
348 /// [environment variable]: `Builder::with_default_env`
349 pub const DEFAULT_PUBLISH_INTERVAL: Duration = Duration::from_secs(1);
350
351 /// By default, completed spans are retained for one hour.
352 ///
353 /// Note that methods like [`init`][`crate::init`] and
354 /// [`spawn`][`crate::spawn`] will take the value from the
355 /// `TOKIO_CONSOLE_RETENTION` [environment variable] before falling back on
356 /// this default.
357 ///
358 /// See also [`Builder::retention`].
359 ///
360 /// [environment variable]: `Builder::with_default_env`
361 pub const DEFAULT_RETENTION: Duration = Duration::from_secs(60 * 60);
362
363 /// The default maximum value for task poll duration histograms.
364 ///
365 /// Any poll duration exceeding this will be clamped to this value. By
366 /// default, the maximum poll duration is one second.
367 ///
368 /// See also [`Builder::poll_duration_histogram_max`].
369 pub const DEFAULT_POLL_DURATION_MAX: Duration = Duration::from_secs(1);
370
371 /// The default maximum value for the task scheduled duration histogram.
372 ///
373 /// Any scheduled duration (the time from a task being woken until it is next
374 /// polled) exceeding this will be clamped to this value. By default, the
375 /// maximum scheduled duration is one second.
376 ///
377 /// See also [`Builder::scheduled_duration_histogram_max`].
378 pub const DEFAULT_SCHEDULED_DURATION_MAX: Duration = Duration::from_secs(1);
379
380 fn is_spawn(&self, meta: &'static Metadata<'static>) -> bool {
381 self.spawn_callsites.contains(meta)
382 }
383
384 fn is_resource(&self, meta: &'static Metadata<'static>) -> bool {
385 self.resource_callsites.contains(meta)
386 }
387
388 fn is_async_op(&self, meta: &'static Metadata<'static>) -> bool {
389 self.async_op_callsites.contains(meta)
390 }
391
392 fn is_id_spawned<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
393 where
394 S: Subscriber + for<'a> LookupSpan<'a>,
395 {
396 cx.span(id)
397 .map(|span| self.is_spawn(span.metadata()))
398 .unwrap_or(false)
399 }
400
401 fn is_id_resource<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
402 where
403 S: Subscriber + for<'a> LookupSpan<'a>,
404 {
405 cx.span(id)
406 .map(|span| self.is_resource(span.metadata()))
407 .unwrap_or(false)
408 }
409
410 fn is_id_async_op<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
411 where
412 S: Subscriber + for<'a> LookupSpan<'a>,
413 {
414 cx.span(id)
415 .map(|span| self.is_async_op(span.metadata()))
416 .unwrap_or(false)
417 }
418
419 fn first_entered<P>(&self, stack: &SpanStack, p: P) -> Option<span::Id>
420 where
421 P: Fn(&span::Id) -> bool,
422 {
423 stack
424 .stack()
425 .iter()
426 .rev()
427 .find(|id| p(id.id()))
428 .map(|id| id.id())
429 .cloned()
430 }
431
432 fn send_metadata(&self, dropped: &AtomicUsize, event: Event) -> bool {
433 self.send_stats(dropped, move || (event, ())).is_some()
434 }
435
436 fn send_stats<S>(
437 &self,
438 dropped: &AtomicUsize,
439 mk_event: impl FnOnce() -> (Event, S),
440 ) -> Option<S> {
441 use mpsc::error::TrySendError;
442
443 // Return whether or not we actually sent the event.
444 let sent = match self.tx.try_reserve() {
445 Ok(permit) => {
446 let (event, stats) = mk_event();
447 permit.send(event);
448 Some(stats)
449 }
450 Err(TrySendError::Closed(_)) => {
451 // we should warn here eventually, but nop for now because we
452 // can't trigger tracing events...
453 None
454 }
455 Err(TrySendError::Full(_)) => {
456 // this shouldn't happen, since we trigger a flush when
457 // approaching the high water line...but if the executor wait
458 // time is very high, maybe the aggregator task hasn't been
459 // polled yet. so... eek?!
460 dropped.fetch_add(1, Ordering::Release);
461 None
462 }
463 };
464
465 let capacity = self.tx.capacity();
466 if capacity <= self.flush_under_capacity {
467 self.shared.flush.trigger();
468 }
469
470 sent
471 }
472
473 fn record(&self, event: impl FnOnce() -> record::Event) {
474 if let Some(ref recorder) = self.recorder {
475 recorder.record(event());
476 }
477 }
478
479 fn state_update<S>(
480 &self,
481 id: &Id,
482 event: &tracing::Event<'_>,
483 ctx: &Context<'_, S>,
484 get_stats: impl for<'a> Fn(&'a Extensions) -> Option<&'a stats::ResourceStats>,
485 ) where
486 S: Subscriber + for<'a> LookupSpan<'a>,
487 {
488 let meta_id = event.metadata().into();
489 let mut state_update_visitor = StateUpdateVisitor::new(meta_id);
490 event.record(&mut state_update_visitor);
491
492 let update = match state_update_visitor.result() {
493 Some(update) => update,
494 None => return,
495 };
496
497 let span = match ctx.span(id) {
498 Some(span) => span,
499 // XXX(eliza): no span exists for a resource ID, we should maybe
500 // record an error here...
501 None => return,
502 };
503
504 let exts = span.extensions();
505 let stats = match get_stats(&exts) {
506 Some(stats) => stats,
507 // XXX(eliza): a resource span was not a resource??? this is a bug
508 None => return,
509 };
510
511 stats.update_attribute(id, &update);
512
513 if let Some(parent) = stats.parent_id.as_ref().and_then(|parent| ctx.span(parent)) {
514 let exts = parent.extensions();
515 if let Some(stats) = get_stats(&exts) {
516 if stats.inherit_child_attributes {
517 stats.update_attribute(id, &update);
518 }
519 }
520 }
521 }
522}
523
524impl<S> Layer<S> for ConsoleLayer
525where
526 S: Subscriber + for<'a> LookupSpan<'a>,
527{
528 fn register_callsite(&self, meta: &'static Metadata<'static>) -> subscriber::Interest {
529 if !meta.is_span() && !meta.is_event() {
530 return subscriber::Interest::never();
531 }
532
533 let dropped = match (meta.name(), meta.target()) {
534 ("runtime.spawn", _) | ("task", "tokio::task") => {
535 self.spawn_callsites.insert(meta);
536 &self.shared.dropped_tasks
537 }
538 (_, "runtime::waker") | (_, "tokio::task::waker") => {
539 self.waker_callsites.insert(meta);
540 &self.shared.dropped_tasks
541 }
542 (ResourceVisitor::RES_SPAN_NAME, _) => {
543 self.resource_callsites.insert(meta);
544 &self.shared.dropped_resources
545 }
546 (AsyncOpVisitor::ASYNC_OP_SPAN_NAME, _) => {
547 self.async_op_callsites.insert(meta);
548 &self.shared.dropped_async_ops
549 }
550 ("runtime.resource.async_op.poll", _) => {
551 self.async_op_poll_callsites.insert(meta);
552 &self.shared.dropped_async_ops
553 }
554 (_, PollOpVisitor::POLL_OP_EVENT_TARGET) => {
555 self.poll_op_callsites.insert(meta);
556 &self.shared.dropped_async_ops
557 }
558 (_, StateUpdateVisitor::RE_STATE_UPDATE_EVENT_TARGET) => {
559 self.resource_state_update_callsites.insert(meta);
560 &self.shared.dropped_resources
561 }
562 (_, StateUpdateVisitor::AO_STATE_UPDATE_EVENT_TARGET) => {
563 self.async_op_state_update_callsites.insert(meta);
564 &self.shared.dropped_async_ops
565 }
566 (_, _) => &self.shared.dropped_tasks,
567 };
568
569 self.send_metadata(dropped, Event::Metadata(meta));
570 subscriber::Interest::always()
571 }
572
573 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
574 let metadata = attrs.metadata();
575 if self.is_spawn(metadata) {
576 let at = Instant::now();
577 let mut task_visitor = TaskVisitor::new(metadata.into());
578 attrs.record(&mut task_visitor);
579 let (fields, location) = task_visitor.result();
580 self.record(|| record::Event::Spawn {
581 id: id.into_u64(),
582 at: self.base_time.to_system_time(at),
583 fields: record::SerializeFields(fields.clone()),
584 });
585 if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || {
586 let stats = Arc::new(stats::TaskStats::new(
587 self.max_poll_duration_nanos,
588 self.max_scheduled_duration_nanos,
589 at,
590 ));
591 let event = Event::Spawn {
592 id: id.clone(),
593 stats: stats.clone(),
594 metadata,
595 fields,
596 location,
597 };
598 (event, stats)
599 }) {
600 ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
601 }
602 return;
603 }
604
605 if self.is_resource(metadata) {
606 let at = Instant::now();
607 let mut resource_visitor = ResourceVisitor::default();
608 attrs.record(&mut resource_visitor);
609 if let Some(result) = resource_visitor.result() {
610 let ResourceVisitorResult {
611 concrete_type,
612 kind,
613 location,
614 is_internal,
615 inherit_child_attrs,
616 } = result;
617 let parent_id = self.current_spans.get().and_then(|stack| {
618 self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
619 });
620 if let Some(stats) = self.send_stats(&self.shared.dropped_resources, move || {
621 let stats = Arc::new(stats::ResourceStats::new(
622 at,
623 inherit_child_attrs,
624 parent_id.clone(),
625 ));
626 let event = Event::Resource {
627 id: id.clone(),
628 parent_id,
629 metadata,
630 concrete_type,
631 kind,
632 location,
633 is_internal,
634 stats: stats.clone(),
635 };
636 (event, stats)
637 }) {
638 ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
639 }
640 }
641 return;
642 }
643
644 if self.is_async_op(metadata) {
645 let at = Instant::now();
646 let mut async_op_visitor = AsyncOpVisitor::default();
647 attrs.record(&mut async_op_visitor);
648 if let Some((source, inherit_child_attrs)) = async_op_visitor.result() {
649 let resource_id = self.current_spans.get().and_then(|stack| {
650 self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
651 });
652
653 let parent_id = self.current_spans.get().and_then(|stack| {
654 self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
655 });
656
657 if let Some(resource_id) = resource_id {
658 if let Some(stats) =
659 self.send_stats(&self.shared.dropped_async_ops, move || {
660 let stats = Arc::new(stats::AsyncOpStats::new(
661 at,
662 inherit_child_attrs,
663 parent_id.clone(),
664 ));
665 let event = Event::AsyncResourceOp {
666 id: id.clone(),
667 parent_id,
668 resource_id,
669 metadata,
670 source,
671 stats: stats.clone(),
672 };
673 (event, stats)
674 })
675 {
676 ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
677 }
678 }
679 }
680 }
681 }
682
683 fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
684 let metadata = event.metadata();
685 if self.waker_callsites.contains(metadata) {
686 let at = Instant::now();
687 let mut visitor = WakerVisitor::default();
688 event.record(&mut visitor);
689 // XXX (eliza): ew...
690 if let Some((id, mut op)) = visitor.result() {
691 if let Some(span) = ctx.span(&id) {
692 let exts = span.extensions();
693 if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
694 if op.is_wake() {
695 // Are we currently inside the task's span? If so, the task
696 // has woken itself.
697
698 let self_wake = self
699 .current_spans
700 .get()
701 .map(|spans| spans.borrow().iter().any(|span| span == &id))
702 .unwrap_or(false);
703 op = op.self_wake(self_wake);
704 }
705
706 stats.record_wake_op(op, at);
707 self.record(|| record::Event::Waker {
708 id: id.into_u64(),
709 at: self.base_time.to_system_time(at),
710 op,
711 });
712 }
713 }
714 }
715 return;
716 }
717
718 if self.poll_op_callsites.contains(metadata) {
719 let resource_id = self.current_spans.get().and_then(|stack| {
720 self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
721 });
722 // poll op event should have a resource span parent
723 if let Some(resource_id) = resource_id {
724 let mut poll_op_visitor = PollOpVisitor::default();
725 event.record(&mut poll_op_visitor);
726 if let Some((op_name, is_ready)) = poll_op_visitor.result() {
727 let task_and_async_op_ids = self.current_spans.get().and_then(|stack| {
728 let stack = stack.borrow();
729 let task_id =
730 self.first_entered(&stack, |id| self.is_id_spawned(id, &ctx))?;
731 let async_op_id =
732 self.first_entered(&stack, |id| self.is_id_async_op(id, &ctx))?;
733 Some((task_id, async_op_id))
734 });
735 // poll op event should be emitted in the context of an async op and task spans
736 if let Some((task_id, async_op_id)) = task_and_async_op_ids {
737 if let Some(span) = ctx.span(&async_op_id) {
738 let exts = span.extensions();
739 if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
740 stats.set_task_id(&task_id);
741 }
742 }
743
744 self.send_stats(&self.shared.dropped_async_ops, || {
745 let event = Event::PollOp {
746 metadata,
747 op_name,
748 resource_id,
749 async_op_id,
750 task_id,
751 is_ready,
752 };
753 (event, ())
754 });
755
756 // TODO: JSON recorder doesn't care about poll ops.
757 }
758 }
759 }
760 return;
761 }
762
763 if self.resource_state_update_callsites.contains(metadata) {
764 // state update event should have a resource span parent
765 let resource_id = self.current_spans.get().and_then(|stack| {
766 self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
767 });
768 if let Some(id) = resource_id {
769 self.state_update(&id, event, &ctx, |exts| {
770 exts.get::<Arc<stats::ResourceStats>>()
771 .map(<Arc<stats::ResourceStats> as std::ops::Deref>::deref)
772 });
773 }
774
775 return;
776 }
777
778 if self.async_op_state_update_callsites.contains(metadata) {
779 let async_op_id = self.current_spans.get().and_then(|stack| {
780 self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx))
781 });
782 if let Some(id) = async_op_id {
783 self.state_update(&id, event, &ctx, |exts| {
784 let async_op = exts.get::<Arc<stats::AsyncOpStats>>()?;
785 Some(&async_op.stats)
786 });
787 }
788 }
789 }
790
791 fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) {
792 if let Some(span) = cx.span(id) {
793 let now = Instant::now();
794 let exts = span.extensions();
795 // if the span we are entering is a task or async op, record the
796 // poll stats.
797 if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
798 stats.start_poll(now);
799 } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
800 stats.start_poll(now);
801 } else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
802 // otherwise, is the span a resource? in that case, we also want
803 // to enter it, although we don't care about recording poll
804 // stats.
805 } else {
806 return;
807 };
808
809 self.current_spans
810 .get_or_default()
811 .borrow_mut()
812 .push(id.clone());
813
814 self.record(|| record::Event::Enter {
815 id: id.into_u64(),
816 at: self.base_time.to_system_time(now),
817 });
818 }
819 }
820
821 fn on_exit(&self, id: &span::Id, cx: Context<'_, S>) {
822 if let Some(span) = cx.span(id) {
823 let exts = span.extensions();
824 let now = Instant::now();
825 // if the span we are entering is a task or async op, record the
826 // poll stats.
827 if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
828 stats.end_poll(now);
829 } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
830 stats.end_poll(now);
831 } else if exts.get::<Arc<stats::ResourceStats>>().is_some() {
832 // otherwise, is the span a resource? in that case, we also want
833 // to enter it, although we don't care about recording poll
834 // stats.
835 } else {
836 return;
837 };
838
839 self.current_spans.get_or_default().borrow_mut().pop(id);
840
841 self.record(|| record::Event::Exit {
842 id: id.into_u64(),
843 at: self.base_time.to_system_time(now),
844 });
845 }
846 }
847
848 fn on_close(&self, id: span::Id, cx: Context<'_, S>) {
849 if let Some(span) = cx.span(&id) {
850 let now = Instant::now();
851 let exts = span.extensions();
852 if let Some(stats) = exts.get::<Arc<stats::TaskStats>>() {
853 stats.drop_task(now);
854 } else if let Some(stats) = exts.get::<Arc<stats::AsyncOpStats>>() {
855 stats.drop_async_op(now);
856 } else if let Some(stats) = exts.get::<Arc<stats::ResourceStats>>() {
857 stats.drop_resource(now);
858 }
859 self.record(|| record::Event::Close {
860 id: id.into_u64(),
861 at: self.base_time.to_system_time(now),
862 });
863 }
864 }
865}
866
867impl fmt::Debug for ConsoleLayer {
868 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
869 f.debug_struct("ConsoleLayer")
870 // mpsc::Sender debug impl is not very useful
871 .field("tx", &format_args!("<...>"))
872 .field("tx.capacity", &self.tx.capacity())
873 .field("shared", &self.shared)
874 .field("spawn_callsites", &self.spawn_callsites)
875 .field("waker_callsites", &self.waker_callsites)
876 .finish()
877 }
878}
879
880impl Server {
881 // XXX(eliza): why is `SocketAddr::new` not `const`???
882 /// A [`Server`] by default binds socket address 127.0.0.1 to service remote
883 /// procedure calls.
884 ///
885 /// Note that methods like [`init`][`crate::init`] and
886 /// [`spawn`][`crate::spawn`] will parse the socket address from the
887 /// `TOKIO_CONSOLE_BIND` [environment variable] before falling back on
888 /// constructing a socket address from this default.
889 ///
890 /// See also [`Builder::server_addr`].
891 ///
892 /// [environment variable]: `Builder::with_default_env`
893 pub const DEFAULT_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
894
895 /// A [`Server`] by default binds port 6669 to service remote procedure
896 /// calls.
897 ///
898 /// Note that methods like [`init`][`crate::init`] and
899 /// [`spawn`][`crate::spawn`] will parse the socket address from the
900 /// `TOKIO_CONSOLE_BIND` [environment variable] before falling back on
901 /// constructing a socket address from this default.
902 ///
903 /// See also [`Builder::server_addr`].
904 ///
905 /// [environment variable]: `Builder::with_default_env`
906 pub const DEFAULT_PORT: u16 = 6669;
907
908 /// Starts the gRPC service with the default gRPC settings.
909 ///
910 /// To configure gRPC server settings before starting the server, use
911 /// [`serve_with`] instead. This method is equivalent to calling [`serve_with`]
912 /// and providing the default gRPC server settings:
913 ///
914 /// ```rust
915 /// # async fn docs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
916 /// # let (_, server) = console_subscriber::ConsoleLayer::new();
917 /// server.serve_with(tonic::transport::Server::default()).await
918 /// # }
919 /// ```
920 /// [`serve_with`]: Server::serve_with
921 pub async fn serve(self) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
922 self.serve_with(tonic::transport::Server::default()).await
923 }
924
925 /// Starts the gRPC service with the given [`tonic`] gRPC transport server
926 /// `builder`.
927 ///
928 /// The `builder` parameter may be used to configure gRPC-specific settings
929 /// prior to starting the server.
930 ///
931 /// This spawns both the server task and the event aggregation worker
932 /// task on the current async runtime.
933 ///
934 /// [`tonic`]: https://docs.rs/tonic/
935 pub async fn serve_with(
936 self,
937 mut builder: tonic::transport::Server,
938 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
939 let addr = self.addr.clone();
940 let ServerParts {
941 instrument_server,
942 aggregator,
943 } = self.into_parts();
944 let aggregate = spawn_named(aggregator.run(), "console::aggregate");
945 let router = builder.add_service(instrument_server);
946 let res = match addr {
947 ServerAddr::Tcp(addr) => {
948 let serve = router.serve(addr);
949 spawn_named(serve, "console::serve").await
950 }
951 #[cfg(unix)]
952 ServerAddr::Unix(path) => {
953 let incoming = UnixListener::bind(path)?;
954 let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
955 spawn_named(serve, "console::serve").await
956 }
957 };
958 aggregate.abort();
959 res?.map_err(Into::into)
960 }
961
962 /// Starts the gRPC service with the default gRPC settings and gRPC-Web
963 /// support.
964 ///
965 /// # Examples
966 ///
967 /// To serve the instrument server with gRPC-Web support with the default
968 /// settings:
969 ///
970 /// ```rust
971 /// # async fn docs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
972 /// # let (_, server) = console_subscriber::ConsoleLayer::new();
973 /// server.serve_with_grpc_web(tonic::transport::Server::default()).await
974 /// # }
975 /// ```
976 ///
977 /// To serve the instrument server with gRPC-Web support and a custom CORS configuration, use the
978 /// following code:
979 ///
980 /// ```rust
981 /// # use std::{thread, time::Duration};
982 /// #
983 /// use console_subscriber::{ConsoleLayer, ServerParts};
984 /// use tonic_web::GrpcWebLayer;
985 /// use tower_http::cors::{CorsLayer, AllowOrigin};
986 /// use http::header::HeaderName;
987 /// # use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
988 /// # const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60);
989 /// # const DEFAULT_EXPOSED_HEADERS: [&str; 3] =
990 /// # ["grpc-status", "grpc-message", "grpc-status-details-bin"];
991 /// # const DEFAULT_ALLOW_HEADERS: [&str; 5] = [
992 /// # "x-grpc-web",
993 /// # "content-type",
994 /// # "x-user-agent",
995 /// # "grpc-timeout",
996 /// # "user-agent",
997 /// # ];
998 ///
999 /// let (console_layer, server) = ConsoleLayer::builder().with_default_env().build();
1000 /// # thread::Builder::new()
1001 /// # .name("subscriber".into())
1002 /// # .spawn(move || {
1003 /// // Customize the CORS configuration.
1004 /// let cors = CorsLayer::new()
1005 /// .allow_origin(AllowOrigin::mirror_request())
1006 /// .allow_credentials(true)
1007 /// .max_age(DEFAULT_MAX_AGE)
1008 /// .expose_headers(
1009 /// DEFAULT_EXPOSED_HEADERS
1010 /// .iter()
1011 /// .cloned()
1012 /// .map(HeaderName::from_static)
1013 /// .collect::<Vec<HeaderName>>(),
1014 /// )
1015 /// .allow_headers(
1016 /// DEFAULT_ALLOW_HEADERS
1017 /// .iter()
1018 /// .cloned()
1019 /// .map(HeaderName::from_static)
1020 /// .collect::<Vec<HeaderName>>(),
1021 /// );
1022 /// # let runtime = tokio::runtime::Builder::new_current_thread()
1023 /// # .enable_all()
1024 /// # .build()
1025 /// # .expect("console subscriber runtime initialization failed");
1026 /// # runtime.block_on(async move {
1027 ///
1028 /// let ServerParts {
1029 /// instrument_server,
1030 /// aggregator,
1031 /// ..
1032 /// } = server.into_parts();
1033 /// tokio::spawn(aggregator.run());
1034 ///
1035 /// // Serve the instrument server with gRPC-Web support and the CORS configuration.
1036 /// let router = tonic::transport::Server::builder()
1037 /// .accept_http1(true)
1038 /// .layer(cors)
1039 /// .layer(GrpcWebLayer::new())
1040 /// .add_service(instrument_server);
1041 /// let serve = router.serve(std::net::SocketAddr::new(
1042 /// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
1043 /// // 6669 is a restricted port on Chrome, so we cannot use it. We use a different port instead.
1044 /// 9999,
1045 /// ));
1046 ///
1047 /// // Finally, spawn the server.
1048 /// serve.await.expect("console subscriber server failed");
1049 /// # });
1050 /// # })
1051 /// # .expect("console subscriber could not spawn thread");
1052 /// # tracing_subscriber::registry().with(console_layer).init();
1053 /// ```
1054 ///
1055 /// For a comprehensive understanding and complete code example,
1056 /// please refer to the `grpc-web` example in the examples directory.
1057 ///
1058 /// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1059 #[cfg(feature = "grpc-web")]
1060 pub async fn serve_with_grpc_web(
1061 self,
1062 builder: tonic::transport::Server,
1063 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
1064 let addr = self.addr.clone();
1065 let ServerParts {
1066 instrument_server,
1067 aggregator,
1068 } = self.into_parts();
1069 let router = builder
1070 .accept_http1(true)
1071 .add_service(tonic_web::enable(instrument_server));
1072 let aggregate = spawn_named(aggregator.run(), "console::aggregate");
1073 let res = match addr {
1074 ServerAddr::Tcp(addr) => {
1075 let serve = router.serve(addr);
1076 spawn_named(serve, "console::serve").await
1077 }
1078 #[cfg(unix)]
1079 ServerAddr::Unix(path) => {
1080 let incoming = UnixListener::bind(path)?;
1081 let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
1082 spawn_named(serve, "console::serve").await
1083 }
1084 };
1085 aggregate.abort();
1086 res?.map_err(Into::into)
1087 }
1088
1089 /// Returns the parts needed to spawn a gRPC server and the aggregator that
1090 /// supplies it.
1091 ///
1092 /// Note that a server spawned in this way will disregard any value set by
1093 /// [`Builder::server_addr`], as the user becomes responsible for defining
1094 /// the address when calling [`Router::serve`].
1095 ///
1096 /// Additionally, the user of this API must ensure that the [`Aggregator`]
1097 /// is running for as long as the gRPC server is. If the server stops
1098 /// running, the aggregator task can be aborted.
1099 ///
1100 /// # Examples
1101 ///
1102 /// The parts can be used to serve the instrument server together with
1103 /// other endpoints from the same gRPC server.
1104 ///
1105 /// ```
1106 /// use console_subscriber::{ConsoleLayer, ServerParts};
1107 ///
1108 /// # let runtime = tokio::runtime::Builder::new_current_thread()
1109 /// # .enable_all()
1110 /// # .build()
1111 /// # .unwrap();
1112 /// # runtime.block_on(async {
1113 /// let (console_layer, server) = ConsoleLayer::builder().build();
1114 /// let ServerParts {
1115 /// instrument_server,
1116 /// aggregator,
1117 /// ..
1118 /// } = server.into_parts();
1119 ///
1120 /// let aggregator_handle = tokio::spawn(aggregator.run());
1121 /// let router = tonic::transport::Server::builder()
1122 /// //.add_service(some_other_service)
1123 /// .add_service(instrument_server);
1124 /// let serve = router.serve(std::net::SocketAddr::new(
1125 /// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
1126 /// 6669,
1127 /// ));
1128 ///
1129 /// // Finally, spawn the server.
1130 /// tokio::spawn(serve);
1131 /// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
1132 /// # drop(console_layer);
1133 /// # let mut aggregator_handle = aggregator_handle;
1134 /// # aggregator_handle.abort();
1135 /// # });
1136 /// ```
1137 ///
1138 /// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1139 pub fn into_parts(mut self) -> ServerParts {
1140 let aggregator = self
1141 .aggregator
1142 .take()
1143 .expect("cannot start server multiple times");
1144
1145 let instrument_server = proto::instrument::instrument_server::InstrumentServer::new(self);
1146
1147 ServerParts {
1148 instrument_server,
1149 aggregator,
1150 }
1151 }
1152}
1153
1154/// Server Parts
1155///
1156/// This struct contains the parts returned by [`Server::into_parts`]. It may contain
1157/// further parts in the future, an as such is marked as `non_exhaustive`.
1158///
1159/// The `InstrumentServer<Server>` can be used to construct a router which
1160/// can be added to a [`tonic`] gRPC server.
1161///
1162/// The `aggregator` is a future which should be running as long as the server is.
1163/// Generally, this future should be spawned onto an appropriate runtime and then
1164/// aborted if the server gets shut down.
1165///
1166/// See the [`Server::into_parts`] documentation for usage.
1167#[non_exhaustive]
1168pub struct ServerParts {
1169 /// The instrument server.
1170 ///
1171 /// See the documentation for [`InstrumentServer`] for details.
1172 pub instrument_server: InstrumentServer<Server>,
1173
1174 /// The aggregator.
1175 ///
1176 /// Responsible for collecting and preparing traces for the instrument server
1177 /// to send its clients.
1178 ///
1179 /// The aggregator should be [`run`] when the instrument server is started.
1180 /// If the server stops running for any reason, the aggregator task can be
1181 /// aborted.
1182 ///
1183 /// [`run`]: fn@crate::Aggregator::run
1184 pub aggregator: Aggregator,
1185}
1186
1187#[tonic::async_trait]
1188impl proto::instrument::instrument_server::Instrument for Server {
1189 type WatchUpdatesStream =
1190 tokio_stream::wrappers::ReceiverStream<Result<proto::instrument::Update, tonic::Status>>;
1191 type WatchTaskDetailsStream =
1192 tokio_stream::wrappers::ReceiverStream<Result<proto::tasks::TaskDetails, tonic::Status>>;
1193 async fn watch_updates(
1194 &self,
1195 req: tonic::Request<proto::instrument::InstrumentRequest>,
1196 ) -> Result<tonic::Response<Self::WatchUpdatesStream>, tonic::Status> {
1197 match req.remote_addr() {
1198 Some(addr) => tracing::debug!(client.addr = %addr, "starting a new watch"),
1199 None => tracing::debug!(client.addr = %"<unknown>", "starting a new watch"),
1200 }
1201 let permit = self.subscribe.reserve().await.map_err(|_| {
1202 tonic::Status::internal("cannot start new watch, aggregation task is not running")
1203 })?;
1204 let (tx, rx) = mpsc::channel(self.client_buffer);
1205 permit.send(Command::Instrument(Watch(tx)));
1206 tracing::debug!("watch started");
1207 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1208 Ok(tonic::Response::new(stream))
1209 }
1210
1211 async fn watch_task_details(
1212 &self,
1213 req: tonic::Request<proto::instrument::TaskDetailsRequest>,
1214 ) -> Result<tonic::Response<Self::WatchTaskDetailsStream>, tonic::Status> {
1215 let task_id = req
1216 .into_inner()
1217 .id
1218 .ok_or_else(|| tonic::Status::invalid_argument("missing task_id"))?
1219 .id;
1220
1221 // `tracing` reserves span ID 0 for niche optimization for `Option<Id>`.
1222 let id = std::num::NonZeroU64::new(task_id)
1223 .map(Id::from_non_zero_u64)
1224 .ok_or_else(|| tonic::Status::invalid_argument("task_id cannot be 0"))?;
1225
1226 let permit = self.subscribe.reserve().await.map_err(|_| {
1227 tonic::Status::internal("cannot start new watch, aggregation task is not running")
1228 })?;
1229
1230 // Check with the aggregator task to request a stream if the task exists.
1231 let (stream_sender, stream_recv) = oneshot::channel();
1232 permit.send(Command::WatchTaskDetail(WatchRequest {
1233 id,
1234 stream_sender,
1235 buffer: self.client_buffer,
1236 }));
1237 // If the aggregator drops the sender, the task doesn't exist.
1238 let rx = stream_recv.await.map_err(|_| {
1239 tracing::warn!(id = ?task_id, "requested task not found");
1240 tonic::Status::not_found("task not found")
1241 })?;
1242
1243 tracing::debug!(id = ?task_id, "task details watch started");
1244 let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1245 Ok(tonic::Response::new(stream))
1246 }
1247
1248 async fn pause(
1249 &self,
1250 _req: tonic::Request<proto::instrument::PauseRequest>,
1251 ) -> Result<tonic::Response<proto::instrument::PauseResponse>, tonic::Status> {
1252 self.subscribe.send(Command::Pause).await.map_err(|_| {
1253 tonic::Status::internal("cannot pause, aggregation task is not running")
1254 })?;
1255 Ok(tonic::Response::new(proto::instrument::PauseResponse {}))
1256 }
1257
1258 async fn resume(
1259 &self,
1260 _req: tonic::Request<proto::instrument::ResumeRequest>,
1261 ) -> Result<tonic::Response<proto::instrument::ResumeResponse>, tonic::Status> {
1262 self.subscribe.send(Command::Resume).await.map_err(|_| {
1263 tonic::Status::internal("cannot resume, aggregation task is not running")
1264 })?;
1265 Ok(tonic::Response::new(proto::instrument::ResumeResponse {}))
1266 }
1267}
1268
1269impl WakeOp {
1270 /// Returns `true` if `self` is a `Wake` or `WakeByRef` event.
1271 fn is_wake(self) -> bool {
1272 matches!(self, Self::Wake { .. } | Self::WakeByRef { .. })
1273 }
1274
1275 fn self_wake(self, self_wake: bool) -> Self {
1276 match self {
1277 Self::Wake { .. } => Self::Wake { self_wake },
1278 Self::WakeByRef { .. } => Self::WakeByRef { self_wake },
1279 x => x,
1280 }
1281 }
1282}
1283
1284#[track_caller]
1285pub(crate) fn spawn_named<T>(
1286 task: impl std::future::Future<Output = T> + Send + 'static,
1287 _name: &str,
1288) -> tokio::task::JoinHandle<T>
1289where
1290 T: Send + 'static,
1291{
1292 #[cfg(tokio_unstable)]
1293 return tokio::task::Builder::new().name(_name).spawn(task).unwrap();
1294
1295 #[cfg(not(tokio_unstable))]
1296 tokio::spawn(task)
1297}