1pub use solana_perf::packet::NUM_RCVMMSGS;
4use {
5 crate::packet::{Meta, Packet},
6 std::{cmp, io, net::UdpSocket},
7};
8#[cfg(target_os = "linux")]
9use {
10 itertools::izip,
11 libc::{
12 iovec, mmsghdr, msghdr, sockaddr_storage, socklen_t, AF_INET, AF_INET6, MSG_WAITFORONE,
13 },
14 std::{
15 mem::{self, MaybeUninit},
16 net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
17 os::unix::io::AsRawFd,
18 ptr,
19 },
20};
21
22#[cfg(not(target_os = "linux"))]
23pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize> {
24 debug_assert!(packets.iter().all(|pkt| pkt.meta() == &Meta::default()));
25 let mut i = 0;
26 let count = cmp::min(NUM_RCVMMSGS, packets.len());
27 for p in packets.iter_mut().take(count) {
28 p.meta_mut().size = 0;
29 match socket.recv_from(p.buffer_mut()) {
30 Err(_) if i > 0 => {
31 break;
32 }
33 Err(e) => {
34 return Err(e);
35 }
36 Ok((nrecv, from)) => {
37 p.meta_mut().size = nrecv;
38 p.meta_mut().set_socket_addr(&from);
39 if i == 0 {
40 socket.set_nonblocking(true)?;
41 }
42 }
43 }
44 i += 1;
45 }
46 Ok(i)
47}
48
49#[cfg(target_os = "linux")]
50fn cast_socket_addr(addr: &sockaddr_storage, hdr: &mmsghdr) -> Option<SocketAddr> {
51 use libc::{sa_family_t, sockaddr_in, sockaddr_in6};
52 const SOCKADDR_IN_SIZE: usize = std::mem::size_of::<sockaddr_in>();
53 const SOCKADDR_IN6_SIZE: usize = std::mem::size_of::<sockaddr_in6>();
54 if addr.ss_family == AF_INET as sa_family_t
55 && hdr.msg_hdr.msg_namelen == SOCKADDR_IN_SIZE as socklen_t
56 {
57 let addr = unsafe { &*(addr as *const _ as *const sockaddr_in) };
59 return Some(SocketAddr::V4(SocketAddrV4::new(
60 Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()),
61 u16::from_be(addr.sin_port),
62 )));
63 }
64 if addr.ss_family == AF_INET6 as sa_family_t
65 && hdr.msg_hdr.msg_namelen == SOCKADDR_IN6_SIZE as socklen_t
66 {
67 let addr = unsafe { &*(addr as *const _ as *const sockaddr_in6) };
69 return Some(SocketAddr::V6(SocketAddrV6::new(
70 Ipv6Addr::from(addr.sin6_addr.s6_addr),
71 u16::from_be(addr.sin6_port),
72 addr.sin6_flowinfo,
73 addr.sin6_scope_id,
74 )));
75 }
76 error!(
77 "recvmmsg unexpected ss_family:{} msg_namelen:{}",
78 addr.ss_family, hdr.msg_hdr.msg_namelen
79 );
80 None
81}
82
83#[cfg(target_os = "linux")]
84pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize> {
85 if packets.is_empty() {
88 return Ok(0);
89 }
90 debug_assert!(packets.iter().all(|pkt| pkt.meta() == &Meta::default()));
92 const SOCKADDR_STORAGE_SIZE: usize = mem::size_of::<sockaddr_storage>();
93
94 let mut iovs = [MaybeUninit::uninit(); NUM_RCVMMSGS];
95 let mut addrs = [MaybeUninit::zeroed(); NUM_RCVMMSGS];
96 let mut hdrs = [MaybeUninit::uninit(); NUM_RCVMMSGS];
97
98 let sock_fd = sock.as_raw_fd();
99 let count = cmp::min(iovs.len(), packets.len());
100
101 for (packet, hdr, iov, addr) in
102 izip!(packets.iter_mut(), &mut hdrs, &mut iovs, &mut addrs).take(count)
103 {
104 let buffer = packet.buffer_mut();
105 iov.write(iovec {
106 iov_base: buffer.as_mut_ptr() as *mut libc::c_void,
107 iov_len: buffer.len(),
108 });
109
110 hdr.write(mmsghdr {
111 msg_len: 0,
112 msg_hdr: msghdr {
113 msg_name: addr.as_mut_ptr() as *mut _,
114 msg_namelen: SOCKADDR_STORAGE_SIZE as socklen_t,
115 msg_iov: iov.as_mut_ptr(),
116 msg_iovlen: 1,
117 msg_control: ptr::null::<libc::c_void>() as *mut _,
118 msg_controllen: 0,
119 msg_flags: 0,
120 },
121 });
122 }
123
124 let mut ts = libc::timespec {
125 tv_sec: 1,
126 tv_nsec: 0,
127 };
128 #[allow(clippy::useless_conversion)]
130 let nrecv = unsafe {
131 libc::recvmmsg(
132 sock_fd,
133 hdrs[0].assume_init_mut(),
134 count as u32,
135 MSG_WAITFORONE.try_into().unwrap(),
136 &mut ts,
137 )
138 };
139 let nrecv = if nrecv < 0 {
140 return Err(io::Error::last_os_error());
141 } else {
142 usize::try_from(nrecv).unwrap()
143 };
144 for (addr, hdr, pkt) in izip!(addrs, hdrs, packets.iter_mut()).take(nrecv) {
145 let hdr_ref = unsafe { hdr.assume_init_ref() };
150 let addr_ref = unsafe { addr.assume_init_ref() };
153 pkt.meta_mut().size = hdr_ref.msg_len as usize;
154 if let Some(addr) = cast_socket_addr(addr_ref, hdr_ref) {
155 pkt.meta_mut().set_socket_addr(&addr);
156 }
157 }
158
159 for (iov, addr, hdr) in izip!(&mut iovs, &mut addrs, &mut hdrs).take(count) {
160 unsafe {
168 iov.assume_init_drop();
169 addr.assume_init_drop();
170 hdr.assume_init_drop();
171 }
172 }
173
174 Ok(nrecv)
175}
176
177#[cfg(test)]
178mod tests {
179 use {
180 crate::{packet::PACKET_DATA_SIZE, recvmmsg::*},
181 std::{
182 net::{SocketAddr, UdpSocket},
183 time::{Duration, Instant},
184 },
185 };
186
187 type TestConfig = (UdpSocket, SocketAddr, UdpSocket, SocketAddr);
188
189 fn test_setup_reader_sender(ip_str: &str) -> io::Result<TestConfig> {
190 let reader = UdpSocket::bind(ip_str)?;
191 let addr = reader.local_addr()?;
192 let sender = UdpSocket::bind(ip_str)?;
193 let saddr = sender.local_addr()?;
194 Ok((reader, addr, sender, saddr))
195 }
196
197 const TEST_NUM_MSGS: usize = 32;
198 #[test]
199 pub fn test_recv_mmsg_one_iter() {
200 let test_one_iter = |(reader, addr, sender, saddr): TestConfig| {
201 let sent = TEST_NUM_MSGS - 1;
202 for _ in 0..sent {
203 let data = [0; PACKET_DATA_SIZE];
204 sender.send_to(&data[..], addr).unwrap();
205 }
206
207 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
208 let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
209 assert_eq!(sent, recv);
210 for packet in packets.iter().take(recv) {
211 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
212 assert_eq!(packet.meta().socket_addr(), saddr);
213 }
214 };
215
216 test_one_iter(test_setup_reader_sender("127.0.0.1:0").unwrap());
217
218 match test_setup_reader_sender("::1:0") {
219 Ok(config) => test_one_iter(config),
220 Err(e) => warn!("Failed to configure IPv6: {:?}", e),
221 }
222 }
223
224 #[test]
225 pub fn test_recv_mmsg_multi_iter() {
226 let test_multi_iter = |(reader, addr, sender, saddr): TestConfig| {
227 let sent = TEST_NUM_MSGS + 10;
228 for _ in 0..sent {
229 let data = [0; PACKET_DATA_SIZE];
230 sender.send_to(&data[..], addr).unwrap();
231 }
232
233 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
234 let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
235 assert_eq!(TEST_NUM_MSGS, recv);
236 for packet in packets.iter().take(recv) {
237 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
238 assert_eq!(packet.meta().socket_addr(), saddr);
239 }
240
241 packets
242 .iter_mut()
243 .for_each(|pkt| *pkt.meta_mut() = Meta::default());
244 let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
245 assert_eq!(sent - TEST_NUM_MSGS, recv);
246 for packet in packets.iter().take(recv) {
247 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
248 assert_eq!(packet.meta().socket_addr(), saddr);
249 }
250 };
251
252 test_multi_iter(test_setup_reader_sender("127.0.0.1:0").unwrap());
253
254 match test_setup_reader_sender("::1:0") {
255 Ok(config) => test_multi_iter(config),
256 Err(e) => warn!("Failed to configure IPv6: {:?}", e),
257 }
258 }
259
260 #[test]
261 pub fn test_recv_mmsg_multi_iter_timeout() {
262 let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
263 let addr = reader.local_addr().unwrap();
264 reader.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
265 reader.set_nonblocking(false).unwrap();
266 let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
267 let saddr = sender.local_addr().unwrap();
268 let sent = TEST_NUM_MSGS;
269 for _ in 0..sent {
270 let data = [0; PACKET_DATA_SIZE];
271 sender.send_to(&data[..], addr).unwrap();
272 }
273
274 let start = Instant::now();
275 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
276 let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
277 assert_eq!(TEST_NUM_MSGS, recv);
278 for packet in packets.iter().take(recv) {
279 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
280 assert_eq!(packet.meta().socket_addr(), saddr);
281 }
282 reader.set_nonblocking(true).unwrap();
283
284 packets
285 .iter_mut()
286 .for_each(|pkt| *pkt.meta_mut() = Meta::default());
287 let _recv = recv_mmsg(&reader, &mut packets[..]);
288 assert!(start.elapsed().as_secs() < 5);
289 }
290
291 #[test]
292 pub fn test_recv_mmsg_multi_addrs() {
293 let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
294 let addr = reader.local_addr().unwrap();
295
296 let sender1 = UdpSocket::bind("127.0.0.1:0").expect("bind");
297 let saddr1 = sender1.local_addr().unwrap();
298 let sent1 = TEST_NUM_MSGS - 1;
299
300 let sender2 = UdpSocket::bind("127.0.0.1:0").expect("bind");
301 let saddr2 = sender2.local_addr().unwrap();
302 let sent2 = TEST_NUM_MSGS + 1;
303
304 for _ in 0..sent1 {
305 let data = [0; PACKET_DATA_SIZE];
306 sender1.send_to(&data[..], addr).unwrap();
307 }
308
309 for _ in 0..sent2 {
310 let data = [0; PACKET_DATA_SIZE];
311 sender2.send_to(&data[..], addr).unwrap();
312 }
313
314 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
315
316 let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
317 assert_eq!(TEST_NUM_MSGS, recv);
318 for packet in packets.iter().take(sent1) {
319 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
320 assert_eq!(packet.meta().socket_addr(), saddr1);
321 }
322 for packet in packets.iter().skip(sent1).take(recv - sent1) {
323 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
324 assert_eq!(packet.meta().socket_addr(), saddr2);
325 }
326
327 packets
328 .iter_mut()
329 .for_each(|pkt| *pkt.meta_mut() = Meta::default());
330 let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
331 assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
332 for packet in packets.iter().take(recv) {
333 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
334 assert_eq!(packet.meta().socket_addr(), saddr2);
335 }
336 }
337}