solana_metrics/
metrics.rs

1//! The `metrics` module enables sending measurements to an `InfluxDB` instance
2
3use {
4    crate::{counter::CounterPoint, datapoint::DataPoint},
5    crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
6    gethostname::gethostname,
7    lazy_static::lazy_static,
8    log::*,
9    solana_cluster_type::ClusterType,
10    solana_sha256_hasher::hash,
11    std::{
12        cmp,
13        collections::HashMap,
14        convert::Into,
15        env,
16        fmt::Write,
17        sync::{Arc, Barrier, Mutex, Once, RwLock},
18        thread,
19        time::{Duration, Instant, UNIX_EPOCH},
20    },
21    thiserror::Error,
22};
23
24type CounterMap = HashMap<(&'static str, u64), CounterPoint>;
25
26#[derive(Debug, Error)]
27pub enum MetricsError {
28    #[error(transparent)]
29    VarError(#[from] env::VarError),
30    #[error(transparent)]
31    ReqwestError(#[from] reqwest::Error),
32    #[error("SOLANA_METRICS_CONFIG is invalid: '{0}'")]
33    ConfigInvalid(String),
34    #[error("SOLANA_METRICS_CONFIG is incomplete")]
35    ConfigIncomplete,
36    #[error("SOLANA_METRICS_CONFIG database mismatch: {0}")]
37    DbMismatch(String),
38}
39
40impl From<MetricsError> for String {
41    fn from(error: MetricsError) -> Self {
42        error.to_string()
43    }
44}
45
46impl From<&CounterPoint> for DataPoint {
47    fn from(counter_point: &CounterPoint) -> Self {
48        let mut point = Self::new(counter_point.name);
49        point.timestamp = counter_point.timestamp;
50        point.add_field_i64("count", counter_point.count);
51        point
52    }
53}
54
55#[derive(Debug)]
56enum MetricsCommand {
57    Flush(Arc<Barrier>),
58    Submit(DataPoint, log::Level),
59    SubmitCounter(CounterPoint, log::Level, u64),
60}
61
62pub struct MetricsAgent {
63    sender: Sender<MetricsCommand>,
64}
65
66pub trait MetricsWriter {
67    // Write the points and empty the vector.  Called on the internal
68    // MetricsAgent worker thread.
69    fn write(&self, points: Vec<DataPoint>);
70}
71
72struct InfluxDbMetricsWriter {
73    write_url: Option<String>,
74}
75
76impl InfluxDbMetricsWriter {
77    fn new() -> Self {
78        Self {
79            write_url: Self::build_write_url().ok(),
80        }
81    }
82
83    fn build_write_url() -> Result<String, MetricsError> {
84        let config = get_metrics_config().map_err(|err| {
85            info!("metrics disabled: {}", err);
86            err
87        })?;
88
89        info!(
90            "metrics configuration: host={} db={} username={}",
91            config.host, config.db, config.username
92        );
93
94        let write_url = format!(
95            "{}/write?db={}&u={}&p={}&precision=n",
96            &config.host, &config.db, &config.username, &config.password
97        );
98
99        Ok(write_url)
100    }
101}
102
103pub fn serialize_points(points: &Vec<DataPoint>, host_id: &str) -> String {
104    const TIMESTAMP_LEN: usize = 20;
105    const HOST_ID_LEN: usize = 8; // "host_id=".len()
106    const EXTRA_LEN: usize = 2; // "=,".len()
107    let mut len = 0;
108    for point in points {
109        for (name, value) in &point.fields {
110            len += name.len() + value.len() + EXTRA_LEN;
111        }
112        for (name, value) in &point.tags {
113            len += name.len() + value.len() + EXTRA_LEN;
114        }
115        len += point.name.len();
116        len += TIMESTAMP_LEN;
117        len += host_id.len() + HOST_ID_LEN;
118    }
119    let mut line = String::with_capacity(len);
120    for point in points {
121        let _ = write!(line, "{},host_id={}", &point.name, host_id);
122        for (name, value) in point.tags.iter() {
123            let _ = write!(line, ",{name}={value}");
124        }
125
126        let mut first = true;
127        for (name, value) in point.fields.iter() {
128            let _ = write!(line, "{}{}={}", if first { ' ' } else { ',' }, name, value);
129            first = false;
130        }
131        let timestamp = point.timestamp.duration_since(UNIX_EPOCH);
132        let nanos = timestamp.unwrap().as_nanos();
133        let _ = writeln!(line, " {nanos}");
134    }
135    line
136}
137
138impl MetricsWriter for InfluxDbMetricsWriter {
139    fn write(&self, points: Vec<DataPoint>) {
140        if let Some(ref write_url) = self.write_url {
141            debug!("submitting {} points", points.len());
142
143            let host_id = HOST_ID.read().unwrap();
144
145            let line = serialize_points(&points, &host_id);
146
147            let client = reqwest::blocking::Client::builder()
148                .timeout(Duration::from_secs(5))
149                .build();
150            let client = match client {
151                Ok(client) => client,
152                Err(err) => {
153                    warn!("client instantiation failed: {}", err);
154                    return;
155                }
156            };
157
158            let response = client.post(write_url.as_str()).body(line).send();
159            if let Ok(resp) = response {
160                let status = resp.status();
161                if !status.is_success() {
162                    let text = resp
163                        .text()
164                        .unwrap_or_else(|_| "[text body empty]".to_string());
165                    warn!("submit response unsuccessful: {} {}", status, text,);
166                }
167            } else {
168                warn!("submit error: {}", response.unwrap_err());
169            }
170        }
171    }
172}
173
174impl Default for MetricsAgent {
175    fn default() -> Self {
176        let max_points_per_sec = env::var("SOLANA_METRICS_MAX_POINTS_PER_SECOND")
177            .map(|x| {
178                x.parse()
179                    .expect("Failed to parse SOLANA_METRICS_MAX_POINTS_PER_SECOND")
180            })
181            .unwrap_or(4000);
182
183        Self::new(
184            Arc::new(InfluxDbMetricsWriter::new()),
185            Duration::from_secs(10),
186            max_points_per_sec,
187        )
188    }
189}
190
191impl MetricsAgent {
192    pub fn new(
193        writer: Arc<dyn MetricsWriter + Send + Sync>,
194        write_frequency: Duration,
195        max_points_per_sec: usize,
196    ) -> Self {
197        let (sender, receiver) = unbounded::<MetricsCommand>();
198
199        thread::Builder::new()
200            .name("solMetricsAgent".into())
201            .spawn(move || Self::run(&receiver, &writer, write_frequency, max_points_per_sec))
202            .unwrap();
203
204        Self { sender }
205    }
206
207    // Combines `points` and `counters` into a single array of `DataPoint`s, appending a data point
208    // with the metrics stats at the end.
209    //
210    // Limits the number of produced points to the `max_points` value.  Takes `points` followed by
211    // `counters`, dropping `counters` first.
212    //
213    // `max_points_per_sec` is only used in a warning message.
214    // `points_buffered` is used in the stats.
215    fn combine_points(
216        max_points: usize,
217        max_points_per_sec: usize,
218        secs_since_last_write: u64,
219        points_buffered: usize,
220        points: &mut Vec<DataPoint>,
221        counters: &mut CounterMap,
222    ) -> Vec<DataPoint> {
223        // Reserve one slot for the stats point we will add at the end.
224        let max_points = max_points.saturating_sub(1);
225
226        let num_points = points.len().saturating_add(counters.len());
227        let fit_counters = max_points.saturating_sub(points.len());
228        let points_written = cmp::min(num_points, max_points);
229
230        debug!("run: attempting to write {} points", num_points);
231
232        if num_points > max_points {
233            warn!(
234                "Max submission rate of {} datapoints per second exceeded.  Only the \
235                 first {} of {} points will be submitted.",
236                max_points_per_sec, max_points, num_points
237            );
238        }
239
240        let mut combined = std::mem::take(points);
241        combined.truncate(points_written);
242
243        combined.extend(counters.values().take(fit_counters).map(|v| v.into()));
244        counters.clear();
245
246        combined.push(
247            DataPoint::new("metrics")
248                .add_field_i64("points_written", points_written as i64)
249                .add_field_i64("num_points", num_points as i64)
250                .add_field_i64("points_lost", (num_points - points_written) as i64)
251                .add_field_i64("points_buffered", points_buffered as i64)
252                .add_field_i64("secs_since_last_write", secs_since_last_write as i64)
253                .to_owned(),
254        );
255
256        combined
257    }
258
259    // Consumes provided `points`, sending up to `max_points` of them into the `writer`.
260    //
261    // Returns an updated value for `last_write_time`.  Which is equal to `Instant::now()`, just
262    // before `write` in updated.
263    fn write(
264        writer: &Arc<dyn MetricsWriter + Send + Sync>,
265        max_points: usize,
266        max_points_per_sec: usize,
267        last_write_time: Instant,
268        points_buffered: usize,
269        points: &mut Vec<DataPoint>,
270        counters: &mut CounterMap,
271    ) -> Instant {
272        let now = Instant::now();
273        let secs_since_last_write = now.duration_since(last_write_time).as_secs();
274
275        writer.write(Self::combine_points(
276            max_points,
277            max_points_per_sec,
278            secs_since_last_write,
279            points_buffered,
280            points,
281            counters,
282        ));
283
284        now
285    }
286
287    fn run(
288        receiver: &Receiver<MetricsCommand>,
289        writer: &Arc<dyn MetricsWriter + Send + Sync>,
290        write_frequency: Duration,
291        max_points_per_sec: usize,
292    ) {
293        trace!("run: enter");
294        let mut last_write_time = Instant::now();
295        let mut points = Vec::<DataPoint>::new();
296        let mut counters = CounterMap::new();
297
298        let max_points = write_frequency.as_secs() as usize * max_points_per_sec;
299
300        // Bind common arguments in the `Self::write()` call.
301        let write = |last_write_time: Instant,
302                     points: &mut Vec<DataPoint>,
303                     counters: &mut CounterMap|
304         -> Instant {
305            Self::write(
306                writer,
307                max_points,
308                max_points_per_sec,
309                last_write_time,
310                receiver.len(),
311                points,
312                counters,
313            )
314        };
315
316        loop {
317            match receiver.recv_timeout(write_frequency / 2) {
318                Ok(cmd) => match cmd {
319                    MetricsCommand::Flush(barrier) => {
320                        debug!("metrics_thread: flush");
321                        last_write_time = write(last_write_time, &mut points, &mut counters);
322                        barrier.wait();
323                    }
324                    MetricsCommand::Submit(point, level) => {
325                        log!(level, "{}", point);
326                        points.push(point);
327                    }
328                    MetricsCommand::SubmitCounter(counter, _level, bucket) => {
329                        debug!("{:?}", counter);
330                        let key = (counter.name, bucket);
331                        if let Some(value) = counters.get_mut(&key) {
332                            value.count += counter.count;
333                        } else {
334                            counters.insert(key, counter);
335                        }
336                    }
337                },
338                Err(RecvTimeoutError::Timeout) => (),
339                Err(RecvTimeoutError::Disconnected) => {
340                    debug!("run: sender disconnected");
341                    break;
342                }
343            }
344
345            let now = Instant::now();
346            if now.duration_since(last_write_time) >= write_frequency {
347                last_write_time = write(last_write_time, &mut points, &mut counters);
348            }
349        }
350
351        debug_assert!(
352            points.is_empty() && counters.is_empty(),
353            "Controlling `MetricsAgent` is expected to call `flush()` from the `Drop` \n\
354             implementation, before exiting.  So both `points` and `counters` must be empty at \n\
355             this point.\n\
356             `points`: {points:?}\n\
357             `counters`: {counters:?}",
358        );
359
360        trace!("run: exit");
361    }
362
363    pub fn submit(&self, point: DataPoint, level: log::Level) {
364        self.sender
365            .send(MetricsCommand::Submit(point, level))
366            .unwrap();
367    }
368
369    pub fn submit_counter(&self, counter: CounterPoint, level: log::Level, bucket: u64) {
370        self.sender
371            .send(MetricsCommand::SubmitCounter(counter, level, bucket))
372            .unwrap();
373    }
374
375    pub fn flush(&self) {
376        debug!("Flush");
377        let barrier = Arc::new(Barrier::new(2));
378        self.sender
379            .send(MetricsCommand::Flush(Arc::clone(&barrier)))
380            .unwrap();
381
382        barrier.wait();
383    }
384}
385
386impl Drop for MetricsAgent {
387    fn drop(&mut self) {
388        self.flush();
389    }
390}
391
392fn get_singleton_agent() -> &'static MetricsAgent {
393    lazy_static! {
394        static ref AGENT: MetricsAgent = MetricsAgent::default();
395    };
396
397    &AGENT
398}
399
400lazy_static! {
401    static ref HOST_ID: Arc<RwLock<String>> = {
402        Arc::new(RwLock::new({
403            let hostname: String = gethostname()
404                .into_string()
405                .unwrap_or_else(|_| "".to_string());
406            format!("{}", hash(hostname.as_bytes()))
407        }))
408    };
409}
410
411pub fn set_host_id(host_id: String) {
412    info!("host id: {}", host_id);
413    *HOST_ID.write().unwrap() = host_id;
414}
415
416/// Submits a new point from any thread.  Note that points are internally queued
417/// and transmitted periodically in batches.
418pub fn submit(point: DataPoint, level: log::Level) {
419    let agent = get_singleton_agent();
420    agent.submit(point, level);
421}
422
423/// Submits a new counter or updates an existing counter from any thread.  Note that points are
424/// internally queued and transmitted periodically in batches.
425pub(crate) fn submit_counter(point: CounterPoint, level: log::Level, bucket: u64) {
426    let agent = get_singleton_agent();
427    agent.submit_counter(point, level, bucket);
428}
429
430#[derive(Debug, Default)]
431struct MetricsConfig {
432    pub host: String,
433    pub db: String,
434    pub username: String,
435    pub password: String,
436}
437
438impl MetricsConfig {
439    fn complete(&self) -> bool {
440        !(self.host.is_empty()
441            || self.db.is_empty()
442            || self.username.is_empty()
443            || self.password.is_empty())
444    }
445}
446
447fn get_metrics_config() -> Result<MetricsConfig, MetricsError> {
448    let mut config = MetricsConfig::default();
449    let config_var = env::var("SOLANA_METRICS_CONFIG")?;
450    if config_var.is_empty() {
451        Err(env::VarError::NotPresent)?;
452    }
453
454    for pair in config_var.split(',') {
455        let nv: Vec<_> = pair.split('=').collect();
456        if nv.len() != 2 {
457            return Err(MetricsError::ConfigInvalid(pair.to_string()));
458        }
459        let v = nv[1].to_string();
460        match nv[0] {
461            "host" => config.host = v,
462            "db" => config.db = v,
463            "u" => config.username = v,
464            "p" => config.password = v,
465            _ => return Err(MetricsError::ConfigInvalid(pair.to_string())),
466        }
467    }
468
469    if !config.complete() {
470        return Err(MetricsError::ConfigIncomplete);
471    }
472
473    Ok(config)
474}
475
476pub fn metrics_config_sanity_check(cluster_type: ClusterType) -> Result<(), MetricsError> {
477    let config = match get_metrics_config() {
478        Ok(config) => config,
479        Err(MetricsError::VarError(env::VarError::NotPresent)) => return Ok(()),
480        Err(e) => return Err(e),
481    };
482    match &config.db[..] {
483        "mainnet-beta" if cluster_type != ClusterType::MainnetBeta => (),
484        "tds" if cluster_type != ClusterType::Testnet => (),
485        "devnet" if cluster_type != ClusterType::Devnet => (),
486        _ => return Ok(()),
487    };
488    let (host, db) = (&config.host, &config.db);
489    let msg = format!("cluster_type={cluster_type:?} host={host} database={db}");
490    Err(MetricsError::DbMismatch(msg))
491}
492
493pub fn query(q: &str) -> Result<String, MetricsError> {
494    let config = get_metrics_config()?;
495    let query_url = format!(
496        "{}/query?u={}&p={}&q={}",
497        &config.host, &config.username, &config.password, &q
498    );
499
500    let response = reqwest::blocking::get(query_url.as_str())?.text()?;
501
502    Ok(response)
503}
504
505/// Blocks until all pending points from previous calls to `submit` have been
506/// transmitted.
507pub fn flush() {
508    let agent = get_singleton_agent();
509    agent.flush();
510}
511
512/// Hook the panic handler to generate a data point on each panic
513pub fn set_panic_hook(program: &'static str, version: Option<String>) {
514    static SET_HOOK: Once = Once::new();
515    SET_HOOK.call_once(|| {
516        let default_hook = std::panic::take_hook();
517        std::panic::set_hook(Box::new(move |ono| {
518            default_hook(ono);
519            let location = match ono.location() {
520                Some(location) => location.to_string(),
521                None => "?".to_string(),
522            };
523            submit(
524                DataPoint::new("panic")
525                    .add_field_str("program", program)
526                    .add_field_str("thread", thread::current().name().unwrap_or("?"))
527                    // The 'one' field exists to give Kapacitor Alerts a numerical value
528                    // to filter on
529                    .add_field_i64("one", 1)
530                    .add_field_str("message", &ono.to_string())
531                    .add_field_str("location", &location)
532                    .add_field_str("version", version.as_ref().unwrap_or(&"".to_string()))
533                    .to_owned(),
534                Level::Error,
535            );
536            // Flush metrics immediately
537            flush();
538
539            // Exit cleanly so the process don't limp along in a half-dead state
540            std::process::exit(1);
541        }));
542    });
543}
544
545pub mod test_mocks {
546    use super::*;
547
548    pub struct MockMetricsWriter {
549        pub points_written: Arc<Mutex<Vec<DataPoint>>>,
550    }
551    impl MockMetricsWriter {
552        pub fn new() -> Self {
553            MockMetricsWriter {
554                points_written: Arc::new(Mutex::new(Vec::new())),
555            }
556        }
557
558        pub fn points_written(&self) -> usize {
559            self.points_written.lock().unwrap().len()
560        }
561    }
562
563    impl Default for MockMetricsWriter {
564        fn default() -> Self {
565            Self::new()
566        }
567    }
568
569    impl MetricsWriter for MockMetricsWriter {
570        fn write(&self, points: Vec<DataPoint>) {
571            assert!(!points.is_empty());
572
573            let new_points = points.len();
574            self.points_written.lock().unwrap().extend(points);
575
576            info!(
577                "Writing {} points ({} total)",
578                new_points,
579                self.points_written(),
580            );
581        }
582    }
583}
584
585#[cfg(test)]
586mod test {
587    use {super::*, test_mocks::MockMetricsWriter};
588
589    #[test]
590    fn test_submit() {
591        let writer = Arc::new(MockMetricsWriter::new());
592        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
593
594        for i in 0..42 {
595            agent.submit(
596                DataPoint::new("measurement")
597                    .add_field_i64("i", i)
598                    .to_owned(),
599                Level::Info,
600            );
601        }
602
603        agent.flush();
604        assert_eq!(writer.points_written(), 43);
605    }
606
607    #[test]
608    fn test_submit_counter() {
609        let writer = Arc::new(MockMetricsWriter::new());
610        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
611
612        for i in 0..10 {
613            agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i);
614            agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i);
615        }
616
617        agent.flush();
618        assert_eq!(writer.points_written(), 21);
619    }
620
621    #[test]
622    fn test_submit_counter_increment() {
623        let writer = Arc::new(MockMetricsWriter::new());
624        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
625
626        for _ in 0..10 {
627            agent.submit_counter(
628                CounterPoint {
629                    name: "counter",
630                    count: 10,
631                    timestamp: UNIX_EPOCH,
632                },
633                Level::Info,
634                0, // use the same bucket
635            );
636        }
637
638        agent.flush();
639        assert_eq!(writer.points_written(), 2);
640
641        let submitted_point = writer.points_written.lock().unwrap()[0].clone();
642        assert_eq!(submitted_point.fields[0], ("count", "100i".to_string()));
643    }
644
645    #[test]
646    fn test_submit_bucketed_counter() {
647        let writer = Arc::new(MockMetricsWriter::new());
648        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
649
650        for i in 0..50 {
651            agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i / 10);
652            agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i / 10);
653        }
654
655        agent.flush();
656        assert_eq!(writer.points_written(), 11);
657    }
658
659    #[test]
660    fn test_submit_with_delay() {
661        let writer = Arc::new(MockMetricsWriter::new());
662        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000);
663
664        agent.submit(DataPoint::new("point 1"), Level::Info);
665        thread::sleep(Duration::from_secs(2));
666        assert_eq!(writer.points_written(), 2);
667    }
668
669    #[test]
670    fn test_submit_exceed_max_rate() {
671        let writer = Arc::new(MockMetricsWriter::new());
672
673        let max_points_per_sec = 100;
674
675        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), max_points_per_sec);
676
677        for i in 0..(max_points_per_sec + 20) {
678            agent.submit(
679                DataPoint::new("measurement")
680                    .add_field_i64("i", i.try_into().unwrap())
681                    .to_owned(),
682                Level::Info,
683            );
684        }
685
686        thread::sleep(Duration::from_secs(2));
687
688        agent.flush();
689
690        // We are expecting `max_points_per_sec - 1` data points from `submit()` and two more metric
691        // stats data points.  One from the timeout when all the `submit()`ed values are sent when 1
692        // second is elapsed, and then one more from the explicit `flush()`.
693        assert_eq!(writer.points_written(), max_points_per_sec + 1);
694    }
695
696    #[test]
697    fn test_multithread_submit() {
698        let writer = Arc::new(MockMetricsWriter::new());
699        let agent = Arc::new(Mutex::new(MetricsAgent::new(
700            writer.clone(),
701            Duration::from_secs(10),
702            1000,
703        )));
704
705        //
706        // Submit measurements from different threads
707        //
708        let mut threads = Vec::new();
709        for i in 0..42 {
710            let mut point = DataPoint::new("measurement");
711            point.add_field_i64("i", i);
712            let agent = Arc::clone(&agent);
713            threads.push(thread::spawn(move || {
714                agent.lock().unwrap().submit(point, Level::Info);
715            }));
716        }
717
718        for thread in threads {
719            thread.join().unwrap();
720        }
721
722        agent.lock().unwrap().flush();
723        assert_eq!(writer.points_written(), 43);
724    }
725
726    #[test]
727    fn test_flush_before_drop() {
728        let writer = Arc::new(MockMetricsWriter::new());
729        {
730            let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9_999_999), 1000);
731            agent.submit(DataPoint::new("point 1"), Level::Info);
732        }
733
734        // The datapoints we expect to see are:
735        // 1. `point 1` from the above.
736        // 2. `metrics` stats submitted as a result of the `Flush` sent by `agent` being destroyed.
737        assert_eq!(writer.points_written(), 2);
738    }
739
740    #[test]
741    fn test_live_submit() {
742        let agent = MetricsAgent::default();
743
744        let point = DataPoint::new("live_submit_test")
745            .add_field_bool("true", true)
746            .add_field_bool("random_bool", rand::random::<u8>() < 128)
747            .add_field_i64("random_int", rand::random::<u8>() as i64)
748            .to_owned();
749        agent.submit(point, Level::Info);
750    }
751}