solana_streamer/
recvmmsg.rs

1//! The `recvmmsg` module provides recvmmsg() API implementation
2
3pub use solana_perf::packet::NUM_RCVMMSGS;
4use {
5    crate::packet::{Meta, Packet},
6    std::{cmp, io, net::UdpSocket},
7};
8#[cfg(target_os = "linux")]
9use {
10    itertools::izip,
11    libc::{
12        iovec, mmsghdr, msghdr, sockaddr_storage, socklen_t, AF_INET, AF_INET6, MSG_WAITFORONE,
13    },
14    std::{
15        mem::{self, MaybeUninit},
16        net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
17        os::unix::io::AsRawFd,
18        ptr,
19    },
20};
21
22#[cfg(not(target_os = "linux"))]
23pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result</*num packets:*/ usize> {
24    debug_assert!(packets.iter().all(|pkt| pkt.meta() == &Meta::default()));
25    let mut i = 0;
26    let count = cmp::min(NUM_RCVMMSGS, packets.len());
27    for p in packets.iter_mut().take(count) {
28        p.meta_mut().size = 0;
29        match socket.recv_from(p.buffer_mut()) {
30            Err(_) if i > 0 => {
31                break;
32            }
33            Err(e) => {
34                return Err(e);
35            }
36            Ok((nrecv, from)) => {
37                p.meta_mut().size = nrecv;
38                p.meta_mut().set_socket_addr(&from);
39                if i == 0 {
40                    socket.set_nonblocking(true)?;
41                }
42            }
43        }
44        i += 1;
45    }
46    Ok(i)
47}
48
49#[cfg(target_os = "linux")]
50fn cast_socket_addr(addr: &sockaddr_storage, hdr: &mmsghdr) -> Option<SocketAddr> {
51    use libc::{sa_family_t, sockaddr_in, sockaddr_in6};
52    const SOCKADDR_IN_SIZE: usize = std::mem::size_of::<sockaddr_in>();
53    const SOCKADDR_IN6_SIZE: usize = std::mem::size_of::<sockaddr_in6>();
54    if addr.ss_family == AF_INET as sa_family_t
55        && hdr.msg_hdr.msg_namelen == SOCKADDR_IN_SIZE as socklen_t
56    {
57        // ref: https://github.com/rust-lang/socket2/blob/65085d9dff270e588c0fbdd7217ec0b392b05ef2/src/sockaddr.rs#L167-L172
58        let addr = unsafe { &*(addr as *const _ as *const sockaddr_in) };
59        return Some(SocketAddr::V4(SocketAddrV4::new(
60            Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()),
61            u16::from_be(addr.sin_port),
62        )));
63    }
64    if addr.ss_family == AF_INET6 as sa_family_t
65        && hdr.msg_hdr.msg_namelen == SOCKADDR_IN6_SIZE as socklen_t
66    {
67        // ref: https://github.com/rust-lang/socket2/blob/65085d9dff270e588c0fbdd7217ec0b392b05ef2/src/sockaddr.rs#L174-L189
68        let addr = unsafe { &*(addr as *const _ as *const sockaddr_in6) };
69        return Some(SocketAddr::V6(SocketAddrV6::new(
70            Ipv6Addr::from(addr.sin6_addr.s6_addr),
71            u16::from_be(addr.sin6_port),
72            addr.sin6_flowinfo,
73            addr.sin6_scope_id,
74        )));
75    }
76    error!(
77        "recvmmsg unexpected ss_family:{} msg_namelen:{}",
78        addr.ss_family, hdr.msg_hdr.msg_namelen
79    );
80    None
81}
82
83#[cfg(target_os = "linux")]
84pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result</*num packets:*/ usize> {
85    // Should never hit this, but bail if the caller didn't provide any Packets
86    // to receive into
87    if packets.is_empty() {
88        return Ok(0);
89    }
90    // Assert that there are no leftovers in packets.
91    debug_assert!(packets.iter().all(|pkt| pkt.meta() == &Meta::default()));
92    const SOCKADDR_STORAGE_SIZE: usize = mem::size_of::<sockaddr_storage>();
93
94    let mut iovs = [MaybeUninit::uninit(); NUM_RCVMMSGS];
95    let mut addrs = [MaybeUninit::zeroed(); NUM_RCVMMSGS];
96    let mut hdrs = [MaybeUninit::uninit(); NUM_RCVMMSGS];
97
98    let sock_fd = sock.as_raw_fd();
99    let count = cmp::min(iovs.len(), packets.len());
100
101    for (packet, hdr, iov, addr) in
102        izip!(packets.iter_mut(), &mut hdrs, &mut iovs, &mut addrs).take(count)
103    {
104        let buffer = packet.buffer_mut();
105        iov.write(iovec {
106            iov_base: buffer.as_mut_ptr() as *mut libc::c_void,
107            iov_len: buffer.len(),
108        });
109
110        hdr.write(mmsghdr {
111            msg_len: 0,
112            msg_hdr: msghdr {
113                msg_name: addr.as_mut_ptr() as *mut _,
114                msg_namelen: SOCKADDR_STORAGE_SIZE as socklen_t,
115                msg_iov: iov.as_mut_ptr(),
116                msg_iovlen: 1,
117                msg_control: ptr::null::<libc::c_void>() as *mut _,
118                msg_controllen: 0,
119                msg_flags: 0,
120            },
121        });
122    }
123
124    let mut ts = libc::timespec {
125        tv_sec: 1,
126        tv_nsec: 0,
127    };
128    // TODO: remove .try_into().unwrap() once rust libc fixes recvmmsg types for musl
129    #[allow(clippy::useless_conversion)]
130    let nrecv = unsafe {
131        libc::recvmmsg(
132            sock_fd,
133            hdrs[0].assume_init_mut(),
134            count as u32,
135            MSG_WAITFORONE.try_into().unwrap(),
136            &mut ts,
137        )
138    };
139    let nrecv = if nrecv < 0 {
140        return Err(io::Error::last_os_error());
141    } else {
142        usize::try_from(nrecv).unwrap()
143    };
144    for (addr, hdr, pkt) in izip!(addrs, hdrs, packets.iter_mut()).take(nrecv) {
145        // SAFETY: We initialized `count` elements of `hdrs` above. `count` is
146        // passed to recvmmsg() as the limit of messages that can be read. So,
147        // `nrevc <= count` which means we initialized this `hdr` and
148        // recvmmsg() will have updated it appropriately
149        let hdr_ref = unsafe { hdr.assume_init_ref() };
150        // SAFETY: Similar to above, we initialized this `addr` and recvmmsg()
151        // will have populated it
152        let addr_ref = unsafe { addr.assume_init_ref() };
153        pkt.meta_mut().size = hdr_ref.msg_len as usize;
154        if let Some(addr) = cast_socket_addr(addr_ref, hdr_ref) {
155            pkt.meta_mut().set_socket_addr(&addr);
156        }
157    }
158
159    for (iov, addr, hdr) in izip!(&mut iovs, &mut addrs, &mut hdrs).take(count) {
160        // SAFETY: We initialized `count` elements of each array above
161        //
162        // It may be that `packets.len() != NUM_RCVMMSGS`; thus, some elements
163        // in `iovs` / `addrs` / `hdrs` may not get initialized. So, we must
164        // manually drop `count` elements from each array instead of being able
165        // to convert [MaybeUninit<T>] to [T] and letting `Drop` do the work
166        // for us when these items go out of scope at the end of the function
167        unsafe {
168            iov.assume_init_drop();
169            addr.assume_init_drop();
170            hdr.assume_init_drop();
171        }
172    }
173
174    Ok(nrecv)
175}
176
177#[cfg(test)]
178mod tests {
179    use {
180        crate::{packet::PACKET_DATA_SIZE, recvmmsg::*},
181        std::{
182            net::{SocketAddr, UdpSocket},
183            time::{Duration, Instant},
184        },
185    };
186
187    type TestConfig = (UdpSocket, SocketAddr, UdpSocket, SocketAddr);
188
189    fn test_setup_reader_sender(ip_str: &str) -> io::Result<TestConfig> {
190        let reader = UdpSocket::bind(ip_str)?;
191        let addr = reader.local_addr()?;
192        let sender = UdpSocket::bind(ip_str)?;
193        let saddr = sender.local_addr()?;
194        Ok((reader, addr, sender, saddr))
195    }
196
197    const TEST_NUM_MSGS: usize = 32;
198    #[test]
199    pub fn test_recv_mmsg_one_iter() {
200        let test_one_iter = |(reader, addr, sender, saddr): TestConfig| {
201            let sent = TEST_NUM_MSGS - 1;
202            for _ in 0..sent {
203                let data = [0; PACKET_DATA_SIZE];
204                sender.send_to(&data[..], addr).unwrap();
205            }
206
207            let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
208            let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
209            assert_eq!(sent, recv);
210            for packet in packets.iter().take(recv) {
211                assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
212                assert_eq!(packet.meta().socket_addr(), saddr);
213            }
214        };
215
216        test_one_iter(test_setup_reader_sender("127.0.0.1:0").unwrap());
217
218        match test_setup_reader_sender("::1:0") {
219            Ok(config) => test_one_iter(config),
220            Err(e) => warn!("Failed to configure IPv6: {:?}", e),
221        }
222    }
223
224    #[test]
225    pub fn test_recv_mmsg_multi_iter() {
226        let test_multi_iter = |(reader, addr, sender, saddr): TestConfig| {
227            let sent = TEST_NUM_MSGS + 10;
228            for _ in 0..sent {
229                let data = [0; PACKET_DATA_SIZE];
230                sender.send_to(&data[..], addr).unwrap();
231            }
232
233            let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
234            let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
235            assert_eq!(TEST_NUM_MSGS, recv);
236            for packet in packets.iter().take(recv) {
237                assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
238                assert_eq!(packet.meta().socket_addr(), saddr);
239            }
240
241            packets
242                .iter_mut()
243                .for_each(|pkt| *pkt.meta_mut() = Meta::default());
244            let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
245            assert_eq!(sent - TEST_NUM_MSGS, recv);
246            for packet in packets.iter().take(recv) {
247                assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
248                assert_eq!(packet.meta().socket_addr(), saddr);
249            }
250        };
251
252        test_multi_iter(test_setup_reader_sender("127.0.0.1:0").unwrap());
253
254        match test_setup_reader_sender("::1:0") {
255            Ok(config) => test_multi_iter(config),
256            Err(e) => warn!("Failed to configure IPv6: {:?}", e),
257        }
258    }
259
260    #[test]
261    pub fn test_recv_mmsg_multi_iter_timeout() {
262        let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
263        let addr = reader.local_addr().unwrap();
264        reader.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
265        reader.set_nonblocking(false).unwrap();
266        let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
267        let saddr = sender.local_addr().unwrap();
268        let sent = TEST_NUM_MSGS;
269        for _ in 0..sent {
270            let data = [0; PACKET_DATA_SIZE];
271            sender.send_to(&data[..], addr).unwrap();
272        }
273
274        let start = Instant::now();
275        let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
276        let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
277        assert_eq!(TEST_NUM_MSGS, recv);
278        for packet in packets.iter().take(recv) {
279            assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
280            assert_eq!(packet.meta().socket_addr(), saddr);
281        }
282        reader.set_nonblocking(true).unwrap();
283
284        packets
285            .iter_mut()
286            .for_each(|pkt| *pkt.meta_mut() = Meta::default());
287        let _recv = recv_mmsg(&reader, &mut packets[..]);
288        assert!(start.elapsed().as_secs() < 5);
289    }
290
291    #[test]
292    pub fn test_recv_mmsg_multi_addrs() {
293        let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
294        let addr = reader.local_addr().unwrap();
295
296        let sender1 = UdpSocket::bind("127.0.0.1:0").expect("bind");
297        let saddr1 = sender1.local_addr().unwrap();
298        let sent1 = TEST_NUM_MSGS - 1;
299
300        let sender2 = UdpSocket::bind("127.0.0.1:0").expect("bind");
301        let saddr2 = sender2.local_addr().unwrap();
302        let sent2 = TEST_NUM_MSGS + 1;
303
304        for _ in 0..sent1 {
305            let data = [0; PACKET_DATA_SIZE];
306            sender1.send_to(&data[..], addr).unwrap();
307        }
308
309        for _ in 0..sent2 {
310            let data = [0; PACKET_DATA_SIZE];
311            sender2.send_to(&data[..], addr).unwrap();
312        }
313
314        let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
315
316        let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
317        assert_eq!(TEST_NUM_MSGS, recv);
318        for packet in packets.iter().take(sent1) {
319            assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
320            assert_eq!(packet.meta().socket_addr(), saddr1);
321        }
322        for packet in packets.iter().skip(sent1).take(recv - sent1) {
323            assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
324            assert_eq!(packet.meta().socket_addr(), saddr2);
325        }
326
327        packets
328            .iter_mut()
329            .for_each(|pkt| *pkt.meta_mut() = Meta::default());
330        let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
331        assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
332        for packet in packets.iter().take(recv) {
333            assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
334            assert_eq!(packet.meta().socket_addr(), saddr2);
335        }
336    }
337}