solana_streamer/
streamer.rs

1//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
2//!
3
4use {
5    crate::{
6        packet::{self, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH},
7        sendmmsg::{batch_send, SendPktsError},
8        socket::SocketAddrSpace,
9    },
10    crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
11    histogram::Histogram,
12    itertools::Itertools,
13    solana_sdk::{packet::Packet, pubkey::Pubkey, timing::timestamp},
14    std::{
15        cmp::Reverse,
16        collections::HashMap,
17        net::{IpAddr, UdpSocket},
18        sync::{
19            atomic::{AtomicBool, AtomicUsize, Ordering},
20            Arc,
21        },
22        thread::{sleep, Builder, JoinHandle},
23        time::{Duration, Instant},
24    },
25    thiserror::Error,
26};
27
28// Total stake and nodes => stake map
29#[derive(Default)]
30pub struct StakedNodes {
31    stakes: Arc<HashMap<Pubkey, u64>>,
32    overrides: HashMap<Pubkey, u64>,
33    total_stake: u64,
34    max_stake: u64,
35    min_stake: u64,
36}
37
38pub type PacketBatchReceiver = Receiver<PacketBatch>;
39pub type PacketBatchSender = Sender<PacketBatch>;
40
41#[derive(Error, Debug)]
42pub enum StreamerError {
43    #[error("I/O error")]
44    Io(#[from] std::io::Error),
45
46    #[error("receive timeout error")]
47    RecvTimeout(#[from] RecvTimeoutError),
48
49    #[error("send packets error")]
50    Send(#[from] SendError<PacketBatch>),
51
52    #[error(transparent)]
53    SendPktsError(#[from] SendPktsError),
54}
55
56pub struct StreamerReceiveStats {
57    pub name: &'static str,
58    pub packets_count: AtomicUsize,
59    pub packet_batches_count: AtomicUsize,
60    pub full_packet_batches_count: AtomicUsize,
61    pub max_channel_len: AtomicUsize,
62}
63
64impl StreamerReceiveStats {
65    pub fn new(name: &'static str) -> Self {
66        Self {
67            name,
68            packets_count: AtomicUsize::default(),
69            packet_batches_count: AtomicUsize::default(),
70            full_packet_batches_count: AtomicUsize::default(),
71            max_channel_len: AtomicUsize::default(),
72        }
73    }
74
75    pub fn report(&self) {
76        datapoint_info!(
77            self.name,
78            (
79                "packets_count",
80                self.packets_count.swap(0, Ordering::Relaxed) as i64,
81                i64
82            ),
83            (
84                "packet_batches_count",
85                self.packet_batches_count.swap(0, Ordering::Relaxed) as i64,
86                i64
87            ),
88            (
89                "full_packet_batches_count",
90                self.full_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
91                i64
92            ),
93            (
94                "channel_len",
95                self.max_channel_len.swap(0, Ordering::Relaxed) as i64,
96                i64
97            ),
98        );
99    }
100}
101
102pub type Result<T> = std::result::Result<T, StreamerError>;
103
104fn recv_loop(
105    socket: &UdpSocket,
106    exit: &AtomicBool,
107    packet_batch_sender: &PacketBatchSender,
108    recycler: &PacketBatchRecycler,
109    stats: &StreamerReceiveStats,
110    coalesce: Duration,
111    use_pinned_memory: bool,
112    in_vote_only_mode: Option<Arc<AtomicBool>>,
113    is_staked_service: bool,
114) -> Result<()> {
115    loop {
116        let mut packet_batch = if use_pinned_memory {
117            PacketBatch::new_with_recycler(recycler, PACKETS_PER_BATCH, stats.name)
118        } else {
119            PacketBatch::with_capacity(PACKETS_PER_BATCH)
120        };
121        loop {
122            // Check for exit signal, even if socket is busy
123            // (for instance the leader transaction socket)
124            if exit.load(Ordering::Relaxed) {
125                return Ok(());
126            }
127
128            if let Some(ref in_vote_only_mode) = in_vote_only_mode {
129                if in_vote_only_mode.load(Ordering::Relaxed) {
130                    sleep(Duration::from_millis(1));
131                    continue;
132                }
133            }
134
135            if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce) {
136                if len > 0 {
137                    let StreamerReceiveStats {
138                        packets_count,
139                        packet_batches_count,
140                        full_packet_batches_count,
141                        max_channel_len,
142                        ..
143                    } = stats;
144
145                    packets_count.fetch_add(len, Ordering::Relaxed);
146                    packet_batches_count.fetch_add(1, Ordering::Relaxed);
147                    max_channel_len.fetch_max(packet_batch_sender.len(), Ordering::Relaxed);
148                    if len == PACKETS_PER_BATCH {
149                        full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
150                    }
151                    packet_batch
152                        .iter_mut()
153                        .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
154                    packet_batch_sender.send(packet_batch)?;
155                }
156                break;
157            }
158        }
159    }
160}
161
162#[allow(clippy::too_many_arguments)]
163pub fn receiver(
164    thread_name: String,
165    socket: Arc<UdpSocket>,
166    exit: Arc<AtomicBool>,
167    packet_batch_sender: PacketBatchSender,
168    recycler: PacketBatchRecycler,
169    stats: Arc<StreamerReceiveStats>,
170    coalesce: Duration,
171    use_pinned_memory: bool,
172    in_vote_only_mode: Option<Arc<AtomicBool>>,
173    is_staked_service: bool,
174) -> JoinHandle<()> {
175    let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
176    assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
177    Builder::new()
178        .name(thread_name)
179        .spawn(move || {
180            let _ = recv_loop(
181                &socket,
182                &exit,
183                &packet_batch_sender,
184                &recycler,
185                &stats,
186                coalesce,
187                use_pinned_memory,
188                in_vote_only_mode,
189                is_staked_service,
190            );
191        })
192        .unwrap()
193}
194
195#[derive(Debug, Default)]
196struct SendStats {
197    bytes: u64,
198    count: u64,
199}
200
201#[derive(Default)]
202struct StreamerSendStats {
203    host_map: HashMap<IpAddr, SendStats>,
204    since: Option<Instant>,
205}
206
207impl StreamerSendStats {
208    fn report_stats(
209        name: &'static str,
210        host_map: HashMap<IpAddr, SendStats>,
211        sample_duration: Option<Duration>,
212    ) {
213        const MAX_REPORT_ENTRIES: usize = 5;
214        let sample_ms = sample_duration.map(|d| d.as_millis()).unwrap_or_default();
215        let mut hist = Histogram::default();
216        let mut byte_sum = 0;
217        let mut pkt_count = 0;
218        host_map.iter().for_each(|(_addr, host_stats)| {
219            hist.increment(host_stats.bytes).unwrap();
220            byte_sum += host_stats.bytes;
221            pkt_count += host_stats.count;
222        });
223
224        datapoint_info!(
225            name,
226            ("streamer-send-sample_duration_ms", sample_ms, i64),
227            ("streamer-send-host_count", host_map.len(), i64),
228            ("streamer-send-bytes_total", byte_sum, i64),
229            ("streamer-send-pkt_count_total", pkt_count, i64),
230            (
231                "streamer-send-host_bytes_min",
232                hist.minimum().unwrap_or_default(),
233                i64
234            ),
235            (
236                "streamer-send-host_bytes_max",
237                hist.maximum().unwrap_or_default(),
238                i64
239            ),
240            (
241                "streamer-send-host_bytes_mean",
242                hist.mean().unwrap_or_default(),
243                i64
244            ),
245            (
246                "streamer-send-host_bytes_90pct",
247                hist.percentile(90.0).unwrap_or_default(),
248                i64
249            ),
250            (
251                "streamer-send-host_bytes_50pct",
252                hist.percentile(50.0).unwrap_or_default(),
253                i64
254            ),
255            (
256                "streamer-send-host_bytes_10pct",
257                hist.percentile(10.0).unwrap_or_default(),
258                i64
259            ),
260        );
261
262        let num_entries = host_map.len();
263        let mut entries: Vec<_> = host_map.into_iter().collect();
264        if entries.len() > MAX_REPORT_ENTRIES {
265            entries.select_nth_unstable_by_key(MAX_REPORT_ENTRIES, |(_addr, stats)| {
266                Reverse(stats.bytes)
267            });
268            entries.truncate(MAX_REPORT_ENTRIES);
269        }
270        info!(
271            "streamer send {} hosts: count:{} {:?}",
272            name, num_entries, entries,
273        );
274    }
275
276    fn maybe_submit(&mut self, name: &'static str, sender: &Sender<Box<dyn FnOnce() + Send>>) {
277        const SUBMIT_CADENCE: Duration = Duration::from_secs(10);
278        const MAP_SIZE_REPORTING_THRESHOLD: usize = 1_000;
279        let elapsed = self.since.as_ref().map(Instant::elapsed);
280        if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default()
281            && self.host_map.len() < MAP_SIZE_REPORTING_THRESHOLD
282        {
283            return;
284        }
285
286        let host_map = std::mem::take(&mut self.host_map);
287        let _ = sender.send(Box::new(move || {
288            Self::report_stats(name, host_map, elapsed);
289        }));
290
291        *self = Self {
292            since: Some(Instant::now()),
293            ..Self::default()
294        };
295    }
296
297    fn record(&mut self, pkt: &Packet) {
298        let ent = self.host_map.entry(pkt.meta().addr).or_default();
299        ent.count += 1;
300        ent.bytes += pkt.data(..).map(<[u8]>::len).unwrap_or_default() as u64;
301    }
302}
303
304impl StakedNodes {
305    pub fn new(stakes: Arc<HashMap<Pubkey, u64>>, overrides: HashMap<Pubkey, u64>) -> Self {
306        let values = stakes
307            .iter()
308            .filter(|(pubkey, _)| !overrides.contains_key(pubkey))
309            .map(|(_, &stake)| stake)
310            .chain(overrides.values().copied())
311            .filter(|&stake| stake > 0);
312        let total_stake = values.clone().sum();
313        let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default();
314        Self {
315            stakes,
316            overrides,
317            total_stake,
318            max_stake,
319            min_stake,
320        }
321    }
322
323    pub fn get_node_stake(&self, pubkey: &Pubkey) -> Option<u64> {
324        self.overrides
325            .get(pubkey)
326            .or_else(|| self.stakes.get(pubkey))
327            .filter(|&&stake| stake > 0)
328            .copied()
329    }
330
331    #[inline]
332    pub fn total_stake(&self) -> u64 {
333        self.total_stake
334    }
335
336    #[inline]
337    pub(super) fn min_stake(&self) -> u64 {
338        self.min_stake
339    }
340
341    #[inline]
342    pub(super) fn max_stake(&self) -> u64 {
343        self.max_stake
344    }
345}
346
347fn recv_send(
348    sock: &UdpSocket,
349    r: &PacketBatchReceiver,
350    socket_addr_space: &SocketAddrSpace,
351    stats: &mut Option<StreamerSendStats>,
352) -> Result<()> {
353    let timer = Duration::new(1, 0);
354    let packet_batch = r.recv_timeout(timer)?;
355    if let Some(stats) = stats {
356        packet_batch.iter().for_each(|p| stats.record(p));
357    }
358    let packets = packet_batch.iter().filter_map(|pkt| {
359        let addr = pkt.meta().socket_addr();
360        let data = pkt.data(..)?;
361        socket_addr_space.check(&addr).then_some((data, addr))
362    });
363    batch_send(sock, &packets.collect::<Vec<_>>())?;
364    Ok(())
365}
366
367pub fn recv_packet_batches(
368    recvr: &PacketBatchReceiver,
369) -> Result<(Vec<PacketBatch>, usize, Duration)> {
370    let recv_start = Instant::now();
371    let timer = Duration::new(1, 0);
372    let packet_batch = recvr.recv_timeout(timer)?;
373    trace!("got packets");
374    let mut num_packets = packet_batch.len();
375    let mut packet_batches = vec![packet_batch];
376    while let Ok(packet_batch) = recvr.try_recv() {
377        trace!("got more packets");
378        num_packets += packet_batch.len();
379        packet_batches.push(packet_batch);
380    }
381    let recv_duration = recv_start.elapsed();
382    trace!(
383        "packet batches len: {}, num packets: {}",
384        packet_batches.len(),
385        num_packets
386    );
387    Ok((packet_batches, num_packets, recv_duration))
388}
389
390pub fn responder(
391    name: &'static str,
392    sock: Arc<UdpSocket>,
393    r: PacketBatchReceiver,
394    socket_addr_space: SocketAddrSpace,
395    stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
396) -> JoinHandle<()> {
397    Builder::new()
398        .name(format!("solRspndr{name}"))
399        .spawn(move || {
400            let mut errors = 0;
401            let mut last_error = None;
402            let mut last_print = 0;
403            let mut stats = None;
404
405            if stats_reporter_sender.is_some() {
406                stats = Some(StreamerSendStats::default());
407            }
408
409            loop {
410                if let Err(e) = recv_send(&sock, &r, &socket_addr_space, &mut stats) {
411                    match e {
412                        StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
413                        StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
414                        _ => {
415                            errors += 1;
416                            last_error = Some(e);
417                        }
418                    }
419                }
420                let now = timestamp();
421                if now - last_print > 1000 && errors != 0 {
422                    datapoint_info!(name, ("errors", errors, i64),);
423                    info!("{} last-error: {:?} count: {}", name, last_error, errors);
424                    last_print = now;
425                    errors = 0;
426                }
427                if let Some(ref stats_reporter_sender) = stats_reporter_sender {
428                    if let Some(ref mut stats) = stats {
429                        stats.maybe_submit(name, stats_reporter_sender);
430                    }
431                }
432            }
433        })
434        .unwrap()
435}
436
437#[cfg(test)]
438mod test {
439    use {
440        super::*,
441        crate::{
442            packet::{Packet, PacketBatch, PACKET_DATA_SIZE},
443            streamer::{receiver, responder},
444        },
445        crossbeam_channel::unbounded,
446        solana_perf::recycler::Recycler,
447        std::{
448            io,
449            io::Write,
450            net::UdpSocket,
451            sync::{
452                atomic::{AtomicBool, Ordering},
453                Arc,
454            },
455            time::Duration,
456        },
457    };
458
459    fn get_packet_batches(r: PacketBatchReceiver, num_packets: &mut usize) {
460        for _ in 0..10 {
461            let packet_batch_res = r.recv_timeout(Duration::new(1, 0));
462            if packet_batch_res.is_err() {
463                continue;
464            }
465
466            *num_packets -= packet_batch_res.unwrap().len();
467
468            if *num_packets == 0 {
469                break;
470            }
471        }
472    }
473
474    #[test]
475    fn streamer_debug() {
476        write!(io::sink(), "{:?}", Packet::default()).unwrap();
477        write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
478    }
479    #[test]
480    fn streamer_send_test() {
481        let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
482        read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
483
484        let addr = read.local_addr().unwrap();
485        let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
486        let exit = Arc::new(AtomicBool::new(false));
487        let (s_reader, r_reader) = unbounded();
488        let stats = Arc::new(StreamerReceiveStats::new("test"));
489        let t_receiver = receiver(
490            "solRcvrTest".to_string(),
491            Arc::new(read),
492            exit.clone(),
493            s_reader,
494            Recycler::default(),
495            stats.clone(),
496            Duration::from_millis(1), // coalesce
497            true,
498            None,
499            false,
500        );
501        const NUM_PACKETS: usize = 5;
502        let t_responder = {
503            let (s_responder, r_responder) = unbounded();
504            let t_responder = responder(
505                "SendTest",
506                Arc::new(send),
507                r_responder,
508                SocketAddrSpace::Unspecified,
509                None,
510            );
511            let mut packet_batch = PacketBatch::default();
512            for i in 0..NUM_PACKETS {
513                let mut p = Packet::default();
514                {
515                    p.buffer_mut()[0] = i as u8;
516                    p.meta_mut().size = PACKET_DATA_SIZE;
517                    p.meta_mut().set_socket_addr(&addr);
518                }
519                packet_batch.push(p);
520            }
521            s_responder.send(packet_batch).expect("send");
522            t_responder
523        };
524
525        let mut packets_remaining = NUM_PACKETS;
526        get_packet_batches(r_reader, &mut packets_remaining);
527        assert_eq!(packets_remaining, 0);
528        exit.store(true, Ordering::Relaxed);
529        assert!(stats.packet_batches_count.load(Ordering::Relaxed) >= 1);
530        assert_eq!(stats.packets_count.load(Ordering::Relaxed), NUM_PACKETS);
531        assert_eq!(stats.full_packet_batches_count.load(Ordering::Relaxed), 0);
532        t_receiver.join().expect("join");
533        t_responder.join().expect("join");
534    }
535}