1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::config;
use crate::shutdown;
use lazy_static::lazy_static;
use prometheus::{gather, Encoder, IntCounterVec, IntGaugeVec, Opts, Registry, TextEncoder};
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use tokio::sync::mpsc;
use tracing::{error, info};
use warp::{Filter, Rejection, Reply};
// DEFAULT_PORT is the default port of the metrics server.
const DEFAULT_PORT: u16 = 8000;
lazy_static! {
// REGISTRY is used to register all metrics.
pub static ref REGISTRY: Registry = Registry::new();
// VERSION_GAUGE is used to record the version info of the service.
pub static ref VERSION_GAUGE: IntGaugeVec =
IntGaugeVec::new(
Opts::new("version", "Version info of the service.").namespace(config::SERVICE_NAME).subsystem(config::NAME),
&["major", "minor", "git_version", "git_commit", "platform", "build_time"]
).expect("metric can be created");
// DOWNLOAD_PEER_COUNT is used to count the number of download peers.
pub static ref DOWNLOAD_PEER_COUNT: IntCounterVec =
IntCounterVec::new(
Opts::new("download_peer_total", "Counter of the number of the download peer.").namespace(config::SERVICE_NAME).subsystem(config::NAME),
&["task_type"]
).expect("metric can be created");
}
// Metrics is the metrics server.
#[derive(Debug)]
pub struct Metrics {
// addr is the address of the metrics server.
addr: SocketAddr,
// shutdown is used to shutdown the metrics server.
shutdown: shutdown::Shutdown,
// _shutdown_complete is used to notify the metrics server is shutdown.
_shutdown_complete: mpsc::UnboundedSender<()>,
}
// Metrics implements the metrics server.
impl Metrics {
// new creates a new Metrics.
pub fn new(
enable_ipv6: bool,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> Self {
// Initialize the address of the server.
let addr = if enable_ipv6 {
SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), DEFAULT_PORT)
} else {
SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), DEFAULT_PORT)
};
Self {
addr,
shutdown,
_shutdown_complete: shutdown_complete_tx,
}
}
// run starts the metrics server.
pub async fn run(&mut self) {
self.register_custom_metrics();
let metrics_route = warp::path!("metrics")
.and(warp::get())
.and(warp::path::end())
.and_then(Self::metrics_handler);
// Start the metrics server and wait for it to finish.
tokio::select! {
_ = warp::serve(metrics_route).run(self.addr) => {
// Metrics server ended.
info!("metrics server ended");
}
_ = self.shutdown.recv() => {
// Metrics server shutting down with signals.
info!("metrics server shutting down");
}
}
}
// register_custom_metrics registers all custom metrics.
fn register_custom_metrics(&self) {
REGISTRY
.register(Box::new(VERSION_GAUGE.clone()))
.expect("metric can be registered");
REGISTRY
.register(Box::new(DOWNLOAD_PEER_COUNT.clone()))
.expect("metric can be registered");
}
// metrics_handler handles the metrics request.
async fn metrics_handler() -> Result<impl Reply, Rejection> {
let encoder = TextEncoder::new();
// Encode custom metrics.
let mut buf = Vec::new();
if let Err(err) = encoder.encode(®ISTRY.gather(), &mut buf) {
error!("could not encode custom metrics: {}", err);
};
let mut res = match String::from_utf8(buf.clone()) {
Ok(v) => v,
Err(err) => {
error!("custom metrics could not be from_utf8'd: {}", err);
String::default()
}
};
buf.clear();
// Encode prometheus metrics.
let mut buf = Vec::new();
if let Err(err) = encoder.encode(&gather(), &mut buf) {
error!("could not encode prometheus metrics: {}", err);
};
let res_custom = match String::from_utf8(buf.clone()) {
Ok(v) => v,
Err(err) => {
error!("prometheus metrics could not be from_utf8'd: {}", err);
String::default()
}
};
buf.clear();
res.push_str(&res_custom);
Ok(res)
}
}