1use {
4 crate::{counter::CounterPoint, datapoint::DataPoint},
5 crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
6 gethostname::gethostname,
7 lazy_static::lazy_static,
8 log::*,
9 solana_sdk::hash::hash,
10 std::{
11 cmp,
12 collections::HashMap,
13 convert::Into,
14 env,
15 fmt::Write,
16 sync::{Arc, Barrier, Mutex, Once, RwLock},
17 thread,
18 time::{Duration, Instant, UNIX_EPOCH},
19 },
20};
21
22type CounterMap = HashMap<(&'static str, u64), CounterPoint>;
23
24impl From<&CounterPoint> for DataPoint {
25 fn from(counter_point: &CounterPoint) -> Self {
26 let mut point = Self::new(counter_point.name);
27 point.timestamp = counter_point.timestamp;
28 point.add_field_i64("count", counter_point.count);
29 point
30 }
31}
32
33#[derive(Debug)]
34enum MetricsCommand {
35 Flush(Arc<Barrier>),
36 Submit(DataPoint, log::Level),
37 SubmitCounter(CounterPoint, log::Level, u64),
38}
39
40pub struct MetricsAgent {
41 sender: Sender<MetricsCommand>,
42}
43
44pub trait MetricsWriter {
45 fn write(&self, points: Vec<DataPoint>);
48}
49
50struct InfluxDbMetricsWriter {
51 write_url: Option<String>,
52}
53
54impl InfluxDbMetricsWriter {
55 fn new() -> Self {
56 Self {
57 write_url: Self::build_write_url().ok(),
58 }
59 }
60
61 fn build_write_url() -> Result<String, String> {
62 let config = get_metrics_config().map_err(|err| {
63 info!("metrics disabled: {}", err);
64 err
65 })?;
66
67 info!(
68 "metrics configuration: host={} db={} username={}",
69 config.host, config.db, config.username
70 );
71
72 let write_url = format!(
73 "{}/write?db={}&u={}&p={}&precision=n",
74 &config.host, &config.db, &config.username, &config.password
75 );
76
77 Ok(write_url)
78 }
79}
80
81pub fn serialize_points(points: &Vec<DataPoint>, host_id: &str) -> String {
82 const TIMESTAMP_LEN: usize = 20;
83 const HOST_ID_LEN: usize = 8; const EXTRA_LEN: usize = 2; let mut len = 0;
86 for point in points {
87 for (name, value) in &point.fields {
88 len += name.len() + value.len() + EXTRA_LEN;
89 }
90 for (name, value) in &point.tags {
91 len += name.len() + value.len() + EXTRA_LEN;
92 }
93 len += point.name.len();
94 len += TIMESTAMP_LEN;
95 len += host_id.len() + HOST_ID_LEN;
96 }
97 let mut line = String::with_capacity(len);
98 for point in points {
99 let _ = write!(line, "{},host_id={}", &point.name, host_id);
100 for (name, value) in point.tags.iter() {
101 let _ = write!(line, ",{}={}", name, value);
102 }
103
104 let mut first = true;
105 for (name, value) in point.fields.iter() {
106 let _ = write!(line, "{}{}={}", if first { ' ' } else { ',' }, name, value);
107 first = false;
108 }
109 let timestamp = point.timestamp.duration_since(UNIX_EPOCH);
110 let nanos = timestamp.unwrap().as_nanos();
111 let _ = writeln!(line, " {}", nanos);
112 }
113 line
114}
115
116impl MetricsWriter for InfluxDbMetricsWriter {
117 fn write(&self, points: Vec<DataPoint>) {
118 if let Some(ref write_url) = self.write_url {
119 debug!("submitting {} points", points.len());
120
121 let host_id = HOST_ID.read().unwrap();
122
123 let line = serialize_points(&points, &host_id);
124
125 let client = reqwest::blocking::Client::builder()
126 .timeout(Duration::from_secs(5))
127 .build();
128 let client = match client {
129 Ok(client) => client,
130 Err(err) => {
131 warn!("client instantiation failed: {}", err);
132 return;
133 }
134 };
135
136 let response = client.post(write_url.as_str()).body(line).send();
137 if let Ok(resp) = response {
138 let status = resp.status();
139 if !status.is_success() {
140 let text = resp
141 .text()
142 .unwrap_or_else(|_| "[text body empty]".to_string());
143 warn!("submit response unsuccessful: {} {}", status, text,);
144 }
145 } else {
146 warn!("submit error: {}", response.unwrap_err());
147 }
148 }
149 }
150}
151
152impl Default for MetricsAgent {
153 fn default() -> Self {
154 let max_points_per_sec = env::var("SAFECOIN_METRICS_MAX_POINTS_PER_SECOND")
155 .map(|x| {
156 x.parse()
157 .expect("Failed to parse SAFECOIN_METRICS_MAX_POINTS_PER_SECOND")
158 })
159 .unwrap_or(4000);
160
161 Self::new(
162 Arc::new(InfluxDbMetricsWriter::new()),
163 Duration::from_secs(10),
164 max_points_per_sec,
165 )
166 }
167}
168
169impl MetricsAgent {
170 pub fn new(
171 writer: Arc<dyn MetricsWriter + Send + Sync>,
172 write_frequency: Duration,
173 max_points_per_sec: usize,
174 ) -> Self {
175 let (sender, receiver) = unbounded::<MetricsCommand>();
176
177 thread::Builder::new()
178 .name("solMetricsAgent".into())
179 .spawn(move || Self::run(&receiver, &writer, write_frequency, max_points_per_sec))
180 .unwrap();
181
182 Self { sender }
183 }
184
185 fn collect_points(points: &mut Vec<DataPoint>, counters: &mut CounterMap) -> Vec<DataPoint> {
186 let mut ret = std::mem::take(points);
187 ret.extend(counters.values().map(|v| v.into()));
188 counters.clear();
189 ret
190 }
191
192 fn write(
193 writer: &Arc<dyn MetricsWriter + Send + Sync>,
194 mut points: Vec<DataPoint>,
195 max_points: usize,
196 max_points_per_sec: usize,
197 last_write_time: Instant,
198 points_buffered: usize,
199 ) {
200 if points.is_empty() {
201 return;
202 }
203
204 let now = Instant::now();
205 let num_points = points.len();
206 debug!("run: attempting to write {} points", num_points);
207 if num_points > max_points {
208 warn!(
209 "max submission rate of {} datapoints per second exceeded. only the
210 first {} of {} points will be submitted",
211 max_points_per_sec, max_points, num_points
212 );
213 }
214 let points_written = cmp::min(num_points, max_points - 1);
215 points.truncate(points_written);
216 points.push(
217 DataPoint::new("metrics")
218 .add_field_i64("points_written", points_written as i64)
219 .add_field_i64("num_points", num_points as i64)
220 .add_field_i64("points_lost", (num_points - points_written) as i64)
221 .add_field_i64("points_buffered", points_buffered as i64)
222 .add_field_i64(
223 "secs_since_last_write",
224 now.duration_since(last_write_time).as_secs() as i64,
225 )
226 .to_owned(),
227 );
228
229 writer.write(points);
230 }
231
232 fn run(
233 receiver: &Receiver<MetricsCommand>,
234 writer: &Arc<dyn MetricsWriter + Send + Sync>,
235 write_frequency: Duration,
236 max_points_per_sec: usize,
237 ) {
238 trace!("run: enter");
239 let mut last_write_time = Instant::now();
240 let mut points = Vec::<DataPoint>::new();
241 let mut counters = CounterMap::new();
242
243 let max_points = write_frequency.as_secs() as usize * max_points_per_sec;
244
245 loop {
246 match receiver.recv_timeout(write_frequency / 2) {
247 Ok(cmd) => match cmd {
248 MetricsCommand::Flush(barrier) => {
249 debug!("metrics_thread: flush");
250 Self::write(
251 writer,
252 Self::collect_points(&mut points, &mut counters),
253 max_points,
254 max_points_per_sec,
255 last_write_time,
256 receiver.len(),
257 );
258 last_write_time = Instant::now();
259 barrier.wait();
260 }
261 MetricsCommand::Submit(point, level) => {
262 log!(level, "{}", point);
263 points.push(point);
264 }
265 MetricsCommand::SubmitCounter(counter, _level, bucket) => {
266 debug!("{:?}", counter);
267 let key = (counter.name, bucket);
268 if let Some(value) = counters.get_mut(&key) {
269 value.count += counter.count;
270 } else {
271 counters.insert(key, counter);
272 }
273 }
274 },
275 Err(RecvTimeoutError::Timeout) => {
276 trace!("run: receive timeout");
277 }
278 Err(RecvTimeoutError::Disconnected) => {
279 debug!("run: sender disconnected");
280 break;
281 }
282 }
283
284 let now = Instant::now();
285 if now.duration_since(last_write_time) >= write_frequency {
286 Self::write(
287 writer,
288 Self::collect_points(&mut points, &mut counters),
289 max_points,
290 max_points_per_sec,
291 last_write_time,
292 receiver.len(),
293 );
294 last_write_time = now;
295 }
296 }
297 trace!("run: exit");
298 }
299
300 pub fn submit(&self, point: DataPoint, level: log::Level) {
301 self.sender
302 .send(MetricsCommand::Submit(point, level))
303 .unwrap();
304 }
305
306 pub fn submit_counter(&self, counter: CounterPoint, level: log::Level, bucket: u64) {
307 self.sender
308 .send(MetricsCommand::SubmitCounter(counter, level, bucket))
309 .unwrap();
310 }
311
312 pub fn flush(&self) {
313 debug!("Flush");
314 let barrier = Arc::new(Barrier::new(2));
315 self.sender
316 .send(MetricsCommand::Flush(Arc::clone(&barrier)))
317 .unwrap();
318
319 barrier.wait();
320 }
321}
322
323impl Drop for MetricsAgent {
324 fn drop(&mut self) {
325 self.flush();
326 }
327}
328
329fn get_singleton_agent() -> Arc<Mutex<MetricsAgent>> {
330 static INIT: Once = Once::new();
331 static mut AGENT: Option<Arc<Mutex<MetricsAgent>>> = None;
332 unsafe {
333 INIT.call_once(|| AGENT = Some(Arc::new(Mutex::new(MetricsAgent::default()))));
334 match AGENT {
335 Some(ref agent) => agent.clone(),
336 None => panic!("Failed to initialize metrics agent"),
337 }
338 }
339}
340
341lazy_static! {
342 static ref HOST_ID: Arc<RwLock<String>> = {
343 Arc::new(RwLock::new({
344 let hostname: String = gethostname()
345 .into_string()
346 .unwrap_or_else(|_| "".to_string());
347 format!("{}", hash(hostname.as_bytes()))
348 }))
349 };
350}
351
352pub fn set_host_id(host_id: String) {
353 info!("host id: {}", host_id);
354 *HOST_ID.write().unwrap() = host_id;
355}
356
357pub fn submit(point: DataPoint, level: log::Level) {
360 let agent_mutex = get_singleton_agent();
361 let agent = agent_mutex.lock().unwrap();
362 agent.submit(point, level);
363}
364
365pub(crate) fn submit_counter(point: CounterPoint, level: log::Level, bucket: u64) {
368 let agent_mutex = get_singleton_agent();
369 let agent = agent_mutex.lock().unwrap();
370 agent.submit_counter(point, level, bucket);
371}
372
373#[derive(Debug, Default)]
374struct MetricsConfig {
375 pub host: String,
376 pub db: String,
377 pub username: String,
378 pub password: String,
379}
380
381impl MetricsConfig {
382 fn complete(&self) -> bool {
383 !(self.host.is_empty()
384 || self.db.is_empty()
385 || self.username.is_empty()
386 || self.password.is_empty())
387 }
388}
389
390fn get_metrics_config() -> Result<MetricsConfig, String> {
391 let mut config = MetricsConfig::default();
392
393 let config_var = env::var("SAFECOIN_METRICS_CONFIG")
394 .map_err(|err| format!("SAFECOIN_METRICS_CONFIG: {}", err))?;
395
396 for pair in config_var.split(',') {
397 let nv: Vec<_> = pair.split('=').collect();
398 if nv.len() != 2 {
399 return Err(format!("SAFECOIN_METRICS_CONFIG is invalid: '{}'", pair));
400 }
401 let v = nv[1].to_string();
402 match nv[0] {
403 "host" => config.host = v,
404 "db" => config.db = v,
405 "u" => config.username = v,
406 "p" => config.password = v,
407 _ => return Err(format!("SAFECOIN_METRICS_CONFIG is invalid: '{}'", pair)),
408 }
409 }
410
411 if !config.complete() {
412 return Err("SAFECOIN_METRICS_CONFIG is incomplete".to_string());
413 }
414 Ok(config)
415}
416
417pub fn query(q: &str) -> Result<String, String> {
418 let config = get_metrics_config()?;
419 let query_url = format!(
420 "{}/query?u={}&p={}&q={}",
421 &config.host, &config.username, &config.password, &q
422 );
423
424 let response = reqwest::blocking::get(query_url.as_str())
425 .map_err(|err| err.to_string())?
426 .text()
427 .map_err(|err| err.to_string())?;
428
429 Ok(response)
430}
431
432pub fn flush() {
435 let agent_mutex = get_singleton_agent();
436 let agent = agent_mutex.lock().unwrap();
437 agent.flush();
438}
439
440pub fn set_panic_hook(program: &'static str, version: Option<String>) {
442 static SET_HOOK: Once = Once::new();
443 SET_HOOK.call_once(|| {
444 let default_hook = std::panic::take_hook();
445 std::panic::set_hook(Box::new(move |ono| {
446 default_hook(ono);
447 let location = match ono.location() {
448 Some(location) => location.to_string(),
449 None => "?".to_string(),
450 };
451 submit(
452 DataPoint::new("panic")
453 .add_field_str("program", program)
454 .add_field_str("thread", thread::current().name().unwrap_or("?"))
455 .add_field_i64("one", 1)
458 .add_field_str("message", &ono.to_string())
459 .add_field_str("location", &location)
460 .add_field_str("version", version.as_ref().unwrap_or(&"".to_string()))
461 .to_owned(),
462 Level::Error,
463 );
464 flush();
466
467 std::process::exit(1);
469 }));
470 });
471}
472
473pub mod test_mocks {
474 use super::*;
475
476 pub struct MockMetricsWriter {
477 pub points_written: Arc<Mutex<Vec<DataPoint>>>,
478 }
479 impl MockMetricsWriter {
480 #[allow(dead_code)]
481 pub fn new() -> Self {
482 MockMetricsWriter {
483 points_written: Arc::new(Mutex::new(Vec::new())),
484 }
485 }
486
487 pub fn points_written(&self) -> usize {
488 self.points_written.lock().unwrap().len()
489 }
490 }
491
492 impl Default for MockMetricsWriter {
493 fn default() -> Self {
494 Self::new()
495 }
496 }
497
498 impl MetricsWriter for MockMetricsWriter {
499 fn write(&self, points: Vec<DataPoint>) {
500 assert!(!points.is_empty());
501
502 let new_points = points.len();
503 self.points_written
504 .lock()
505 .unwrap()
506 .extend(points.into_iter());
507
508 info!(
509 "Writing {} points ({} total)",
510 new_points,
511 self.points_written(),
512 );
513 }
514 }
515}
516
517#[cfg(test)]
518mod test {
519 use {super::*, test_mocks::MockMetricsWriter};
520
521 #[test]
522 fn test_submit() {
523 let writer = Arc::new(MockMetricsWriter::new());
524 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
525
526 for i in 0..42 {
527 agent.submit(
528 DataPoint::new("measurement")
529 .add_field_i64("i", i)
530 .to_owned(),
531 Level::Info,
532 );
533 }
534
535 agent.flush();
536 assert_eq!(writer.points_written(), 43);
537 }
538
539 #[test]
540 fn test_submit_counter() {
541 let writer = Arc::new(MockMetricsWriter::new());
542 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
543
544 for i in 0..10 {
545 agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i);
546 agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i);
547 }
548
549 agent.flush();
550 assert_eq!(writer.points_written(), 21);
551 }
552
553 #[test]
554 fn test_submit_counter_increment() {
555 let writer = Arc::new(MockMetricsWriter::new());
556 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
557
558 for _ in 0..10 {
559 agent.submit_counter(
560 CounterPoint {
561 name: "counter",
562 count: 10,
563 timestamp: UNIX_EPOCH,
564 },
565 Level::Info,
566 0, );
568 }
569
570 agent.flush();
571 assert_eq!(writer.points_written(), 2);
572
573 let submitted_point = writer.points_written.lock().unwrap()[0].clone();
574 assert_eq!(submitted_point.fields[0], ("count", "100i".to_string()));
575 }
576
577 #[test]
578 fn test_submit_bucketed_counter() {
579 let writer = Arc::new(MockMetricsWriter::new());
580 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
581
582 for i in 0..50 {
583 agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i / 10);
584 agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i / 10);
585 }
586
587 agent.flush();
588 assert_eq!(writer.points_written(), 11);
589 }
590
591 #[test]
592 fn test_submit_with_delay() {
593 let writer = Arc::new(MockMetricsWriter::new());
594 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000);
595
596 agent.submit(DataPoint::new("point 1"), Level::Info);
597 thread::sleep(Duration::from_secs(2));
598 assert_eq!(writer.points_written(), 2);
599 }
600
601 #[test]
602 fn test_submit_exceed_max_rate() {
603 let writer = Arc::new(MockMetricsWriter::new());
604 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 100);
605
606 for i in 0..102 {
607 agent.submit(
608 DataPoint::new("measurement")
609 .add_field_i64("i", i)
610 .to_owned(),
611 Level::Info,
612 );
613 }
614
615 thread::sleep(Duration::from_secs(2));
616
617 agent.flush();
618 assert_eq!(writer.points_written(), 100);
619 }
620
621 #[test]
622 fn test_multithread_submit() {
623 let writer = Arc::new(MockMetricsWriter::new());
624 let agent = Arc::new(Mutex::new(MetricsAgent::new(
625 writer.clone(),
626 Duration::from_secs(10),
627 1000,
628 )));
629
630 let mut threads = Vec::new();
634 for i in 0..42 {
635 let mut point = DataPoint::new("measurement");
636 point.add_field_i64("i", i);
637 let agent = Arc::clone(&agent);
638 threads.push(thread::spawn(move || {
639 agent.lock().unwrap().submit(point, Level::Info);
640 }));
641 }
642
643 for thread in threads {
644 thread.join().unwrap();
645 }
646
647 agent.lock().unwrap().flush();
648 assert_eq!(writer.points_written(), 43);
649 }
650
651 #[test]
652 fn test_flush_before_drop() {
653 let writer = Arc::new(MockMetricsWriter::new());
654 {
655 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9_999_999), 1000);
656 agent.submit(DataPoint::new("point 1"), Level::Info);
657 }
658
659 assert_eq!(writer.points_written(), 2);
660 }
661
662 #[test]
663 fn test_live_submit() {
664 let agent = MetricsAgent::default();
665
666 let point = DataPoint::new("live_submit_test")
667 .add_field_bool("true", true)
668 .add_field_bool("random_bool", rand::random::<u8>() < 128)
669 .add_field_i64("random_int", rand::random::<u8>() as i64)
670 .to_owned();
671 agent.submit(point, Level::Info);
672 }
673}