console_subscriber

Struct Server

source
pub struct Server { /* private fields */ }
Expand description

A gRPC Server that implements the tokio-console wire format.

Client applications, such as the tokio-console CLI connect to the gRPC server, and stream data about the runtime’s history (such as a list of the currently active tasks, or statistics summarizing polling times). A Server also interprets commands from a client application, such a request to focus in on a specific task, and translates that into a stream of details specific to that task.

Implementations§

source§

impl Server

source

pub const DEFAULT_IP: IpAddr = _

A Server by default binds socket address 127.0.0.1 to service remote procedure calls.

Note that methods like init and spawn will parse the socket address from the TOKIO_CONSOLE_BIND environment variable before falling back on constructing a socket address from this default.

See also Builder::server_addr.

source

pub const DEFAULT_PORT: u16 = 6_669u16

A Server by default binds port 6669 to service remote procedure calls.

Note that methods like init and spawn will parse the socket address from the TOKIO_CONSOLE_BIND environment variable before falling back on constructing a socket address from this default.

See also Builder::server_addr.

source

pub async fn serve(self) -> Result<(), Box<dyn Error + Send + Sync + 'static>>

Starts the gRPC service with the default gRPC settings.

To configure gRPC server settings before starting the server, use serve_with instead. This method is equivalent to calling serve_with and providing the default gRPC server settings:

server.serve_with(tonic::transport::Server::default()).await
source

pub async fn serve_with( self, builder: Server, ) -> Result<(), Box<dyn Error + Send + Sync + 'static>>

Starts the gRPC service with the given tonic gRPC transport server builder.

The builder parameter may be used to configure gRPC-specific settings prior to starting the server.

This spawns both the server task and the event aggregation worker task on the current async runtime.

source

pub async fn serve_with_grpc_web( self, builder: Server, ) -> Result<(), Box<dyn Error + Send + Sync + 'static>>

Starts the gRPC service with the default gRPC settings and gRPC-Web support.

§Examples

To serve the instrument server with gRPC-Web support with the default settings:

server.serve_with_grpc_web(tonic::transport::Server::default()).await

To serve the instrument server with gRPC-Web support and a custom CORS configuration, use the following code:

use console_subscriber::{ConsoleLayer, ServerParts};
use tonic_web::GrpcWebLayer;
use tower_http::cors::{CorsLayer, AllowOrigin};
use http::header::HeaderName;

let (console_layer, server) = ConsoleLayer::builder().with_default_env().build();
// Customize the CORS configuration.
let cors = CorsLayer::new()
    .allow_origin(AllowOrigin::mirror_request())
    .allow_credentials(true)
    .max_age(DEFAULT_MAX_AGE)
    .expose_headers(
        DEFAULT_EXPOSED_HEADERS
            .iter()
            .cloned()
            .map(HeaderName::from_static)
            .collect::<Vec<HeaderName>>(),
    )
    .allow_headers(
        DEFAULT_ALLOW_HEADERS
            .iter()
            .cloned()
            .map(HeaderName::from_static)
            .collect::<Vec<HeaderName>>(),
    );

let ServerParts {
    instrument_server,
    aggregator,
    ..
} = server.into_parts();
tokio::spawn(aggregator.run());

// Serve the instrument server with gRPC-Web support and the CORS configuration.
let router = tonic::transport::Server::builder()
    .accept_http1(true)
    .layer(cors)
    .layer(GrpcWebLayer::new())
    .add_service(instrument_server);
let serve = router.serve(std::net::SocketAddr::new(
    std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
    // 6669 is a restricted port on Chrome, so we cannot use it. We use a different port instead.
    9999,
));

// Finally, spawn the server.
serve.await.expect("console subscriber server failed");

For a comprehensive understanding and complete code example, please refer to the grpc-web example in the examples directory.

source

pub fn into_parts(self) -> ServerParts

Returns the parts needed to spawn a gRPC server and the aggregator that supplies it.

Note that a server spawned in this way will disregard any value set by Builder::server_addr, as the user becomes responsible for defining the address when calling Router::serve.

Additionally, the user of this API must ensure that the Aggregator is running for as long as the gRPC server is. If the server stops running, the aggregator task can be aborted.

§Examples

The parts can be used to serve the instrument server together with other endpoints from the same gRPC server.

use console_subscriber::{ConsoleLayer, ServerParts};

let (console_layer, server) = ConsoleLayer::builder().build();
let ServerParts {
    instrument_server,
    aggregator,
    ..
} = server.into_parts();

let aggregator_handle = tokio::spawn(aggregator.run());
let router = tonic::transport::Server::builder()
    //.add_service(some_other_service)
    .add_service(instrument_server);
let serve = router.serve(std::net::SocketAddr::new(
    std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
    6669,
));

// Finally, spawn the server.
tokio::spawn(serve);

Trait Implementations§

source§

impl Instrument for Server

source§

type WatchUpdatesStream = ReceiverStream<Result<Update, Status>>

Server streaming response type for the WatchUpdates method.
source§

type WatchTaskDetailsStream = ReceiverStream<Result<TaskDetails, Status>>

Server streaming response type for the WatchTaskDetails method.
source§

fn watch_updates<'life0, 'async_trait>( &'life0 self, req: Request<InstrumentRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<Self::WatchUpdatesStream>, Status>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Produces a stream of updates representing the behavior of the instrumented async runtime.
source§

fn watch_task_details<'life0, 'async_trait>( &'life0 self, req: Request<TaskDetailsRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<Self::WatchTaskDetailsStream>, Status>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Produces a stream of updates describing the activity of a specific task.
source§

fn pause<'life0, 'async_trait>( &'life0 self, _req: Request<PauseRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<PauseResponse>, Status>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Registers that the console observer wants to pause the stream.
source§

fn resume<'life0, 'async_trait>( &'life0 self, _req: Request<ResumeRequest>, ) -> Pin<Box<dyn Future<Output = Result<Response<ResumeResponse>, Status>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Registers that the console observer wants to resume the stream.

Auto Trait Implementations§

§

impl Freeze for Server

§

impl !RefUnwindSafe for Server

§

impl Send for Server

§

impl Sync for Server

§

impl Unpin for Server

§

impl !UnwindSafe for Server

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

source§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more