solana_metrics/
metrics.rs

1//! The `metrics` module enables sending measurements to an `InfluxDB` instance
2
3use {
4    crate::{counter::CounterPoint, datapoint::DataPoint},
5    crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
6    gethostname::gethostname,
7    lazy_static::lazy_static,
8    log::*,
9    solana_sdk::hash::hash,
10    std::{
11        cmp,
12        collections::HashMap,
13        convert::Into,
14        env,
15        fmt::Write,
16        sync::{Arc, Barrier, Mutex, Once, RwLock},
17        thread,
18        time::{Duration, Instant, UNIX_EPOCH},
19    },
20};
21
22type CounterMap = HashMap<(&'static str, u64), CounterPoint>;
23
24impl From<&CounterPoint> for DataPoint {
25    fn from(counter_point: &CounterPoint) -> Self {
26        let mut point = Self::new(counter_point.name);
27        point.timestamp = counter_point.timestamp;
28        point.add_field_i64("count", counter_point.count);
29        point
30    }
31}
32
33#[derive(Debug)]
34enum MetricsCommand {
35    Flush(Arc<Barrier>),
36    Submit(DataPoint, log::Level),
37    SubmitCounter(CounterPoint, log::Level, u64),
38}
39
40pub struct MetricsAgent {
41    sender: Sender<MetricsCommand>,
42}
43
44pub trait MetricsWriter {
45    // Write the points and empty the vector.  Called on the internal
46    // MetricsAgent worker thread.
47    fn write(&self, points: Vec<DataPoint>);
48}
49
50struct InfluxDbMetricsWriter {
51    write_url: Option<String>,
52}
53
54impl InfluxDbMetricsWriter {
55    fn new() -> Self {
56        Self {
57            write_url: Self::build_write_url().ok(),
58        }
59    }
60
61    fn build_write_url() -> Result<String, String> {
62        let config = get_metrics_config().map_err(|err| {
63            info!("metrics disabled: {}", err);
64            err
65        })?;
66
67        info!(
68            "metrics configuration: host={} db={} username={}",
69            config.host, config.db, config.username
70        );
71
72        let write_url = format!(
73            "{}/write?db={}&u={}&p={}&precision=n",
74            &config.host, &config.db, &config.username, &config.password
75        );
76
77        Ok(write_url)
78    }
79}
80
81pub fn serialize_points(points: &Vec<DataPoint>, host_id: &str) -> String {
82    const TIMESTAMP_LEN: usize = 20;
83    const HOST_ID_LEN: usize = 8; // "host_id=".len()
84    const EXTRA_LEN: usize = 2; // "=,".len()
85    let mut len = 0;
86    for point in points {
87        for (name, value) in &point.fields {
88            len += name.len() + value.len() + EXTRA_LEN;
89        }
90        for (name, value) in &point.tags {
91            len += name.len() + value.len() + EXTRA_LEN;
92        }
93        len += point.name.len();
94        len += TIMESTAMP_LEN;
95        len += host_id.len() + HOST_ID_LEN;
96    }
97    let mut line = String::with_capacity(len);
98    for point in points {
99        let _ = write!(line, "{},host_id={}", &point.name, host_id);
100        for (name, value) in point.tags.iter() {
101            let _ = write!(line, ",{}={}", name, value);
102        }
103
104        let mut first = true;
105        for (name, value) in point.fields.iter() {
106            let _ = write!(line, "{}{}={}", if first { ' ' } else { ',' }, name, value);
107            first = false;
108        }
109        let timestamp = point.timestamp.duration_since(UNIX_EPOCH);
110        let nanos = timestamp.unwrap().as_nanos();
111        let _ = writeln!(line, " {}", nanos);
112    }
113    line
114}
115
116impl MetricsWriter for InfluxDbMetricsWriter {
117    fn write(&self, points: Vec<DataPoint>) {
118        if let Some(ref write_url) = self.write_url {
119            debug!("submitting {} points", points.len());
120
121            let host_id = HOST_ID.read().unwrap();
122
123            let line = serialize_points(&points, &host_id);
124
125            let client = reqwest::blocking::Client::builder()
126                .timeout(Duration::from_secs(5))
127                .build();
128            let client = match client {
129                Ok(client) => client,
130                Err(err) => {
131                    warn!("client instantiation failed: {}", err);
132                    return;
133                }
134            };
135
136            let response = client.post(write_url.as_str()).body(line).send();
137            if let Ok(resp) = response {
138                let status = resp.status();
139                if !status.is_success() {
140                    let text = resp
141                        .text()
142                        .unwrap_or_else(|_| "[text body empty]".to_string());
143                    warn!("submit response unsuccessful: {} {}", status, text,);
144                }
145            } else {
146                warn!("submit error: {}", response.unwrap_err());
147            }
148        }
149    }
150}
151
152impl Default for MetricsAgent {
153    fn default() -> Self {
154        let max_points_per_sec = env::var("SAFECOIN_METRICS_MAX_POINTS_PER_SECOND")
155            .map(|x| {
156                x.parse()
157                    .expect("Failed to parse SAFECOIN_METRICS_MAX_POINTS_PER_SECOND")
158            })
159            .unwrap_or(4000);
160
161        Self::new(
162            Arc::new(InfluxDbMetricsWriter::new()),
163            Duration::from_secs(10),
164            max_points_per_sec,
165        )
166    }
167}
168
169impl MetricsAgent {
170    pub fn new(
171        writer: Arc<dyn MetricsWriter + Send + Sync>,
172        write_frequency: Duration,
173        max_points_per_sec: usize,
174    ) -> Self {
175        let (sender, receiver) = unbounded::<MetricsCommand>();
176
177        thread::Builder::new()
178            .name("solMetricsAgent".into())
179            .spawn(move || Self::run(&receiver, &writer, write_frequency, max_points_per_sec))
180            .unwrap();
181
182        Self { sender }
183    }
184
185    fn collect_points(points: &mut Vec<DataPoint>, counters: &mut CounterMap) -> Vec<DataPoint> {
186        let mut ret = std::mem::take(points);
187        ret.extend(counters.values().map(|v| v.into()));
188        counters.clear();
189        ret
190    }
191
192    fn write(
193        writer: &Arc<dyn MetricsWriter + Send + Sync>,
194        mut points: Vec<DataPoint>,
195        max_points: usize,
196        max_points_per_sec: usize,
197        last_write_time: Instant,
198        points_buffered: usize,
199    ) {
200        if points.is_empty() {
201            return;
202        }
203
204        let now = Instant::now();
205        let num_points = points.len();
206        debug!("run: attempting to write {} points", num_points);
207        if num_points > max_points {
208            warn!(
209                "max submission rate of {} datapoints per second exceeded.  only the
210                    first {} of {} points will be submitted",
211                max_points_per_sec, max_points, num_points
212            );
213        }
214        let points_written = cmp::min(num_points, max_points - 1);
215        points.truncate(points_written);
216        points.push(
217            DataPoint::new("metrics")
218                .add_field_i64("points_written", points_written as i64)
219                .add_field_i64("num_points", num_points as i64)
220                .add_field_i64("points_lost", (num_points - points_written) as i64)
221                .add_field_i64("points_buffered", points_buffered as i64)
222                .add_field_i64(
223                    "secs_since_last_write",
224                    now.duration_since(last_write_time).as_secs() as i64,
225                )
226                .to_owned(),
227        );
228
229        writer.write(points);
230    }
231
232    fn run(
233        receiver: &Receiver<MetricsCommand>,
234        writer: &Arc<dyn MetricsWriter + Send + Sync>,
235        write_frequency: Duration,
236        max_points_per_sec: usize,
237    ) {
238        trace!("run: enter");
239        let mut last_write_time = Instant::now();
240        let mut points = Vec::<DataPoint>::new();
241        let mut counters = CounterMap::new();
242
243        let max_points = write_frequency.as_secs() as usize * max_points_per_sec;
244
245        loop {
246            match receiver.recv_timeout(write_frequency / 2) {
247                Ok(cmd) => match cmd {
248                    MetricsCommand::Flush(barrier) => {
249                        debug!("metrics_thread: flush");
250                        Self::write(
251                            writer,
252                            Self::collect_points(&mut points, &mut counters),
253                            max_points,
254                            max_points_per_sec,
255                            last_write_time,
256                            receiver.len(),
257                        );
258                        last_write_time = Instant::now();
259                        barrier.wait();
260                    }
261                    MetricsCommand::Submit(point, level) => {
262                        log!(level, "{}", point);
263                        points.push(point);
264                    }
265                    MetricsCommand::SubmitCounter(counter, _level, bucket) => {
266                        debug!("{:?}", counter);
267                        let key = (counter.name, bucket);
268                        if let Some(value) = counters.get_mut(&key) {
269                            value.count += counter.count;
270                        } else {
271                            counters.insert(key, counter);
272                        }
273                    }
274                },
275                Err(RecvTimeoutError::Timeout) => {
276                    trace!("run: receive timeout");
277                }
278                Err(RecvTimeoutError::Disconnected) => {
279                    debug!("run: sender disconnected");
280                    break;
281                }
282            }
283
284            let now = Instant::now();
285            if now.duration_since(last_write_time) >= write_frequency {
286                Self::write(
287                    writer,
288                    Self::collect_points(&mut points, &mut counters),
289                    max_points,
290                    max_points_per_sec,
291                    last_write_time,
292                    receiver.len(),
293                );
294                last_write_time = now;
295            }
296        }
297        trace!("run: exit");
298    }
299
300    pub fn submit(&self, point: DataPoint, level: log::Level) {
301        self.sender
302            .send(MetricsCommand::Submit(point, level))
303            .unwrap();
304    }
305
306    pub fn submit_counter(&self, counter: CounterPoint, level: log::Level, bucket: u64) {
307        self.sender
308            .send(MetricsCommand::SubmitCounter(counter, level, bucket))
309            .unwrap();
310    }
311
312    pub fn flush(&self) {
313        debug!("Flush");
314        let barrier = Arc::new(Barrier::new(2));
315        self.sender
316            .send(MetricsCommand::Flush(Arc::clone(&barrier)))
317            .unwrap();
318
319        barrier.wait();
320    }
321}
322
323impl Drop for MetricsAgent {
324    fn drop(&mut self) {
325        self.flush();
326    }
327}
328
329fn get_singleton_agent() -> Arc<Mutex<MetricsAgent>> {
330    static INIT: Once = Once::new();
331    static mut AGENT: Option<Arc<Mutex<MetricsAgent>>> = None;
332    unsafe {
333        INIT.call_once(|| AGENT = Some(Arc::new(Mutex::new(MetricsAgent::default()))));
334        match AGENT {
335            Some(ref agent) => agent.clone(),
336            None => panic!("Failed to initialize metrics agent"),
337        }
338    }
339}
340
341lazy_static! {
342    static ref HOST_ID: Arc<RwLock<String>> = {
343        Arc::new(RwLock::new({
344            let hostname: String = gethostname()
345                .into_string()
346                .unwrap_or_else(|_| "".to_string());
347            format!("{}", hash(hostname.as_bytes()))
348        }))
349    };
350}
351
352pub fn set_host_id(host_id: String) {
353    info!("host id: {}", host_id);
354    *HOST_ID.write().unwrap() = host_id;
355}
356
357/// Submits a new point from any thread.  Note that points are internally queued
358/// and transmitted periodically in batches.
359pub fn submit(point: DataPoint, level: log::Level) {
360    let agent_mutex = get_singleton_agent();
361    let agent = agent_mutex.lock().unwrap();
362    agent.submit(point, level);
363}
364
365/// Submits a new counter or updates an existing counter from any thread.  Note that points are
366/// internally queued and transmitted periodically in batches.
367pub(crate) fn submit_counter(point: CounterPoint, level: log::Level, bucket: u64) {
368    let agent_mutex = get_singleton_agent();
369    let agent = agent_mutex.lock().unwrap();
370    agent.submit_counter(point, level, bucket);
371}
372
373#[derive(Debug, Default)]
374struct MetricsConfig {
375    pub host: String,
376    pub db: String,
377    pub username: String,
378    pub password: String,
379}
380
381impl MetricsConfig {
382    fn complete(&self) -> bool {
383        !(self.host.is_empty()
384            || self.db.is_empty()
385            || self.username.is_empty()
386            || self.password.is_empty())
387    }
388}
389
390fn get_metrics_config() -> Result<MetricsConfig, String> {
391    let mut config = MetricsConfig::default();
392
393    let config_var = env::var("SAFECOIN_METRICS_CONFIG")
394        .map_err(|err| format!("SAFECOIN_METRICS_CONFIG: {}", err))?;
395
396    for pair in config_var.split(',') {
397        let nv: Vec<_> = pair.split('=').collect();
398        if nv.len() != 2 {
399            return Err(format!("SAFECOIN_METRICS_CONFIG is invalid: '{}'", pair));
400        }
401        let v = nv[1].to_string();
402        match nv[0] {
403            "host" => config.host = v,
404            "db" => config.db = v,
405            "u" => config.username = v,
406            "p" => config.password = v,
407            _ => return Err(format!("SAFECOIN_METRICS_CONFIG is invalid: '{}'", pair)),
408        }
409    }
410
411    if !config.complete() {
412        return Err("SAFECOIN_METRICS_CONFIG is incomplete".to_string());
413    }
414    Ok(config)
415}
416
417pub fn query(q: &str) -> Result<String, String> {
418    let config = get_metrics_config()?;
419    let query_url = format!(
420        "{}/query?u={}&p={}&q={}",
421        &config.host, &config.username, &config.password, &q
422    );
423
424    let response = reqwest::blocking::get(query_url.as_str())
425        .map_err(|err| err.to_string())?
426        .text()
427        .map_err(|err| err.to_string())?;
428
429    Ok(response)
430}
431
432/// Blocks until all pending points from previous calls to `submit` have been
433/// transmitted.
434pub fn flush() {
435    let agent_mutex = get_singleton_agent();
436    let agent = agent_mutex.lock().unwrap();
437    agent.flush();
438}
439
440/// Hook the panic handler to generate a data point on each panic
441pub fn set_panic_hook(program: &'static str, version: Option<String>) {
442    static SET_HOOK: Once = Once::new();
443    SET_HOOK.call_once(|| {
444        let default_hook = std::panic::take_hook();
445        std::panic::set_hook(Box::new(move |ono| {
446            default_hook(ono);
447            let location = match ono.location() {
448                Some(location) => location.to_string(),
449                None => "?".to_string(),
450            };
451            submit(
452                DataPoint::new("panic")
453                    .add_field_str("program", program)
454                    .add_field_str("thread", thread::current().name().unwrap_or("?"))
455                    // The 'one' field exists to give Kapacitor Alerts a numerical value
456                    // to filter on
457                    .add_field_i64("one", 1)
458                    .add_field_str("message", &ono.to_string())
459                    .add_field_str("location", &location)
460                    .add_field_str("version", version.as_ref().unwrap_or(&"".to_string()))
461                    .to_owned(),
462                Level::Error,
463            );
464            // Flush metrics immediately
465            flush();
466
467            // Exit cleanly so the process don't limp along in a half-dead state
468            std::process::exit(1);
469        }));
470    });
471}
472
473pub mod test_mocks {
474    use super::*;
475
476    pub struct MockMetricsWriter {
477        pub points_written: Arc<Mutex<Vec<DataPoint>>>,
478    }
479    impl MockMetricsWriter {
480        #[allow(dead_code)]
481        pub fn new() -> Self {
482            MockMetricsWriter {
483                points_written: Arc::new(Mutex::new(Vec::new())),
484            }
485        }
486
487        pub fn points_written(&self) -> usize {
488            self.points_written.lock().unwrap().len()
489        }
490    }
491
492    impl Default for MockMetricsWriter {
493        fn default() -> Self {
494            Self::new()
495        }
496    }
497
498    impl MetricsWriter for MockMetricsWriter {
499        fn write(&self, points: Vec<DataPoint>) {
500            assert!(!points.is_empty());
501
502            let new_points = points.len();
503            self.points_written
504                .lock()
505                .unwrap()
506                .extend(points.into_iter());
507
508            info!(
509                "Writing {} points ({} total)",
510                new_points,
511                self.points_written(),
512            );
513        }
514    }
515}
516
517#[cfg(test)]
518mod test {
519    use {super::*, test_mocks::MockMetricsWriter};
520
521    #[test]
522    fn test_submit() {
523        let writer = Arc::new(MockMetricsWriter::new());
524        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
525
526        for i in 0..42 {
527            agent.submit(
528                DataPoint::new("measurement")
529                    .add_field_i64("i", i)
530                    .to_owned(),
531                Level::Info,
532            );
533        }
534
535        agent.flush();
536        assert_eq!(writer.points_written(), 43);
537    }
538
539    #[test]
540    fn test_submit_counter() {
541        let writer = Arc::new(MockMetricsWriter::new());
542        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
543
544        for i in 0..10 {
545            agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i);
546            agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i);
547        }
548
549        agent.flush();
550        assert_eq!(writer.points_written(), 21);
551    }
552
553    #[test]
554    fn test_submit_counter_increment() {
555        let writer = Arc::new(MockMetricsWriter::new());
556        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
557
558        for _ in 0..10 {
559            agent.submit_counter(
560                CounterPoint {
561                    name: "counter",
562                    count: 10,
563                    timestamp: UNIX_EPOCH,
564                },
565                Level::Info,
566                0, // use the same bucket
567            );
568        }
569
570        agent.flush();
571        assert_eq!(writer.points_written(), 2);
572
573        let submitted_point = writer.points_written.lock().unwrap()[0].clone();
574        assert_eq!(submitted_point.fields[0], ("count", "100i".to_string()));
575    }
576
577    #[test]
578    fn test_submit_bucketed_counter() {
579        let writer = Arc::new(MockMetricsWriter::new());
580        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
581
582        for i in 0..50 {
583            agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i / 10);
584            agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i / 10);
585        }
586
587        agent.flush();
588        assert_eq!(writer.points_written(), 11);
589    }
590
591    #[test]
592    fn test_submit_with_delay() {
593        let writer = Arc::new(MockMetricsWriter::new());
594        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000);
595
596        agent.submit(DataPoint::new("point 1"), Level::Info);
597        thread::sleep(Duration::from_secs(2));
598        assert_eq!(writer.points_written(), 2);
599    }
600
601    #[test]
602    fn test_submit_exceed_max_rate() {
603        let writer = Arc::new(MockMetricsWriter::new());
604        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 100);
605
606        for i in 0..102 {
607            agent.submit(
608                DataPoint::new("measurement")
609                    .add_field_i64("i", i)
610                    .to_owned(),
611                Level::Info,
612            );
613        }
614
615        thread::sleep(Duration::from_secs(2));
616
617        agent.flush();
618        assert_eq!(writer.points_written(), 100);
619    }
620
621    #[test]
622    fn test_multithread_submit() {
623        let writer = Arc::new(MockMetricsWriter::new());
624        let agent = Arc::new(Mutex::new(MetricsAgent::new(
625            writer.clone(),
626            Duration::from_secs(10),
627            1000,
628        )));
629
630        //
631        // Submit measurements from different threads
632        //
633        let mut threads = Vec::new();
634        for i in 0..42 {
635            let mut point = DataPoint::new("measurement");
636            point.add_field_i64("i", i);
637            let agent = Arc::clone(&agent);
638            threads.push(thread::spawn(move || {
639                agent.lock().unwrap().submit(point, Level::Info);
640            }));
641        }
642
643        for thread in threads {
644            thread.join().unwrap();
645        }
646
647        agent.lock().unwrap().flush();
648        assert_eq!(writer.points_written(), 43);
649    }
650
651    #[test]
652    fn test_flush_before_drop() {
653        let writer = Arc::new(MockMetricsWriter::new());
654        {
655            let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9_999_999), 1000);
656            agent.submit(DataPoint::new("point 1"), Level::Info);
657        }
658
659        assert_eq!(writer.points_written(), 2);
660    }
661
662    #[test]
663    fn test_live_submit() {
664        let agent = MetricsAgent::default();
665
666        let point = DataPoint::new("live_submit_test")
667            .add_field_bool("true", true)
668            .add_field_bool("random_bool", rand::random::<u8>() < 128)
669            .add_field_i64("random_int", rand::random::<u8>() as i64)
670            .to_owned();
671        agent.submit(point, Level::Info);
672    }
673}