hickory_proto/tests/
udp.rs1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2
3use futures_util::stream::StreamExt;
4use tracing::debug;
5
6use crate::udp::{UdpClientStream, UdpSocket, UdpStream};
7use crate::xfer::dns_handle::DnsStreamHandle;
8use crate::xfer::{DnsRequestOptions, FirstAnswer};
9use crate::Executor;
10
11pub fn next_random_socket_test<S: UdpSocket + Send + 'static, E: Executor>(mut exec: E) {
13 let (stream, _) = UdpStream::<S>::new(
14 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 52),
15 None,
16 );
17 drop(
18 exec.block_on(stream)
19 .expect("failed to get next socket address"),
20 );
21}
22
23pub async fn udp_stream_test<S: UdpSocket + Send + 'static>(server_addr: IpAddr) {
25 use crate::xfer::SerialMessage;
26 use std::net::ToSocketAddrs;
27
28 let succeeded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
29 let succeeded_clone = succeeded.clone();
30 std::thread::Builder::new()
31 .name("thread_killer".to_string())
32 .spawn(move || {
33 let succeeded = succeeded_clone;
34 for _ in 0..15 {
35 std::thread::sleep(std::time::Duration::from_secs(1));
36 if succeeded.load(std::sync::atomic::Ordering::Relaxed) {
37 return;
38 }
39 }
40
41 println!("Thread Killer has been awoken, killing process");
42 std::process::exit(-1);
43 })
44 .unwrap();
45
46 let server = std::net::UdpSocket::bind(SocketAddr::new(server_addr, 0)).unwrap();
47 server
48 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
49 .unwrap(); server
51 .set_write_timeout(Some(std::time::Duration::from_secs(5)))
52 .unwrap(); let server_addr = server.local_addr().unwrap();
54 println!("server listening on: {server_addr}");
55
56 let test_bytes: &'static [u8; 8] = b"DEADBEEF";
57 let send_recv_times = 4u32;
58
59 let server_handle = std::thread::Builder::new()
61 .name("test_udp_stream_ipv4:server".to_string())
62 .spawn(move || {
63 let mut buffer = [0_u8; 512];
64
65 for _ in 0..send_recv_times {
66 let (len, addr) = server.recv_from(&mut buffer).expect("receive failed");
69
70 assert_eq!(&buffer[0..len], test_bytes);
71
72 assert_eq!(
75 server.send_to(&buffer[0..len], addr).expect("send failed"),
76 len
77 );
78 }
79 })
80 .unwrap();
81
82 let client_addr = match server_addr {
86 std::net::SocketAddr::V4(_) => "127.0.0.1:0",
87 std::net::SocketAddr::V6(_) => "[::1]:0",
88 };
89
90 println!("binding client socket");
91 let socket = S::bind(client_addr.to_socket_addrs().unwrap().next().unwrap())
92 .await
93 .expect("could not create socket"); println!("bound client socket");
95
96 let (mut stream, mut sender) = UdpStream::<S>::with_bound(socket, server_addr);
97
98 for _i in 0..send_recv_times {
99 sender
101 .send(SerialMessage::new(test_bytes.to_vec(), server_addr))
102 .unwrap();
103 let buffer_and_addr = stream.next().await;
105 let message = buffer_and_addr.expect("no message").expect("io error");
107 assert_eq!(message.bytes(), test_bytes);
108 assert_eq!(message.addr(), server_addr);
109 }
110
111 succeeded.store(true, std::sync::atomic::Ordering::Relaxed);
112 server_handle.join().expect("server thread failed");
113}
114
115#[allow(clippy::print_stdout)]
117pub fn udp_client_stream_test<S: UdpSocket + Send + 'static, E: Executor>(
118 server_addr: IpAddr,
119 mut exec: E,
120) {
121 use crate::op::{Message, Query};
122 use crate::rr::rdata::NULL;
123 use crate::rr::{Name, RData, Record, RecordType};
124 use crate::xfer::{DnsRequest, DnsRequestSender};
125 use std::str::FromStr;
126 use std::time::Duration;
127
128 let succeeded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
132 let succeeded_clone = succeeded.clone();
133 std::thread::Builder::new()
134 .name("thread_killer".to_string())
135 .spawn(move || {
136 let succeeded = succeeded_clone;
137 for _ in 0..15 {
138 std::thread::sleep(std::time::Duration::from_secs(1));
139 if succeeded.load(std::sync::atomic::Ordering::Relaxed) {
140 return;
141 }
142 }
143
144 println!("Thread Killer has been awoken, killing process");
145 std::process::exit(-1);
146 })
147 .unwrap();
148
149 let server = std::net::UdpSocket::bind(SocketAddr::new(server_addr, 0)).unwrap();
150 server
151 .set_read_timeout(Some(std::time::Duration::from_secs(5)))
152 .unwrap(); server
154 .set_write_timeout(Some(std::time::Duration::from_secs(5)))
155 .unwrap(); let server_addr = server.local_addr().unwrap();
157
158 let mut query = Message::new();
159 let test_name = Name::from_str("dead.beef").unwrap();
160 query.add_query(Query::query(test_name.clone(), RecordType::NULL));
161 let test_bytes: &'static [u8; 8] = b"DEADBEEF";
162 let send_recv_times = 4;
163
164 let test_name_server = test_name;
165 let server_handle = std::thread::Builder::new()
167 .name("test_udp_client_stream_ipv4:server".to_string())
168 .spawn(move || {
169 let mut buffer = [0_u8; 512];
170
171 for i in 0..send_recv_times {
172 debug!("server receiving request {}", i);
174 let (len, addr) = server.recv_from(&mut buffer).expect("receive failed");
175 debug!("server received request {} from: {}", i, addr);
176
177 let request = Message::from_vec(&buffer[0..len]).expect("failed parse of request");
178 assert_eq!(*request.queries()[0].name(), test_name_server.clone());
179 assert_eq!(request.queries()[0].query_type(), RecordType::NULL);
180
181 let mut message = Message::new();
182 message.set_id(request.id());
183 message.add_queries(request.queries().to_vec());
184 message.add_answer(Record::from_rdata(
185 test_name_server.clone(),
186 0,
187 RData::NULL(NULL::with(test_bytes.to_vec())),
188 ));
189
190 let bytes = message.to_vec().unwrap();
192 debug!("server sending response {i} to: {addr}");
193 assert_eq!(
194 server.send_to(&bytes, addr).expect("send failed"),
195 bytes.len()
196 );
197 debug!("server sent response {i}");
198 std::thread::yield_now();
199 }
200 })
201 .unwrap();
202
203 let stream = UdpClientStream::with_timeout(server_addr, Duration::from_millis(500));
209 let mut stream: UdpClientStream<S> = exec.block_on(stream).ok().unwrap();
210 let mut worked_once = false;
211
212 for i in 0..send_recv_times {
213 let response_stream =
215 stream.send_message(DnsRequest::new(query.clone(), DnsRequestOptions::default()));
216 println!("client sending request {i}");
217 let response = match exec.block_on(response_stream.first_answer()) {
218 Ok(response) => response,
219 Err(err) => {
220 println!("failed to get message: {err}");
221 continue;
222 }
223 };
224 println!("client got response {i}");
225
226 let response = Message::from(response);
227 if let Some(RData::NULL(null)) = response.answers()[0].data() {
228 assert_eq!(null.anything(), test_bytes);
229 } else {
230 panic!("not a NULL response");
231 }
232
233 worked_once = true;
234 }
235
236 succeeded.store(true, std::sync::atomic::Ordering::Relaxed);
237 server_handle.join().expect("server thread failed");
238
239 assert!(worked_once);
240}