solana_streamer/
recvmmsg.rs

1//! The `recvmmsg` module provides recvmmsg() API implementation
2
3#[cfg(target_os = "linux")]
4#[allow(deprecated)]
5use nix::sys::socket::InetAddr;
6pub use solana_perf::packet::NUM_RCVMMSGS;
7use {
8    crate::packet::{Meta, Packet},
9    std::{cmp, io, net::UdpSocket},
10};
11#[cfg(target_os = "linux")]
12use {
13    itertools::izip,
14    libc::{iovec, mmsghdr, sockaddr_storage, socklen_t, AF_INET, AF_INET6, MSG_WAITFORONE},
15    std::{mem, os::unix::io::AsRawFd},
16};
17
18#[cfg(not(target_os = "linux"))]
19pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result</*num packets:*/ usize> {
20    debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default()));
21    let mut i = 0;
22    let count = cmp::min(NUM_RCVMMSGS, packets.len());
23    for p in packets.iter_mut().take(count) {
24        p.meta.size = 0;
25        match socket.recv_from(p.buffer_mut()) {
26            Err(_) if i > 0 => {
27                break;
28            }
29            Err(e) => {
30                return Err(e);
31            }
32            Ok((nrecv, from)) => {
33                p.meta.size = nrecv;
34                p.meta.set_socket_addr(&from);
35                if i == 0 {
36                    socket.set_nonblocking(true)?;
37                }
38            }
39        }
40        i += 1;
41    }
42    Ok(i)
43}
44
45#[cfg(target_os = "linux")]
46#[allow(deprecated)]
47fn cast_socket_addr(addr: &sockaddr_storage, hdr: &mmsghdr) -> Option<InetAddr> {
48    use libc::{sa_family_t, sockaddr_in, sockaddr_in6};
49    const SOCKADDR_IN_SIZE: usize = std::mem::size_of::<sockaddr_in>();
50    const SOCKADDR_IN6_SIZE: usize = std::mem::size_of::<sockaddr_in6>();
51    if addr.ss_family == AF_INET as sa_family_t
52        && hdr.msg_hdr.msg_namelen == SOCKADDR_IN_SIZE as socklen_t
53    {
54        let addr = addr as *const _ as *const sockaddr_in;
55        return Some(unsafe { InetAddr::V4(*addr) });
56    }
57    if addr.ss_family == AF_INET6 as sa_family_t
58        && hdr.msg_hdr.msg_namelen == SOCKADDR_IN6_SIZE as socklen_t
59    {
60        let addr = addr as *const _ as *const sockaddr_in6;
61        return Some(unsafe { InetAddr::V6(*addr) });
62    }
63    error!(
64        "recvmmsg unexpected ss_family:{} msg_namelen:{}",
65        addr.ss_family, hdr.msg_hdr.msg_namelen
66    );
67    None
68}
69
70#[cfg(target_os = "linux")]
71#[allow(clippy::uninit_assumed_init)]
72pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result</*num packets:*/ usize> {
73    // Assert that there are no leftovers in packets.
74    debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default()));
75    const SOCKADDR_STORAGE_SIZE: usize = mem::size_of::<sockaddr_storage>();
76
77    let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
78    let iovs = mem::MaybeUninit::<[iovec; NUM_RCVMMSGS]>::uninit();
79    let mut iovs: [iovec; NUM_RCVMMSGS] = unsafe { iovs.assume_init() };
80    let mut addrs: [sockaddr_storage; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
81
82    let sock_fd = sock.as_raw_fd();
83    let count = cmp::min(iovs.len(), packets.len());
84
85    for (packet, hdr, iov, addr) in
86        izip!(packets.iter_mut(), &mut hdrs, &mut iovs, &mut addrs).take(count)
87    {
88        let buffer = packet.buffer_mut();
89        *iov = iovec {
90            iov_base: buffer.as_mut_ptr() as *mut libc::c_void,
91            iov_len: buffer.len(),
92        };
93        hdr.msg_hdr.msg_name = addr as *mut _ as *mut _;
94        hdr.msg_hdr.msg_namelen = SOCKADDR_STORAGE_SIZE as socklen_t;
95        hdr.msg_hdr.msg_iov = iov;
96        hdr.msg_hdr.msg_iovlen = 1;
97    }
98    let mut ts = libc::timespec {
99        tv_sec: 1,
100        tv_nsec: 0,
101    };
102    let nrecv =
103        unsafe { libc::recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) };
104    let nrecv = if nrecv < 0 {
105        return Err(io::Error::last_os_error());
106    } else {
107        usize::try_from(nrecv).unwrap()
108    };
109    for (addr, hdr, pkt) in izip!(addrs, hdrs, packets.iter_mut()).take(nrecv) {
110        pkt.meta.size = hdr.msg_len as usize;
111        if let Some(addr) = cast_socket_addr(&addr, &hdr) {
112            pkt.meta.set_socket_addr(&addr.to_std());
113        }
114    }
115    Ok(nrecv)
116}
117
118#[cfg(test)]
119mod tests {
120    use {
121        crate::{packet::PACKET_DATA_SIZE, recvmmsg::*},
122        std::{
123            net::{SocketAddr, UdpSocket},
124            time::{Duration, Instant},
125        },
126    };
127
128    type TestConfig = (UdpSocket, SocketAddr, UdpSocket, SocketAddr);
129
130    fn test_setup_reader_sender(ip_str: &str) -> io::Result<TestConfig> {
131        let reader = UdpSocket::bind(ip_str)?;
132        let addr = reader.local_addr()?;
133        let sender = UdpSocket::bind(ip_str)?;
134        let saddr = sender.local_addr()?;
135        Ok((reader, addr, sender, saddr))
136    }
137
138    const TEST_NUM_MSGS: usize = 32;
139    #[test]
140    pub fn test_recv_mmsg_one_iter() {
141        let test_one_iter = |(reader, addr, sender, saddr): TestConfig| {
142            let sent = TEST_NUM_MSGS - 1;
143            for _ in 0..sent {
144                let data = [0; PACKET_DATA_SIZE];
145                sender.send_to(&data[..], addr).unwrap();
146            }
147
148            let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
149            let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
150            assert_eq!(sent, recv);
151            for packet in packets.iter().take(recv) {
152                assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
153                assert_eq!(packet.meta.socket_addr(), saddr);
154            }
155        };
156
157        test_one_iter(test_setup_reader_sender("127.0.0.1:0").unwrap());
158
159        match test_setup_reader_sender("::1:0") {
160            Ok(config) => test_one_iter(config),
161            Err(e) => warn!("Failed to configure IPv6: {:?}", e),
162        }
163    }
164
165    #[test]
166    pub fn test_recv_mmsg_multi_iter() {
167        let test_multi_iter = |(reader, addr, sender, saddr): TestConfig| {
168            let sent = TEST_NUM_MSGS + 10;
169            for _ in 0..sent {
170                let data = [0; PACKET_DATA_SIZE];
171                sender.send_to(&data[..], addr).unwrap();
172            }
173
174            let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
175            let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
176            assert_eq!(TEST_NUM_MSGS, recv);
177            for packet in packets.iter().take(recv) {
178                assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
179                assert_eq!(packet.meta.socket_addr(), saddr);
180            }
181
182            packets
183                .iter_mut()
184                .for_each(|pkt| pkt.meta = Meta::default());
185            let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
186            assert_eq!(sent - TEST_NUM_MSGS, recv);
187            for packet in packets.iter().take(recv) {
188                assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
189                assert_eq!(packet.meta.socket_addr(), saddr);
190            }
191        };
192
193        test_multi_iter(test_setup_reader_sender("127.0.0.1:0").unwrap());
194
195        match test_setup_reader_sender("::1:0") {
196            Ok(config) => test_multi_iter(config),
197            Err(e) => warn!("Failed to configure IPv6: {:?}", e),
198        }
199    }
200
201    #[test]
202    pub fn test_recv_mmsg_multi_iter_timeout() {
203        let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
204        let addr = reader.local_addr().unwrap();
205        reader.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
206        reader.set_nonblocking(false).unwrap();
207        let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
208        let saddr = sender.local_addr().unwrap();
209        let sent = TEST_NUM_MSGS;
210        for _ in 0..sent {
211            let data = [0; PACKET_DATA_SIZE];
212            sender.send_to(&data[..], addr).unwrap();
213        }
214
215        let start = Instant::now();
216        let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
217        let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
218        assert_eq!(TEST_NUM_MSGS, recv);
219        for packet in packets.iter().take(recv) {
220            assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
221            assert_eq!(packet.meta.socket_addr(), saddr);
222        }
223        reader.set_nonblocking(true).unwrap();
224
225        packets
226            .iter_mut()
227            .for_each(|pkt| pkt.meta = Meta::default());
228        let _recv = recv_mmsg(&reader, &mut packets[..]);
229        assert!(start.elapsed().as_secs() < 5);
230    }
231
232    #[test]
233    pub fn test_recv_mmsg_multi_addrs() {
234        let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
235        let addr = reader.local_addr().unwrap();
236
237        let sender1 = UdpSocket::bind("127.0.0.1:0").expect("bind");
238        let saddr1 = sender1.local_addr().unwrap();
239        let sent1 = TEST_NUM_MSGS - 1;
240
241        let sender2 = UdpSocket::bind("127.0.0.1:0").expect("bind");
242        let saddr2 = sender2.local_addr().unwrap();
243        let sent2 = TEST_NUM_MSGS + 1;
244
245        for _ in 0..sent1 {
246            let data = [0; PACKET_DATA_SIZE];
247            sender1.send_to(&data[..], addr).unwrap();
248        }
249
250        for _ in 0..sent2 {
251            let data = [0; PACKET_DATA_SIZE];
252            sender2.send_to(&data[..], addr).unwrap();
253        }
254
255        let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
256
257        let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
258        assert_eq!(TEST_NUM_MSGS, recv);
259        for packet in packets.iter().take(sent1) {
260            assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
261            assert_eq!(packet.meta.socket_addr(), saddr1);
262        }
263        for packet in packets.iter().skip(sent1).take(recv - sent1) {
264            assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
265            assert_eq!(packet.meta.socket_addr(), saddr2);
266        }
267
268        packets
269            .iter_mut()
270            .for_each(|pkt| pkt.meta = Meta::default());
271        let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
272        assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
273        for packet in packets.iter().take(recv) {
274            assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
275            assert_eq!(packet.meta.socket_addr(), saddr2);
276        }
277    }
278}