1use crate::{service::traits::BandwidthSink, ProtocolName};
20
21use prometheus_endpoint::{
22 self as prometheus, Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, MetricSource, Opts,
23 PrometheusError, Registry, SourcedCounter, SourcedGauge, U64,
24};
25
26use std::{
27 str,
28 sync::{
29 atomic::{AtomicUsize, Ordering},
30 Arc,
31 },
32};
33
34pub use prometheus_endpoint::{Histogram, HistogramVec};
35
36pub fn register(registry: &Registry, sources: MetricSources) -> Result<Metrics, PrometheusError> {
38 BandwidthCounters::register(registry, sources.bandwidth)?;
39 NumConnectedGauge::register(registry, sources.connected_peers)?;
40 Metrics::register(registry)
41}
42
43pub fn register_without_sources(registry: &Registry) -> Result<Metrics, PrometheusError> {
45 Metrics::register(registry)
46}
47
48pub struct MetricSources {
50 pub bandwidth: Arc<dyn BandwidthSink>,
51 pub connected_peers: Arc<AtomicUsize>,
52}
53
54impl MetricSources {
55 pub fn register(
56 registry: &Registry,
57 bandwidth: Arc<dyn BandwidthSink>,
58 connected_peers: Arc<AtomicUsize>,
59 ) -> Result<(), PrometheusError> {
60 BandwidthCounters::register(registry, bandwidth)?;
61 NumConnectedGauge::register(registry, connected_peers)
62 }
63}
64
65#[derive(Clone)]
67pub struct Metrics {
68 pub connections_closed_total: CounterVec<U64>,
70 pub connections_opened_total: CounterVec<U64>,
71 pub distinct_peers_connections_closed_total: Counter<U64>,
72 pub distinct_peers_connections_opened_total: Counter<U64>,
73 pub incoming_connections_errors_total: CounterVec<U64>,
74 pub incoming_connections_total: Counter<U64>,
75 pub kademlia_query_duration: HistogramVec,
76 pub kademlia_random_queries_total: Counter<U64>,
77 pub kademlia_records_count: Gauge<U64>,
78 pub kademlia_records_sizes_total: Gauge<U64>,
79 pub kbuckets_num_nodes: GaugeVec<U64>,
80 pub listeners_local_addresses: Gauge<U64>,
81 pub listeners_errors_total: Counter<U64>,
82 pub pending_connections: Gauge<U64>,
83 pub pending_connections_errors_total: CounterVec<U64>,
84 pub requests_in_failure_total: CounterVec<U64>,
85 pub requests_in_success_total: HistogramVec,
86 pub requests_out_failure_total: CounterVec<U64>,
87 pub requests_out_success_total: HistogramVec,
88}
89
90impl Metrics {
91 fn register(registry: &Registry) -> Result<Self, PrometheusError> {
92 Ok(Self {
93 connections_closed_total: prometheus::register(CounterVec::new(
95 Opts::new(
96 "substrate_sub_libp2p_connections_closed_total",
97 "Total number of connections closed, by direction and reason"
98 ),
99 &["direction", "reason"]
100 )?, registry)?,
101 connections_opened_total: prometheus::register(CounterVec::new(
102 Opts::new(
103 "substrate_sub_libp2p_connections_opened_total",
104 "Total number of connections opened by direction"
105 ),
106 &["direction"]
107 )?, registry)?,
108 distinct_peers_connections_closed_total: prometheus::register(Counter::new(
109 "substrate_sub_libp2p_distinct_peers_connections_closed_total",
110 "Total number of connections closed with distinct peers"
111 )?, registry)?,
112 distinct_peers_connections_opened_total: prometheus::register(Counter::new(
113 "substrate_sub_libp2p_distinct_peers_connections_opened_total",
114 "Total number of connections opened with distinct peers"
115 )?, registry)?,
116 incoming_connections_errors_total: prometheus::register(CounterVec::new(
117 Opts::new(
118 "substrate_sub_libp2p_incoming_connections_handshake_errors_total",
119 "Total number of incoming connections that have failed during the \
120 initial handshake"
121 ),
122 &["reason"]
123 )?, registry)?,
124 incoming_connections_total: prometheus::register(Counter::new(
125 "substrate_sub_libp2p_incoming_connections_total",
126 "Total number of incoming connections on the listening sockets"
127 )?, registry)?,
128 kademlia_query_duration: prometheus::register(HistogramVec::new(
129 HistogramOpts {
130 common_opts: Opts::new(
131 "substrate_sub_libp2p_kademlia_query_duration",
132 "Duration of Kademlia queries per query type"
133 ),
134 buckets: prometheus::exponential_buckets(0.5, 2.0, 10)
135 .expect("parameters are always valid values; qed"),
136 },
137 &["type"]
138 )?, registry)?,
139 kademlia_random_queries_total: prometheus::register(Counter::new(
140 "substrate_sub_libp2p_kademlia_random_queries_total",
141 "Number of random Kademlia queries started",
142 )?, registry)?,
143 kademlia_records_count: prometheus::register(Gauge::new(
144 "substrate_sub_libp2p_kademlia_records_count",
145 "Number of records in the Kademlia records store",
146 )?, registry)?,
147 kademlia_records_sizes_total: prometheus::register(Gauge::new(
148 "substrate_sub_libp2p_kademlia_records_sizes_total",
149 "Total size of all the records in the Kademlia records store",
150 )?, registry)?,
151 kbuckets_num_nodes: prometheus::register(GaugeVec::new(
152 Opts::new(
153 "substrate_sub_libp2p_kbuckets_num_nodes",
154 "Number of nodes per kbucket per Kademlia instance"
155 ),
156 &["lower_ilog2_bucket_bound"]
157 )?, registry)?,
158 listeners_local_addresses: prometheus::register(Gauge::new(
159 "substrate_sub_libp2p_listeners_local_addresses",
160 "Number of local addresses we're listening on"
161 )?, registry)?,
162 listeners_errors_total: prometheus::register(Counter::new(
163 "substrate_sub_libp2p_listeners_errors_total",
164 "Total number of non-fatal errors reported by a listener"
165 )?, registry)?,
166 pending_connections: prometheus::register(Gauge::new(
167 "substrate_sub_libp2p_pending_connections",
168 "Number of connections in the process of being established",
169 )?, registry)?,
170 pending_connections_errors_total: prometheus::register(CounterVec::new(
171 Opts::new(
172 "substrate_sub_libp2p_pending_connections_errors_total",
173 "Total number of pending connection errors"
174 ),
175 &["reason"]
176 )?, registry)?,
177 requests_in_failure_total: prometheus::register(CounterVec::new(
178 Opts::new(
179 "substrate_sub_libp2p_requests_in_failure_total",
180 "Total number of incoming requests that the node has failed to answer"
181 ),
182 &["protocol", "reason"]
183 )?, registry)?,
184 requests_in_success_total: prometheus::register(HistogramVec::new(
185 HistogramOpts {
186 common_opts: Opts::new(
187 "substrate_sub_libp2p_requests_in_success_total",
188 "For successful incoming requests, time between receiving the request and \
189 starting to send the response"
190 ),
191 buckets: prometheus::exponential_buckets(0.001, 2.0, 16)
192 .expect("parameters are always valid values; qed"),
193 },
194 &["protocol"]
195 )?, registry)?,
196 requests_out_failure_total: prometheus::register(CounterVec::new(
197 Opts::new(
198 "substrate_sub_libp2p_requests_out_failure_total",
199 "Total number of requests that have failed"
200 ),
201 &["protocol", "reason"]
202 )?, registry)?,
203 requests_out_success_total: prometheus::register(HistogramVec::new(
204 HistogramOpts {
205 common_opts: Opts::new(
206 "substrate_sub_libp2p_requests_out_success_total",
207 "For successful outgoing requests, time between a request's start and finish"
208 ),
209 buckets: prometheus::exponential_buckets(0.001, 2.0, 16)
210 .expect("parameters are always valid values; qed"),
211 },
212 &["protocol"]
213 )?, registry)?,
214 })
215 }
216}
217
218#[derive(Clone, Debug)]
220pub struct PeerStoreMetrics {
221 pub num_banned_peers: Gauge<U64>,
222 pub num_discovered: Gauge<U64>,
223}
224
225impl PeerStoreMetrics {
226 pub fn register(registry: &Registry) -> Result<Self, PrometheusError> {
227 Ok(Self {
228 num_banned_peers: prometheus::register(
229 Gauge::new(
230 "substrate_sub_libp2p_peerset_num_banned_peers",
231 "Number of banned peers stored in the peerset manager",
232 )?,
233 registry,
234 )?,
235 num_discovered: prometheus::register(
236 Gauge::new(
237 "substrate_sub_libp2p_peerset_num_discovered",
238 "Number of nodes stored in the peerset manager",
239 )?,
240 registry,
241 )?,
242 })
243 }
244}
245
246#[derive(Clone)]
248pub struct BandwidthCounters(Arc<dyn BandwidthSink>);
249
250impl BandwidthCounters {
251 fn register(registry: &Registry, sinks: Arc<dyn BandwidthSink>) -> Result<(), PrometheusError> {
254 prometheus::register(
255 SourcedCounter::new(
256 &Opts::new("substrate_sub_libp2p_network_bytes_total", "Total bandwidth usage")
257 .variable_label("direction"),
258 BandwidthCounters(sinks),
259 )?,
260 registry,
261 )?;
262
263 Ok(())
264 }
265}
266
267impl MetricSource for BandwidthCounters {
268 type N = u64;
269
270 fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
271 set(&["in"], self.0.total_inbound());
272 set(&["out"], self.0.total_outbound());
273 }
274}
275
276#[derive(Clone)]
278pub struct NumConnectedGauge(Arc<AtomicUsize>);
279
280impl NumConnectedGauge {
281 fn register(registry: &Registry, value: Arc<AtomicUsize>) -> Result<(), PrometheusError> {
284 prometheus::register(
285 SourcedGauge::new(
286 &Opts::new("substrate_sub_libp2p_peers_count", "Number of connected peers"),
287 NumConnectedGauge(value),
288 )?,
289 registry,
290 )?;
291
292 Ok(())
293 }
294}
295
296impl MetricSource for NumConnectedGauge {
297 type N = u64;
298
299 fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
300 set(&[], self.0.load(Ordering::Relaxed) as u64);
301 }
302}
303
304#[derive(Debug, Clone)]
308pub struct NotificationMetrics {
309 metrics: Option<InnerNotificationMetrics>,
311}
312
313impl NotificationMetrics {
314 pub fn new(registry: Option<&Registry>) -> NotificationMetrics {
316 let metrics = match registry {
317 Some(registry) => InnerNotificationMetrics::register(registry).ok(),
318 None => None,
319 };
320
321 Self { metrics }
322 }
323
324 pub fn register_substream_opened(&self, protocol: &ProtocolName) {
326 if let Some(metrics) = &self.metrics {
327 metrics.notifications_streams_opened_total.with_label_values(&[&protocol]).inc();
328 }
329 }
330
331 pub fn register_substream_closed(&self, protocol: &ProtocolName) {
333 if let Some(metrics) = &self.metrics {
334 metrics
335 .notifications_streams_closed_total
336 .with_label_values(&[&protocol[..]])
337 .inc();
338 }
339 }
340
341 pub fn register_notification_sent(&self, protocol: &ProtocolName, size: usize) {
343 if let Some(metrics) = &self.metrics {
344 metrics
345 .notifications_sizes
346 .with_label_values(&["out", protocol])
347 .observe(size as f64);
348 }
349 }
350
351 pub fn register_notification_received(&self, protocol: &ProtocolName, size: usize) {
353 if let Some(metrics) = &self.metrics {
354 metrics
355 .notifications_sizes
356 .with_label_values(&["in", protocol])
357 .observe(size as f64);
358 }
359 }
360}
361
362#[derive(Debug, Clone)]
364struct InnerNotificationMetrics {
365 pub notifications_streams_opened_total: CounterVec<U64>,
367
368 pub notifications_streams_closed_total: CounterVec<U64>,
370
371 pub notifications_sizes: HistogramVec,
373}
374
375impl InnerNotificationMetrics {
376 fn register(registry: &Registry) -> Result<Self, PrometheusError> {
377 Ok(Self {
378 notifications_sizes: prometheus::register(
379 HistogramVec::new(
380 HistogramOpts {
381 common_opts: Opts::new(
382 "substrate_sub_libp2p_notifications_sizes",
383 "Sizes of the notifications send to and received from all nodes",
384 ),
385 buckets: prometheus::exponential_buckets(64.0, 4.0, 8)
386 .expect("parameters are always valid values; qed"),
387 },
388 &["direction", "protocol"],
389 )?,
390 registry,
391 )?,
392 notifications_streams_closed_total: prometheus::register(
393 CounterVec::new(
394 Opts::new(
395 "substrate_sub_libp2p_notifications_streams_closed_total",
396 "Total number of notification substreams that have been closed",
397 ),
398 &["protocol"],
399 )?,
400 registry,
401 )?,
402 notifications_streams_opened_total: prometheus::register(
403 CounterVec::new(
404 Opts::new(
405 "substrate_sub_libp2p_notifications_streams_opened_total",
406 "Total number of notification substreams that have been opened",
407 ),
408 &["protocol"],
409 )?,
410 registry,
411 )?,
412 })
413 }
414}