1use std::collections::HashMap;
25
26use prometheus_client::{
27 encoding::{EncodeLabelSet, EncodeLabelValue},
28 metrics::{
29 counter::Counter,
30 family::{Family, MetricConstructor},
31 gauge::Gauge,
32 histogram::{linear_buckets, Histogram},
33 },
34 registry::Registry,
35};
36
37use crate::{
38 topic::TopicHash,
39 types::{MessageAcceptance, PeerKind},
40};
41
42const DEFAULT_MAX_TOPICS: usize = 300;
44
45const DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS: usize = 50;
48
49#[derive(Debug, Clone)]
50pub struct Config {
51 pub max_topics: usize,
54 pub max_never_subscribed_topics: usize,
58 pub score_buckets: Vec<f64>,
60}
61
62impl Config {
63 pub fn buckets_using_scoring_thresholds(&mut self, params: &crate::PeerScoreThresholds) {
65 self.score_buckets = vec![
66 params.graylist_threshold,
67 params.publish_threshold,
68 params.gossip_threshold,
69 params.gossip_threshold / 2.0,
70 params.gossip_threshold / 4.0,
71 0.0,
72 1.0,
73 10.0,
74 100.0,
75 ];
76 }
77}
78
79impl Default for Config {
80 fn default() -> Self {
81 let gossip_threshold = -4000.0;
83 let publish_threshold = -8000.0;
84 let graylist_threshold = -16000.0;
85 let score_buckets: Vec<f64> = vec![
86 graylist_threshold,
87 publish_threshold,
88 gossip_threshold,
89 gossip_threshold / 2.0,
90 gossip_threshold / 4.0,
91 0.0,
92 1.0,
93 10.0,
94 100.0,
95 ];
96 Config {
97 max_topics: DEFAULT_MAX_TOPICS,
98 max_never_subscribed_topics: DEFAULT_MAX_NEVER_SUBSCRIBED_TOPICS,
99 score_buckets,
100 }
101 }
102}
103
104type EverSubscribed = bool;
106
107pub(crate) struct Metrics {
109 max_topics: usize,
112 max_never_subscribed_topics: usize,
116
117 topic_info: HashMap<TopicHash, EverSubscribed>,
120
121 topic_subscription_status: Family<TopicHash, Gauge>,
125 topic_peers_count: Family<TopicHash, Gauge>,
128 invalid_messages: Family<TopicHash, Counter>,
130 accepted_messages: Family<TopicHash, Counter>,
132 ignored_messages: Family<TopicHash, Counter>,
134 rejected_messages: Family<TopicHash, Counter>,
136 publish_messages_dropped: Family<TopicHash, Counter>,
138 forward_messages_dropped: Family<TopicHash, Counter>,
140 timedout_messages_dropped: Family<TopicHash, Counter>,
142
143 mesh_peer_counts: Family<TopicHash, Gauge>,
147 mesh_peer_inclusion_events: Family<InclusionLabel, Counter>,
149 mesh_peer_churn_events: Family<ChurnLabel, Counter>,
151
152 topic_msg_sent_counts: Family<TopicHash, Counter>,
155 topic_msg_sent_bytes: Family<TopicHash, Counter>,
157 topic_msg_published: Family<TopicHash, Counter>,
159
160 topic_msg_recv_counts_unfiltered: Family<TopicHash, Counter>,
162 topic_msg_recv_counts: Family<TopicHash, Counter>,
164 topic_msg_recv_bytes: Family<TopicHash, Counter>,
166
167 score_per_mesh: Family<TopicHash, Histogram, HistBuilder>,
170 scoring_penalties: Family<PenaltyLabel, Counter>,
172
173 peers_per_protocol: Family<ProtocolLabel, Gauge>,
178 heartbeat_duration: Histogram,
180
181 memcache_misses: Counter,
186 topic_iwant_msgs: Family<TopicHash, Counter>,
189
190 idontwant_msgs: Counter,
192
193 idontwant_msgs_ids: Counter,
195
196 priority_queue_size: Histogram,
198 non_priority_queue_size: Histogram,
200}
201
202impl Metrics {
203 pub(crate) fn new(registry: &mut Registry, config: Config) -> Self {
204 let Config {
206 max_topics,
207 max_never_subscribed_topics,
208 score_buckets,
209 } = config;
210
211 macro_rules! register_family {
212 ($name:expr, $help:expr) => {{
213 let fam = Family::default();
214 registry.register($name, $help, fam.clone());
215 fam
216 }};
217 }
218
219 let topic_subscription_status = register_family!(
220 "topic_subscription_status",
221 "Subscription status per known topic"
222 );
223 let topic_peers_count = register_family!(
224 "topic_peers_counts",
225 "Number of peers subscribed to each topic"
226 );
227
228 let invalid_messages = register_family!(
229 "invalid_messages_per_topic",
230 "Number of invalid messages received for each topic"
231 );
232
233 let accepted_messages = register_family!(
234 "accepted_messages_per_topic",
235 "Number of accepted messages received for each topic"
236 );
237
238 let ignored_messages = register_family!(
239 "ignored_messages_per_topic",
240 "Number of ignored messages received for each topic"
241 );
242
243 let rejected_messages = register_family!(
244 "rejected_messages_per_topic",
245 "Number of rejected messages received for each topic"
246 );
247
248 let publish_messages_dropped = register_family!(
249 "publish_messages_dropped_per_topic",
250 "Number of publish messages dropped per topic"
251 );
252
253 let forward_messages_dropped = register_family!(
254 "forward_messages_dropped_per_topic",
255 "Number of forward messages dropped per topic"
256 );
257
258 let timedout_messages_dropped = register_family!(
259 "timedout_messages_dropped_per_topic",
260 "Number of timedout messages dropped per topic"
261 );
262
263 let mesh_peer_counts = register_family!(
264 "mesh_peer_counts",
265 "Number of peers in each topic in our mesh"
266 );
267 let mesh_peer_inclusion_events = register_family!(
268 "mesh_peer_inclusion_events",
269 "Number of times a peer gets added to our mesh for different reasons"
270 );
271 let mesh_peer_churn_events = register_family!(
272 "mesh_peer_churn_events",
273 "Number of times a peer gets removed from our mesh for different reasons"
274 );
275 let topic_msg_sent_counts = register_family!(
276 "topic_msg_sent_counts",
277 "Number of gossip messages sent to each topic"
278 );
279 let topic_msg_published = register_family!(
280 "topic_msg_published",
281 "Number of gossip messages published to each topic"
282 );
283 let topic_msg_sent_bytes = register_family!(
284 "topic_msg_sent_bytes",
285 "Bytes from gossip messages sent to each topic"
286 );
287
288 let topic_msg_recv_counts_unfiltered = register_family!(
289 "topic_msg_recv_counts_unfiltered",
290 "Number of gossip messages received on each topic (without duplicates being filtered)"
291 );
292
293 let topic_msg_recv_counts = register_family!(
294 "topic_msg_recv_counts",
295 "Number of gossip messages received on each topic (after duplicates have been filtered)"
296 );
297 let topic_msg_recv_bytes = register_family!(
298 "topic_msg_recv_bytes",
299 "Bytes received from gossip messages for each topic"
300 );
301
302 let hist_builder = HistBuilder {
303 buckets: score_buckets,
304 };
305
306 let score_per_mesh: Family<_, _, HistBuilder> = Family::new_with_constructor(hist_builder);
307 registry.register(
308 "score_per_mesh",
309 "Histogram of scores per mesh topic",
310 score_per_mesh.clone(),
311 );
312
313 let scoring_penalties = register_family!(
314 "scoring_penalties",
315 "Counter of types of scoring penalties given to peers"
316 );
317 let peers_per_protocol = register_family!(
318 "peers_per_protocol",
319 "Number of connected peers by protocol type"
320 );
321
322 let heartbeat_duration = Histogram::new(linear_buckets(0.0, 50.0, 10));
323 registry.register(
324 "heartbeat_duration",
325 "Histogram of observed heartbeat durations",
326 heartbeat_duration.clone(),
327 );
328
329 let topic_iwant_msgs = register_family!(
330 "topic_iwant_msgs",
331 "Number of times we have decided an IWANT is required for this topic"
332 );
333
334 let idontwant_msgs = {
335 let metric = Counter::default();
336 registry.register(
337 "idontwant_msgs",
338 "The number of times we have received an IDONTWANT control message",
339 metric.clone(),
340 );
341 metric
342 };
343
344 let idontwant_msgs_ids = {
345 let metric = Counter::default();
346 registry.register(
347 "idontwant_msgs_ids",
348 "The number of msg_id's we have received in every total of all IDONTWANT control message.",
349 metric.clone(),
350 );
351 metric
352 };
353
354 let memcache_misses = {
355 let metric = Counter::default();
356 registry.register(
357 "memcache_misses",
358 "Number of times a message is not found in the duplicate cache when validating",
359 metric.clone(),
360 );
361 metric
362 };
363
364 let priority_queue_size = Histogram::new(linear_buckets(0.0, 25.0, 100));
365 registry.register(
366 "priority_queue_size",
367 "Histogram of observed priority queue sizes",
368 priority_queue_size.clone(),
369 );
370
371 let non_priority_queue_size = Histogram::new(linear_buckets(0.0, 25.0, 100));
372 registry.register(
373 "non_priority_queue_size",
374 "Histogram of observed non-priority queue sizes",
375 non_priority_queue_size.clone(),
376 );
377
378 Self {
379 max_topics,
380 max_never_subscribed_topics,
381 topic_info: HashMap::default(),
382 topic_subscription_status,
383 topic_peers_count,
384 invalid_messages,
385 accepted_messages,
386 ignored_messages,
387 rejected_messages,
388 publish_messages_dropped,
389 forward_messages_dropped,
390 timedout_messages_dropped,
391 mesh_peer_counts,
392 mesh_peer_inclusion_events,
393 mesh_peer_churn_events,
394 topic_msg_sent_counts,
395 topic_msg_sent_bytes,
396 topic_msg_published,
397 topic_msg_recv_counts_unfiltered,
398 topic_msg_recv_counts,
399 topic_msg_recv_bytes,
400 score_per_mesh,
401 scoring_penalties,
402 peers_per_protocol,
403 heartbeat_duration,
404 memcache_misses,
405 topic_iwant_msgs,
406 idontwant_msgs,
407 idontwant_msgs_ids,
408 priority_queue_size,
409 non_priority_queue_size,
410 }
411 }
412
413 fn non_subscription_topics_count(&self) -> usize {
414 self.topic_info
415 .values()
416 .filter(|&ever_subscribed| !ever_subscribed)
417 .count()
418 }
419
420 fn register_topic(&mut self, topic: &TopicHash) -> Result<(), ()> {
422 if self.topic_info.contains_key(topic) {
423 Ok(())
424 } else if self.topic_info.len() < self.max_topics
425 && self.non_subscription_topics_count() < self.max_never_subscribed_topics
426 {
427 self.topic_info.entry(topic.clone()).or_insert(false);
430 self.topic_subscription_status.get_or_create(topic).set(0);
431 Ok(())
432 } else {
433 Err(())
435 }
436 }
437
438 pub(crate) fn inc_topic_peers(&mut self, topic: &TopicHash) {
440 if self.register_topic(topic).is_ok() {
441 self.topic_peers_count.get_or_create(topic).inc();
442 }
443 }
444
445 pub(crate) fn dec_topic_peers(&mut self, topic: &TopicHash) {
447 if self.register_topic(topic).is_ok() {
448 self.topic_peers_count.get_or_create(topic).dec();
449 }
450 }
451
452 pub(crate) fn joined(&mut self, topic: &TopicHash) {
457 if self.topic_info.contains_key(topic) || self.topic_info.len() < self.max_topics {
458 self.topic_info.insert(topic.clone(), true);
459 let was_subscribed = self.topic_subscription_status.get_or_create(topic).set(1);
460 debug_assert_eq!(was_subscribed, 0);
461 self.mesh_peer_counts.get_or_create(topic).set(0);
462 }
463 }
464
465 pub(crate) fn left(&mut self, topic: &TopicHash) {
468 if self.topic_info.contains_key(topic) {
469 let was_subscribed = self.topic_subscription_status.get_or_create(topic).set(0);
472 debug_assert_eq!(was_subscribed, 1);
473 self.mesh_peer_counts.get_or_create(topic).set(0);
474 }
475 }
476
477 pub(crate) fn peers_included(&mut self, topic: &TopicHash, reason: Inclusion, count: usize) {
479 if self.register_topic(topic).is_ok() {
480 self.mesh_peer_inclusion_events
481 .get_or_create(&InclusionLabel {
482 hash: topic.to_string(),
483 reason,
484 })
485 .inc_by(count as u64);
486 }
487 }
488
489 pub(crate) fn peers_removed(&mut self, topic: &TopicHash, reason: Churn, count: usize) {
491 if self.register_topic(topic).is_ok() {
492 self.mesh_peer_churn_events
493 .get_or_create(&ChurnLabel {
494 hash: topic.to_string(),
495 reason,
496 })
497 .inc_by(count as u64);
498 }
499 }
500
501 pub(crate) fn set_mesh_peers(&mut self, topic: &TopicHash, count: usize) {
503 if self.register_topic(topic).is_ok() {
504 self.mesh_peer_counts.get_or_create(topic).set(count as i64);
506 }
507 }
508
509 pub(crate) fn register_invalid_message(&mut self, topic: &TopicHash) {
511 if self.register_topic(topic).is_ok() {
512 self.invalid_messages.get_or_create(topic).inc();
513 }
514 }
515
516 pub(crate) fn register_score_penalty(&mut self, penalty: Penalty) {
518 self.scoring_penalties
519 .get_or_create(&PenaltyLabel { penalty })
520 .inc();
521 }
522
523 pub(crate) fn register_published_message(&mut self, topic: &TopicHash) {
525 if self.register_topic(topic).is_ok() {
526 self.topic_msg_published.get_or_create(topic).inc();
527 }
528 }
529
530 pub(crate) fn msg_sent(&mut self, topic: &TopicHash, bytes: usize) {
532 if self.register_topic(topic).is_ok() {
533 self.topic_msg_sent_counts.get_or_create(topic).inc();
534 self.topic_msg_sent_bytes
535 .get_or_create(topic)
536 .inc_by(bytes as u64);
537 }
538 }
539
540 pub(crate) fn publish_msg_dropped(&mut self, topic: &TopicHash) {
542 if self.register_topic(topic).is_ok() {
543 self.publish_messages_dropped.get_or_create(topic).inc();
544 }
545 }
546
547 pub(crate) fn forward_msg_dropped(&mut self, topic: &TopicHash) {
549 if self.register_topic(topic).is_ok() {
550 self.forward_messages_dropped.get_or_create(topic).inc();
551 }
552 }
553
554 pub(crate) fn timeout_msg_dropped(&mut self, topic: &TopicHash) {
556 if self.register_topic(topic).is_ok() {
557 self.timedout_messages_dropped.get_or_create(topic).inc();
558 }
559 }
560
561 pub(crate) fn msg_recvd(&mut self, topic: &TopicHash) {
563 if self.register_topic(topic).is_ok() {
564 self.topic_msg_recv_counts.get_or_create(topic).inc();
565 }
566 }
567
568 pub(crate) fn msg_recvd_unfiltered(&mut self, topic: &TopicHash, bytes: usize) {
570 if self.register_topic(topic).is_ok() {
571 self.topic_msg_recv_counts_unfiltered
572 .get_or_create(topic)
573 .inc();
574 self.topic_msg_recv_bytes
575 .get_or_create(topic)
576 .inc_by(bytes as u64);
577 }
578 }
579
580 pub(crate) fn register_msg_validation(
581 &mut self,
582 topic: &TopicHash,
583 validation: &MessageAcceptance,
584 ) {
585 if self.register_topic(topic).is_ok() {
586 match validation {
587 MessageAcceptance::Accept => self.accepted_messages.get_or_create(topic).inc(),
588 MessageAcceptance::Ignore => self.ignored_messages.get_or_create(topic).inc(),
589 MessageAcceptance::Reject => self.rejected_messages.get_or_create(topic).inc(),
590 };
591 }
592 }
593
594 pub(crate) fn memcache_miss(&mut self) {
596 self.memcache_misses.inc();
597 }
598
599 pub(crate) fn register_iwant(&mut self, topic: &TopicHash) {
601 if self.register_topic(topic).is_ok() {
602 self.topic_iwant_msgs.get_or_create(topic).inc();
603 }
604 }
605
606 pub(crate) fn register_idontwant(&mut self, msgs: usize) {
608 self.idontwant_msgs.inc();
609 self.idontwant_msgs_ids.inc_by(msgs as u64);
610 }
611
612 pub(crate) fn observe_heartbeat_duration(&mut self, millis: u64) {
614 self.heartbeat_duration.observe(millis as f64);
615 }
616
617 pub(crate) fn observe_priority_queue_size(&mut self, len: usize) {
619 self.priority_queue_size.observe(len as f64);
620 }
621
622 pub(crate) fn observe_non_priority_queue_size(&mut self, len: usize) {
624 self.non_priority_queue_size.observe(len as f64);
625 }
626
627 pub(crate) fn observe_mesh_peers_score(&mut self, topic: &TopicHash, score: f64) {
629 if self.register_topic(topic).is_ok() {
630 self.score_per_mesh.get_or_create(topic).observe(score);
631 }
632 }
633
634 pub(crate) fn peer_protocol_connected(&mut self, kind: PeerKind) {
636 self.peers_per_protocol
637 .get_or_create(&ProtocolLabel { protocol: kind })
638 .inc();
639 }
640
641 pub(crate) fn peer_protocol_disconnected(&mut self, kind: PeerKind) {
643 let metric = self
644 .peers_per_protocol
645 .get_or_create(&ProtocolLabel { protocol: kind });
646 if metric.get() != 0 {
647 metric.set(metric.get() - 1);
649 }
650 }
651}
652
653#[derive(PartialEq, Eq, Hash, EncodeLabelValue, Clone, Debug)]
655pub(crate) enum Inclusion {
656 Fanout,
658 Random,
660 Subscribed,
662 Outbound,
664}
665
666#[derive(PartialEq, Eq, Hash, EncodeLabelValue, Clone, Debug)]
668pub(crate) enum Churn {
669 Dc,
671 BadScore,
673 Prune,
675 Unsub,
677 Excess,
679}
680
681#[derive(PartialEq, Eq, Hash, EncodeLabelValue, Clone, Debug)]
683pub(crate) enum Penalty {
684 GraftBackoff,
686 BrokenPromise,
688 MessageDeficit,
690 IPColocation,
692}
693
694#[derive(PartialEq, Eq, Hash, EncodeLabelSet, Clone, Debug)]
696struct InclusionLabel {
697 hash: String,
698 reason: Inclusion,
699}
700
701#[derive(PartialEq, Eq, Hash, EncodeLabelSet, Clone, Debug)]
703struct ChurnLabel {
704 hash: String,
705 reason: Churn,
706}
707
708#[derive(PartialEq, Eq, Hash, EncodeLabelSet, Clone, Debug)]
710struct ProtocolLabel {
711 protocol: PeerKind,
712}
713
714#[derive(PartialEq, Eq, Hash, EncodeLabelSet, Clone, Debug)]
716struct PenaltyLabel {
717 penalty: Penalty,
718}
719
720#[derive(Clone)]
721struct HistBuilder {
722 buckets: Vec<f64>,
723}
724
725impl MetricConstructor<Histogram> for HistBuilder {
726 fn new_metric(&self) -> Histogram {
727 Histogram::new(self.buckets.clone().into_iter())
728 }
729}