hickory_proto/tests/
tcp.rs

1use std::io::{Read, Write};
2use std::net::{IpAddr, SocketAddr};
3use std::sync::{atomic::AtomicBool, Arc};
4
5use futures_util::stream::StreamExt;
6
7use crate::tcp::{Connect, TcpClientStream, TcpStream};
8use crate::xfer::dns_handle::DnsStreamHandle;
9use crate::xfer::SerialMessage;
10use crate::Executor;
11
12const TEST_BYTES: &[u8; 8] = b"DEADBEEF";
13const TEST_BYTES_LEN: usize = 8;
14const SEND_RECV_TIMES: usize = 4;
15
16fn tcp_server_setup(
17    server_name: &str,
18    server_addr: IpAddr,
19) -> (Arc<AtomicBool>, std::thread::JoinHandle<()>, SocketAddr) {
20    let succeeded = Arc::new(AtomicBool::new(false));
21    let succeeded_clone = succeeded.clone();
22    std::thread::Builder::new()
23        .name("thread_killer".to_string())
24        .spawn(move || {
25            let succeeded = succeeded_clone;
26            for _ in 0..15 {
27                std::thread::sleep(std::time::Duration::from_secs(1));
28                if succeeded.load(std::sync::atomic::Ordering::Relaxed) {
29                    return;
30                }
31            }
32
33            println!("Thread Killer has been awoken, killing process");
34            std::process::exit(-1);
35        })
36        .expect("Thread spawning failed");
37
38    // TODO: need a timeout on listen
39    let server = std::net::TcpListener::bind(SocketAddr::new(server_addr, 0))
40        .expect("Unable to bind a TCP socket");
41    let server_addr = server.local_addr().unwrap();
42
43    // an in and out server
44    let server_handle = std::thread::Builder::new()
45        .name(server_name.to_string())
46        .spawn(move || {
47            let (mut socket, _) = server.accept().expect("accept failed");
48
49            socket
50                .set_read_timeout(Some(std::time::Duration::from_secs(5)))
51                .unwrap(); // should receive something within 5 seconds...
52            socket
53                .set_write_timeout(Some(std::time::Duration::from_secs(5)))
54                .unwrap(); // should receive something within 5 seconds...
55
56            for _ in 0..SEND_RECV_TIMES {
57                // wait for some bytes...
58                let mut len_bytes = [0_u8; 2];
59                socket
60                    .read_exact(&mut len_bytes)
61                    .expect("SERVER: receive failed");
62                let length =
63                    u16::from(len_bytes[0]) << 8 & 0xFF00 | u16::from(len_bytes[1]) & 0x00FF;
64                assert_eq!(length as usize, TEST_BYTES_LEN);
65
66                let mut buffer = [0_u8; TEST_BYTES_LEN];
67                socket.read_exact(&mut buffer).unwrap();
68
69                // println!("read bytes iter: {}", i);
70                assert_eq!(&buffer, TEST_BYTES);
71
72                // bounce them right back...
73                socket
74                    .write_all(&len_bytes)
75                    .expect("SERVER: send length failed");
76                socket
77                    .write_all(&buffer)
78                    .expect("SERVER: send buffer failed");
79                // println!("wrote bytes iter: {}", i);
80                std::thread::yield_now();
81            }
82        })
83        .unwrap();
84    (succeeded, server_handle, server_addr)
85}
86
87/// Test tcp_stream.
88pub fn tcp_stream_test<S: Connect, E: Executor>(server_addr: IpAddr, mut exec: E) {
89    let (succeeded, server_handle, server_addr) =
90        tcp_server_setup("test_tcp_stream:server", server_addr);
91
92    // setup the client, which is going to run on the testing thread...
93
94    // the tests should run within 5 seconds... right?
95    // TODO: add timeout here, so that test never hangs...
96    // let timeout = Timeout::new(Duration::from_secs(5));
97    let (stream, mut sender) = TcpStream::<S>::new(server_addr);
98
99    let mut stream = exec.block_on(stream).expect("run failed to get stream");
100
101    for _ in 0..SEND_RECV_TIMES {
102        // test once
103        sender
104            .send(SerialMessage::new(TEST_BYTES.to_vec(), server_addr))
105            .expect("send failed");
106
107        let (buffer, stream_tmp) = exec.block_on(stream.into_future());
108        stream = stream_tmp;
109        let message = buffer
110            .expect("no buffer received")
111            .expect("error receiving buffer");
112        assert_eq!(message.bytes(), TEST_BYTES);
113    }
114
115    succeeded.store(true, std::sync::atomic::Ordering::Relaxed);
116    server_handle.join().expect("server thread failed");
117}
118
119/// Test tcp_client_stream.
120pub fn tcp_client_stream_test<S: Connect, E: Executor>(server_addr: IpAddr, mut exec: E) {
121    let (succeeded, server_handle, server_addr) =
122        tcp_server_setup("test_tcp_client_stream:server", server_addr);
123
124    // setup the client, which is going to run on the testing thread...
125
126    // the tests should run within 5 seconds... right?
127    // TODO: add timeout here, so that test never hangs...
128    // let timeout = Timeout::new(Duration::from_secs(5));
129    let (stream, mut sender) = TcpClientStream::<S>::new(server_addr);
130
131    let mut stream = exec.block_on(stream).expect("run failed to get stream");
132
133    for _ in 0..SEND_RECV_TIMES {
134        // test once
135        sender
136            .send(SerialMessage::new(TEST_BYTES.to_vec(), server_addr))
137            .expect("send failed");
138        let (buffer, stream_tmp) = exec.block_on(stream.into_future());
139        stream = stream_tmp;
140        let buffer = buffer
141            .expect("no buffer received")
142            .expect("error receiving buffer");
143        assert_eq!(buffer.bytes(), TEST_BYTES);
144    }
145
146    succeeded.store(true, std::sync::atomic::Ordering::Relaxed);
147    server_handle.join().expect("server thread failed");
148}