solana_streamer/
streamer.rs

1//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
2//!
3
4use {
5    crate::{
6        packet::{self, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH},
7        sendmmsg::{batch_send, SendPktsError},
8        socket::SocketAddrSpace,
9    },
10    crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
11    histogram::Histogram,
12    itertools::Itertools,
13    solana_packet::Packet,
14    solana_pubkey::Pubkey,
15    solana_time_utils::timestamp,
16    std::{
17        cmp::Reverse,
18        collections::HashMap,
19        net::{IpAddr, UdpSocket},
20        sync::{
21            atomic::{AtomicBool, AtomicUsize, Ordering},
22            Arc,
23        },
24        thread::{sleep, Builder, JoinHandle},
25        time::{Duration, Instant},
26    },
27    thiserror::Error,
28};
29
30// Total stake and nodes => stake map
31#[derive(Default)]
32pub struct StakedNodes {
33    stakes: Arc<HashMap<Pubkey, u64>>,
34    overrides: HashMap<Pubkey, u64>,
35    total_stake: u64,
36    max_stake: u64,
37    min_stake: u64,
38}
39
40pub type PacketBatchReceiver = Receiver<PacketBatch>;
41pub type PacketBatchSender = Sender<PacketBatch>;
42
43#[derive(Error, Debug)]
44pub enum StreamerError {
45    #[error("I/O error")]
46    Io(#[from] std::io::Error),
47
48    #[error("receive timeout error")]
49    RecvTimeout(#[from] RecvTimeoutError),
50
51    #[error("send packets error")]
52    Send(#[from] SendError<PacketBatch>),
53
54    #[error(transparent)]
55    SendPktsError(#[from] SendPktsError),
56}
57
58pub struct StreamerReceiveStats {
59    pub name: &'static str,
60    pub packets_count: AtomicUsize,
61    pub packet_batches_count: AtomicUsize,
62    pub full_packet_batches_count: AtomicUsize,
63    pub max_channel_len: AtomicUsize,
64}
65
66impl StreamerReceiveStats {
67    pub fn new(name: &'static str) -> Self {
68        Self {
69            name,
70            packets_count: AtomicUsize::default(),
71            packet_batches_count: AtomicUsize::default(),
72            full_packet_batches_count: AtomicUsize::default(),
73            max_channel_len: AtomicUsize::default(),
74        }
75    }
76
77    pub fn report(&self) {
78        datapoint_info!(
79            self.name,
80            (
81                "packets_count",
82                self.packets_count.swap(0, Ordering::Relaxed) as i64,
83                i64
84            ),
85            (
86                "packet_batches_count",
87                self.packet_batches_count.swap(0, Ordering::Relaxed) as i64,
88                i64
89            ),
90            (
91                "full_packet_batches_count",
92                self.full_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
93                i64
94            ),
95            (
96                "channel_len",
97                self.max_channel_len.swap(0, Ordering::Relaxed) as i64,
98                i64
99            ),
100        );
101    }
102}
103
104pub type Result<T> = std::result::Result<T, StreamerError>;
105
106fn recv_loop(
107    socket: &UdpSocket,
108    exit: &AtomicBool,
109    packet_batch_sender: &PacketBatchSender,
110    recycler: &PacketBatchRecycler,
111    stats: &StreamerReceiveStats,
112    coalesce: Duration,
113    use_pinned_memory: bool,
114    in_vote_only_mode: Option<Arc<AtomicBool>>,
115    is_staked_service: bool,
116) -> Result<()> {
117    loop {
118        let mut packet_batch = if use_pinned_memory {
119            PacketBatch::new_with_recycler(recycler, PACKETS_PER_BATCH, stats.name)
120        } else {
121            PacketBatch::with_capacity(PACKETS_PER_BATCH)
122        };
123        loop {
124            // Check for exit signal, even if socket is busy
125            // (for instance the leader transaction socket)
126            if exit.load(Ordering::Relaxed) {
127                return Ok(());
128            }
129
130            if let Some(ref in_vote_only_mode) = in_vote_only_mode {
131                if in_vote_only_mode.load(Ordering::Relaxed) {
132                    sleep(Duration::from_millis(1));
133                    continue;
134                }
135            }
136
137            if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce) {
138                if len > 0 {
139                    let StreamerReceiveStats {
140                        packets_count,
141                        packet_batches_count,
142                        full_packet_batches_count,
143                        max_channel_len,
144                        ..
145                    } = stats;
146
147                    packets_count.fetch_add(len, Ordering::Relaxed);
148                    packet_batches_count.fetch_add(1, Ordering::Relaxed);
149                    max_channel_len.fetch_max(packet_batch_sender.len(), Ordering::Relaxed);
150                    if len == PACKETS_PER_BATCH {
151                        full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
152                    }
153                    packet_batch
154                        .iter_mut()
155                        .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
156                    packet_batch_sender.send(packet_batch)?;
157                }
158                break;
159            }
160        }
161    }
162}
163
164#[allow(clippy::too_many_arguments)]
165pub fn receiver(
166    thread_name: String,
167    socket: Arc<UdpSocket>,
168    exit: Arc<AtomicBool>,
169    packet_batch_sender: PacketBatchSender,
170    recycler: PacketBatchRecycler,
171    stats: Arc<StreamerReceiveStats>,
172    coalesce: Duration,
173    use_pinned_memory: bool,
174    in_vote_only_mode: Option<Arc<AtomicBool>>,
175    is_staked_service: bool,
176) -> JoinHandle<()> {
177    let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
178    assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
179    Builder::new()
180        .name(thread_name)
181        .spawn(move || {
182            let _ = recv_loop(
183                &socket,
184                &exit,
185                &packet_batch_sender,
186                &recycler,
187                &stats,
188                coalesce,
189                use_pinned_memory,
190                in_vote_only_mode,
191                is_staked_service,
192            );
193        })
194        .unwrap()
195}
196
197#[derive(Debug, Default)]
198struct SendStats {
199    bytes: u64,
200    count: u64,
201}
202
203#[derive(Default)]
204struct StreamerSendStats {
205    host_map: HashMap<IpAddr, SendStats>,
206    since: Option<Instant>,
207}
208
209impl StreamerSendStats {
210    fn report_stats(
211        name: &'static str,
212        host_map: HashMap<IpAddr, SendStats>,
213        sample_duration: Option<Duration>,
214    ) {
215        const MAX_REPORT_ENTRIES: usize = 5;
216        let sample_ms = sample_duration.map(|d| d.as_millis()).unwrap_or_default();
217        let mut hist = Histogram::default();
218        let mut byte_sum = 0;
219        let mut pkt_count = 0;
220        host_map.iter().for_each(|(_addr, host_stats)| {
221            hist.increment(host_stats.bytes).unwrap();
222            byte_sum += host_stats.bytes;
223            pkt_count += host_stats.count;
224        });
225
226        datapoint_info!(
227            name,
228            ("streamer-send-sample_duration_ms", sample_ms, i64),
229            ("streamer-send-host_count", host_map.len(), i64),
230            ("streamer-send-bytes_total", byte_sum, i64),
231            ("streamer-send-pkt_count_total", pkt_count, i64),
232            (
233                "streamer-send-host_bytes_min",
234                hist.minimum().unwrap_or_default(),
235                i64
236            ),
237            (
238                "streamer-send-host_bytes_max",
239                hist.maximum().unwrap_or_default(),
240                i64
241            ),
242            (
243                "streamer-send-host_bytes_mean",
244                hist.mean().unwrap_or_default(),
245                i64
246            ),
247            (
248                "streamer-send-host_bytes_90pct",
249                hist.percentile(90.0).unwrap_or_default(),
250                i64
251            ),
252            (
253                "streamer-send-host_bytes_50pct",
254                hist.percentile(50.0).unwrap_or_default(),
255                i64
256            ),
257            (
258                "streamer-send-host_bytes_10pct",
259                hist.percentile(10.0).unwrap_or_default(),
260                i64
261            ),
262        );
263
264        let num_entries = host_map.len();
265        let mut entries: Vec<_> = host_map.into_iter().collect();
266        if entries.len() > MAX_REPORT_ENTRIES {
267            entries.select_nth_unstable_by_key(MAX_REPORT_ENTRIES, |(_addr, stats)| {
268                Reverse(stats.bytes)
269            });
270            entries.truncate(MAX_REPORT_ENTRIES);
271        }
272        info!(
273            "streamer send {} hosts: count:{} {:?}",
274            name, num_entries, entries,
275        );
276    }
277
278    fn maybe_submit(&mut self, name: &'static str, sender: &Sender<Box<dyn FnOnce() + Send>>) {
279        const SUBMIT_CADENCE: Duration = Duration::from_secs(10);
280        const MAP_SIZE_REPORTING_THRESHOLD: usize = 1_000;
281        let elapsed = self.since.as_ref().map(Instant::elapsed);
282        if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default()
283            && self.host_map.len() < MAP_SIZE_REPORTING_THRESHOLD
284        {
285            return;
286        }
287
288        let host_map = std::mem::take(&mut self.host_map);
289        let _ = sender.send(Box::new(move || {
290            Self::report_stats(name, host_map, elapsed);
291        }));
292
293        *self = Self {
294            since: Some(Instant::now()),
295            ..Self::default()
296        };
297    }
298
299    fn record(&mut self, pkt: &Packet) {
300        let ent = self.host_map.entry(pkt.meta().addr).or_default();
301        ent.count += 1;
302        ent.bytes += pkt.data(..).map(<[u8]>::len).unwrap_or_default() as u64;
303    }
304}
305
306impl StakedNodes {
307    pub fn new(stakes: Arc<HashMap<Pubkey, u64>>, overrides: HashMap<Pubkey, u64>) -> Self {
308        let values = stakes
309            .iter()
310            .filter(|(pubkey, _)| !overrides.contains_key(pubkey))
311            .map(|(_, &stake)| stake)
312            .chain(overrides.values().copied())
313            .filter(|&stake| stake > 0);
314        let total_stake = values.clone().sum();
315        let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default();
316        Self {
317            stakes,
318            overrides,
319            total_stake,
320            max_stake,
321            min_stake,
322        }
323    }
324
325    pub fn get_node_stake(&self, pubkey: &Pubkey) -> Option<u64> {
326        self.overrides
327            .get(pubkey)
328            .or_else(|| self.stakes.get(pubkey))
329            .filter(|&&stake| stake > 0)
330            .copied()
331    }
332
333    #[inline]
334    pub fn total_stake(&self) -> u64 {
335        self.total_stake
336    }
337
338    #[inline]
339    pub(super) fn min_stake(&self) -> u64 {
340        self.min_stake
341    }
342
343    #[inline]
344    pub(super) fn max_stake(&self) -> u64 {
345        self.max_stake
346    }
347}
348
349fn recv_send(
350    sock: &UdpSocket,
351    r: &PacketBatchReceiver,
352    socket_addr_space: &SocketAddrSpace,
353    stats: &mut Option<StreamerSendStats>,
354) -> Result<()> {
355    let timer = Duration::new(1, 0);
356    let packet_batch = r.recv_timeout(timer)?;
357    if let Some(stats) = stats {
358        packet_batch.iter().for_each(|p| stats.record(p));
359    }
360    let packets = packet_batch.iter().filter_map(|pkt| {
361        let addr = pkt.meta().socket_addr();
362        let data = pkt.data(..)?;
363        socket_addr_space.check(&addr).then_some((data, addr))
364    });
365    batch_send(sock, &packets.collect::<Vec<_>>())?;
366    Ok(())
367}
368
369pub fn recv_packet_batches(
370    recvr: &PacketBatchReceiver,
371) -> Result<(Vec<PacketBatch>, usize, Duration)> {
372    let recv_start = Instant::now();
373    let timer = Duration::new(1, 0);
374    let packet_batch = recvr.recv_timeout(timer)?;
375    trace!("got packets");
376    let mut num_packets = packet_batch.len();
377    let mut packet_batches = vec![packet_batch];
378    while let Ok(packet_batch) = recvr.try_recv() {
379        trace!("got more packets");
380        num_packets += packet_batch.len();
381        packet_batches.push(packet_batch);
382    }
383    let recv_duration = recv_start.elapsed();
384    trace!(
385        "packet batches len: {}, num packets: {}",
386        packet_batches.len(),
387        num_packets
388    );
389    Ok((packet_batches, num_packets, recv_duration))
390}
391
392pub fn responder(
393    name: &'static str,
394    sock: Arc<UdpSocket>,
395    r: PacketBatchReceiver,
396    socket_addr_space: SocketAddrSpace,
397    stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
398) -> JoinHandle<()> {
399    Builder::new()
400        .name(format!("solRspndr{name}"))
401        .spawn(move || {
402            let mut errors = 0;
403            let mut last_error = None;
404            let mut last_print = 0;
405            let mut stats = None;
406
407            if stats_reporter_sender.is_some() {
408                stats = Some(StreamerSendStats::default());
409            }
410
411            loop {
412                if let Err(e) = recv_send(&sock, &r, &socket_addr_space, &mut stats) {
413                    match e {
414                        StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
415                        StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
416                        _ => {
417                            errors += 1;
418                            last_error = Some(e);
419                        }
420                    }
421                }
422                let now = timestamp();
423                if now - last_print > 1000 && errors != 0 {
424                    datapoint_info!(name, ("errors", errors, i64),);
425                    info!("{} last-error: {:?} count: {}", name, last_error, errors);
426                    last_print = now;
427                    errors = 0;
428                }
429                if let Some(ref stats_reporter_sender) = stats_reporter_sender {
430                    if let Some(ref mut stats) = stats {
431                        stats.maybe_submit(name, stats_reporter_sender);
432                    }
433                }
434            }
435        })
436        .unwrap()
437}
438
439#[cfg(test)]
440mod test {
441    use {
442        super::*,
443        crate::{
444            packet::{Packet, PacketBatch, PACKET_DATA_SIZE},
445            streamer::{receiver, responder},
446        },
447        crossbeam_channel::unbounded,
448        solana_net_utils::bind_to_localhost,
449        solana_perf::recycler::Recycler,
450        std::{
451            io,
452            io::Write,
453            sync::{
454                atomic::{AtomicBool, Ordering},
455                Arc,
456            },
457            time::Duration,
458        },
459    };
460
461    fn get_packet_batches(r: PacketBatchReceiver, num_packets: &mut usize) {
462        for _ in 0..10 {
463            let packet_batch_res = r.recv_timeout(Duration::new(1, 0));
464            if packet_batch_res.is_err() {
465                continue;
466            }
467
468            *num_packets -= packet_batch_res.unwrap().len();
469
470            if *num_packets == 0 {
471                break;
472            }
473        }
474    }
475
476    #[test]
477    fn streamer_debug() {
478        write!(io::sink(), "{:?}", Packet::default()).unwrap();
479        write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
480    }
481    #[test]
482    fn streamer_send_test() {
483        let read = bind_to_localhost().expect("bind");
484        read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
485
486        let addr = read.local_addr().unwrap();
487        let send = bind_to_localhost().expect("bind");
488        let exit = Arc::new(AtomicBool::new(false));
489        let (s_reader, r_reader) = unbounded();
490        let stats = Arc::new(StreamerReceiveStats::new("test"));
491        let t_receiver = receiver(
492            "solRcvrTest".to_string(),
493            Arc::new(read),
494            exit.clone(),
495            s_reader,
496            Recycler::default(),
497            stats.clone(),
498            Duration::from_millis(1), // coalesce
499            true,
500            None,
501            false,
502        );
503        const NUM_PACKETS: usize = 5;
504        let t_responder = {
505            let (s_responder, r_responder) = unbounded();
506            let t_responder = responder(
507                "SendTest",
508                Arc::new(send),
509                r_responder,
510                SocketAddrSpace::Unspecified,
511                None,
512            );
513            let mut packet_batch = PacketBatch::default();
514            for i in 0..NUM_PACKETS {
515                let mut p = Packet::default();
516                {
517                    p.buffer_mut()[0] = i as u8;
518                    p.meta_mut().size = PACKET_DATA_SIZE;
519                    p.meta_mut().set_socket_addr(&addr);
520                }
521                packet_batch.push(p);
522            }
523            s_responder.send(packet_batch).expect("send");
524            t_responder
525        };
526
527        let mut packets_remaining = NUM_PACKETS;
528        get_packet_batches(r_reader, &mut packets_remaining);
529        assert_eq!(packets_remaining, 0);
530        exit.store(true, Ordering::Relaxed);
531        assert!(stats.packet_batches_count.load(Ordering::Relaxed) >= 1);
532        assert_eq!(stats.packets_count.load(Ordering::Relaxed), NUM_PACKETS);
533        assert_eq!(stats.full_packet_batches_count.load(Ordering::Relaxed), 0);
534        t_receiver.join().expect("join");
535        t_responder.join().expect("join");
536    }
537}