1#[cfg(target_os = "linux")]
4#[allow(deprecated)]
5use nix::sys::socket::InetAddr;
6pub use solana_perf::packet::NUM_RCVMMSGS;
7use {
8 crate::packet::{Meta, Packet},
9 std::{cmp, io, net::UdpSocket},
10};
11#[cfg(target_os = "linux")]
12use {
13 itertools::izip,
14 libc::{iovec, mmsghdr, sockaddr_storage, socklen_t, AF_INET, AF_INET6, MSG_WAITFORONE},
15 std::{mem, os::unix::io::AsRawFd},
16};
17
18#[cfg(not(target_os = "linux"))]
19pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize> {
20 debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default()));
21 let mut i = 0;
22 let count = cmp::min(NUM_RCVMMSGS, packets.len());
23 for p in packets.iter_mut().take(count) {
24 p.meta.size = 0;
25 match socket.recv_from(p.buffer_mut()) {
26 Err(_) if i > 0 => {
27 break;
28 }
29 Err(e) => {
30 return Err(e);
31 }
32 Ok((nrecv, from)) => {
33 p.meta.size = nrecv;
34 p.meta.set_socket_addr(&from);
35 if i == 0 {
36 socket.set_nonblocking(true)?;
37 }
38 }
39 }
40 i += 1;
41 }
42 Ok(i)
43}
44
45#[cfg(target_os = "linux")]
46#[allow(deprecated)]
47fn cast_socket_addr(addr: &sockaddr_storage, hdr: &mmsghdr) -> Option<InetAddr> {
48 use libc::{sa_family_t, sockaddr_in, sockaddr_in6};
49 const SOCKADDR_IN_SIZE: usize = std::mem::size_of::<sockaddr_in>();
50 const SOCKADDR_IN6_SIZE: usize = std::mem::size_of::<sockaddr_in6>();
51 if addr.ss_family == AF_INET as sa_family_t
52 && hdr.msg_hdr.msg_namelen == SOCKADDR_IN_SIZE as socklen_t
53 {
54 let addr = addr as *const _ as *const sockaddr_in;
55 return Some(unsafe { InetAddr::V4(*addr) });
56 }
57 if addr.ss_family == AF_INET6 as sa_family_t
58 && hdr.msg_hdr.msg_namelen == SOCKADDR_IN6_SIZE as socklen_t
59 {
60 let addr = addr as *const _ as *const sockaddr_in6;
61 return Some(unsafe { InetAddr::V6(*addr) });
62 }
63 error!(
64 "recvmmsg unexpected ss_family:{} msg_namelen:{}",
65 addr.ss_family, hdr.msg_hdr.msg_namelen
66 );
67 None
68}
69
70#[cfg(target_os = "linux")]
71#[allow(clippy::uninit_assumed_init)]
72pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result<usize> {
73 debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default()));
75 const SOCKADDR_STORAGE_SIZE: usize = mem::size_of::<sockaddr_storage>();
76
77 let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
78 let iovs = mem::MaybeUninit::<[iovec; NUM_RCVMMSGS]>::uninit();
79 let mut iovs: [iovec; NUM_RCVMMSGS] = unsafe { iovs.assume_init() };
80 let mut addrs: [sockaddr_storage; NUM_RCVMMSGS] = unsafe { mem::zeroed() };
81
82 let sock_fd = sock.as_raw_fd();
83 let count = cmp::min(iovs.len(), packets.len());
84
85 for (packet, hdr, iov, addr) in
86 izip!(packets.iter_mut(), &mut hdrs, &mut iovs, &mut addrs).take(count)
87 {
88 let buffer = packet.buffer_mut();
89 *iov = iovec {
90 iov_base: buffer.as_mut_ptr() as *mut libc::c_void,
91 iov_len: buffer.len(),
92 };
93 hdr.msg_hdr.msg_name = addr as *mut _ as *mut _;
94 hdr.msg_hdr.msg_namelen = SOCKADDR_STORAGE_SIZE as socklen_t;
95 hdr.msg_hdr.msg_iov = iov;
96 hdr.msg_hdr.msg_iovlen = 1;
97 }
98 let mut ts = libc::timespec {
99 tv_sec: 1,
100 tv_nsec: 0,
101 };
102 let nrecv =
103 unsafe { libc::recvmmsg(sock_fd, &mut hdrs[0], count as u32, MSG_WAITFORONE, &mut ts) };
104 let nrecv = if nrecv < 0 {
105 return Err(io::Error::last_os_error());
106 } else {
107 usize::try_from(nrecv).unwrap()
108 };
109 for (addr, hdr, pkt) in izip!(addrs, hdrs, packets.iter_mut()).take(nrecv) {
110 pkt.meta.size = hdr.msg_len as usize;
111 if let Some(addr) = cast_socket_addr(&addr, &hdr) {
112 pkt.meta.set_socket_addr(&addr.to_std());
113 }
114 }
115 Ok(nrecv)
116}
117
118#[cfg(test)]
119mod tests {
120 use {
121 crate::{packet::PACKET_DATA_SIZE, recvmmsg::*},
122 std::{
123 net::{SocketAddr, UdpSocket},
124 time::{Duration, Instant},
125 },
126 };
127
128 type TestConfig = (UdpSocket, SocketAddr, UdpSocket, SocketAddr);
129
130 fn test_setup_reader_sender(ip_str: &str) -> io::Result<TestConfig> {
131 let reader = UdpSocket::bind(ip_str)?;
132 let addr = reader.local_addr()?;
133 let sender = UdpSocket::bind(ip_str)?;
134 let saddr = sender.local_addr()?;
135 Ok((reader, addr, sender, saddr))
136 }
137
138 const TEST_NUM_MSGS: usize = 32;
139 #[test]
140 pub fn test_recv_mmsg_one_iter() {
141 let test_one_iter = |(reader, addr, sender, saddr): TestConfig| {
142 let sent = TEST_NUM_MSGS - 1;
143 for _ in 0..sent {
144 let data = [0; PACKET_DATA_SIZE];
145 sender.send_to(&data[..], addr).unwrap();
146 }
147
148 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
149 let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
150 assert_eq!(sent, recv);
151 for packet in packets.iter().take(recv) {
152 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
153 assert_eq!(packet.meta.socket_addr(), saddr);
154 }
155 };
156
157 test_one_iter(test_setup_reader_sender("127.0.0.1:0").unwrap());
158
159 match test_setup_reader_sender("::1:0") {
160 Ok(config) => test_one_iter(config),
161 Err(e) => warn!("Failed to configure IPv6: {:?}", e),
162 }
163 }
164
165 #[test]
166 pub fn test_recv_mmsg_multi_iter() {
167 let test_multi_iter = |(reader, addr, sender, saddr): TestConfig| {
168 let sent = TEST_NUM_MSGS + 10;
169 for _ in 0..sent {
170 let data = [0; PACKET_DATA_SIZE];
171 sender.send_to(&data[..], addr).unwrap();
172 }
173
174 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
175 let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
176 assert_eq!(TEST_NUM_MSGS, recv);
177 for packet in packets.iter().take(recv) {
178 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
179 assert_eq!(packet.meta.socket_addr(), saddr);
180 }
181
182 packets
183 .iter_mut()
184 .for_each(|pkt| pkt.meta = Meta::default());
185 let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
186 assert_eq!(sent - TEST_NUM_MSGS, recv);
187 for packet in packets.iter().take(recv) {
188 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
189 assert_eq!(packet.meta.socket_addr(), saddr);
190 }
191 };
192
193 test_multi_iter(test_setup_reader_sender("127.0.0.1:0").unwrap());
194
195 match test_setup_reader_sender("::1:0") {
196 Ok(config) => test_multi_iter(config),
197 Err(e) => warn!("Failed to configure IPv6: {:?}", e),
198 }
199 }
200
201 #[test]
202 pub fn test_recv_mmsg_multi_iter_timeout() {
203 let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
204 let addr = reader.local_addr().unwrap();
205 reader.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
206 reader.set_nonblocking(false).unwrap();
207 let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
208 let saddr = sender.local_addr().unwrap();
209 let sent = TEST_NUM_MSGS;
210 for _ in 0..sent {
211 let data = [0; PACKET_DATA_SIZE];
212 sender.send_to(&data[..], addr).unwrap();
213 }
214
215 let start = Instant::now();
216 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
217 let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
218 assert_eq!(TEST_NUM_MSGS, recv);
219 for packet in packets.iter().take(recv) {
220 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
221 assert_eq!(packet.meta.socket_addr(), saddr);
222 }
223 reader.set_nonblocking(true).unwrap();
224
225 packets
226 .iter_mut()
227 .for_each(|pkt| pkt.meta = Meta::default());
228 let _recv = recv_mmsg(&reader, &mut packets[..]);
229 assert!(start.elapsed().as_secs() < 5);
230 }
231
232 #[test]
233 pub fn test_recv_mmsg_multi_addrs() {
234 let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
235 let addr = reader.local_addr().unwrap();
236
237 let sender1 = UdpSocket::bind("127.0.0.1:0").expect("bind");
238 let saddr1 = sender1.local_addr().unwrap();
239 let sent1 = TEST_NUM_MSGS - 1;
240
241 let sender2 = UdpSocket::bind("127.0.0.1:0").expect("bind");
242 let saddr2 = sender2.local_addr().unwrap();
243 let sent2 = TEST_NUM_MSGS + 1;
244
245 for _ in 0..sent1 {
246 let data = [0; PACKET_DATA_SIZE];
247 sender1.send_to(&data[..], addr).unwrap();
248 }
249
250 for _ in 0..sent2 {
251 let data = [0; PACKET_DATA_SIZE];
252 sender2.send_to(&data[..], addr).unwrap();
253 }
254
255 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
256
257 let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
258 assert_eq!(TEST_NUM_MSGS, recv);
259 for packet in packets.iter().take(sent1) {
260 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
261 assert_eq!(packet.meta.socket_addr(), saddr1);
262 }
263 for packet in packets.iter().skip(sent1).take(recv - sent1) {
264 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
265 assert_eq!(packet.meta.socket_addr(), saddr2);
266 }
267
268 packets
269 .iter_mut()
270 .for_each(|pkt| pkt.meta = Meta::default());
271 let recv = recv_mmsg(&reader, &mut packets[..]).unwrap();
272 assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
273 for packet in packets.iter().take(recv) {
274 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
275 assert_eq!(packet.meta.socket_addr(), saddr2);
276 }
277 }
278}