solana_streamer/nonblocking/
sendmmsg.rs1use {
4 crate::sendmmsg::SendPktsError,
5 futures_util::future::join_all,
6 std::{borrow::Borrow, iter::repeat, net::SocketAddr},
7 tokio::net::UdpSocket,
8};
9
10pub async fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
11where
12 S: Borrow<SocketAddr>,
13 T: AsRef<[u8]>,
14{
15 let mut num_failed = 0;
16 let mut erropt = None;
17 let futures = packets
18 .iter()
19 .map(|(p, a)| sock.send_to(p.as_ref(), a.borrow()))
20 .collect::<Vec<_>>();
21 let results = join_all(futures).await;
22 for result in results {
23 if let Err(e) = result {
24 num_failed += 1;
25 if erropt.is_none() {
26 erropt = Some(e);
27 }
28 }
29 }
30
31 if let Some(err) = erropt {
32 Err(SendPktsError::IoError(err, num_failed))
33 } else {
34 Ok(())
35 }
36}
37
38pub async fn multi_target_send<S, T>(
39 sock: &UdpSocket,
40 packet: T,
41 dests: &[S],
42) -> Result<(), SendPktsError>
43where
44 S: Borrow<SocketAddr>,
45 T: AsRef<[u8]>,
46{
47 let dests = dests.iter().map(Borrow::borrow);
48 let pkts: Vec<_> = repeat(&packet).zip(dests).collect();
49 batch_send(sock, &pkts).await
50}
51
52#[cfg(test)]
53mod tests {
54 use {
55 crate::{
56 nonblocking::{
57 recvmmsg::recv_mmsg,
58 sendmmsg::{batch_send, multi_target_send},
59 },
60 packet::Packet,
61 sendmmsg::SendPktsError,
62 },
63 solana_sdk::packet::PACKET_DATA_SIZE,
64 std::{
65 io::ErrorKind,
66 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
67 },
68 tokio::net::UdpSocket,
69 };
70
71 #[tokio::test]
72 async fn test_send_mmsg_one_dest() {
73 let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
74 let addr = reader.local_addr().unwrap();
75 let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
76
77 let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
78 let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect();
79
80 let sent = batch_send(&sender, &packet_refs[..]).await.ok();
81 assert_eq!(sent, Some(()));
82
83 let mut packets = vec![Packet::default(); 32];
84 let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
85 assert_eq!(32, recv);
86 }
87
88 #[tokio::test]
89 async fn test_send_mmsg_multi_dest() {
90 let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
91 let addr = reader.local_addr().unwrap();
92
93 let reader2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
94 let addr2 = reader2.local_addr().unwrap();
95
96 let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
97
98 let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
99 let packet_refs: Vec<_> = packets
100 .iter()
101 .enumerate()
102 .map(|(i, p)| {
103 if i < 16 {
104 (&p[..], &addr)
105 } else {
106 (&p[..], &addr2)
107 }
108 })
109 .collect();
110
111 let sent = batch_send(&sender, &packet_refs[..]).await.ok();
112 assert_eq!(sent, Some(()));
113
114 let mut packets = vec![Packet::default(); 32];
115 let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
116 assert_eq!(16, recv);
117
118 let mut packets = vec![Packet::default(); 32];
119 let recv = recv_mmsg(&reader2, &mut packets[..]).await.unwrap();
120 assert_eq!(16, recv);
121 }
122
123 #[tokio::test]
124 async fn test_multicast_msg() {
125 let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
126 let addr = reader.local_addr().unwrap();
127
128 let reader2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
129 let addr2 = reader2.local_addr().unwrap();
130
131 let reader3 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
132 let addr3 = reader3.local_addr().unwrap();
133
134 let reader4 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
135 let addr4 = reader4.local_addr().unwrap();
136
137 let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
138
139 let packet = Packet::default();
140
141 let sent = multi_target_send(
142 &sender,
143 packet.data(..).unwrap(),
144 &[&addr, &addr2, &addr3, &addr4],
145 )
146 .await
147 .ok();
148 assert_eq!(sent, Some(()));
149
150 let mut packets = vec![Packet::default(); 32];
151 let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
152 assert_eq!(1, recv);
153
154 let mut packets = vec![Packet::default(); 32];
155 let recv = recv_mmsg(&reader2, &mut packets[..]).await.unwrap();
156 assert_eq!(1, recv);
157
158 let mut packets = vec![Packet::default(); 32];
159 let recv = recv_mmsg(&reader3, &mut packets[..]).await.unwrap();
160 assert_eq!(1, recv);
161
162 let mut packets = vec![Packet::default(); 32];
163 let recv = recv_mmsg(&reader4, &mut packets[..]).await.unwrap();
164 assert_eq!(1, recv);
165 }
166
167 #[tokio::test]
168 async fn test_intermediate_failures_mismatched_bind() {
169 let packets: Vec<_> = (0..3).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
170 let ip4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080);
171 let ip6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 8080);
172 let packet_refs: Vec<_> = vec![
173 (&packets[0][..], &ip4),
174 (&packets[1][..], &ip6),
175 (&packets[2][..], &ip4),
176 ];
177 let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4];
178
179 let sender = UdpSocket::bind("0.0.0.0:0").await.expect("bind");
180 if let Err(SendPktsError::IoError(_, num_failed)) =
181 batch_send(&sender, &packet_refs[..]).await
182 {
183 assert_eq!(num_failed, 1);
184 }
185 if let Err(SendPktsError::IoError(_, num_failed)) =
186 multi_target_send(&sender, &packets[0], &dest_refs).await
187 {
188 assert_eq!(num_failed, 1);
189 }
190 }
191
192 #[tokio::test]
193 async fn test_intermediate_failures_unreachable_address() {
194 let packets: Vec<_> = (0..5).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
195 let ipv4local = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080);
196 let ipv4broadcast = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), 8080);
197 let sender = UdpSocket::bind("0.0.0.0:0").await.expect("bind");
198
199 let packet_refs: Vec<_> = vec![
201 (&packets[0][..], &ipv4local),
202 (&packets[1][..], &ipv4broadcast),
203 (&packets[2][..], &ipv4local),
204 (&packets[3][..], &ipv4broadcast),
205 (&packets[4][..], &ipv4local),
206 ];
207 if let Err(SendPktsError::IoError(ioerror, num_failed)) =
208 batch_send(&sender, &packet_refs[..]).await
209 {
210 assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied));
211 assert_eq!(num_failed, 2);
212 }
213
214 let packet_refs: Vec<_> = vec![
216 (&packets[0][..], &ipv4broadcast),
217 (&packets[1][..], &ipv4local),
218 (&packets[2][..], &ipv4broadcast),
219 (&packets[3][..], &ipv4local),
220 (&packets[4][..], &ipv4broadcast),
221 ];
222 if let Err(SendPktsError::IoError(ioerror, num_failed)) =
223 batch_send(&sender, &packet_refs[..]).await
224 {
225 assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied));
226 assert_eq!(num_failed, 3);
227 }
228
229 let packet_refs: Vec<_> = vec![
231 (&packets[0][..], &ipv4local),
232 (&packets[1][..], &ipv4local),
233 (&packets[2][..], &ipv4broadcast),
234 (&packets[3][..], &ipv4broadcast),
235 (&packets[4][..], &ipv4local),
236 ];
237 if let Err(SendPktsError::IoError(ioerror, num_failed)) =
238 batch_send(&sender, &packet_refs[..]).await
239 {
240 assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied));
241 assert_eq!(num_failed, 2);
242 }
243
244 let dest_refs: Vec<_> = vec![
246 &ipv4local,
247 &ipv4broadcast,
248 &ipv4local,
249 &ipv4broadcast,
250 &ipv4local,
251 ];
252 if let Err(SendPktsError::IoError(ioerror, num_failed)) =
253 multi_target_send(&sender, &packets[0], &dest_refs).await
254 {
255 assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied));
256 assert_eq!(num_failed, 2);
257 }
258
259 let dest_refs: Vec<_> = vec![
261 &ipv4broadcast,
262 &ipv4local,
263 &ipv4broadcast,
264 &ipv4local,
265 &ipv4broadcast,
266 ];
267 if let Err(SendPktsError::IoError(ioerror, num_failed)) =
268 multi_target_send(&sender, &packets[0], &dest_refs).await
269 {
270 assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied));
271 assert_eq!(num_failed, 3);
272 }
273 }
274}