hickory_proto/tests/
udp.rs

1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2
3use futures_util::stream::StreamExt;
4use tracing::debug;
5
6use crate::udp::{UdpClientStream, UdpSocket, UdpStream};
7use crate::xfer::dns_handle::DnsStreamHandle;
8use crate::xfer::{DnsRequestOptions, FirstAnswer};
9use crate::Executor;
10
11/// Test next random udpsocket.
12pub fn next_random_socket_test<S: UdpSocket + Send + 'static, E: Executor>(mut exec: E) {
13    let (stream, _) = UdpStream::<S>::new(
14        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 52),
15        None,
16    );
17    drop(
18        exec.block_on(stream)
19            .expect("failed to get next socket address"),
20    );
21}
22
23/// Test udp_stream.
24pub async fn udp_stream_test<S: UdpSocket + Send + 'static>(server_addr: IpAddr) {
25    use crate::xfer::SerialMessage;
26    use std::net::ToSocketAddrs;
27
28    let succeeded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
29    let succeeded_clone = succeeded.clone();
30    std::thread::Builder::new()
31        .name("thread_killer".to_string())
32        .spawn(move || {
33            let succeeded = succeeded_clone;
34            for _ in 0..15 {
35                std::thread::sleep(std::time::Duration::from_secs(1));
36                if succeeded.load(std::sync::atomic::Ordering::Relaxed) {
37                    return;
38                }
39            }
40
41            println!("Thread Killer has been awoken, killing process");
42            std::process::exit(-1);
43        })
44        .unwrap();
45
46    let server = std::net::UdpSocket::bind(SocketAddr::new(server_addr, 0)).unwrap();
47    server
48        .set_read_timeout(Some(std::time::Duration::from_secs(5)))
49        .unwrap(); // should receive something within 5 seconds...
50    server
51        .set_write_timeout(Some(std::time::Duration::from_secs(5)))
52        .unwrap(); // should receive something within 5 seconds...
53    let server_addr = server.local_addr().unwrap();
54    println!("server listening on: {server_addr}");
55
56    let test_bytes: &'static [u8; 8] = b"DEADBEEF";
57    let send_recv_times = 4u32;
58
59    // an in and out server
60    let server_handle = std::thread::Builder::new()
61        .name("test_udp_stream_ipv4:server".to_string())
62        .spawn(move || {
63            let mut buffer = [0_u8; 512];
64
65            for _ in 0..send_recv_times {
66                // wait for some bytes...
67                //println!("receiving message: {}", _i);
68                let (len, addr) = server.recv_from(&mut buffer).expect("receive failed");
69
70                assert_eq!(&buffer[0..len], test_bytes);
71
72                //println!("sending message len back: {}", len);
73                // bounce them right back...
74                assert_eq!(
75                    server.send_to(&buffer[0..len], addr).expect("send failed"),
76                    len
77                );
78            }
79        })
80        .unwrap();
81
82    // setup the client, which is going to run on the testing thread...
83    // the tests should run within 5 seconds... right?
84    // TODO: add timeout here, so that test never hangs...
85    let client_addr = match server_addr {
86        std::net::SocketAddr::V4(_) => "127.0.0.1:0",
87        std::net::SocketAddr::V6(_) => "[::1]:0",
88    };
89
90    println!("binding client socket");
91    let socket = S::bind(client_addr.to_socket_addrs().unwrap().next().unwrap())
92        .await
93        .expect("could not create socket"); // some random address...
94    println!("bound client socket");
95
96    let (mut stream, mut sender) = UdpStream::<S>::with_bound(socket, server_addr);
97
98    for _i in 0..send_recv_times {
99        // test once
100        sender
101            .send(SerialMessage::new(test_bytes.to_vec(), server_addr))
102            .unwrap();
103        //println!("blocking on client stream: {}", _i);
104        let buffer_and_addr = stream.next().await;
105        //println!("got message");
106        let message = buffer_and_addr.expect("no message").expect("io error");
107        assert_eq!(message.bytes(), test_bytes);
108        assert_eq!(message.addr(), server_addr);
109    }
110
111    succeeded.store(true, std::sync::atomic::Ordering::Relaxed);
112    server_handle.join().expect("server thread failed");
113}
114
115/// Test udp_client_stream.
116#[allow(clippy::print_stdout)]
117pub fn udp_client_stream_test<S: UdpSocket + Send + 'static, E: Executor>(
118    server_addr: IpAddr,
119    mut exec: E,
120) {
121    use crate::op::{Message, Query};
122    use crate::rr::rdata::NULL;
123    use crate::rr::{Name, RData, Record, RecordType};
124    use crate::xfer::{DnsRequest, DnsRequestSender};
125    use std::str::FromStr;
126    use std::time::Duration;
127
128    // use env_logger;
129    // env_logger::try_init().ok();
130
131    let succeeded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
132    let succeeded_clone = succeeded.clone();
133    std::thread::Builder::new()
134        .name("thread_killer".to_string())
135        .spawn(move || {
136            let succeeded = succeeded_clone;
137            for _ in 0..15 {
138                std::thread::sleep(std::time::Duration::from_secs(1));
139                if succeeded.load(std::sync::atomic::Ordering::Relaxed) {
140                    return;
141                }
142            }
143
144            println!("Thread Killer has been awoken, killing process");
145            std::process::exit(-1);
146        })
147        .unwrap();
148
149    let server = std::net::UdpSocket::bind(SocketAddr::new(server_addr, 0)).unwrap();
150    server
151        .set_read_timeout(Some(std::time::Duration::from_secs(5)))
152        .unwrap(); // should receive something within 5 seconds...
153    server
154        .set_write_timeout(Some(std::time::Duration::from_secs(5)))
155        .unwrap(); // should receive something within 5 seconds...
156    let server_addr = server.local_addr().unwrap();
157
158    let mut query = Message::new();
159    let test_name = Name::from_str("dead.beef").unwrap();
160    query.add_query(Query::query(test_name.clone(), RecordType::NULL));
161    let test_bytes: &'static [u8; 8] = b"DEADBEEF";
162    let send_recv_times = 4;
163
164    let test_name_server = test_name;
165    // an in and out server
166    let server_handle = std::thread::Builder::new()
167        .name("test_udp_client_stream_ipv4:server".to_string())
168        .spawn(move || {
169            let mut buffer = [0_u8; 512];
170
171            for i in 0..send_recv_times {
172                // wait for some bytes...
173                debug!("server receiving request {}", i);
174                let (len, addr) = server.recv_from(&mut buffer).expect("receive failed");
175                debug!("server received request {} from: {}", i, addr);
176
177                let request = Message::from_vec(&buffer[0..len]).expect("failed parse of request");
178                assert_eq!(*request.queries()[0].name(), test_name_server.clone());
179                assert_eq!(request.queries()[0].query_type(), RecordType::NULL);
180
181                let mut message = Message::new();
182                message.set_id(request.id());
183                message.add_queries(request.queries().to_vec());
184                message.add_answer(Record::from_rdata(
185                    test_name_server.clone(),
186                    0,
187                    RData::NULL(NULL::with(test_bytes.to_vec())),
188                ));
189
190                // bounce them right back...
191                let bytes = message.to_vec().unwrap();
192                debug!("server sending response {i} to: {addr}");
193                assert_eq!(
194                    server.send_to(&bytes, addr).expect("send failed"),
195                    bytes.len()
196                );
197                debug!("server sent response {i}");
198                std::thread::yield_now();
199            }
200        })
201        .unwrap();
202
203    // setup the client, which is going to run on the testing thread...
204
205    // the tests should run within 5 seconds... right?
206    // TODO: add timeout here, so that test never hangs...
207    // let timeout = Timeout::new(Duration::from_secs(5));
208    let stream = UdpClientStream::with_timeout(server_addr, Duration::from_millis(500));
209    let mut stream: UdpClientStream<S> = exec.block_on(stream).ok().unwrap();
210    let mut worked_once = false;
211
212    for i in 0..send_recv_times {
213        // test once
214        let response_stream =
215            stream.send_message(DnsRequest::new(query.clone(), DnsRequestOptions::default()));
216        println!("client sending request {i}");
217        let response = match exec.block_on(response_stream.first_answer()) {
218            Ok(response) => response,
219            Err(err) => {
220                println!("failed to get message: {err}");
221                continue;
222            }
223        };
224        println!("client got response {i}");
225
226        let response = Message::from(response);
227        if let Some(RData::NULL(null)) = response.answers()[0].data() {
228            assert_eq!(null.anything(), test_bytes);
229        } else {
230            panic!("not a NULL response");
231        }
232
233        worked_once = true;
234    }
235
236    succeeded.store(true, std::sync::atomic::Ordering::Relaxed);
237    server_handle.join().expect("server thread failed");
238
239    assert!(worked_once);
240}