1use {
5 crate::{
6 packet::{self, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH},
7 sendmmsg::{batch_send, SendPktsError},
8 socket::SocketAddrSpace,
9 },
10 crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
11 histogram::Histogram,
12 itertools::Itertools,
13 solana_sdk::{packet::Packet, pubkey::Pubkey, timing::timestamp},
14 std::{
15 cmp::Reverse,
16 collections::HashMap,
17 net::{IpAddr, UdpSocket},
18 sync::{
19 atomic::{AtomicBool, AtomicUsize, Ordering},
20 Arc,
21 },
22 thread::{sleep, Builder, JoinHandle},
23 time::{Duration, Instant},
24 },
25 thiserror::Error,
26};
27
28#[derive(Default)]
30pub struct StakedNodes {
31 stakes: Arc<HashMap<Pubkey, u64>>,
32 overrides: HashMap<Pubkey, u64>,
33 total_stake: u64,
34 max_stake: u64,
35 min_stake: u64,
36}
37
38pub type PacketBatchReceiver = Receiver<PacketBatch>;
39pub type PacketBatchSender = Sender<PacketBatch>;
40
41#[derive(Error, Debug)]
42pub enum StreamerError {
43 #[error("I/O error")]
44 Io(#[from] std::io::Error),
45
46 #[error("receive timeout error")]
47 RecvTimeout(#[from] RecvTimeoutError),
48
49 #[error("send packets error")]
50 Send(#[from] SendError<PacketBatch>),
51
52 #[error(transparent)]
53 SendPktsError(#[from] SendPktsError),
54}
55
56pub struct StreamerReceiveStats {
57 pub name: &'static str,
58 pub packets_count: AtomicUsize,
59 pub packet_batches_count: AtomicUsize,
60 pub full_packet_batches_count: AtomicUsize,
61 pub max_channel_len: AtomicUsize,
62}
63
64impl StreamerReceiveStats {
65 pub fn new(name: &'static str) -> Self {
66 Self {
67 name,
68 packets_count: AtomicUsize::default(),
69 packet_batches_count: AtomicUsize::default(),
70 full_packet_batches_count: AtomicUsize::default(),
71 max_channel_len: AtomicUsize::default(),
72 }
73 }
74
75 pub fn report(&self) {
76 datapoint_info!(
77 self.name,
78 (
79 "packets_count",
80 self.packets_count.swap(0, Ordering::Relaxed) as i64,
81 i64
82 ),
83 (
84 "packet_batches_count",
85 self.packet_batches_count.swap(0, Ordering::Relaxed) as i64,
86 i64
87 ),
88 (
89 "full_packet_batches_count",
90 self.full_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
91 i64
92 ),
93 (
94 "channel_len",
95 self.max_channel_len.swap(0, Ordering::Relaxed) as i64,
96 i64
97 ),
98 );
99 }
100}
101
102pub type Result<T> = std::result::Result<T, StreamerError>;
103
104fn recv_loop(
105 socket: &UdpSocket,
106 exit: &AtomicBool,
107 packet_batch_sender: &PacketBatchSender,
108 recycler: &PacketBatchRecycler,
109 stats: &StreamerReceiveStats,
110 coalesce: Duration,
111 use_pinned_memory: bool,
112 in_vote_only_mode: Option<Arc<AtomicBool>>,
113 is_staked_service: bool,
114) -> Result<()> {
115 loop {
116 let mut packet_batch = if use_pinned_memory {
117 PacketBatch::new_with_recycler(recycler, PACKETS_PER_BATCH, stats.name)
118 } else {
119 PacketBatch::with_capacity(PACKETS_PER_BATCH)
120 };
121 loop {
122 if exit.load(Ordering::Relaxed) {
125 return Ok(());
126 }
127
128 if let Some(ref in_vote_only_mode) = in_vote_only_mode {
129 if in_vote_only_mode.load(Ordering::Relaxed) {
130 sleep(Duration::from_millis(1));
131 continue;
132 }
133 }
134
135 if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce) {
136 if len > 0 {
137 let StreamerReceiveStats {
138 packets_count,
139 packet_batches_count,
140 full_packet_batches_count,
141 max_channel_len,
142 ..
143 } = stats;
144
145 packets_count.fetch_add(len, Ordering::Relaxed);
146 packet_batches_count.fetch_add(1, Ordering::Relaxed);
147 max_channel_len.fetch_max(packet_batch_sender.len(), Ordering::Relaxed);
148 if len == PACKETS_PER_BATCH {
149 full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
150 }
151 packet_batch
152 .iter_mut()
153 .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
154 packet_batch_sender.send(packet_batch)?;
155 }
156 break;
157 }
158 }
159 }
160}
161
162#[allow(clippy::too_many_arguments)]
163pub fn receiver(
164 thread_name: String,
165 socket: Arc<UdpSocket>,
166 exit: Arc<AtomicBool>,
167 packet_batch_sender: PacketBatchSender,
168 recycler: PacketBatchRecycler,
169 stats: Arc<StreamerReceiveStats>,
170 coalesce: Duration,
171 use_pinned_memory: bool,
172 in_vote_only_mode: Option<Arc<AtomicBool>>,
173 is_staked_service: bool,
174) -> JoinHandle<()> {
175 let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
176 assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
177 Builder::new()
178 .name(thread_name)
179 .spawn(move || {
180 let _ = recv_loop(
181 &socket,
182 &exit,
183 &packet_batch_sender,
184 &recycler,
185 &stats,
186 coalesce,
187 use_pinned_memory,
188 in_vote_only_mode,
189 is_staked_service,
190 );
191 })
192 .unwrap()
193}
194
195#[derive(Debug, Default)]
196struct SendStats {
197 bytes: u64,
198 count: u64,
199}
200
201#[derive(Default)]
202struct StreamerSendStats {
203 host_map: HashMap<IpAddr, SendStats>,
204 since: Option<Instant>,
205}
206
207impl StreamerSendStats {
208 fn report_stats(
209 name: &'static str,
210 host_map: HashMap<IpAddr, SendStats>,
211 sample_duration: Option<Duration>,
212 ) {
213 const MAX_REPORT_ENTRIES: usize = 5;
214 let sample_ms = sample_duration.map(|d| d.as_millis()).unwrap_or_default();
215 let mut hist = Histogram::default();
216 let mut byte_sum = 0;
217 let mut pkt_count = 0;
218 host_map.iter().for_each(|(_addr, host_stats)| {
219 hist.increment(host_stats.bytes).unwrap();
220 byte_sum += host_stats.bytes;
221 pkt_count += host_stats.count;
222 });
223
224 datapoint_info!(
225 name,
226 ("streamer-send-sample_duration_ms", sample_ms, i64),
227 ("streamer-send-host_count", host_map.len(), i64),
228 ("streamer-send-bytes_total", byte_sum, i64),
229 ("streamer-send-pkt_count_total", pkt_count, i64),
230 (
231 "streamer-send-host_bytes_min",
232 hist.minimum().unwrap_or_default(),
233 i64
234 ),
235 (
236 "streamer-send-host_bytes_max",
237 hist.maximum().unwrap_or_default(),
238 i64
239 ),
240 (
241 "streamer-send-host_bytes_mean",
242 hist.mean().unwrap_or_default(),
243 i64
244 ),
245 (
246 "streamer-send-host_bytes_90pct",
247 hist.percentile(90.0).unwrap_or_default(),
248 i64
249 ),
250 (
251 "streamer-send-host_bytes_50pct",
252 hist.percentile(50.0).unwrap_or_default(),
253 i64
254 ),
255 (
256 "streamer-send-host_bytes_10pct",
257 hist.percentile(10.0).unwrap_or_default(),
258 i64
259 ),
260 );
261
262 let num_entries = host_map.len();
263 let mut entries: Vec<_> = host_map.into_iter().collect();
264 if entries.len() > MAX_REPORT_ENTRIES {
265 entries.select_nth_unstable_by_key(MAX_REPORT_ENTRIES, |(_addr, stats)| {
266 Reverse(stats.bytes)
267 });
268 entries.truncate(MAX_REPORT_ENTRIES);
269 }
270 info!(
271 "streamer send {} hosts: count:{} {:?}",
272 name, num_entries, entries,
273 );
274 }
275
276 fn maybe_submit(&mut self, name: &'static str, sender: &Sender<Box<dyn FnOnce() + Send>>) {
277 const SUBMIT_CADENCE: Duration = Duration::from_secs(10);
278 const MAP_SIZE_REPORTING_THRESHOLD: usize = 1_000;
279 let elapsed = self.since.as_ref().map(Instant::elapsed);
280 if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default()
281 && self.host_map.len() < MAP_SIZE_REPORTING_THRESHOLD
282 {
283 return;
284 }
285
286 let host_map = std::mem::take(&mut self.host_map);
287 let _ = sender.send(Box::new(move || {
288 Self::report_stats(name, host_map, elapsed);
289 }));
290
291 *self = Self {
292 since: Some(Instant::now()),
293 ..Self::default()
294 };
295 }
296
297 fn record(&mut self, pkt: &Packet) {
298 let ent = self.host_map.entry(pkt.meta().addr).or_default();
299 ent.count += 1;
300 ent.bytes += pkt.data(..).map(<[u8]>::len).unwrap_or_default() as u64;
301 }
302}
303
304impl StakedNodes {
305 pub fn new(stakes: Arc<HashMap<Pubkey, u64>>, overrides: HashMap<Pubkey, u64>) -> Self {
306 let values = stakes
307 .iter()
308 .filter(|(pubkey, _)| !overrides.contains_key(pubkey))
309 .map(|(_, &stake)| stake)
310 .chain(overrides.values().copied())
311 .filter(|&stake| stake > 0);
312 let total_stake = values.clone().sum();
313 let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default();
314 Self {
315 stakes,
316 overrides,
317 total_stake,
318 max_stake,
319 min_stake,
320 }
321 }
322
323 pub fn get_node_stake(&self, pubkey: &Pubkey) -> Option<u64> {
324 self.overrides
325 .get(pubkey)
326 .or_else(|| self.stakes.get(pubkey))
327 .filter(|&&stake| stake > 0)
328 .copied()
329 }
330
331 #[inline]
332 pub fn total_stake(&self) -> u64 {
333 self.total_stake
334 }
335
336 #[inline]
337 pub(super) fn min_stake(&self) -> u64 {
338 self.min_stake
339 }
340
341 #[inline]
342 pub(super) fn max_stake(&self) -> u64 {
343 self.max_stake
344 }
345}
346
347fn recv_send(
348 sock: &UdpSocket,
349 r: &PacketBatchReceiver,
350 socket_addr_space: &SocketAddrSpace,
351 stats: &mut Option<StreamerSendStats>,
352) -> Result<()> {
353 let timer = Duration::new(1, 0);
354 let packet_batch = r.recv_timeout(timer)?;
355 if let Some(stats) = stats {
356 packet_batch.iter().for_each(|p| stats.record(p));
357 }
358 let packets = packet_batch.iter().filter_map(|pkt| {
359 let addr = pkt.meta().socket_addr();
360 let data = pkt.data(..)?;
361 socket_addr_space.check(&addr).then_some((data, addr))
362 });
363 batch_send(sock, &packets.collect::<Vec<_>>())?;
364 Ok(())
365}
366
367pub fn recv_packet_batches(
368 recvr: &PacketBatchReceiver,
369) -> Result<(Vec<PacketBatch>, usize, Duration)> {
370 let recv_start = Instant::now();
371 let timer = Duration::new(1, 0);
372 let packet_batch = recvr.recv_timeout(timer)?;
373 trace!("got packets");
374 let mut num_packets = packet_batch.len();
375 let mut packet_batches = vec![packet_batch];
376 while let Ok(packet_batch) = recvr.try_recv() {
377 trace!("got more packets");
378 num_packets += packet_batch.len();
379 packet_batches.push(packet_batch);
380 }
381 let recv_duration = recv_start.elapsed();
382 trace!(
383 "packet batches len: {}, num packets: {}",
384 packet_batches.len(),
385 num_packets
386 );
387 Ok((packet_batches, num_packets, recv_duration))
388}
389
390pub fn responder(
391 name: &'static str,
392 sock: Arc<UdpSocket>,
393 r: PacketBatchReceiver,
394 socket_addr_space: SocketAddrSpace,
395 stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
396) -> JoinHandle<()> {
397 Builder::new()
398 .name(format!("solRspndr{name}"))
399 .spawn(move || {
400 let mut errors = 0;
401 let mut last_error = None;
402 let mut last_print = 0;
403 let mut stats = None;
404
405 if stats_reporter_sender.is_some() {
406 stats = Some(StreamerSendStats::default());
407 }
408
409 loop {
410 if let Err(e) = recv_send(&sock, &r, &socket_addr_space, &mut stats) {
411 match e {
412 StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
413 StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
414 _ => {
415 errors += 1;
416 last_error = Some(e);
417 }
418 }
419 }
420 let now = timestamp();
421 if now - last_print > 1000 && errors != 0 {
422 datapoint_info!(name, ("errors", errors, i64),);
423 info!("{} last-error: {:?} count: {}", name, last_error, errors);
424 last_print = now;
425 errors = 0;
426 }
427 if let Some(ref stats_reporter_sender) = stats_reporter_sender {
428 if let Some(ref mut stats) = stats {
429 stats.maybe_submit(name, stats_reporter_sender);
430 }
431 }
432 }
433 })
434 .unwrap()
435}
436
437#[cfg(test)]
438mod test {
439 use {
440 super::*,
441 crate::{
442 packet::{Packet, PacketBatch, PACKET_DATA_SIZE},
443 streamer::{receiver, responder},
444 },
445 crossbeam_channel::unbounded,
446 solana_perf::recycler::Recycler,
447 std::{
448 io,
449 io::Write,
450 net::UdpSocket,
451 sync::{
452 atomic::{AtomicBool, Ordering},
453 Arc,
454 },
455 time::Duration,
456 },
457 };
458
459 fn get_packet_batches(r: PacketBatchReceiver, num_packets: &mut usize) {
460 for _ in 0..10 {
461 let packet_batch_res = r.recv_timeout(Duration::new(1, 0));
462 if packet_batch_res.is_err() {
463 continue;
464 }
465
466 *num_packets -= packet_batch_res.unwrap().len();
467
468 if *num_packets == 0 {
469 break;
470 }
471 }
472 }
473
474 #[test]
475 fn streamer_debug() {
476 write!(io::sink(), "{:?}", Packet::default()).unwrap();
477 write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
478 }
479 #[test]
480 fn streamer_send_test() {
481 let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
482 read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
483
484 let addr = read.local_addr().unwrap();
485 let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
486 let exit = Arc::new(AtomicBool::new(false));
487 let (s_reader, r_reader) = unbounded();
488 let stats = Arc::new(StreamerReceiveStats::new("test"));
489 let t_receiver = receiver(
490 "solRcvrTest".to_string(),
491 Arc::new(read),
492 exit.clone(),
493 s_reader,
494 Recycler::default(),
495 stats.clone(),
496 Duration::from_millis(1), true,
498 None,
499 false,
500 );
501 const NUM_PACKETS: usize = 5;
502 let t_responder = {
503 let (s_responder, r_responder) = unbounded();
504 let t_responder = responder(
505 "SendTest",
506 Arc::new(send),
507 r_responder,
508 SocketAddrSpace::Unspecified,
509 None,
510 );
511 let mut packet_batch = PacketBatch::default();
512 for i in 0..NUM_PACKETS {
513 let mut p = Packet::default();
514 {
515 p.buffer_mut()[0] = i as u8;
516 p.meta_mut().size = PACKET_DATA_SIZE;
517 p.meta_mut().set_socket_addr(&addr);
518 }
519 packet_batch.push(p);
520 }
521 s_responder.send(packet_batch).expect("send");
522 t_responder
523 };
524
525 let mut packets_remaining = NUM_PACKETS;
526 get_packet_batches(r_reader, &mut packets_remaining);
527 assert_eq!(packets_remaining, 0);
528 exit.store(true, Ordering::Relaxed);
529 assert!(stats.packet_batches_count.load(Ordering::Relaxed) >= 1);
530 assert_eq!(stats.packets_count.load(Ordering::Relaxed), NUM_PACKETS);
531 assert_eq!(stats.full_packet_batches_count.load(Ordering::Relaxed), 0);
532 t_receiver.join().expect("join");
533 t_responder.join().expect("join");
534 }
535}