solana_streamer/nonblocking/
recvmmsg.rs

1//! The `recvmmsg` module provides a nonblocking recvmmsg() API implementation
2
3use {
4    crate::{
5        packet::{Meta, Packet},
6        recvmmsg::NUM_RCVMMSGS,
7    },
8    std::{cmp, io},
9    tokio::net::UdpSocket,
10};
11
12/// Pulls some packets from the socket into the specified container
13/// returning how many packets were read
14pub async fn recv_mmsg(
15    socket: &UdpSocket,
16    packets: &mut [Packet],
17) -> io::Result</*num packets:*/ usize> {
18    debug_assert!(packets.iter().all(|pkt| pkt.meta() == &Meta::default()));
19    let count = cmp::min(NUM_RCVMMSGS, packets.len());
20    socket.readable().await?;
21    let mut i = 0;
22    for p in packets.iter_mut().take(count) {
23        p.meta_mut().size = 0;
24        match socket.try_recv_from(p.buffer_mut()) {
25            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
26                break;
27            }
28            Err(e) => {
29                return Err(e);
30            }
31            Ok((nrecv, from)) => {
32                p.meta_mut().size = nrecv;
33                p.meta_mut().set_socket_addr(&from);
34            }
35        }
36        i += 1;
37    }
38    Ok(i)
39}
40
41/// Reads the exact number of packets required to fill `packets`
42pub async fn recv_mmsg_exact(
43    socket: &UdpSocket,
44    packets: &mut [Packet],
45) -> io::Result</*num packets:*/ usize> {
46    let total = packets.len();
47    let mut remaining = total;
48    while remaining != 0 {
49        let first = total - remaining;
50        let res = recv_mmsg(socket, &mut packets[first..]).await?;
51        remaining -= res;
52    }
53    Ok(packets.len())
54}
55
56#[cfg(test)]
57mod tests {
58    use {
59        crate::{nonblocking::recvmmsg::*, packet::PACKET_DATA_SIZE},
60        std::{net::SocketAddr, time::Instant},
61        tokio::net::UdpSocket,
62    };
63
64    type TestConfig = (UdpSocket, SocketAddr, UdpSocket, SocketAddr);
65
66    async fn test_setup_reader_sender(ip_str: &str) -> io::Result<TestConfig> {
67        let reader = UdpSocket::bind(ip_str).await?;
68        let addr = reader.local_addr()?;
69        let sender = UdpSocket::bind(ip_str).await?;
70        let saddr = sender.local_addr()?;
71        Ok((reader, addr, sender, saddr))
72    }
73
74    const TEST_NUM_MSGS: usize = 32;
75
76    async fn test_one_iter((reader, addr, sender, saddr): TestConfig) {
77        let sent = TEST_NUM_MSGS - 1;
78        for _ in 0..sent {
79            let data = [0; PACKET_DATA_SIZE];
80            sender.send_to(&data[..], &addr).await.unwrap();
81        }
82
83        let mut packets = vec![Packet::default(); sent];
84        let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
85        assert_eq!(sent, recv);
86        for packet in packets.iter().take(recv) {
87            assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
88            assert_eq!(packet.meta().socket_addr(), saddr);
89        }
90    }
91
92    #[tokio::test]
93    async fn test_recv_mmsg_one_iter() {
94        test_one_iter(test_setup_reader_sender("127.0.0.1:0").await.unwrap()).await;
95
96        match test_setup_reader_sender("::1:0").await {
97            Ok(config) => test_one_iter(config).await,
98            Err(e) => warn!("Failed to configure IPv6: {:?}", e),
99        }
100    }
101
102    async fn test_multi_iter((reader, addr, sender, saddr): TestConfig) {
103        let sent = TEST_NUM_MSGS + 10;
104        for _ in 0..sent {
105            let data = [0; PACKET_DATA_SIZE];
106            sender.send_to(&data[..], &addr).await.unwrap();
107        }
108
109        let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
110        let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
111        assert_eq!(TEST_NUM_MSGS, recv);
112        for packet in packets.iter().take(recv) {
113            assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
114            assert_eq!(packet.meta().socket_addr(), saddr);
115        }
116
117        let mut packets = vec![Packet::default(); sent - TEST_NUM_MSGS];
118        packets
119            .iter_mut()
120            .for_each(|pkt| *pkt.meta_mut() = Meta::default());
121        let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
122        assert_eq!(sent - TEST_NUM_MSGS, recv);
123        for packet in packets.iter().take(recv) {
124            assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
125            assert_eq!(packet.meta().socket_addr(), saddr);
126        }
127    }
128
129    #[tokio::test]
130    async fn test_recv_mmsg_multi_iter() {
131        test_multi_iter(test_setup_reader_sender("127.0.0.1:0").await.unwrap()).await;
132
133        match test_setup_reader_sender("::1:0").await {
134            Ok(config) => test_multi_iter(config).await,
135            Err(e) => warn!("Failed to configure IPv6: {:?}", e),
136        }
137    }
138
139    #[tokio::test]
140    async fn test_recv_mmsg_exact_multi_iter_timeout() {
141        let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
142        let addr = reader.local_addr().unwrap();
143        let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
144        let saddr = sender.local_addr().unwrap();
145        let sent = TEST_NUM_MSGS;
146        for _ in 0..sent {
147            let data = [0; PACKET_DATA_SIZE];
148            sender.send_to(&data[..], &addr).await.unwrap();
149        }
150
151        let start = Instant::now();
152        let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
153        let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap();
154        assert_eq!(TEST_NUM_MSGS, recv);
155        for packet in packets.iter().take(recv) {
156            assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
157            assert_eq!(packet.meta().socket_addr(), saddr);
158        }
159
160        packets
161            .iter_mut()
162            .for_each(|pkt| *pkt.meta_mut() = Meta::default());
163        let _recv = recv_mmsg(&reader, &mut packets[..]).await;
164        assert!(start.elapsed().as_secs() < 5);
165    }
166
167    #[tokio::test]
168    async fn test_recv_mmsg_multi_addrs() {
169        let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
170        let addr = reader.local_addr().unwrap();
171
172        let sender1 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
173        let saddr1 = sender1.local_addr().unwrap();
174        let sent1 = TEST_NUM_MSGS - 1;
175
176        let sender2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
177        let saddr2 = sender2.local_addr().unwrap();
178        let sent2 = TEST_NUM_MSGS + 1;
179
180        for _ in 0..sent1 {
181            let data = [0; PACKET_DATA_SIZE];
182            sender1.send_to(&data[..], &addr).await.unwrap();
183        }
184
185        for _ in 0..sent2 {
186            let data = [0; PACKET_DATA_SIZE];
187            sender2.send_to(&data[..], &addr).await.unwrap();
188        }
189
190        let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
191
192        let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
193        assert_eq!(TEST_NUM_MSGS, recv);
194        for packet in packets.iter().take(sent1) {
195            assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
196            assert_eq!(packet.meta().socket_addr(), saddr1);
197        }
198        for packet in packets.iter().skip(sent1).take(recv - sent1) {
199            assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
200            assert_eq!(packet.meta().socket_addr(), saddr2);
201        }
202
203        packets
204            .iter_mut()
205            .for_each(|pkt| *pkt.meta_mut() = Meta::default());
206        let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
207        assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
208        for packet in packets.iter().take(recv) {
209            assert_eq!(packet.meta().size, PACKET_DATA_SIZE);
210            assert_eq!(packet.meta().socket_addr(), saddr2);
211        }
212    }
213}