solana_streamer/nonblocking/
sendmmsg.rs1use {
4 crate::sendmmsg::SendPktsError,
5 futures_util::future::join_all,
6 std::{borrow::Borrow, iter::repeat, net::SocketAddr},
7 tokio::net::UdpSocket,
8};
9
10pub async fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
11where
12 S: Borrow<SocketAddr>,
13 T: AsRef<[u8]>,
14{
15 let mut num_failed = 0;
16 let mut erropt = None;
17 let futures = packets
18 .iter()
19 .map(|(p, a)| sock.send_to(p.as_ref(), a.borrow()))
20 .collect::<Vec<_>>();
21 let results = join_all(futures).await;
22 for result in results {
23 if let Err(e) = result {
24 num_failed += 1;
25 if erropt.is_none() {
26 erropt = Some(e);
27 }
28 }
29 }
30
31 if let Some(err) = erropt {
32 Err(SendPktsError::IoError(err, num_failed))
33 } else {
34 Ok(())
35 }
36}
37
38pub async fn multi_target_send<S, T>(
39 sock: &UdpSocket,
40 packet: T,
41 dests: &[S],
42) -> Result<(), SendPktsError>
43where
44 S: Borrow<SocketAddr>,
45 T: AsRef<[u8]>,
46{
47 let dests = dests.iter().map(Borrow::borrow);
48 let pkts: Vec<_> = repeat(&packet).zip(dests).collect();
49 batch_send(sock, &pkts).await
50}
51
52#[cfg(test)]
53mod tests {
54 use {
55 crate::{
56 nonblocking::{
57 recvmmsg::{recv_mmsg, recv_mmsg_exact},
58 sendmmsg::{batch_send, multi_target_send},
59 },
60 packet::Packet,
61 sendmmsg::SendPktsError,
62 },
63 assert_matches::assert_matches,
64 solana_net_utils::{bind_to_localhost_async, bind_to_unspecified_async},
65 solana_packet::PACKET_DATA_SIZE,
66 std::{
67 io::ErrorKind,
68 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
69 },
70 };
71
72 #[tokio::test]
73 async fn test_send_mmsg_one_dest() {
74 let reader = bind_to_localhost_async().await.expect("bind");
75 let addr = reader.local_addr().unwrap();
76 let sender = bind_to_localhost_async().await.expect("bind");
77
78 let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
79 let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect();
80
81 let sent = batch_send(&sender, &packet_refs[..]).await.ok();
82 assert_eq!(sent, Some(()));
83
84 let mut packets = vec![Packet::default(); 32];
85 let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
86 assert_eq!(32, recv);
87 }
88
89 #[tokio::test]
90 async fn test_send_mmsg_multi_dest() {
91 let reader = bind_to_localhost_async().await.expect("bind");
92 let addr = reader.local_addr().unwrap();
93
94 let reader2 = bind_to_localhost_async().await.expect("bind");
95 let addr2 = reader2.local_addr().unwrap();
96
97 let sender = bind_to_localhost_async().await.expect("bind");
98
99 let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
100 let packet_refs: Vec<_> = packets
101 .iter()
102 .enumerate()
103 .map(|(i, p)| {
104 if i < 16 {
105 (&p[..], &addr)
106 } else {
107 (&p[..], &addr2)
108 }
109 })
110 .collect();
111
112 let sent = batch_send(&sender, &packet_refs[..]).await.ok();
113 assert_eq!(sent, Some(()));
114
115 let mut packets = vec![Packet::default(); 16];
116 let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
117 assert_eq!(16, recv);
118
119 let mut packets = vec![Packet::default(); 16];
120 let recv = recv_mmsg_exact(&reader2, &mut packets[..]).await.unwrap();
121 assert_eq!(16, recv);
122 }
123
124 #[tokio::test]
125 async fn test_multicast_msg() {
126 let reader = bind_to_localhost_async().await.expect("bind");
127 let addr = reader.local_addr().unwrap();
128
129 let reader2 = bind_to_localhost_async().await.expect("bind");
130 let addr2 = reader2.local_addr().unwrap();
131
132 let reader3 = bind_to_localhost_async().await.expect("bind");
133 let addr3 = reader3.local_addr().unwrap();
134
135 let reader4 = bind_to_localhost_async().await.expect("bind");
136 let addr4 = reader4.local_addr().unwrap();
137
138 let sender = bind_to_localhost_async().await.expect("bind");
139
140 let packet = Packet::default();
141
142 let sent = multi_target_send(
143 &sender,
144 packet.data(..).unwrap(),
145 &[&addr, &addr2, &addr3, &addr4],
146 )
147 .await
148 .ok();
149 assert_eq!(sent, Some(()));
150
151 let mut packets = vec![Packet::default(); 32];
152 let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
153 assert_eq!(1, recv);
154
155 let mut packets = vec![Packet::default(); 32];
156 let recv = recv_mmsg(&reader2, &mut packets[..]).await.unwrap();
157 assert_eq!(1, recv);
158
159 let mut packets = vec![Packet::default(); 32];
160 let recv = recv_mmsg(&reader3, &mut packets[..]).await.unwrap();
161 assert_eq!(1, recv);
162
163 let mut packets = vec![Packet::default(); 32];
164 let recv = recv_mmsg(&reader4, &mut packets[..]).await.unwrap();
165 assert_eq!(1, recv);
166 }
167
168 #[tokio::test]
169 async fn test_intermediate_failures_mismatched_bind() {
170 let packets: Vec<_> = (0..3).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
171 let ip4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080);
172 let ip6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 8080);
173 let packet_refs: Vec<_> = vec![
174 (&packets[0][..], &ip4),
175 (&packets[1][..], &ip6),
176 (&packets[2][..], &ip4),
177 ];
178 let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4];
179
180 let sender = bind_to_unspecified_async().await.expect("bind");
181 let res = batch_send(&sender, &packet_refs[..]).await;
182 assert_matches!(res, Err(SendPktsError::IoError(_, 1)));
183 let res = multi_target_send(&sender, &packets[0], &dest_refs).await;
184 assert_matches!(res, Err(SendPktsError::IoError(_, 1)));
185 }
186
187 #[tokio::test]
188 async fn test_intermediate_failures_unreachable_address() {
189 let packets: Vec<_> = (0..5).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
190 let ipv4local = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080);
191 let ipv4broadcast = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), 8080);
192 let sender = bind_to_unspecified_async().await.expect("bind");
193
194 let packet_refs: Vec<_> = vec![
196 (&packets[0][..], &ipv4local),
197 (&packets[1][..], &ipv4broadcast),
198 (&packets[2][..], &ipv4local),
199 (&packets[3][..], &ipv4broadcast),
200 (&packets[4][..], &ipv4local),
201 ];
202 match batch_send(&sender, &packet_refs[..]).await {
203 Ok(()) => panic!(),
204 Err(SendPktsError::IoError(ioerror, num_failed)) => {
205 assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
206 assert_eq!(num_failed, 2);
207 }
208 }
209
210 let packet_refs: Vec<_> = vec![
212 (&packets[0][..], &ipv4broadcast),
213 (&packets[1][..], &ipv4local),
214 (&packets[2][..], &ipv4broadcast),
215 (&packets[3][..], &ipv4local),
216 (&packets[4][..], &ipv4broadcast),
217 ];
218 match batch_send(&sender, &packet_refs[..]).await {
219 Ok(()) => panic!(),
220 Err(SendPktsError::IoError(ioerror, num_failed)) => {
221 assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
222 assert_eq!(num_failed, 3);
223 }
224 }
225
226 let packet_refs: Vec<_> = vec![
228 (&packets[0][..], &ipv4local),
229 (&packets[1][..], &ipv4local),
230 (&packets[2][..], &ipv4broadcast),
231 (&packets[3][..], &ipv4broadcast),
232 (&packets[4][..], &ipv4local),
233 ];
234 match batch_send(&sender, &packet_refs[..]).await {
235 Ok(()) => panic!(),
236 Err(SendPktsError::IoError(ioerror, num_failed)) => {
237 assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
238 assert_eq!(num_failed, 2);
239 }
240 }
241
242 let dest_refs: Vec<_> = vec![
244 &ipv4local,
245 &ipv4broadcast,
246 &ipv4local,
247 &ipv4broadcast,
248 &ipv4local,
249 ];
250 match multi_target_send(&sender, &packets[0], &dest_refs).await {
251 Ok(()) => panic!(),
252 Err(SendPktsError::IoError(ioerror, num_failed)) => {
253 assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
254 assert_eq!(num_failed, 2);
255 }
256 }
257
258 let dest_refs: Vec<_> = vec![
260 &ipv4broadcast,
261 &ipv4local,
262 &ipv4broadcast,
263 &ipv4local,
264 &ipv4broadcast,
265 ];
266 match multi_target_send(&sender, &packets[0], &dest_refs).await {
267 Ok(()) => panic!(),
268 Err(SendPktsError::IoError(ioerror, num_failed)) => {
269 assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
270 assert_eq!(num_failed, 3);
271 }
272 }
273 }
274}