1use {
5 crate::{
6 packet::{self, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH},
7 sendmmsg::{batch_send, SendPktsError},
8 socket::SocketAddrSpace,
9 },
10 crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
11 histogram::Histogram,
12 itertools::Itertools,
13 solana_packet::Packet,
14 solana_pubkey::Pubkey,
15 solana_time_utils::timestamp,
16 std::{
17 cmp::Reverse,
18 collections::HashMap,
19 net::{IpAddr, UdpSocket},
20 sync::{
21 atomic::{AtomicBool, AtomicUsize, Ordering},
22 Arc,
23 },
24 thread::{sleep, Builder, JoinHandle},
25 time::{Duration, Instant},
26 },
27 thiserror::Error,
28};
29
30#[derive(Default)]
32pub struct StakedNodes {
33 stakes: Arc<HashMap<Pubkey, u64>>,
34 overrides: HashMap<Pubkey, u64>,
35 total_stake: u64,
36 max_stake: u64,
37 min_stake: u64,
38}
39
40pub type PacketBatchReceiver = Receiver<PacketBatch>;
41pub type PacketBatchSender = Sender<PacketBatch>;
42
43#[derive(Error, Debug)]
44pub enum StreamerError {
45 #[error("I/O error")]
46 Io(#[from] std::io::Error),
47
48 #[error("receive timeout error")]
49 RecvTimeout(#[from] RecvTimeoutError),
50
51 #[error("send packets error")]
52 Send(#[from] SendError<PacketBatch>),
53
54 #[error(transparent)]
55 SendPktsError(#[from] SendPktsError),
56}
57
58pub struct StreamerReceiveStats {
59 pub name: &'static str,
60 pub packets_count: AtomicUsize,
61 pub packet_batches_count: AtomicUsize,
62 pub full_packet_batches_count: AtomicUsize,
63 pub max_channel_len: AtomicUsize,
64}
65
66impl StreamerReceiveStats {
67 pub fn new(name: &'static str) -> Self {
68 Self {
69 name,
70 packets_count: AtomicUsize::default(),
71 packet_batches_count: AtomicUsize::default(),
72 full_packet_batches_count: AtomicUsize::default(),
73 max_channel_len: AtomicUsize::default(),
74 }
75 }
76
77 pub fn report(&self) {
78 datapoint_info!(
79 self.name,
80 (
81 "packets_count",
82 self.packets_count.swap(0, Ordering::Relaxed) as i64,
83 i64
84 ),
85 (
86 "packet_batches_count",
87 self.packet_batches_count.swap(0, Ordering::Relaxed) as i64,
88 i64
89 ),
90 (
91 "full_packet_batches_count",
92 self.full_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
93 i64
94 ),
95 (
96 "channel_len",
97 self.max_channel_len.swap(0, Ordering::Relaxed) as i64,
98 i64
99 ),
100 );
101 }
102}
103
104pub type Result<T> = std::result::Result<T, StreamerError>;
105
106fn recv_loop(
107 socket: &UdpSocket,
108 exit: &AtomicBool,
109 packet_batch_sender: &PacketBatchSender,
110 recycler: &PacketBatchRecycler,
111 stats: &StreamerReceiveStats,
112 coalesce: Duration,
113 use_pinned_memory: bool,
114 in_vote_only_mode: Option<Arc<AtomicBool>>,
115 is_staked_service: bool,
116) -> Result<()> {
117 loop {
118 let mut packet_batch = if use_pinned_memory {
119 PacketBatch::new_with_recycler(recycler, PACKETS_PER_BATCH, stats.name)
120 } else {
121 PacketBatch::with_capacity(PACKETS_PER_BATCH)
122 };
123 loop {
124 if exit.load(Ordering::Relaxed) {
127 return Ok(());
128 }
129
130 if let Some(ref in_vote_only_mode) = in_vote_only_mode {
131 if in_vote_only_mode.load(Ordering::Relaxed) {
132 sleep(Duration::from_millis(1));
133 continue;
134 }
135 }
136
137 if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce) {
138 if len > 0 {
139 let StreamerReceiveStats {
140 packets_count,
141 packet_batches_count,
142 full_packet_batches_count,
143 max_channel_len,
144 ..
145 } = stats;
146
147 packets_count.fetch_add(len, Ordering::Relaxed);
148 packet_batches_count.fetch_add(1, Ordering::Relaxed);
149 max_channel_len.fetch_max(packet_batch_sender.len(), Ordering::Relaxed);
150 if len == PACKETS_PER_BATCH {
151 full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
152 }
153 packet_batch
154 .iter_mut()
155 .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
156 packet_batch_sender.send(packet_batch)?;
157 }
158 break;
159 }
160 }
161 }
162}
163
164#[allow(clippy::too_many_arguments)]
165pub fn receiver(
166 thread_name: String,
167 socket: Arc<UdpSocket>,
168 exit: Arc<AtomicBool>,
169 packet_batch_sender: PacketBatchSender,
170 recycler: PacketBatchRecycler,
171 stats: Arc<StreamerReceiveStats>,
172 coalesce: Duration,
173 use_pinned_memory: bool,
174 in_vote_only_mode: Option<Arc<AtomicBool>>,
175 is_staked_service: bool,
176) -> JoinHandle<()> {
177 let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
178 assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
179 Builder::new()
180 .name(thread_name)
181 .spawn(move || {
182 let _ = recv_loop(
183 &socket,
184 &exit,
185 &packet_batch_sender,
186 &recycler,
187 &stats,
188 coalesce,
189 use_pinned_memory,
190 in_vote_only_mode,
191 is_staked_service,
192 );
193 })
194 .unwrap()
195}
196
197#[derive(Debug, Default)]
198struct SendStats {
199 bytes: u64,
200 count: u64,
201}
202
203#[derive(Default)]
204struct StreamerSendStats {
205 host_map: HashMap<IpAddr, SendStats>,
206 since: Option<Instant>,
207}
208
209impl StreamerSendStats {
210 fn report_stats(
211 name: &'static str,
212 host_map: HashMap<IpAddr, SendStats>,
213 sample_duration: Option<Duration>,
214 ) {
215 const MAX_REPORT_ENTRIES: usize = 5;
216 let sample_ms = sample_duration.map(|d| d.as_millis()).unwrap_or_default();
217 let mut hist = Histogram::default();
218 let mut byte_sum = 0;
219 let mut pkt_count = 0;
220 host_map.iter().for_each(|(_addr, host_stats)| {
221 hist.increment(host_stats.bytes).unwrap();
222 byte_sum += host_stats.bytes;
223 pkt_count += host_stats.count;
224 });
225
226 datapoint_info!(
227 name,
228 ("streamer-send-sample_duration_ms", sample_ms, i64),
229 ("streamer-send-host_count", host_map.len(), i64),
230 ("streamer-send-bytes_total", byte_sum, i64),
231 ("streamer-send-pkt_count_total", pkt_count, i64),
232 (
233 "streamer-send-host_bytes_min",
234 hist.minimum().unwrap_or_default(),
235 i64
236 ),
237 (
238 "streamer-send-host_bytes_max",
239 hist.maximum().unwrap_or_default(),
240 i64
241 ),
242 (
243 "streamer-send-host_bytes_mean",
244 hist.mean().unwrap_or_default(),
245 i64
246 ),
247 (
248 "streamer-send-host_bytes_90pct",
249 hist.percentile(90.0).unwrap_or_default(),
250 i64
251 ),
252 (
253 "streamer-send-host_bytes_50pct",
254 hist.percentile(50.0).unwrap_or_default(),
255 i64
256 ),
257 (
258 "streamer-send-host_bytes_10pct",
259 hist.percentile(10.0).unwrap_or_default(),
260 i64
261 ),
262 );
263
264 let num_entries = host_map.len();
265 let mut entries: Vec<_> = host_map.into_iter().collect();
266 if entries.len() > MAX_REPORT_ENTRIES {
267 entries.select_nth_unstable_by_key(MAX_REPORT_ENTRIES, |(_addr, stats)| {
268 Reverse(stats.bytes)
269 });
270 entries.truncate(MAX_REPORT_ENTRIES);
271 }
272 info!(
273 "streamer send {} hosts: count:{} {:?}",
274 name, num_entries, entries,
275 );
276 }
277
278 fn maybe_submit(&mut self, name: &'static str, sender: &Sender<Box<dyn FnOnce() + Send>>) {
279 const SUBMIT_CADENCE: Duration = Duration::from_secs(10);
280 const MAP_SIZE_REPORTING_THRESHOLD: usize = 1_000;
281 let elapsed = self.since.as_ref().map(Instant::elapsed);
282 if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default()
283 && self.host_map.len() < MAP_SIZE_REPORTING_THRESHOLD
284 {
285 return;
286 }
287
288 let host_map = std::mem::take(&mut self.host_map);
289 let _ = sender.send(Box::new(move || {
290 Self::report_stats(name, host_map, elapsed);
291 }));
292
293 *self = Self {
294 since: Some(Instant::now()),
295 ..Self::default()
296 };
297 }
298
299 fn record(&mut self, pkt: &Packet) {
300 let ent = self.host_map.entry(pkt.meta().addr).or_default();
301 ent.count += 1;
302 ent.bytes += pkt.data(..).map(<[u8]>::len).unwrap_or_default() as u64;
303 }
304}
305
306impl StakedNodes {
307 pub fn new(stakes: Arc<HashMap<Pubkey, u64>>, overrides: HashMap<Pubkey, u64>) -> Self {
308 let values = stakes
309 .iter()
310 .filter(|(pubkey, _)| !overrides.contains_key(pubkey))
311 .map(|(_, &stake)| stake)
312 .chain(overrides.values().copied())
313 .filter(|&stake| stake > 0);
314 let total_stake = values.clone().sum();
315 let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default();
316 Self {
317 stakes,
318 overrides,
319 total_stake,
320 max_stake,
321 min_stake,
322 }
323 }
324
325 pub fn get_node_stake(&self, pubkey: &Pubkey) -> Option<u64> {
326 self.overrides
327 .get(pubkey)
328 .or_else(|| self.stakes.get(pubkey))
329 .filter(|&&stake| stake > 0)
330 .copied()
331 }
332
333 #[inline]
334 pub fn total_stake(&self) -> u64 {
335 self.total_stake
336 }
337
338 #[inline]
339 pub(super) fn min_stake(&self) -> u64 {
340 self.min_stake
341 }
342
343 #[inline]
344 pub(super) fn max_stake(&self) -> u64 {
345 self.max_stake
346 }
347}
348
349fn recv_send(
350 sock: &UdpSocket,
351 r: &PacketBatchReceiver,
352 socket_addr_space: &SocketAddrSpace,
353 stats: &mut Option<StreamerSendStats>,
354) -> Result<()> {
355 let timer = Duration::new(1, 0);
356 let packet_batch = r.recv_timeout(timer)?;
357 if let Some(stats) = stats {
358 packet_batch.iter().for_each(|p| stats.record(p));
359 }
360 let packets = packet_batch.iter().filter_map(|pkt| {
361 let addr = pkt.meta().socket_addr();
362 let data = pkt.data(..)?;
363 socket_addr_space.check(&addr).then_some((data, addr))
364 });
365 batch_send(sock, &packets.collect::<Vec<_>>())?;
366 Ok(())
367}
368
369pub fn recv_packet_batches(
370 recvr: &PacketBatchReceiver,
371) -> Result<(Vec<PacketBatch>, usize, Duration)> {
372 let recv_start = Instant::now();
373 let timer = Duration::new(1, 0);
374 let packet_batch = recvr.recv_timeout(timer)?;
375 trace!("got packets");
376 let mut num_packets = packet_batch.len();
377 let mut packet_batches = vec![packet_batch];
378 while let Ok(packet_batch) = recvr.try_recv() {
379 trace!("got more packets");
380 num_packets += packet_batch.len();
381 packet_batches.push(packet_batch);
382 }
383 let recv_duration = recv_start.elapsed();
384 trace!(
385 "packet batches len: {}, num packets: {}",
386 packet_batches.len(),
387 num_packets
388 );
389 Ok((packet_batches, num_packets, recv_duration))
390}
391
392pub fn responder(
393 name: &'static str,
394 sock: Arc<UdpSocket>,
395 r: PacketBatchReceiver,
396 socket_addr_space: SocketAddrSpace,
397 stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
398) -> JoinHandle<()> {
399 Builder::new()
400 .name(format!("solRspndr{name}"))
401 .spawn(move || {
402 let mut errors = 0;
403 let mut last_error = None;
404 let mut last_print = 0;
405 let mut stats = None;
406
407 if stats_reporter_sender.is_some() {
408 stats = Some(StreamerSendStats::default());
409 }
410
411 loop {
412 if let Err(e) = recv_send(&sock, &r, &socket_addr_space, &mut stats) {
413 match e {
414 StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
415 StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
416 _ => {
417 errors += 1;
418 last_error = Some(e);
419 }
420 }
421 }
422 let now = timestamp();
423 if now - last_print > 1000 && errors != 0 {
424 datapoint_info!(name, ("errors", errors, i64),);
425 info!("{} last-error: {:?} count: {}", name, last_error, errors);
426 last_print = now;
427 errors = 0;
428 }
429 if let Some(ref stats_reporter_sender) = stats_reporter_sender {
430 if let Some(ref mut stats) = stats {
431 stats.maybe_submit(name, stats_reporter_sender);
432 }
433 }
434 }
435 })
436 .unwrap()
437}
438
439#[cfg(test)]
440mod test {
441 use {
442 super::*,
443 crate::{
444 packet::{Packet, PacketBatch, PACKET_DATA_SIZE},
445 streamer::{receiver, responder},
446 },
447 crossbeam_channel::unbounded,
448 solana_net_utils::bind_to_localhost,
449 solana_perf::recycler::Recycler,
450 std::{
451 io,
452 io::Write,
453 sync::{
454 atomic::{AtomicBool, Ordering},
455 Arc,
456 },
457 time::Duration,
458 },
459 };
460
461 fn get_packet_batches(r: PacketBatchReceiver, num_packets: &mut usize) {
462 for _ in 0..10 {
463 let packet_batch_res = r.recv_timeout(Duration::new(1, 0));
464 if packet_batch_res.is_err() {
465 continue;
466 }
467
468 *num_packets -= packet_batch_res.unwrap().len();
469
470 if *num_packets == 0 {
471 break;
472 }
473 }
474 }
475
476 #[test]
477 fn streamer_debug() {
478 write!(io::sink(), "{:?}", Packet::default()).unwrap();
479 write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
480 }
481 #[test]
482 fn streamer_send_test() {
483 let read = bind_to_localhost().expect("bind");
484 read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
485
486 let addr = read.local_addr().unwrap();
487 let send = bind_to_localhost().expect("bind");
488 let exit = Arc::new(AtomicBool::new(false));
489 let (s_reader, r_reader) = unbounded();
490 let stats = Arc::new(StreamerReceiveStats::new("test"));
491 let t_receiver = receiver(
492 "solRcvrTest".to_string(),
493 Arc::new(read),
494 exit.clone(),
495 s_reader,
496 Recycler::default(),
497 stats.clone(),
498 Duration::from_millis(1), true,
500 None,
501 false,
502 );
503 const NUM_PACKETS: usize = 5;
504 let t_responder = {
505 let (s_responder, r_responder) = unbounded();
506 let t_responder = responder(
507 "SendTest",
508 Arc::new(send),
509 r_responder,
510 SocketAddrSpace::Unspecified,
511 None,
512 );
513 let mut packet_batch = PacketBatch::default();
514 for i in 0..NUM_PACKETS {
515 let mut p = Packet::default();
516 {
517 p.buffer_mut()[0] = i as u8;
518 p.meta_mut().size = PACKET_DATA_SIZE;
519 p.meta_mut().set_socket_addr(&addr);
520 }
521 packet_batch.push(p);
522 }
523 s_responder.send(packet_batch).expect("send");
524 t_responder
525 };
526
527 let mut packets_remaining = NUM_PACKETS;
528 get_packet_batches(r_reader, &mut packets_remaining);
529 assert_eq!(packets_remaining, 0);
530 exit.store(true, Ordering::Relaxed);
531 assert!(stats.packet_batches_count.load(Ordering::Relaxed) >= 1);
532 assert_eq!(stats.packets_count.load(Ordering::Relaxed), NUM_PACKETS);
533 assert_eq!(stats.full_packet_batches_count.load(Ordering::Relaxed), 0);
534 t_receiver.join().expect("join");
535 t_responder.join().expect("join");
536 }
537}