solana_streamer/
packet.rs

1//! The `packet` module defines data structures and methods to pull data from the network.
2use {
3    crate::{
4        recvmmsg::{recv_mmsg, NUM_RCVMMSGS},
5        socket::SocketAddrSpace,
6    },
7    std::{
8        io::Result,
9        net::UdpSocket,
10        time::{Duration, Instant},
11    },
12};
13pub use {
14    solana_perf::packet::{
15        to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS, PACKETS_PER_BATCH,
16    },
17    solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE},
18};
19
20pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait: Duration) -> Result<usize> {
21    let mut i = 0;
22    //DOCUMENTED SIDE-EFFECT
23    //Performance out of the IO without poll
24    //  * block on the socket until it's readable
25    //  * set the socket to non blocking
26    //  * read until it fails
27    //  * set it back to blocking before returning
28    socket.set_nonblocking(false)?;
29    trace!("receiving on {}", socket.local_addr().unwrap());
30    let start = Instant::now();
31    loop {
32        batch.resize(
33            std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH),
34            Packet::default(),
35        );
36        match recv_mmsg(socket, &mut batch[i..]) {
37            Err(_) if i > 0 => {
38                if start.elapsed() > max_wait {
39                    break;
40                }
41            }
42            Err(e) => {
43                trace!("recv_from err {:?}", e);
44                return Err(e);
45            }
46            Ok(npkts) => {
47                if i == 0 {
48                    socket.set_nonblocking(true)?;
49                }
50                trace!("got {} packets", npkts);
51                i += npkts;
52                // Try to batch into big enough buffers
53                // will cause less re-shuffling later on.
54                if start.elapsed() > max_wait || i >= PACKETS_PER_BATCH {
55                    break;
56                }
57            }
58        }
59    }
60    batch.truncate(i);
61    Ok(i)
62}
63
64pub fn send_to(
65    batch: &PacketBatch,
66    socket: &UdpSocket,
67    socket_addr_space: &SocketAddrSpace,
68) -> Result<()> {
69    for p in batch.iter() {
70        let addr = p.meta().socket_addr();
71        if socket_addr_space.check(&addr) {
72            if let Some(data) = p.data(..) {
73                socket.send_to(data, addr)?;
74            }
75        }
76    }
77    Ok(())
78}
79
80#[cfg(test)]
81mod tests {
82    use {
83        super::*,
84        std::{
85            io,
86            io::Write,
87            net::{SocketAddr, UdpSocket},
88        },
89    };
90
91    #[test]
92    fn test_packets_set_addr() {
93        // test that the address is actually being updated
94        let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap();
95        let packets = vec![Packet::default()];
96        let mut packet_batch = PacketBatch::new(packets);
97        packet_batch.set_addr(&send_addr);
98        assert_eq!(packet_batch[0].meta().socket_addr(), send_addr);
99    }
100
101    #[test]
102    pub fn packet_send_recv() {
103        solana_logger::setup();
104        let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
105        let addr = recv_socket.local_addr().unwrap();
106        let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
107        let saddr = send_socket.local_addr().unwrap();
108
109        let packet_batch_size = 10;
110        let mut batch = PacketBatch::with_capacity(packet_batch_size);
111        batch.resize(packet_batch_size, Packet::default());
112
113        for m in batch.iter_mut() {
114            m.meta_mut().set_socket_addr(&addr);
115            m.meta_mut().size = PACKET_DATA_SIZE;
116        }
117        send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
118
119        batch
120            .iter_mut()
121            .for_each(|pkt| *pkt.meta_mut() = Meta::default());
122        let recvd = recv_from(
123            &mut batch,
124            &recv_socket,
125            Duration::from_millis(1), // max_wait
126        )
127        .unwrap();
128        assert_eq!(recvd, batch.len());
129
130        for m in batch.iter() {
131            assert_eq!(m.meta().size, PACKET_DATA_SIZE);
132            assert_eq!(m.meta().socket_addr(), saddr);
133        }
134    }
135
136    #[test]
137    pub fn debug_trait() {
138        write!(io::sink(), "{:?}", Packet::default()).unwrap();
139        write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
140    }
141
142    #[test]
143    fn test_packet_partial_eq() {
144        let mut p1 = Packet::default();
145        let mut p2 = Packet::default();
146
147        p1.meta_mut().size = 1;
148        p1.buffer_mut()[0] = 0;
149
150        p2.meta_mut().size = 1;
151        p2.buffer_mut()[0] = 0;
152
153        assert!(p1 == p2);
154
155        p2.buffer_mut()[0] = 4;
156        assert!(p1 != p2);
157    }
158
159    #[test]
160    fn test_packet_resize() {
161        solana_logger::setup();
162        let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
163        let addr = recv_socket.local_addr().unwrap();
164        let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
165        let mut batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
166        batch.resize(PACKETS_PER_BATCH, Packet::default());
167
168        // Should only get PACKETS_PER_BATCH packets per iteration even
169        // if a lot more were sent, and regardless of packet size
170        for _ in 0..2 * PACKETS_PER_BATCH {
171            let batch_size = 1;
172            let mut batch = PacketBatch::with_capacity(batch_size);
173            batch.resize(batch_size, Packet::default());
174            for p in batch.iter_mut() {
175                p.meta_mut().set_socket_addr(&addr);
176                p.meta_mut().size = 1;
177            }
178            send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
179        }
180        let recvd = recv_from(
181            &mut batch,
182            &recv_socket,
183            Duration::from_millis(100), // max_wait
184        )
185        .unwrap();
186        // Check we only got PACKETS_PER_BATCH packets
187        assert_eq!(recvd, PACKETS_PER_BATCH);
188        assert_eq!(batch.capacity(), PACKETS_PER_BATCH);
189    }
190}