solana_streamer/nonblocking/
recvmmsg.rs1use {
4 crate::{
5 packet::{Meta, Packet},
6 recvmmsg::NUM_RCVMMSGS,
7 },
8 std::{cmp, io},
9 tokio::net::UdpSocket,
10};
11
12pub async fn recv_mmsg(
15 socket: &UdpSocket,
16 packets: &mut [Packet],
17) -> io::Result<usize> {
18 debug_assert!(packets.iter().all(|pkt| pkt.meta() == &Meta::default()));
19 let count = cmp::min(NUM_RCVMMSGS, packets.len());
20 socket.readable().await?;
21 let mut i = 0;
22 for p in packets.iter_mut().take(count) {
23 p.meta_mut().size = 0;
24 match socket.try_recv_from(p.buffer_mut()) {
25 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
26 break;
27 }
28 Err(e) => {
29 return Err(e);
30 }
31 Ok((nrecv, from)) => {
32 p.meta_mut().size = nrecv;
33 p.meta_mut().set_socket_addr(&from);
34 }
35 }
36 i += 1;
37 }
38 Ok(i)
39}
40
41pub async fn recv_mmsg_exact(
43 socket: &UdpSocket,
44 packets: &mut [Packet],
45) -> io::Result<usize> {
46 let total = packets.len();
47 let mut remaining = total;
48 while remaining != 0 {
49 let first = total - remaining;
50 let res = recv_mmsg(socket, &mut packets[first..]).await?;
51 remaining -= res;
52 }
53 Ok(packets.len())
54}
55
56#[cfg(test)]
57mod tests {
58 use {
59 crate::{nonblocking::recvmmsg::*, packet::PACKET_DATA_SIZE},
60 std::{net::SocketAddr, time::Instant},
61 tokio::net::UdpSocket,
62 };
63
64 type TestConfig = (UdpSocket, SocketAddr, UdpSocket, SocketAddr);
65
66 async fn test_setup_reader_sender(ip_str: &str) -> io::Result<TestConfig> {
67 let reader = UdpSocket::bind(ip_str).await?;
68 let addr = reader.local_addr()?;
69 let sender = UdpSocket::bind(ip_str).await?;
70 let saddr = sender.local_addr()?;
71 Ok((reader, addr, sender, saddr))
72 }
73
74 const TEST_NUM_MSGS: usize = 32;
75
76 async fn test_one_iter((reader, addr, sender, saddr): TestConfig) {
77 let sent = TEST_NUM_MSGS - 1;
78 for _ in 0..sent {
79 let data = [0; PACKET_DATA_SIZE];
80 sender.send_to(&data[..], &addr).await.unwrap();
81 }
82
83 let mut packets = vec![Packet::default(); sent];
84 let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
85 assert_eq!(sent, recv);
86 for packet in packets.iter().take(recv) {
87 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
88 assert_eq!(packet.meta().socket_addr(), saddr);
89 }
90 }
91
92 #[tokio::test]
93 async fn test_recv_mmsg_one_iter() {
94 test_one_iter(test_setup_reader_sender("127.0.0.1:0").await.unwrap()).await;
95
96 match test_setup_reader_sender("::1:0").await {
97 Ok(config) => test_one_iter(config).await,
98 Err(e) => warn!("Failed to configure IPv6: {:?}", e),
99 }
100 }
101
102 async fn test_multi_iter((reader, addr, sender, saddr): TestConfig) {
103 let sent = TEST_NUM_MSGS + 10;
104 for _ in 0..sent {
105 let data = [0; PACKET_DATA_SIZE];
106 sender.send_to(&data[..], &addr).await.unwrap();
107 }
108
109 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
110 let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
111 assert_eq!(TEST_NUM_MSGS, recv);
112 for packet in packets.iter().take(recv) {
113 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
114 assert_eq!(packet.meta().socket_addr(), saddr);
115 }
116
117 let mut packets = vec![Packet::default(); sent - TEST_NUM_MSGS];
118 packets
119 .iter_mut()
120 .for_each(|pkt| *pkt.meta_mut() = Meta::default());
121 let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
122 assert_eq!(sent - TEST_NUM_MSGS, recv);
123 for packet in packets.iter().take(recv) {
124 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
125 assert_eq!(packet.meta().socket_addr(), saddr);
126 }
127 }
128
129 #[tokio::test]
130 async fn test_recv_mmsg_multi_iter() {
131 test_multi_iter(test_setup_reader_sender("127.0.0.1:0").await.unwrap()).await;
132
133 match test_setup_reader_sender("::1:0").await {
134 Ok(config) => test_multi_iter(config).await,
135 Err(e) => warn!("Failed to configure IPv6: {:?}", e),
136 }
137 }
138
139 #[tokio::test]
140 async fn test_recv_mmsg_exact_multi_iter_timeout() {
141 let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
142 let addr = reader.local_addr().unwrap();
143 let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
144 let saddr = sender.local_addr().unwrap();
145 let sent = TEST_NUM_MSGS;
146 for _ in 0..sent {
147 let data = [0; PACKET_DATA_SIZE];
148 sender.send_to(&data[..], &addr).await.unwrap();
149 }
150
151 let start = Instant::now();
152 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
153 let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
154 assert_eq!(TEST_NUM_MSGS, recv);
155 for packet in packets.iter().take(recv) {
156 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
157 assert_eq!(packet.meta().socket_addr(), saddr);
158 }
159
160 packets
161 .iter_mut()
162 .for_each(|pkt| *pkt.meta_mut() = Meta::default());
163 let _recv = recv_mmsg(&reader, &mut packets[..]).await;
164 assert!(start.elapsed().as_secs() < 5);
165 }
166
167 #[tokio::test]
168 async fn test_recv_mmsg_multi_addrs() {
169 let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
170 let addr = reader.local_addr().unwrap();
171
172 let sender1 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
173 let saddr1 = sender1.local_addr().unwrap();
174 let sent1 = TEST_NUM_MSGS - 1;
175
176 let sender2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
177 let saddr2 = sender2.local_addr().unwrap();
178 let sent2 = TEST_NUM_MSGS + 1;
179
180 for _ in 0..sent1 {
181 let data = [0; PACKET_DATA_SIZE];
182 sender1.send_to(&data[..], &addr).await.unwrap();
183 }
184
185 for _ in 0..sent2 {
186 let data = [0; PACKET_DATA_SIZE];
187 sender2.send_to(&data[..], &addr).await.unwrap();
188 }
189
190 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
191
192 let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
193 assert_eq!(TEST_NUM_MSGS, recv);
194 for packet in packets.iter().take(sent1) {
195 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
196 assert_eq!(packet.meta().socket_addr(), saddr1);
197 }
198 for packet in packets.iter().skip(sent1).take(recv - sent1) {
199 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
200 assert_eq!(packet.meta().socket_addr(), saddr2);
201 }
202
203 packets
204 .iter_mut()
205 .for_each(|pkt| *pkt.meta_mut() = Meta::default());
206 let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
207 assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
208 for packet in packets.iter().take(recv) {
209 assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
210 assert_eq!(packet.meta().socket_addr(), saddr2);
211 }
212 }
213}