hickory_proto/tests/
udp.rs1use alloc::string::ToString;
2use alloc::sync::Arc;
3use core::str::FromStr;
4use core::sync::atomic::AtomicBool;
5use core::time::Duration;
6use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs};
7use std::println;
8
9use futures_util::stream::StreamExt;
10use tracing::debug;
11
12use crate::op::{Message, Query};
13use crate::rr::rdata::NULL;
14use crate::rr::{Name, RData, Record, RecordType};
15use crate::runtime::RuntimeProvider;
16use crate::udp::{UdpClientStream, UdpStream};
17use crate::xfer::dns_handle::DnsStreamHandle;
18use crate::xfer::{DnsRequest, DnsRequestOptions, DnsRequestSender, FirstAnswer, SerialMessage};
19
20pub async fn next_random_socket_test(provider: impl RuntimeProvider) {
22 let (stream, _) = UdpStream::new(
23 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 52),
24 None,
25 None,
26 false,
27 provider,
28 );
29 drop(stream.await.expect("failed to get next socket address"));
30}
31
32pub async fn udp_stream_test<P: RuntimeProvider>(server_addr: IpAddr, provider: P) {
34 let succeeded = Arc::new(AtomicBool::new(false));
35 let succeeded_clone = succeeded.clone();
36 std::thread::Builder::new()
37 .name("thread_killer".to_string())
38 .spawn(move || {
39 let succeeded = succeeded_clone;
40 for _ in 0..15 {
41 std::thread::sleep(core::time::Duration::from_secs(1));
42 if succeeded.load(core::sync::atomic::Ordering::Relaxed) {
43 return;
44 }
45 }
46
47 println!("Thread Killer has been awoken, killing process");
48 std::process::exit(-1);
49 })
50 .unwrap();
51
52 let server = std::net::UdpSocket::bind(SocketAddr::new(server_addr, 0)).unwrap();
53 server
54 .set_read_timeout(Some(core::time::Duration::from_secs(5)))
55 .unwrap(); server
57 .set_write_timeout(Some(core::time::Duration::from_secs(5)))
58 .unwrap(); let server_addr = server.local_addr().unwrap();
60 println!("server listening on: {server_addr}");
61
62 let test_bytes: &'static [u8; 8] = b"DEADBEEF";
63 let send_recv_times = 4u32;
64
65 let server_handle = std::thread::Builder::new()
67 .name("test_udp_stream_ipv4:server".to_string())
68 .spawn(move || {
69 let mut buffer = [0_u8; 512];
70
71 for _ in 0..send_recv_times {
72 let (len, addr) = server.recv_from(&mut buffer).expect("receive failed");
75
76 assert_eq!(&buffer[0..len], test_bytes);
77
78 assert_eq!(
81 server.send_to(&buffer[0..len], addr).expect("send failed"),
82 len
83 );
84 }
85 })
86 .unwrap();
87
88 let client_addr = match server_addr {
92 SocketAddr::V4(_) => "127.0.0.1:0",
93 SocketAddr::V6(_) => "[::1]:0",
94 };
95
96 println!("binding client socket");
97 let socket = provider
98 .bind_udp(
99 client_addr.to_socket_addrs().unwrap().next().unwrap(),
100 server_addr,
101 )
102 .await
103 .expect("could not create socket"); println!("bound client socket");
105
106 let (mut stream, mut sender) = UdpStream::<P>::with_bound(socket, server_addr);
107
108 for _i in 0..send_recv_times {
109 sender
111 .send(SerialMessage::new(test_bytes.to_vec(), server_addr))
112 .unwrap();
113 let buffer_and_addr = stream.next().await;
115 let message = buffer_and_addr.expect("no message").expect("io error");
117 assert_eq!(message.bytes(), test_bytes);
118 assert_eq!(message.addr(), server_addr);
119 }
120
121 succeeded.store(true, core::sync::atomic::Ordering::Relaxed);
122 server_handle.join().expect("server thread failed");
123}
124
125#[allow(clippy::print_stdout)]
127pub async fn udp_client_stream_test(server_addr: IpAddr, provider: impl RuntimeProvider) {
128 let succeeded = Arc::new(AtomicBool::new(false));
129 let succeeded_clone = succeeded.clone();
130 std::thread::Builder::new()
131 .name("thread_killer".to_string())
132 .spawn(move || {
133 let succeeded = succeeded_clone;
134 for _ in 0..15 {
135 std::thread::sleep(core::time::Duration::from_secs(1));
136 if succeeded.load(core::sync::atomic::Ordering::Relaxed) {
137 return;
138 }
139 }
140
141 println!("Thread Killer has been awoken, killing process");
142 std::process::exit(-1);
143 })
144 .unwrap();
145
146 let server = std::net::UdpSocket::bind(SocketAddr::new(server_addr, 0)).unwrap();
147 server
148 .set_read_timeout(Some(core::time::Duration::from_secs(5)))
149 .unwrap(); server
151 .set_write_timeout(Some(core::time::Duration::from_secs(5)))
152 .unwrap(); let server_addr = server.local_addr().unwrap();
154
155 let mut query = Message::new();
156 let test_name = Name::from_str("dead.beef.").unwrap();
157 query.add_query(Query::query(test_name.clone(), RecordType::NULL));
158 let test_bytes: &'static [u8; 8] = b"DEADBEEF";
159 let send_recv_times = 4;
160
161 let test_name_server = test_name;
162 let server_handle = std::thread::Builder::new()
164 .name("test_udp_client_stream_ipv4:server".to_string())
165 .spawn(move || {
166 let mut buffer = [0_u8; 512];
167
168 for i in 0..send_recv_times {
169 debug!("server receiving request {}", i);
171 let (len, addr) = server.recv_from(&mut buffer).expect("receive failed");
172 debug!("server received request {} from: {}", i, addr);
173
174 let request = Message::from_vec(&buffer[0..len]).expect("failed parse of request");
175 assert_eq!(*request.queries()[0].name(), test_name_server.clone());
176 assert_eq!(request.queries()[0].query_type(), RecordType::NULL);
177
178 let mut message = Message::new();
179 message.set_id(request.id());
180 message.add_queries(request.queries().to_vec());
181 message.add_answer(Record::from_rdata(
182 test_name_server.clone(),
183 0,
184 RData::NULL(NULL::with(test_bytes.to_vec())),
185 ));
186
187 let bytes = message.to_vec().unwrap();
189 debug!("server sending response {i} to: {addr}");
190 assert_eq!(
191 server.send_to(&bytes, addr).expect("send failed"),
192 bytes.len()
193 );
194 debug!("server sent response {i}");
195 std::thread::yield_now();
196 }
197 })
198 .unwrap();
199
200 let stream = UdpClientStream::builder(server_addr, provider)
206 .with_timeout(Some(Duration::from_millis(500)))
207 .build();
208 let mut stream = stream.await.ok().unwrap();
209 let mut worked_once = false;
210
211 for i in 0..send_recv_times {
212 let response_stream =
214 stream.send_message(DnsRequest::new(query.clone(), DnsRequestOptions::default()));
215 println!("client sending request {i}");
216 let response = match response_stream.first_answer().await {
217 Ok(response) => response,
218 Err(err) => {
219 println!("failed to get message: {err}");
220 continue;
221 }
222 };
223 println!("client got response {i}");
224
225 let response = Message::from(response);
226 if let RData::NULL(null) = response.answers()[0].data() {
227 assert_eq!(null.anything(), test_bytes);
228 } else {
229 panic!("not a NULL response");
230 }
231
232 worked_once = true;
233 }
234
235 succeeded.store(true, core::sync::atomic::Ordering::Relaxed);
236 server_handle.join().expect("server thread failed");
237
238 assert!(worked_once);
239}