solana_streamer/nonblocking/
sendmmsg.rs

1//! The `sendmmsg` module provides a nonblocking sendmmsg() API implementation
2
3use {
4    crate::sendmmsg::SendPktsError,
5    futures_util::future::join_all,
6    std::{borrow::Borrow, iter::repeat, net::SocketAddr},
7    tokio::net::UdpSocket,
8};
9
10pub async fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
11where
12    S: Borrow<SocketAddr>,
13    T: AsRef<[u8]>,
14{
15    let mut num_failed = 0;
16    let mut erropt = None;
17    let futures = packets
18        .iter()
19        .map(|(p, a)| sock.send_to(p.as_ref(), a.borrow()))
20        .collect::<Vec<_>>();
21    let results = join_all(futures).await;
22    for result in results {
23        if let Err(e) = result {
24            num_failed += 1;
25            if erropt.is_none() {
26                erropt = Some(e);
27            }
28        }
29    }
30
31    if let Some(err) = erropt {
32        Err(SendPktsError::IoError(err, num_failed))
33    } else {
34        Ok(())
35    }
36}
37
38pub async fn multi_target_send<S, T>(
39    sock: &UdpSocket,
40    packet: T,
41    dests: &[S],
42) -> Result<(), SendPktsError>
43where
44    S: Borrow<SocketAddr>,
45    T: AsRef<[u8]>,
46{
47    let dests = dests.iter().map(Borrow::borrow);
48    let pkts: Vec<_> = repeat(&packet).zip(dests).collect();
49    batch_send(sock, &pkts).await
50}
51
52#[cfg(test)]
53mod tests {
54    use {
55        crate::{
56            nonblocking::{
57                recvmmsg::{recv_mmsg, recv_mmsg_exact},
58                sendmmsg::{batch_send, multi_target_send},
59            },
60            packet::Packet,
61            sendmmsg::SendPktsError,
62        },
63        assert_matches::assert_matches,
64        solana_net_utils::{bind_to_localhost_async, bind_to_unspecified_async},
65        solana_packet::PACKET_DATA_SIZE,
66        std::{
67            io::ErrorKind,
68            net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
69        },
70    };
71
72    #[tokio::test]
73    async fn test_send_mmsg_one_dest() {
74        let reader = bind_to_localhost_async().await.expect("bind");
75        let addr = reader.local_addr().unwrap();
76        let sender = bind_to_localhost_async().await.expect("bind");
77
78        let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
79        let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect();
80
81        let sent = batch_send(&sender, &packet_refs[..]).await.ok();
82        assert_eq!(sent, Some(()));
83
84        let mut packets = vec![Packet::default(); 32];
85        let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
86        assert_eq!(32, recv);
87    }
88
89    #[tokio::test]
90    async fn test_send_mmsg_multi_dest() {
91        let reader = bind_to_localhost_async().await.expect("bind");
92        let addr = reader.local_addr().unwrap();
93
94        let reader2 = bind_to_localhost_async().await.expect("bind");
95        let addr2 = reader2.local_addr().unwrap();
96
97        let sender = bind_to_localhost_async().await.expect("bind");
98
99        let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
100        let packet_refs: Vec<_> = packets
101            .iter()
102            .enumerate()
103            .map(|(i, p)| {
104                if i < 16 {
105                    (&p[..], &addr)
106                } else {
107                    (&p[..], &addr2)
108                }
109            })
110            .collect();
111
112        let sent = batch_send(&sender, &packet_refs[..]).await.ok();
113        assert_eq!(sent, Some(()));
114
115        let mut packets = vec![Packet::default(); 16];
116        let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
117        assert_eq!(16, recv);
118
119        let mut packets = vec![Packet::default(); 16];
120        let recv = recv_mmsg_exact(&reader2, &mut packets[..]).await.unwrap();
121        assert_eq!(16, recv);
122    }
123
124    #[tokio::test]
125    async fn test_multicast_msg() {
126        let reader = bind_to_localhost_async().await.expect("bind");
127        let addr = reader.local_addr().unwrap();
128
129        let reader2 = bind_to_localhost_async().await.expect("bind");
130        let addr2 = reader2.local_addr().unwrap();
131
132        let reader3 = bind_to_localhost_async().await.expect("bind");
133        let addr3 = reader3.local_addr().unwrap();
134
135        let reader4 = bind_to_localhost_async().await.expect("bind");
136        let addr4 = reader4.local_addr().unwrap();
137
138        let sender = bind_to_localhost_async().await.expect("bind");
139
140        let packet = Packet::default();
141
142        let sent = multi_target_send(
143            &sender,
144            packet.data(..).unwrap(),
145            &[&addr, &addr2, &addr3, &addr4],
146        )
147        .await
148        .ok();
149        assert_eq!(sent, Some(()));
150
151        let mut packets = vec![Packet::default(); 32];
152        let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
153        assert_eq!(1, recv);
154
155        let mut packets = vec![Packet::default(); 32];
156        let recv = recv_mmsg(&reader2, &mut packets[..]).await.unwrap();
157        assert_eq!(1, recv);
158
159        let mut packets = vec![Packet::default(); 32];
160        let recv = recv_mmsg(&reader3, &mut packets[..]).await.unwrap();
161        assert_eq!(1, recv);
162
163        let mut packets = vec![Packet::default(); 32];
164        let recv = recv_mmsg(&reader4, &mut packets[..]).await.unwrap();
165        assert_eq!(1, recv);
166    }
167
168    #[tokio::test]
169    async fn test_intermediate_failures_mismatched_bind() {
170        let packets: Vec<_> = (0..3).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
171        let ip4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080);
172        let ip6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 8080);
173        let packet_refs: Vec<_> = vec![
174            (&packets[0][..], &ip4),
175            (&packets[1][..], &ip6),
176            (&packets[2][..], &ip4),
177        ];
178        let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4];
179
180        let sender = bind_to_unspecified_async().await.expect("bind");
181        let res = batch_send(&sender, &packet_refs[..]).await;
182        assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1)));
183        let res = multi_target_send(&sender, &packets[0], &dest_refs).await;
184        assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1)));
185    }
186
187    #[tokio::test]
188    async fn test_intermediate_failures_unreachable_address() {
189        let packets: Vec<_> = (0..5).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
190        let ipv4local = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080);
191        let ipv4broadcast = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), 8080);
192        let sender = bind_to_unspecified_async().await.expect("bind");
193
194        // test intermediate failures for batch_send
195        let packet_refs: Vec<_> = vec![
196            (&packets[0][..], &ipv4local),
197            (&packets[1][..], &ipv4broadcast),
198            (&packets[2][..], &ipv4local),
199            (&packets[3][..], &ipv4broadcast),
200            (&packets[4][..], &ipv4local),
201        ];
202        match batch_send(&sender, &packet_refs[..]).await {
203            Ok(()) => panic!(),
204            Err(SendPktsError::IoError(ioerror, num_failed)) => {
205                assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
206                assert_eq!(num_failed, 2);
207            }
208        }
209
210        // test leading and trailing failures for batch_send
211        let packet_refs: Vec<_> = vec![
212            (&packets[0][..], &ipv4broadcast),
213            (&packets[1][..], &ipv4local),
214            (&packets[2][..], &ipv4broadcast),
215            (&packets[3][..], &ipv4local),
216            (&packets[4][..], &ipv4broadcast),
217        ];
218        match batch_send(&sender, &packet_refs[..]).await {
219            Ok(()) => panic!(),
220            Err(SendPktsError::IoError(ioerror, num_failed)) => {
221                assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
222                assert_eq!(num_failed, 3);
223            }
224        }
225
226        // test consecutive intermediate failures for batch_send
227        let packet_refs: Vec<_> = vec![
228            (&packets[0][..], &ipv4local),
229            (&packets[1][..], &ipv4local),
230            (&packets[2][..], &ipv4broadcast),
231            (&packets[3][..], &ipv4broadcast),
232            (&packets[4][..], &ipv4local),
233        ];
234        match batch_send(&sender, &packet_refs[..]).await {
235            Ok(()) => panic!(),
236            Err(SendPktsError::IoError(ioerror, num_failed)) => {
237                assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
238                assert_eq!(num_failed, 2);
239            }
240        }
241
242        // test intermediate failures for multi_target_send
243        let dest_refs: Vec<_> = vec![
244            &ipv4local,
245            &ipv4broadcast,
246            &ipv4local,
247            &ipv4broadcast,
248            &ipv4local,
249        ];
250        match multi_target_send(&sender, &packets[0], &dest_refs).await {
251            Ok(()) => panic!(),
252            Err(SendPktsError::IoError(ioerror, num_failed)) => {
253                assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
254                assert_eq!(num_failed, 2);
255            }
256        }
257
258        // test leading and trailing failures for multi_target_send
259        let dest_refs: Vec<_> = vec![
260            &ipv4broadcast,
261            &ipv4local,
262            &ipv4broadcast,
263            &ipv4local,
264            &ipv4broadcast,
265        ];
266        match multi_target_send(&sender, &packets[0], &dest_refs).await {
267            Ok(()) => panic!(),
268            Err(SendPktsError::IoError(ioerror, num_failed)) => {
269                assert_matches!(ioerror.kind(), ErrorKind::PermissionDenied);
270                assert_eq!(num_failed, 3);
271            }
272        }
273    }
274}