solana_streamer/
packet.rs

1//! The `packet` module defines data structures and methods to pull data from the network.
2use {
3    crate::{
4        recvmmsg::{recv_mmsg, NUM_RCVMMSGS},
5        socket::SocketAddrSpace,
6    },
7    solana_metrics::inc_new_counter_debug,
8    std::{io::Result, net::UdpSocket, time::Instant},
9};
10pub use {
11    solana_perf::packet::{
12        to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS, PACKETS_PER_BATCH,
13    },
14    solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE},
15};
16
17pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) -> Result<usize> {
18    let mut i = 0;
19    //DOCUMENTED SIDE-EFFECT
20    //Performance out of the IO without poll
21    //  * block on the socket until it's readable
22    //  * set the socket to non blocking
23    //  * read until it fails
24    //  * set it back to blocking before returning
25    socket.set_nonblocking(false)?;
26    trace!("receiving on {}", socket.local_addr().unwrap());
27    let start = Instant::now();
28    loop {
29        batch.resize(
30            std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH),
31            Packet::default(),
32        );
33        match recv_mmsg(socket, &mut batch[i..]) {
34            Err(_) if i > 0 => {
35                if start.elapsed().as_millis() as u64 > max_wait_ms {
36                    break;
37                }
38            }
39            Err(e) => {
40                trace!("recv_from err {:?}", e);
41                return Err(e);
42            }
43            Ok(npkts) => {
44                if i == 0 {
45                    socket.set_nonblocking(true)?;
46                }
47                trace!("got {} packets", npkts);
48                i += npkts;
49                // Try to batch into big enough buffers
50                // will cause less re-shuffling later on.
51                if start.elapsed().as_millis() as u64 > max_wait_ms || i >= PACKETS_PER_BATCH {
52                    break;
53                }
54            }
55        }
56    }
57    batch.truncate(i);
58    inc_new_counter_debug!("packets-recv_count", i);
59    Ok(i)
60}
61
62pub fn send_to(
63    batch: &PacketBatch,
64    socket: &UdpSocket,
65    socket_addr_space: &SocketAddrSpace,
66) -> Result<()> {
67    for p in batch.iter() {
68        let addr = p.meta.socket_addr();
69        if socket_addr_space.check(&addr) {
70            if let Some(data) = p.data(..) {
71                socket.send_to(data, addr)?;
72            }
73        }
74    }
75    Ok(())
76}
77
78#[cfg(test)]
79mod tests {
80    use {
81        super::*,
82        std::{
83            io,
84            io::Write,
85            net::{SocketAddr, UdpSocket},
86        },
87    };
88
89    #[test]
90    fn test_packets_set_addr() {
91        // test that the address is actually being updated
92        let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap();
93        let packets = vec![Packet::default()];
94        let mut packet_batch = PacketBatch::new(packets);
95        packet_batch.set_addr(&send_addr);
96        assert_eq!(packet_batch[0].meta.socket_addr(), send_addr);
97    }
98
99    #[test]
100    pub fn packet_send_recv() {
101        solana_logger::setup();
102        let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
103        let addr = recv_socket.local_addr().unwrap();
104        let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
105        let saddr = send_socket.local_addr().unwrap();
106
107        let packet_batch_size = 10;
108        let mut batch = PacketBatch::with_capacity(packet_batch_size);
109        batch.resize(packet_batch_size, Packet::default());
110
111        for m in batch.iter_mut() {
112            m.meta.set_socket_addr(&addr);
113            m.meta.size = PACKET_DATA_SIZE;
114        }
115        send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
116
117        batch.iter_mut().for_each(|pkt| pkt.meta = Meta::default());
118        let recvd = recv_from(&mut batch, &recv_socket, 1).unwrap();
119
120        assert_eq!(recvd, batch.len());
121
122        for m in batch.iter() {
123            assert_eq!(m.meta.size, PACKET_DATA_SIZE);
124            assert_eq!(m.meta.socket_addr(), saddr);
125        }
126    }
127
128    #[test]
129    pub fn debug_trait() {
130        write!(io::sink(), "{:?}", Packet::default()).unwrap();
131        write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
132    }
133
134    #[test]
135    fn test_packet_partial_eq() {
136        let mut p1 = Packet::default();
137        let mut p2 = Packet::default();
138
139        p1.meta.size = 1;
140        p1.buffer_mut()[0] = 0;
141
142        p2.meta.size = 1;
143        p2.buffer_mut()[0] = 0;
144
145        assert!(p1 == p2);
146
147        p2.buffer_mut()[0] = 4;
148        assert!(p1 != p2);
149    }
150
151    #[test]
152    fn test_packet_resize() {
153        solana_logger::setup();
154        let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
155        let addr = recv_socket.local_addr().unwrap();
156        let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
157        let mut batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
158        batch.resize(PACKETS_PER_BATCH, Packet::default());
159
160        // Should only get PACKETS_PER_BATCH packets per iteration even
161        // if a lot more were sent, and regardless of packet size
162        for _ in 0..2 * PACKETS_PER_BATCH {
163            let batch_size = 1;
164            let mut batch = PacketBatch::with_capacity(batch_size);
165            batch.resize(batch_size, Packet::default());
166            for p in batch.iter_mut() {
167                p.meta.set_socket_addr(&addr);
168                p.meta.size = 1;
169            }
170            send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
171        }
172
173        let recvd = recv_from(&mut batch, &recv_socket, 100).unwrap();
174
175        // Check we only got PACKETS_PER_BATCH packets
176        assert_eq!(recvd, PACKETS_PER_BATCH);
177        assert_eq!(batch.capacity(), PACKETS_PER_BATCH);
178    }
179}