1use {
4 crate::{counter::CounterPoint, datapoint::DataPoint},
5 crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
6 gethostname::gethostname,
7 lazy_static::lazy_static,
8 log::*,
9 solana_cluster_type::ClusterType,
10 solana_sha256_hasher::hash,
11 std::{
12 cmp,
13 collections::HashMap,
14 convert::Into,
15 env,
16 fmt::Write,
17 sync::{Arc, Barrier, Mutex, Once, RwLock},
18 thread,
19 time::{Duration, Instant, UNIX_EPOCH},
20 },
21 thiserror::Error,
22};
23
24type CounterMap = HashMap<(&'static str, u64), CounterPoint>;
25
26#[derive(Debug, Error)]
27pub enum MetricsError {
28 #[error(transparent)]
29 VarError(#[from] env::VarError),
30 #[error(transparent)]
31 ReqwestError(#[from] reqwest::Error),
32 #[error("SOLANA_METRICS_CONFIG is invalid: '{0}'")]
33 ConfigInvalid(String),
34 #[error("SOLANA_METRICS_CONFIG is incomplete")]
35 ConfigIncomplete,
36 #[error("SOLANA_METRICS_CONFIG database mismatch: {0}")]
37 DbMismatch(String),
38}
39
40impl From<MetricsError> for String {
41 fn from(error: MetricsError) -> Self {
42 error.to_string()
43 }
44}
45
46impl From<&CounterPoint> for DataPoint {
47 fn from(counter_point: &CounterPoint) -> Self {
48 let mut point = Self::new(counter_point.name);
49 point.timestamp = counter_point.timestamp;
50 point.add_field_i64("count", counter_point.count);
51 point
52 }
53}
54
55#[derive(Debug)]
56enum MetricsCommand {
57 Flush(Arc<Barrier>),
58 Submit(DataPoint, log::Level),
59 SubmitCounter(CounterPoint, log::Level, u64),
60}
61
62pub struct MetricsAgent {
63 sender: Sender<MetricsCommand>,
64}
65
66pub trait MetricsWriter {
67 fn write(&self, points: Vec<DataPoint>);
70}
71
72struct InfluxDbMetricsWriter {
73 write_url: Option<String>,
74}
75
76impl InfluxDbMetricsWriter {
77 fn new() -> Self {
78 Self {
79 write_url: Self::build_write_url().ok(),
80 }
81 }
82
83 fn build_write_url() -> Result<String, MetricsError> {
84 let config = get_metrics_config().map_err(|err| {
85 info!("metrics disabled: {}", err);
86 err
87 })?;
88
89 info!(
90 "metrics configuration: host={} db={} username={}",
91 config.host, config.db, config.username
92 );
93
94 let write_url = format!(
95 "{}/write?db={}&u={}&p={}&precision=n",
96 &config.host, &config.db, &config.username, &config.password
97 );
98
99 Ok(write_url)
100 }
101}
102
103pub fn serialize_points(points: &Vec<DataPoint>, host_id: &str) -> String {
104 const TIMESTAMP_LEN: usize = 20;
105 const HOST_ID_LEN: usize = 8; const EXTRA_LEN: usize = 2; let mut len = 0;
108 for point in points {
109 for (name, value) in &point.fields {
110 len += name.len() + value.len() + EXTRA_LEN;
111 }
112 for (name, value) in &point.tags {
113 len += name.len() + value.len() + EXTRA_LEN;
114 }
115 len += point.name.len();
116 len += TIMESTAMP_LEN;
117 len += host_id.len() + HOST_ID_LEN;
118 }
119 let mut line = String::with_capacity(len);
120 for point in points {
121 let _ = write!(line, "{},host_id={}", &point.name, host_id);
122 for (name, value) in point.tags.iter() {
123 let _ = write!(line, ",{name}={value}");
124 }
125
126 let mut first = true;
127 for (name, value) in point.fields.iter() {
128 let _ = write!(line, "{}{}={}", if first { ' ' } else { ',' }, name, value);
129 first = false;
130 }
131 let timestamp = point.timestamp.duration_since(UNIX_EPOCH);
132 let nanos = timestamp.unwrap().as_nanos();
133 let _ = writeln!(line, " {nanos}");
134 }
135 line
136}
137
138impl MetricsWriter for InfluxDbMetricsWriter {
139 fn write(&self, points: Vec<DataPoint>) {
140 if let Some(ref write_url) = self.write_url {
141 debug!("submitting {} points", points.len());
142
143 let host_id = HOST_ID.read().unwrap();
144
145 let line = serialize_points(&points, &host_id);
146
147 let client = reqwest::blocking::Client::builder()
148 .timeout(Duration::from_secs(5))
149 .build();
150 let client = match client {
151 Ok(client) => client,
152 Err(err) => {
153 warn!("client instantiation failed: {}", err);
154 return;
155 }
156 };
157
158 let response = client.post(write_url.as_str()).body(line).send();
159 if let Ok(resp) = response {
160 let status = resp.status();
161 if !status.is_success() {
162 let text = resp
163 .text()
164 .unwrap_or_else(|_| "[text body empty]".to_string());
165 warn!("submit response unsuccessful: {} {}", status, text,);
166 }
167 } else {
168 warn!("submit error: {}", response.unwrap_err());
169 }
170 }
171 }
172}
173
174impl Default for MetricsAgent {
175 fn default() -> Self {
176 let max_points_per_sec = env::var("SOLANA_METRICS_MAX_POINTS_PER_SECOND")
177 .map(|x| {
178 x.parse()
179 .expect("Failed to parse SOLANA_METRICS_MAX_POINTS_PER_SECOND")
180 })
181 .unwrap_or(4000);
182
183 Self::new(
184 Arc::new(InfluxDbMetricsWriter::new()),
185 Duration::from_secs(10),
186 max_points_per_sec,
187 )
188 }
189}
190
191impl MetricsAgent {
192 pub fn new(
193 writer: Arc<dyn MetricsWriter + Send + Sync>,
194 write_frequency: Duration,
195 max_points_per_sec: usize,
196 ) -> Self {
197 let (sender, receiver) = unbounded::<MetricsCommand>();
198
199 thread::Builder::new()
200 .name("solMetricsAgent".into())
201 .spawn(move || Self::run(&receiver, &writer, write_frequency, max_points_per_sec))
202 .unwrap();
203
204 Self { sender }
205 }
206
207 fn combine_points(
216 max_points: usize,
217 max_points_per_sec: usize,
218 secs_since_last_write: u64,
219 points_buffered: usize,
220 points: &mut Vec<DataPoint>,
221 counters: &mut CounterMap,
222 ) -> Vec<DataPoint> {
223 let max_points = max_points.saturating_sub(1);
225
226 let num_points = points.len().saturating_add(counters.len());
227 let fit_counters = max_points.saturating_sub(points.len());
228 let points_written = cmp::min(num_points, max_points);
229
230 debug!("run: attempting to write {} points", num_points);
231
232 if num_points > max_points {
233 warn!(
234 "Max submission rate of {} datapoints per second exceeded. Only the \
235 first {} of {} points will be submitted.",
236 max_points_per_sec, max_points, num_points
237 );
238 }
239
240 let mut combined = std::mem::take(points);
241 combined.truncate(points_written);
242
243 combined.extend(counters.values().take(fit_counters).map(|v| v.into()));
244 counters.clear();
245
246 combined.push(
247 DataPoint::new("metrics")
248 .add_field_i64("points_written", points_written as i64)
249 .add_field_i64("num_points", num_points as i64)
250 .add_field_i64("points_lost", (num_points - points_written) as i64)
251 .add_field_i64("points_buffered", points_buffered as i64)
252 .add_field_i64("secs_since_last_write", secs_since_last_write as i64)
253 .to_owned(),
254 );
255
256 combined
257 }
258
259 fn write(
264 writer: &Arc<dyn MetricsWriter + Send + Sync>,
265 max_points: usize,
266 max_points_per_sec: usize,
267 last_write_time: Instant,
268 points_buffered: usize,
269 points: &mut Vec<DataPoint>,
270 counters: &mut CounterMap,
271 ) -> Instant {
272 let now = Instant::now();
273 let secs_since_last_write = now.duration_since(last_write_time).as_secs();
274
275 writer.write(Self::combine_points(
276 max_points,
277 max_points_per_sec,
278 secs_since_last_write,
279 points_buffered,
280 points,
281 counters,
282 ));
283
284 now
285 }
286
287 fn run(
288 receiver: &Receiver<MetricsCommand>,
289 writer: &Arc<dyn MetricsWriter + Send + Sync>,
290 write_frequency: Duration,
291 max_points_per_sec: usize,
292 ) {
293 trace!("run: enter");
294 let mut last_write_time = Instant::now();
295 let mut points = Vec::<DataPoint>::new();
296 let mut counters = CounterMap::new();
297
298 let max_points = write_frequency.as_secs() as usize * max_points_per_sec;
299
300 let write = |last_write_time: Instant,
302 points: &mut Vec<DataPoint>,
303 counters: &mut CounterMap|
304 -> Instant {
305 Self::write(
306 writer,
307 max_points,
308 max_points_per_sec,
309 last_write_time,
310 receiver.len(),
311 points,
312 counters,
313 )
314 };
315
316 loop {
317 match receiver.recv_timeout(write_frequency / 2) {
318 Ok(cmd) => match cmd {
319 MetricsCommand::Flush(barrier) => {
320 debug!("metrics_thread: flush");
321 last_write_time = write(last_write_time, &mut points, &mut counters);
322 barrier.wait();
323 }
324 MetricsCommand::Submit(point, level) => {
325 log!(level, "{}", point);
326 points.push(point);
327 }
328 MetricsCommand::SubmitCounter(counter, _level, bucket) => {
329 debug!("{:?}", counter);
330 let key = (counter.name, bucket);
331 if let Some(value) = counters.get_mut(&key) {
332 value.count += counter.count;
333 } else {
334 counters.insert(key, counter);
335 }
336 }
337 },
338 Err(RecvTimeoutError::Timeout) => (),
339 Err(RecvTimeoutError::Disconnected) => {
340 debug!("run: sender disconnected");
341 break;
342 }
343 }
344
345 let now = Instant::now();
346 if now.duration_since(last_write_time) >= write_frequency {
347 last_write_time = write(last_write_time, &mut points, &mut counters);
348 }
349 }
350
351 debug_assert!(
352 points.is_empty() && counters.is_empty(),
353 "Controlling `MetricsAgent` is expected to call `flush()` from the `Drop` \n\
354 implementation, before exiting. So both `points` and `counters` must be empty at \n\
355 this point.\n\
356 `points`: {points:?}\n\
357 `counters`: {counters:?}",
358 );
359
360 trace!("run: exit");
361 }
362
363 pub fn submit(&self, point: DataPoint, level: log::Level) {
364 self.sender
365 .send(MetricsCommand::Submit(point, level))
366 .unwrap();
367 }
368
369 pub fn submit_counter(&self, counter: CounterPoint, level: log::Level, bucket: u64) {
370 self.sender
371 .send(MetricsCommand::SubmitCounter(counter, level, bucket))
372 .unwrap();
373 }
374
375 pub fn flush(&self) {
376 debug!("Flush");
377 let barrier = Arc::new(Barrier::new(2));
378 self.sender
379 .send(MetricsCommand::Flush(Arc::clone(&barrier)))
380 .unwrap();
381
382 barrier.wait();
383 }
384}
385
386impl Drop for MetricsAgent {
387 fn drop(&mut self) {
388 self.flush();
389 }
390}
391
392fn get_singleton_agent() -> &'static MetricsAgent {
393 lazy_static! {
394 static ref AGENT: MetricsAgent = MetricsAgent::default();
395 };
396
397 &AGENT
398}
399
400lazy_static! {
401 static ref HOST_ID: Arc<RwLock<String>> = {
402 Arc::new(RwLock::new({
403 let hostname: String = gethostname()
404 .into_string()
405 .unwrap_or_else(|_| "".to_string());
406 format!("{}", hash(hostname.as_bytes()))
407 }))
408 };
409}
410
411pub fn set_host_id(host_id: String) {
412 info!("host id: {}", host_id);
413 *HOST_ID.write().unwrap() = host_id;
414}
415
416pub fn submit(point: DataPoint, level: log::Level) {
419 let agent = get_singleton_agent();
420 agent.submit(point, level);
421}
422
423pub(crate) fn submit_counter(point: CounterPoint, level: log::Level, bucket: u64) {
426 let agent = get_singleton_agent();
427 agent.submit_counter(point, level, bucket);
428}
429
430#[derive(Debug, Default)]
431struct MetricsConfig {
432 pub host: String,
433 pub db: String,
434 pub username: String,
435 pub password: String,
436}
437
438impl MetricsConfig {
439 fn complete(&self) -> bool {
440 !(self.host.is_empty()
441 || self.db.is_empty()
442 || self.username.is_empty()
443 || self.password.is_empty())
444 }
445}
446
447fn get_metrics_config() -> Result<MetricsConfig, MetricsError> {
448 let mut config = MetricsConfig::default();
449 let config_var = env::var("SOLANA_METRICS_CONFIG")?;
450 if config_var.is_empty() {
451 Err(env::VarError::NotPresent)?;
452 }
453
454 for pair in config_var.split(',') {
455 let nv: Vec<_> = pair.split('=').collect();
456 if nv.len() != 2 {
457 return Err(MetricsError::ConfigInvalid(pair.to_string()));
458 }
459 let v = nv[1].to_string();
460 match nv[0] {
461 "host" => config.host = v,
462 "db" => config.db = v,
463 "u" => config.username = v,
464 "p" => config.password = v,
465 _ => return Err(MetricsError::ConfigInvalid(pair.to_string())),
466 }
467 }
468
469 if !config.complete() {
470 return Err(MetricsError::ConfigIncomplete);
471 }
472
473 Ok(config)
474}
475
476pub fn metrics_config_sanity_check(cluster_type: ClusterType) -> Result<(), MetricsError> {
477 let config = match get_metrics_config() {
478 Ok(config) => config,
479 Err(MetricsError::VarError(env::VarError::NotPresent)) => return Ok(()),
480 Err(e) => return Err(e),
481 };
482 match &config.db[..] {
483 "mainnet-beta" if cluster_type != ClusterType::MainnetBeta => (),
484 "tds" if cluster_type != ClusterType::Testnet => (),
485 "devnet" if cluster_type != ClusterType::Devnet => (),
486 _ => return Ok(()),
487 };
488 let (host, db) = (&config.host, &config.db);
489 let msg = format!("cluster_type={cluster_type:?} host={host} database={db}");
490 Err(MetricsError::DbMismatch(msg))
491}
492
493pub fn query(q: &str) -> Result<String, MetricsError> {
494 let config = get_metrics_config()?;
495 let query_url = format!(
496 "{}/query?u={}&p={}&q={}",
497 &config.host, &config.username, &config.password, &q
498 );
499
500 let response = reqwest::blocking::get(query_url.as_str())?.text()?;
501
502 Ok(response)
503}
504
505pub fn flush() {
508 let agent = get_singleton_agent();
509 agent.flush();
510}
511
512pub fn set_panic_hook(program: &'static str, version: Option<String>) {
514 static SET_HOOK: Once = Once::new();
515 SET_HOOK.call_once(|| {
516 let default_hook = std::panic::take_hook();
517 std::panic::set_hook(Box::new(move |ono| {
518 default_hook(ono);
519 let location = match ono.location() {
520 Some(location) => location.to_string(),
521 None => "?".to_string(),
522 };
523 submit(
524 DataPoint::new("panic")
525 .add_field_str("program", program)
526 .add_field_str("thread", thread::current().name().unwrap_or("?"))
527 .add_field_i64("one", 1)
530 .add_field_str("message", &ono.to_string())
531 .add_field_str("location", &location)
532 .add_field_str("version", version.as_ref().unwrap_or(&"".to_string()))
533 .to_owned(),
534 Level::Error,
535 );
536 flush();
538
539 std::process::exit(1);
541 }));
542 });
543}
544
545pub mod test_mocks {
546 use super::*;
547
548 pub struct MockMetricsWriter {
549 pub points_written: Arc<Mutex<Vec<DataPoint>>>,
550 }
551 impl MockMetricsWriter {
552 pub fn new() -> Self {
553 MockMetricsWriter {
554 points_written: Arc::new(Mutex::new(Vec::new())),
555 }
556 }
557
558 pub fn points_written(&self) -> usize {
559 self.points_written.lock().unwrap().len()
560 }
561 }
562
563 impl Default for MockMetricsWriter {
564 fn default() -> Self {
565 Self::new()
566 }
567 }
568
569 impl MetricsWriter for MockMetricsWriter {
570 fn write(&self, points: Vec<DataPoint>) {
571 assert!(!points.is_empty());
572
573 let new_points = points.len();
574 self.points_written.lock().unwrap().extend(points);
575
576 info!(
577 "Writing {} points ({} total)",
578 new_points,
579 self.points_written(),
580 );
581 }
582 }
583}
584
585#[cfg(test)]
586mod test {
587 use {super::*, test_mocks::MockMetricsWriter};
588
589 #[test]
590 fn test_submit() {
591 let writer = Arc::new(MockMetricsWriter::new());
592 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
593
594 for i in 0..42 {
595 agent.submit(
596 DataPoint::new("measurement")
597 .add_field_i64("i", i)
598 .to_owned(),
599 Level::Info,
600 );
601 }
602
603 agent.flush();
604 assert_eq!(writer.points_written(), 43);
605 }
606
607 #[test]
608 fn test_submit_counter() {
609 let writer = Arc::new(MockMetricsWriter::new());
610 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
611
612 for i in 0..10 {
613 agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i);
614 agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i);
615 }
616
617 agent.flush();
618 assert_eq!(writer.points_written(), 21);
619 }
620
621 #[test]
622 fn test_submit_counter_increment() {
623 let writer = Arc::new(MockMetricsWriter::new());
624 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
625
626 for _ in 0..10 {
627 agent.submit_counter(
628 CounterPoint {
629 name: "counter",
630 count: 10,
631 timestamp: UNIX_EPOCH,
632 },
633 Level::Info,
634 0, );
636 }
637
638 agent.flush();
639 assert_eq!(writer.points_written(), 2);
640
641 let submitted_point = writer.points_written.lock().unwrap()[0].clone();
642 assert_eq!(submitted_point.fields[0], ("count", "100i".to_string()));
643 }
644
645 #[test]
646 fn test_submit_bucketed_counter() {
647 let writer = Arc::new(MockMetricsWriter::new());
648 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
649
650 for i in 0..50 {
651 agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i / 10);
652 agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i / 10);
653 }
654
655 agent.flush();
656 assert_eq!(writer.points_written(), 11);
657 }
658
659 #[test]
660 fn test_submit_with_delay() {
661 let writer = Arc::new(MockMetricsWriter::new());
662 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000);
663
664 agent.submit(DataPoint::new("point 1"), Level::Info);
665 thread::sleep(Duration::from_secs(2));
666 assert_eq!(writer.points_written(), 2);
667 }
668
669 #[test]
670 fn test_submit_exceed_max_rate() {
671 let writer = Arc::new(MockMetricsWriter::new());
672
673 let max_points_per_sec = 100;
674
675 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), max_points_per_sec);
676
677 for i in 0..(max_points_per_sec + 20) {
678 agent.submit(
679 DataPoint::new("measurement")
680 .add_field_i64("i", i.try_into().unwrap())
681 .to_owned(),
682 Level::Info,
683 );
684 }
685
686 thread::sleep(Duration::from_secs(2));
687
688 agent.flush();
689
690 assert_eq!(writer.points_written(), max_points_per_sec + 1);
694 }
695
696 #[test]
697 fn test_multithread_submit() {
698 let writer = Arc::new(MockMetricsWriter::new());
699 let agent = Arc::new(Mutex::new(MetricsAgent::new(
700 writer.clone(),
701 Duration::from_secs(10),
702 1000,
703 )));
704
705 let mut threads = Vec::new();
709 for i in 0..42 {
710 let mut point = DataPoint::new("measurement");
711 point.add_field_i64("i", i);
712 let agent = Arc::clone(&agent);
713 threads.push(thread::spawn(move || {
714 agent.lock().unwrap().submit(point, Level::Info);
715 }));
716 }
717
718 for thread in threads {
719 thread.join().unwrap();
720 }
721
722 agent.lock().unwrap().flush();
723 assert_eq!(writer.points_written(), 43);
724 }
725
726 #[test]
727 fn test_flush_before_drop() {
728 let writer = Arc::new(MockMetricsWriter::new());
729 {
730 let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9_999_999), 1000);
731 agent.submit(DataPoint::new("point 1"), Level::Info);
732 }
733
734 assert_eq!(writer.points_written(), 2);
738 }
739
740 #[test]
741 fn test_live_submit() {
742 let agent = MetricsAgent::default();
743
744 let point = DataPoint::new("live_submit_test")
745 .add_field_bool("true", true)
746 .add_field_bool("random_bool", rand::random::<u8>() < 128)
747 .add_field_i64("random_int", rand::random::<u8>() as i64)
748 .to_owned();
749 agent.submit(point, Level::Info);
750 }
751}