solana_streamer/
packet.rs

1//! The `packet` module defines data structures and methods to pull data from the network.
2use {
3    crate::{
4        recvmmsg::{recv_mmsg, NUM_RCVMMSGS},
5        socket::SocketAddrSpace,
6    },
7    std::{
8        io::Result,
9        net::UdpSocket,
10        time::{Duration, Instant},
11    },
12};
13pub use {
14    solana_packet::{Meta, Packet, PACKET_DATA_SIZE},
15    solana_perf::packet::{
16        to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS, PACKETS_PER_BATCH,
17    },
18};
19
20pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait: Duration) -> Result<usize> {
21    let mut i = 0;
22    //DOCUMENTED SIDE-EFFECT
23    //Performance out of the IO without poll
24    //  * block on the socket until it's readable
25    //  * set the socket to non blocking
26    //  * read until it fails
27    //  * set it back to blocking before returning
28    socket.set_nonblocking(false)?;
29    trace!("receiving on {}", socket.local_addr().unwrap());
30    let start = Instant::now();
31    loop {
32        batch.resize(
33            std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH),
34            Packet::default(),
35        );
36        match recv_mmsg(socket, &mut batch[i..]) {
37            Err(_) if i > 0 => {
38                if start.elapsed() > max_wait {
39                    break;
40                }
41            }
42            Err(e) => {
43                trace!("recv_from err {:?}", e);
44                return Err(e);
45            }
46            Ok(npkts) => {
47                if i == 0 {
48                    socket.set_nonblocking(true)?;
49                }
50                trace!("got {} packets", npkts);
51                i += npkts;
52                // Try to batch into big enough buffers
53                // will cause less re-shuffling later on.
54                if start.elapsed() > max_wait || i >= PACKETS_PER_BATCH {
55                    break;
56                }
57            }
58        }
59    }
60    batch.truncate(i);
61    Ok(i)
62}
63
64pub fn send_to(
65    batch: &PacketBatch,
66    socket: &UdpSocket,
67    socket_addr_space: &SocketAddrSpace,
68) -> Result<()> {
69    for p in batch.iter() {
70        let addr = p.meta().socket_addr();
71        if socket_addr_space.check(&addr) {
72            if let Some(data) = p.data(..) {
73                socket.send_to(data, addr)?;
74            }
75        }
76    }
77    Ok(())
78}
79
80#[cfg(test)]
81mod tests {
82    use {
83        super::*,
84        solana_net_utils::bind_to_localhost,
85        std::{io, io::Write, net::SocketAddr},
86    };
87
88    #[test]
89    fn test_packets_set_addr() {
90        // test that the address is actually being updated
91        let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap();
92        let packets = vec![Packet::default()];
93        let mut packet_batch = PacketBatch::new(packets);
94        packet_batch.set_addr(&send_addr);
95        assert_eq!(packet_batch[0].meta().socket_addr(), send_addr);
96    }
97
98    #[test]
99    pub fn packet_send_recv() {
100        solana_logger::setup();
101        let recv_socket = bind_to_localhost().expect("bind");
102        let addr = recv_socket.local_addr().unwrap();
103        let send_socket = bind_to_localhost().expect("bind");
104        let saddr = send_socket.local_addr().unwrap();
105
106        let packet_batch_size = 10;
107        let mut batch = PacketBatch::with_capacity(packet_batch_size);
108        batch.resize(packet_batch_size, Packet::default());
109
110        for m in batch.iter_mut() {
111            m.meta_mut().set_socket_addr(&addr);
112            m.meta_mut().size = PACKET_DATA_SIZE;
113        }
114        send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
115
116        batch
117            .iter_mut()
118            .for_each(|pkt| *pkt.meta_mut() = Meta::default());
119        let recvd = recv_from(
120            &mut batch,
121            &recv_socket,
122            Duration::from_millis(1), // max_wait
123        )
124        .unwrap();
125        assert_eq!(recvd, batch.len());
126
127        for m in batch.iter() {
128            assert_eq!(m.meta().size, PACKET_DATA_SIZE);
129            assert_eq!(m.meta().socket_addr(), saddr);
130        }
131    }
132
133    #[test]
134    pub fn debug_trait() {
135        write!(io::sink(), "{:?}", Packet::default()).unwrap();
136        write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
137    }
138
139    #[test]
140    fn test_packet_partial_eq() {
141        let mut p1 = Packet::default();
142        let mut p2 = Packet::default();
143
144        p1.meta_mut().size = 1;
145        p1.buffer_mut()[0] = 0;
146
147        p2.meta_mut().size = 1;
148        p2.buffer_mut()[0] = 0;
149
150        assert!(p1 == p2);
151
152        p2.buffer_mut()[0] = 4;
153        assert!(p1 != p2);
154    }
155
156    #[test]
157    fn test_packet_resize() {
158        solana_logger::setup();
159        let recv_socket = bind_to_localhost().expect("bind");
160        let addr = recv_socket.local_addr().unwrap();
161        let send_socket = bind_to_localhost().expect("bind");
162        let mut batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
163        batch.resize(PACKETS_PER_BATCH, Packet::default());
164
165        // Should only get PACKETS_PER_BATCH packets per iteration even
166        // if a lot more were sent, and regardless of packet size
167        for _ in 0..2 * PACKETS_PER_BATCH {
168            let batch_size = 1;
169            let mut batch = PacketBatch::with_capacity(batch_size);
170            batch.resize(batch_size, Packet::default());
171            for p in batch.iter_mut() {
172                p.meta_mut().set_socket_addr(&addr);
173                p.meta_mut().size = 1;
174            }
175            send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
176        }
177        let recvd = recv_from(
178            &mut batch,
179            &recv_socket,
180            Duration::from_millis(100), // max_wait
181        )
182        .unwrap();
183        // Check we only got PACKETS_PER_BATCH packets
184        assert_eq!(recvd, PACKETS_PER_BATCH);
185        assert_eq!(batch.capacity(), PACKETS_PER_BATCH);
186    }
187}