hickory_proto/tests/
udp.rs

1use alloc::string::ToString;
2use alloc::sync::Arc;
3use core::str::FromStr;
4use core::sync::atomic::AtomicBool;
5use core::time::Duration;
6use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs};
7use std::println;
8
9use futures_util::stream::StreamExt;
10use tracing::debug;
11
12use crate::op::{Message, Query};
13use crate::rr::rdata::NULL;
14use crate::rr::{Name, RData, Record, RecordType};
15use crate::runtime::RuntimeProvider;
16use crate::udp::{UdpClientStream, UdpStream};
17use crate::xfer::dns_handle::DnsStreamHandle;
18use crate::xfer::{DnsRequest, DnsRequestOptions, DnsRequestSender, FirstAnswer, SerialMessage};
19
20/// Test next random udpsocket.
21pub async fn next_random_socket_test(provider: impl RuntimeProvider) {
22    let (stream, _) = UdpStream::new(
23        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 52),
24        None,
25        None,
26        false,
27        provider,
28    );
29    drop(stream.await.expect("failed to get next socket address"));
30}
31
32/// Test udp_stream.
33pub async fn udp_stream_test<P: RuntimeProvider>(server_addr: IpAddr, provider: P) {
34    let succeeded = Arc::new(AtomicBool::new(false));
35    let succeeded_clone = succeeded.clone();
36    std::thread::Builder::new()
37        .name("thread_killer".to_string())
38        .spawn(move || {
39            let succeeded = succeeded_clone;
40            for _ in 0..15 {
41                std::thread::sleep(core::time::Duration::from_secs(1));
42                if succeeded.load(core::sync::atomic::Ordering::Relaxed) {
43                    return;
44                }
45            }
46
47            println!("Thread Killer has been awoken, killing process");
48            std::process::exit(-1);
49        })
50        .unwrap();
51
52    let server = std::net::UdpSocket::bind(SocketAddr::new(server_addr, 0)).unwrap();
53    server
54        .set_read_timeout(Some(core::time::Duration::from_secs(5)))
55        .unwrap(); // should receive something within 5 seconds...
56    server
57        .set_write_timeout(Some(core::time::Duration::from_secs(5)))
58        .unwrap(); // should receive something within 5 seconds...
59    let server_addr = server.local_addr().unwrap();
60    println!("server listening on: {server_addr}");
61
62    let test_bytes: &'static [u8; 8] = b"DEADBEEF";
63    let send_recv_times = 4u32;
64
65    // an in and out server
66    let server_handle = std::thread::Builder::new()
67        .name("test_udp_stream_ipv4:server".to_string())
68        .spawn(move || {
69            let mut buffer = [0_u8; 512];
70
71            for _ in 0..send_recv_times {
72                // wait for some bytes...
73                //println!("receiving message: {}", _i);
74                let (len, addr) = server.recv_from(&mut buffer).expect("receive failed");
75
76                assert_eq!(&buffer[0..len], test_bytes);
77
78                //println!("sending message len back: {}", len);
79                // bounce them right back...
80                assert_eq!(
81                    server.send_to(&buffer[0..len], addr).expect("send failed"),
82                    len
83                );
84            }
85        })
86        .unwrap();
87
88    // setup the client, which is going to run on the testing thread...
89    // the tests should run within 5 seconds... right?
90    // TODO: add timeout here, so that test never hangs...
91    let client_addr = match server_addr {
92        SocketAddr::V4(_) => "127.0.0.1:0",
93        SocketAddr::V6(_) => "[::1]:0",
94    };
95
96    println!("binding client socket");
97    let socket = provider
98        .bind_udp(
99            client_addr.to_socket_addrs().unwrap().next().unwrap(),
100            server_addr,
101        )
102        .await
103        .expect("could not create socket"); // some random address...
104    println!("bound client socket");
105
106    let (mut stream, mut sender) = UdpStream::<P>::with_bound(socket, server_addr);
107
108    for _i in 0..send_recv_times {
109        // test once
110        sender
111            .send(SerialMessage::new(test_bytes.to_vec(), server_addr))
112            .unwrap();
113        //println!("blocking on client stream: {}", _i);
114        let buffer_and_addr = stream.next().await;
115        //println!("got message");
116        let message = buffer_and_addr.expect("no message").expect("io error");
117        assert_eq!(message.bytes(), test_bytes);
118        assert_eq!(message.addr(), server_addr);
119    }
120
121    succeeded.store(true, core::sync::atomic::Ordering::Relaxed);
122    server_handle.join().expect("server thread failed");
123}
124
125/// Test udp_client_stream.
126#[allow(clippy::print_stdout)]
127pub async fn udp_client_stream_test(server_addr: IpAddr, provider: impl RuntimeProvider) {
128    let succeeded = Arc::new(AtomicBool::new(false));
129    let succeeded_clone = succeeded.clone();
130    std::thread::Builder::new()
131        .name("thread_killer".to_string())
132        .spawn(move || {
133            let succeeded = succeeded_clone;
134            for _ in 0..15 {
135                std::thread::sleep(core::time::Duration::from_secs(1));
136                if succeeded.load(core::sync::atomic::Ordering::Relaxed) {
137                    return;
138                }
139            }
140
141            println!("Thread Killer has been awoken, killing process");
142            std::process::exit(-1);
143        })
144        .unwrap();
145
146    let server = std::net::UdpSocket::bind(SocketAddr::new(server_addr, 0)).unwrap();
147    server
148        .set_read_timeout(Some(core::time::Duration::from_secs(5)))
149        .unwrap(); // should receive something within 5 seconds...
150    server
151        .set_write_timeout(Some(core::time::Duration::from_secs(5)))
152        .unwrap(); // should receive something within 5 seconds...
153    let server_addr = server.local_addr().unwrap();
154
155    let mut query = Message::new();
156    let test_name = Name::from_str("dead.beef.").unwrap();
157    query.add_query(Query::query(test_name.clone(), RecordType::NULL));
158    let test_bytes: &'static [u8; 8] = b"DEADBEEF";
159    let send_recv_times = 4;
160
161    let test_name_server = test_name;
162    // an in and out server
163    let server_handle = std::thread::Builder::new()
164        .name("test_udp_client_stream_ipv4:server".to_string())
165        .spawn(move || {
166            let mut buffer = [0_u8; 512];
167
168            for i in 0..send_recv_times {
169                // wait for some bytes...
170                debug!("server receiving request {}", i);
171                let (len, addr) = server.recv_from(&mut buffer).expect("receive failed");
172                debug!("server received request {} from: {}", i, addr);
173
174                let request = Message::from_vec(&buffer[0..len]).expect("failed parse of request");
175                assert_eq!(*request.queries()[0].name(), test_name_server.clone());
176                assert_eq!(request.queries()[0].query_type(), RecordType::NULL);
177
178                let mut message = Message::new();
179                message.set_id(request.id());
180                message.add_queries(request.queries().to_vec());
181                message.add_answer(Record::from_rdata(
182                    test_name_server.clone(),
183                    0,
184                    RData::NULL(NULL::with(test_bytes.to_vec())),
185                ));
186
187                // bounce them right back...
188                let bytes = message.to_vec().unwrap();
189                debug!("server sending response {i} to: {addr}");
190                assert_eq!(
191                    server.send_to(&bytes, addr).expect("send failed"),
192                    bytes.len()
193                );
194                debug!("server sent response {i}");
195                std::thread::yield_now();
196            }
197        })
198        .unwrap();
199
200    // setup the client, which is going to run on the testing thread...
201
202    // the tests should run within 5 seconds... right?
203    // TODO: add timeout here, so that test never hangs...
204    // let timeout = Timeout::new(Duration::from_secs(5));
205    let stream = UdpClientStream::builder(server_addr, provider)
206        .with_timeout(Some(Duration::from_millis(500)))
207        .build();
208    let mut stream = stream.await.ok().unwrap();
209    let mut worked_once = false;
210
211    for i in 0..send_recv_times {
212        // test once
213        let response_stream =
214            stream.send_message(DnsRequest::new(query.clone(), DnsRequestOptions::default()));
215        println!("client sending request {i}");
216        let response = match response_stream.first_answer().await {
217            Ok(response) => response,
218            Err(err) => {
219                println!("failed to get message: {err}");
220                continue;
221            }
222        };
223        println!("client got response {i}");
224
225        let response = Message::from(response);
226        if let RData::NULL(null) = response.answers()[0].data() {
227            assert_eq!(null.anything(), test_bytes);
228        } else {
229            panic!("not a NULL response");
230        }
231
232        worked_once = true;
233    }
234
235    succeeded.store(true, core::sync::atomic::Ordering::Relaxed);
236    server_handle.join().expect("server thread failed");
237
238    assert!(worked_once);
239}