solana_streamer/nonblocking/
sendmmsg.rs

1//! The `sendmmsg` module provides a nonblocking sendmmsg() API implementation
2
3use {
4    crate::sendmmsg::SendPktsError,
5    futures_util::future::join_all,
6    std::{borrow::Borrow, iter::repeat, net::SocketAddr},
7    tokio::net::UdpSocket,
8};
9
10pub async fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
11where
12    S: Borrow<SocketAddr>,
13    T: AsRef<[u8]>,
14{
15    let mut num_failed = 0;
16    let mut erropt = None;
17    let futures = packets
18        .iter()
19        .map(|(p, a)| sock.send_to(p.as_ref(), a.borrow()))
20        .collect::<Vec<_>>();
21    let results = join_all(futures).await;
22    for result in results {
23        if let Err(e) = result {
24            num_failed += 1;
25            if erropt.is_none() {
26                erropt = Some(e);
27            }
28        }
29    }
30
31    if let Some(err) = erropt {
32        Err(SendPktsError::IoError(err, num_failed))
33    } else {
34        Ok(())
35    }
36}
37
38pub async fn multi_target_send<S, T>(
39    sock: &UdpSocket,
40    packet: T,
41    dests: &[S],
42) -> Result<(), SendPktsError>
43where
44    S: Borrow<SocketAddr>,
45    T: AsRef<[u8]>,
46{
47    let dests = dests.iter().map(Borrow::borrow);
48    let pkts: Vec<_> = repeat(&packet).zip(dests).collect();
49    batch_send(sock, &pkts).await
50}
51
52#[cfg(test)]
53mod tests {
54    use {
55        crate::{
56            nonblocking::{
57                recvmmsg::recv_mmsg,
58                sendmmsg::{batch_send, multi_target_send},
59            },
60            packet::Packet,
61            sendmmsg::SendPktsError,
62        },
63        solana_sdk::packet::PACKET_DATA_SIZE,
64        std::{
65            io::ErrorKind,
66            net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
67        },
68        tokio::net::UdpSocket,
69    };
70
71    #[tokio::test]
72    async fn test_send_mmsg_one_dest() {
73        let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
74        let addr = reader.local_addr().unwrap();
75        let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
76
77        let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
78        let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect();
79
80        let sent = batch_send(&sender, &packet_refs[..]).await.ok();
81        assert_eq!(sent, Some(()));
82
83        let mut packets = vec![Packet::default(); 32];
84        let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
85        assert_eq!(32, recv);
86    }
87
88    #[tokio::test]
89    async fn test_send_mmsg_multi_dest() {
90        let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
91        let addr = reader.local_addr().unwrap();
92
93        let reader2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
94        let addr2 = reader2.local_addr().unwrap();
95
96        let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
97
98        let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
99        let packet_refs: Vec<_> = packets
100            .iter()
101            .enumerate()
102            .map(|(i, p)| {
103                if i < 16 {
104                    (&p[..], &addr)
105                } else {
106                    (&p[..], &addr2)
107                }
108            })
109            .collect();
110
111        let sent = batch_send(&sender, &packet_refs[..]).await.ok();
112        assert_eq!(sent, Some(()));
113
114        let mut packets = vec![Packet::default(); 32];
115        let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
116        assert_eq!(16, recv);
117
118        let mut packets = vec![Packet::default(); 32];
119        let recv = recv_mmsg(&reader2, &mut packets[..]).await.unwrap();
120        assert_eq!(16, recv);
121    }
122
123    #[tokio::test]
124    async fn test_multicast_msg() {
125        let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
126        let addr = reader.local_addr().unwrap();
127
128        let reader2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
129        let addr2 = reader2.local_addr().unwrap();
130
131        let reader3 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
132        let addr3 = reader3.local_addr().unwrap();
133
134        let reader4 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
135        let addr4 = reader4.local_addr().unwrap();
136
137        let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
138
139        let packet = Packet::default();
140
141        let sent = multi_target_send(
142            &sender,
143            packet.data(..).unwrap(),
144            &[&addr, &addr2, &addr3, &addr4],
145        )
146        .await
147        .ok();
148        assert_eq!(sent, Some(()));
149
150        let mut packets = vec![Packet::default(); 32];
151        let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
152        assert_eq!(1, recv);
153
154        let mut packets = vec![Packet::default(); 32];
155        let recv = recv_mmsg(&reader2, &mut packets[..]).await.unwrap();
156        assert_eq!(1, recv);
157
158        let mut packets = vec![Packet::default(); 32];
159        let recv = recv_mmsg(&reader3, &mut packets[..]).await.unwrap();
160        assert_eq!(1, recv);
161
162        let mut packets = vec![Packet::default(); 32];
163        let recv = recv_mmsg(&reader4, &mut packets[..]).await.unwrap();
164        assert_eq!(1, recv);
165    }
166
167    #[tokio::test]
168    async fn test_intermediate_failures_mismatched_bind() {
169        let packets: Vec<_> = (0..3).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
170        let ip4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080);
171        let ip6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 8080);
172        let packet_refs: Vec<_> = vec![
173            (&packets[0][..], &ip4),
174            (&packets[1][..], &ip6),
175            (&packets[2][..], &ip4),
176        ];
177        let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4];
178
179        let sender = UdpSocket::bind("0.0.0.0:0").await.expect("bind");
180        if let Err(SendPktsError::IoError(_, num_failed)) =
181            batch_send(&sender, &packet_refs[..]).await
182        {
183            assert_eq!(num_failed, 1);
184        }
185        if let Err(SendPktsError::IoError(_, num_failed)) =
186            multi_target_send(&sender, &packets[0], &dest_refs).await
187        {
188            assert_eq!(num_failed, 1);
189        }
190    }
191
192    #[tokio::test]
193    async fn test_intermediate_failures_unreachable_address() {
194        let packets: Vec<_> = (0..5).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
195        let ipv4local = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080);
196        let ipv4broadcast = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), 8080);
197        let sender = UdpSocket::bind("0.0.0.0:0").await.expect("bind");
198
199        // test intermediate failures for batch_send
200        let packet_refs: Vec<_> = vec![
201            (&packets[0][..], &ipv4local),
202            (&packets[1][..], &ipv4broadcast),
203            (&packets[2][..], &ipv4local),
204            (&packets[3][..], &ipv4broadcast),
205            (&packets[4][..], &ipv4local),
206        ];
207        if let Err(SendPktsError::IoError(ioerror, num_failed)) =
208            batch_send(&sender, &packet_refs[..]).await
209        {
210            assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied));
211            assert_eq!(num_failed, 2);
212        }
213
214        // test leading and trailing failures for batch_send
215        let packet_refs: Vec<_> = vec![
216            (&packets[0][..], &ipv4broadcast),
217            (&packets[1][..], &ipv4local),
218            (&packets[2][..], &ipv4broadcast),
219            (&packets[3][..], &ipv4local),
220            (&packets[4][..], &ipv4broadcast),
221        ];
222        if let Err(SendPktsError::IoError(ioerror, num_failed)) =
223            batch_send(&sender, &packet_refs[..]).await
224        {
225            assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied));
226            assert_eq!(num_failed, 3);
227        }
228
229        // test consecutive intermediate failures for batch_send
230        let packet_refs: Vec<_> = vec![
231            (&packets[0][..], &ipv4local),
232            (&packets[1][..], &ipv4local),
233            (&packets[2][..], &ipv4broadcast),
234            (&packets[3][..], &ipv4broadcast),
235            (&packets[4][..], &ipv4local),
236        ];
237        if let Err(SendPktsError::IoError(ioerror, num_failed)) =
238            batch_send(&sender, &packet_refs[..]).await
239        {
240            assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied));
241            assert_eq!(num_failed, 2);
242        }
243
244        // test intermediate failures for multi_target_send
245        let dest_refs: Vec<_> = vec![
246            &ipv4local,
247            &ipv4broadcast,
248            &ipv4local,
249            &ipv4broadcast,
250            &ipv4local,
251        ];
252        if let Err(SendPktsError::IoError(ioerror, num_failed)) =
253            multi_target_send(&sender, &packets[0], &dest_refs).await
254        {
255            assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied));
256            assert_eq!(num_failed, 2);
257        }
258
259        // test leading and trailing failures for multi_target_send
260        let dest_refs: Vec<_> = vec![
261            &ipv4broadcast,
262            &ipv4local,
263            &ipv4broadcast,
264            &ipv4local,
265            &ipv4broadcast,
266        ];
267        if let Err(SendPktsError::IoError(ioerror, num_failed)) =
268            multi_target_send(&sender, &packets[0], &dest_refs).await
269        {
270            assert!(matches!(ioerror.kind(), ErrorKind::PermissionDenied));
271            assert_eq!(num_failed, 3);
272        }
273    }
274}