1mod ip_echo_client;
3mod ip_echo_server;
4
5pub use ip_echo_server::{
6 ip_echo_server, IpEchoServer, DEFAULT_IP_ECHO_SERVER_THREADS, MAX_PORT_COUNT_PER_MESSAGE,
7 MINIMUM_IP_ECHO_SERVER_THREADS,
8};
9#[cfg(feature = "dev-context-only-utils")]
10use tokio::net::UdpSocket as TokioUdpSocket;
11use {
12 ip_echo_client::{ip_echo_server_request, ip_echo_server_request_with_binding},
13 ip_echo_server::IpEchoServerMessage,
14 log::*,
15 rand::{thread_rng, Rng},
16 socket2::{Domain, SockAddr, Socket, Type},
17 std::{
18 io::{self},
19 net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, ToSocketAddrs, UdpSocket},
20 },
21 url::Url,
22};
23
24pub struct UdpSocketPair {
26 pub addr: SocketAddr, pub receiver: UdpSocket, pub sender: UdpSocket, }
30
31pub type PortRange = (u16, u16);
32
33pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
34pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 17; pub(crate) const HEADER_LENGTH: usize = 4;
37pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23;
38
39pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, String> {
42 let fut = ip_echo_server_request(*ip_echo_server_addr, IpEchoServerMessage::default());
43 let rt = tokio::runtime::Builder::new_current_thread()
44 .enable_all()
45 .build()
46 .map_err(|e| e.to_string())?;
47 let resp = rt.block_on(fut).map_err(|e| e.to_string())?;
48 Ok(resp.address)
49}
50
51pub fn get_public_ip_addr_with_binding(
54 ip_echo_server_addr: &SocketAddr,
55 bind_address: IpAddr,
56) -> anyhow::Result<IpAddr> {
57 let fut = ip_echo_server_request_with_binding(
58 *ip_echo_server_addr,
59 IpEchoServerMessage::default(),
60 bind_address,
61 );
62 let rt = tokio::runtime::Builder::new_current_thread()
63 .enable_all()
64 .build()?;
65 let resp = rt.block_on(fut)?;
66 Ok(resp.address)
67}
68
69pub fn get_cluster_shred_version(ip_echo_server_addr: &SocketAddr) -> Result<u16, String> {
71 let fut = ip_echo_server_request(*ip_echo_server_addr, IpEchoServerMessage::default());
72 let rt = tokio::runtime::Builder::new_current_thread()
73 .enable_all()
74 .build()
75 .map_err(|e| e.to_string())?;
76 let resp = rt.block_on(fut).map_err(|e| e.to_string())?;
77 resp.shred_version
78 .ok_or_else(|| "IP echo server does not return a shred-version".to_owned())
79}
80
81pub fn get_cluster_shred_version_with_binding(
84 ip_echo_server_addr: &SocketAddr,
85 bind_address: IpAddr,
86) -> anyhow::Result<u16> {
87 let fut = ip_echo_server_request_with_binding(
88 *ip_echo_server_addr,
89 IpEchoServerMessage::default(),
90 bind_address,
91 );
92 let rt = tokio::runtime::Builder::new_current_thread()
93 .enable_all()
94 .build()?;
95 let resp = rt.block_on(fut)?;
96 resp.shred_version
97 .ok_or_else(|| anyhow::anyhow!("IP echo server does not return a shred-version"))
98}
99
100const MAX_PORT_VERIFY_THREADS: usize = 64;
103
104#[deprecated(
109 since = "2.2.0",
110 note = "use `verify_all_reachable_udp` and `verify_all_reachable_tcp` instead"
111)]
112pub fn verify_reachable_ports(
113 ip_echo_server_addr: &SocketAddr,
114 tcp_listeners: Vec<(u16, TcpListener)>,
115 udp_sockets: &[&UdpSocket],
116) -> bool {
117 verify_all_reachable_tcp(
118 ip_echo_server_addr,
119 tcp_listeners.into_iter().map(|(_, l)| l).collect(),
120 ) && verify_all_reachable_udp(ip_echo_server_addr, udp_sockets)
121}
122
123pub fn verify_all_reachable_udp(
128 ip_echo_server_addr: &SocketAddr,
129 udp_sockets: &[&UdpSocket],
130) -> bool {
131 let rt = tokio::runtime::Builder::new_current_thread()
132 .enable_all()
133 .max_blocking_threads(MAX_PORT_VERIFY_THREADS)
134 .build()
135 .expect("Tokio builder should be able to reliably create a current thread runtime");
136 let fut = ip_echo_client::verify_all_reachable_udp(
137 *ip_echo_server_addr,
138 udp_sockets,
139 ip_echo_client::TIMEOUT,
140 ip_echo_client::DEFAULT_RETRY_COUNT,
141 );
142 rt.block_on(fut)
143}
144
145pub fn verify_all_reachable_tcp(
150 ip_echo_server_addr: &SocketAddr,
151 tcp_listeners: Vec<TcpListener>,
152) -> bool {
153 let rt = tokio::runtime::Builder::new_current_thread()
154 .enable_all()
155 .max_blocking_threads(MAX_PORT_VERIFY_THREADS)
156 .build()
157 .expect("Tokio builder should be able to reliably create a current thread runtime");
158 let fut = ip_echo_client::verify_all_reachable_tcp(
159 *ip_echo_server_addr,
160 tcp_listeners,
161 ip_echo_client::TIMEOUT,
162 );
163 rt.block_on(fut)
164}
165
166pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr {
167 if let Some(addrstr) = optstr {
168 if let Ok(port) = addrstr.parse() {
169 let mut addr = default_addr;
170 addr.set_port(port);
171 addr
172 } else if let Ok(addr) = addrstr.parse() {
173 addr
174 } else {
175 default_addr
176 }
177 } else {
178 default_addr
179 }
180}
181
182pub fn parse_port_range(port_range: &str) -> Option<PortRange> {
183 let ports: Vec<&str> = port_range.split('-').collect();
184 if ports.len() != 2 {
185 return None;
186 }
187
188 let start_port = ports[0].parse();
189 let end_port = ports[1].parse();
190
191 if start_port.is_err() || end_port.is_err() {
192 return None;
193 }
194 let start_port = start_port.unwrap();
195 let end_port = end_port.unwrap();
196 if end_port < start_port {
197 return None;
198 }
199 Some((start_port, end_port))
200}
201
202pub fn parse_host(host: &str) -> Result<IpAddr, String> {
203 let parsed_url = Url::parse(&format!("http://{host}")).map_err(|e| e.to_string())?;
206 if parsed_url.port().is_some() {
207 return Err(format!("Expected port in URL: {host}"));
208 }
209
210 let ips: Vec<_> = (host, 0)
212 .to_socket_addrs()
213 .map_err(|err| err.to_string())?
214 .map(|socket_address| socket_address.ip())
215 .collect();
216 if ips.is_empty() {
217 Err(format!("Unable to resolve host: {host}"))
218 } else {
219 Ok(ips[0])
220 }
221}
222
223pub fn is_host(string: String) -> Result<(), String> {
224 parse_host(&string).map(|_| ())
225}
226
227pub fn parse_host_port(host_port: &str) -> Result<SocketAddr, String> {
228 let addrs: Vec<_> = host_port
229 .to_socket_addrs()
230 .map_err(|err| format!("Unable to resolve host {host_port}: {err}"))?
231 .collect();
232 if addrs.is_empty() {
233 Err(format!("Unable to resolve host: {host_port}"))
234 } else {
235 Ok(addrs[0])
236 }
237}
238
239pub fn is_host_port(string: String) -> Result<(), String> {
240 parse_host_port(&string).map(|_| ())
241}
242
243#[derive(Clone, Copy, Debug, Default)]
244pub struct SocketConfig {
245 reuseport: bool,
246 recv_buffer_size: Option<usize>,
247 send_buffer_size: Option<usize>,
248}
249
250impl SocketConfig {
251 pub fn reuseport(mut self, reuseport: bool) -> Self {
252 self.reuseport = reuseport;
253 self
254 }
255
256 pub fn recv_buffer_size(mut self, size: usize) -> Self {
263 self.recv_buffer_size = Some(size);
264 self
265 }
266
267 pub fn send_buffer_size(mut self, size: usize) -> Self {
274 self.send_buffer_size = Some(size);
275 self
276 }
277}
278
279#[cfg(any(windows, target_os = "ios"))]
280fn udp_socket_with_config(_config: SocketConfig) -> io::Result<Socket> {
281 let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
282 Ok(sock)
283}
284
285#[cfg(not(any(windows, target_os = "ios")))]
286fn udp_socket_with_config(config: SocketConfig) -> io::Result<Socket> {
287 use nix::sys::socket::{setsockopt, sockopt::ReusePort};
288 let SocketConfig {
289 reuseport,
290 recv_buffer_size,
291 send_buffer_size,
292 } = config;
293
294 let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
295
296 if let Some(recv_buffer_size) = recv_buffer_size {
298 sock.set_recv_buffer_size(recv_buffer_size)?;
299 }
300
301 if let Some(send_buffer_size) = send_buffer_size {
302 sock.set_send_buffer_size(send_buffer_size)?;
303 }
304
305 if reuseport {
306 setsockopt(&sock, ReusePort, &true).ok();
307 }
308
309 Ok(sock)
310}
311
312pub fn bind_common_in_range_with_config(
314 ip_addr: IpAddr,
315 range: PortRange,
316 config: SocketConfig,
317) -> io::Result<(u16, (UdpSocket, TcpListener))> {
318 for port in range.0..range.1 {
319 if let Ok((sock, listener)) = bind_common_with_config(ip_addr, port, config) {
320 return Result::Ok((sock.local_addr().unwrap().port(), (sock, listener)));
321 }
322 }
323
324 Err(io::Error::new(
325 io::ErrorKind::Other,
326 format!("No available TCP/UDP ports in {range:?}"),
327 ))
328}
329
330#[deprecated(
332 since = "2.2.0",
333 note = "use `bind_common_in_range_with_config` instead"
334)]
335pub fn bind_common_in_range(
336 ip_addr: IpAddr,
337 range: PortRange,
338) -> io::Result<(u16, (UdpSocket, TcpListener))> {
339 bind_common_in_range_with_config(ip_addr, range, SocketConfig::default())
340}
341
342pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> {
343 let config = SocketConfig::default();
344 bind_in_range_with_config(ip_addr, range, config)
345}
346
347pub fn bind_in_range_with_config(
348 ip_addr: IpAddr,
349 range: PortRange,
350 config: SocketConfig,
351) -> io::Result<(u16, UdpSocket)> {
352 let sock = udp_socket_with_config(config)?;
353
354 for port in range.0..range.1 {
355 let addr = SocketAddr::new(ip_addr, port);
356
357 if sock.bind(&SockAddr::from(addr)).is_ok() {
358 let sock: UdpSocket = sock.into();
359 return Result::Ok((sock.local_addr().unwrap().port(), sock));
360 }
361 }
362
363 Err(io::Error::new(
364 io::ErrorKind::Other,
365 format!("No available UDP ports in {range:?}"),
366 ))
367}
368
369pub fn bind_with_any_port_with_config(
370 ip_addr: IpAddr,
371 config: SocketConfig,
372) -> io::Result<UdpSocket> {
373 let sock = udp_socket_with_config(config)?;
374 let addr = SocketAddr::new(ip_addr, 0);
375 match sock.bind(&SockAddr::from(addr)) {
376 Ok(_) => Result::Ok(sock.into()),
377 Err(err) => Err(io::Error::new(
378 io::ErrorKind::Other,
379 format!("No available UDP port: {err}"),
380 )),
381 }
382}
383
384#[deprecated(since = "2.2.0", note = "use `bind_with_any_port_with_config` instead")]
385pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result<UdpSocket> {
386 bind_with_any_port_with_config(ip_addr, SocketConfig::default())
387}
388
389pub fn multi_bind_in_range_with_config(
391 ip_addr: IpAddr,
392 range: PortRange,
393 config: SocketConfig,
394 mut num: usize,
395) -> io::Result<(u16, Vec<UdpSocket>)> {
396 if !config.reuseport {
397 return Err(io::Error::new(
398 io::ErrorKind::InvalidInput,
399 "SocketConfig.reuseport must be true for multi_bind_in_range_with_config",
400 ));
401 }
402 if cfg!(windows) && num != 1 {
403 warn!(
405 "multi_bind_in_range_with_config() only supports 1 socket in windows ({} requested)",
406 num
407 );
408 num = 1;
409 }
410 let mut sockets = Vec::with_capacity(num);
411
412 const NUM_TRIES: usize = 100;
413 let mut port = 0;
414 let mut error = None;
415 for _ in 0..NUM_TRIES {
416 port = {
417 let (port, _) = bind_in_range(ip_addr, range)?;
418 port
419 }; for _ in 0..num {
422 let sock = bind_to_with_config(ip_addr, port, config);
423 if let Ok(sock) = sock {
424 sockets.push(sock);
425 } else {
426 error = Some(sock);
427 break;
428 }
429 }
430 if sockets.len() == num {
431 break;
432 } else {
433 sockets.clear();
434 }
435 }
436 if sockets.len() != num {
437 error.unwrap()?;
438 }
439 Ok((port, sockets))
440}
441
442#[deprecated(
445 since = "2.2.0",
446 note = "use `multi_bind_in_range_with_config` instead"
447)]
448#[allow(unused_mut)]
449pub fn multi_bind_in_range(
450 ip_addr: IpAddr,
451 range: PortRange,
452 mut num: usize,
453) -> io::Result<(u16, Vec<UdpSocket>)> {
454 let config = SocketConfig::default().reuseport(true);
455 multi_bind_in_range_with_config(ip_addr, range, config, num)
456}
457
458pub fn bind_to(ip_addr: IpAddr, port: u16, reuseport: bool) -> io::Result<UdpSocket> {
459 let config = SocketConfig::default().reuseport(reuseport);
460 bind_to_with_config(ip_addr, port, config)
461}
462
463#[cfg(feature = "dev-context-only-utils")]
464pub async fn bind_to_async(
465 ip_addr: IpAddr,
466 port: u16,
467 reuseport: bool,
468) -> io::Result<TokioUdpSocket> {
469 let config = SocketConfig::default().reuseport(reuseport);
470 let socket = bind_to_with_config_non_blocking(ip_addr, port, config)?;
471 TokioUdpSocket::from_std(socket)
472}
473
474pub fn bind_to_localhost() -> io::Result<UdpSocket> {
475 bind_to(
476 IpAddr::V4(Ipv4Addr::LOCALHOST),
477 0,
478 false,
479 )
480}
481
482#[cfg(feature = "dev-context-only-utils")]
483pub async fn bind_to_localhost_async() -> io::Result<TokioUdpSocket> {
484 bind_to_async(
485 IpAddr::V4(Ipv4Addr::LOCALHOST),
486 0,
487 false,
488 )
489 .await
490}
491
492pub fn bind_to_unspecified() -> io::Result<UdpSocket> {
493 bind_to(
494 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
495 0,
496 false,
497 )
498}
499
500#[cfg(feature = "dev-context-only-utils")]
501pub async fn bind_to_unspecified_async() -> io::Result<TokioUdpSocket> {
502 bind_to_async(
503 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
504 0,
505 false,
506 )
507 .await
508}
509
510pub fn bind_to_with_config(
511 ip_addr: IpAddr,
512 port: u16,
513 config: SocketConfig,
514) -> io::Result<UdpSocket> {
515 let sock = udp_socket_with_config(config)?;
516
517 let addr = SocketAddr::new(ip_addr, port);
518
519 sock.bind(&SockAddr::from(addr)).map(|_| sock.into())
520}
521
522pub fn bind_to_with_config_non_blocking(
523 ip_addr: IpAddr,
524 port: u16,
525 config: SocketConfig,
526) -> io::Result<UdpSocket> {
527 let sock = udp_socket_with_config(config)?;
528
529 let addr = SocketAddr::new(ip_addr, port);
530
531 sock.bind(&SockAddr::from(addr))?;
532 sock.set_nonblocking(true)?;
533 Ok(sock.into())
534}
535
536pub fn bind_common(ip_addr: IpAddr, port: u16) -> io::Result<(UdpSocket, TcpListener)> {
538 let config = SocketConfig::default();
539 bind_common_with_config(ip_addr, port, config)
540}
541
542pub fn bind_common_with_config(
544 ip_addr: IpAddr,
545 port: u16,
546 config: SocketConfig,
547) -> io::Result<(UdpSocket, TcpListener)> {
548 let sock = udp_socket_with_config(config)?;
549
550 let addr = SocketAddr::new(ip_addr, port);
551 let sock_addr = SockAddr::from(addr);
552 sock.bind(&sock_addr)
553 .and_then(|_| TcpListener::bind(addr).map(|listener| (sock.into(), listener)))
554}
555
556pub fn bind_two_in_range_with_offset(
557 ip_addr: IpAddr,
558 range: PortRange,
559 offset: u16,
560) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> {
561 let sock1_config = SocketConfig::default();
562 let sock2_config = SocketConfig::default();
563 bind_two_in_range_with_offset_and_config(ip_addr, range, offset, sock1_config, sock2_config)
564}
565
566pub fn bind_two_in_range_with_offset_and_config(
567 ip_addr: IpAddr,
568 range: PortRange,
569 offset: u16,
570 sock1_config: SocketConfig,
571 sock2_config: SocketConfig,
572) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> {
573 if range.1.saturating_sub(range.0) < offset {
574 return Err(io::Error::new(
575 io::ErrorKind::Other,
576 "range too small to find two ports with the correct offset".to_string(),
577 ));
578 }
579 for port in range.0..range.1 {
580 if let Ok(first_bind) = bind_to_with_config(ip_addr, port, sock1_config) {
581 if range.1.saturating_sub(port) >= offset {
582 if let Ok(second_bind) =
583 bind_to_with_config(ip_addr, port.saturating_add(offset), sock2_config)
584 {
585 return Ok((
586 (first_bind.local_addr().unwrap().port(), first_bind),
587 (second_bind.local_addr().unwrap().port(), second_bind),
588 ));
589 }
590 } else {
591 break;
592 }
593 }
594 }
595 Err(io::Error::new(
596 io::ErrorKind::Other,
597 "couldn't find two ports with the correct offset in range".to_string(),
598 ))
599}
600
601pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<u16> {
606 let range = range.0..range.1;
607 let mut next_port_to_try = range
608 .clone()
609 .cycle() .skip(thread_rng().gen_range(range.clone()) as usize) .take(range.len()) .peekable();
613 loop {
614 let port_to_try = next_port_to_try.next().unwrap(); match bind_common(ip_addr, port_to_try) {
616 Ok(_) => {
617 return Ok(port_to_try);
618 }
619 Err(err) => {
620 if next_port_to_try.peek().is_none() {
621 return Err(err);
622 }
623 }
624 }
625 }
626}
627
628pub fn bind_more_with_config(
629 socket: UdpSocket,
630 num: usize,
631 config: SocketConfig,
632) -> io::Result<Vec<UdpSocket>> {
633 let addr = socket.local_addr().unwrap();
634 let ip = addr.ip();
635 let port = addr.port();
636 std::iter::once(Ok(socket))
637 .chain((1..num).map(|_| bind_to_with_config(ip, port, config)))
638 .collect()
639}
640
641#[cfg(test)]
642mod tests {
643 use {
644 super::*,
645 ip_echo_server::IpEchoServerResponse,
646 itertools::Itertools,
647 std::{net::Ipv4Addr, time::Duration},
648 tokio::runtime::Runtime,
649 };
650
651 fn runtime() -> Runtime {
652 tokio::runtime::Builder::new_current_thread()
653 .enable_all()
654 .build()
655 .expect("Can not create a runtime")
656 }
657 #[test]
658 fn test_response_length() {
659 let resp = IpEchoServerResponse {
660 address: IpAddr::from([u16::MAX; 8]), shred_version: Some(u16::MAX),
662 };
663 let resp_size = bincode::serialized_size(&resp).unwrap();
664 assert_eq!(
665 IP_ECHO_SERVER_RESPONSE_LENGTH,
666 HEADER_LENGTH + resp_size as usize
667 );
668 }
669
670 #[test]
672 fn test_backward_compat() {
673 let address = IpAddr::from([
674 525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16,
675 ]);
676 let response = IpEchoServerResponse {
677 address,
678 shred_version: Some(42),
679 };
680 let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
681 bincode::serialize_into(&mut data[HEADER_LENGTH..], &response).unwrap();
682 data.truncate(HEADER_LENGTH + 20);
683 assert_eq!(
684 bincode::deserialize::<IpAddr>(&data[HEADER_LENGTH..]).unwrap(),
685 address
686 );
687 }
688
689 #[test]
691 fn test_forward_compat() {
692 let address = IpAddr::from([
693 525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16,
694 ]);
695 let mut data = [0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
696 bincode::serialize_into(&mut data[HEADER_LENGTH..], &address).unwrap();
697 let response: Result<IpEchoServerResponse, _> =
698 bincode::deserialize(&data[HEADER_LENGTH..]);
699 assert_eq!(
700 response.unwrap(),
701 IpEchoServerResponse {
702 address,
703 shred_version: None,
704 }
705 );
706 }
707
708 #[test]
709 fn test_parse_port_or_addr() {
710 let p1 = parse_port_or_addr(Some("9000"), SocketAddr::from(([1, 2, 3, 4], 1)));
711 assert_eq!(p1.port(), 9000);
712 let p2 = parse_port_or_addr(Some("127.0.0.1:7000"), SocketAddr::from(([1, 2, 3, 4], 1)));
713 assert_eq!(p2.port(), 7000);
714 let p2 = parse_port_or_addr(Some("hi there"), SocketAddr::from(([1, 2, 3, 4], 1)));
715 assert_eq!(p2.port(), 1);
716 let p3 = parse_port_or_addr(None, SocketAddr::from(([1, 2, 3, 4], 1)));
717 assert_eq!(p3.port(), 1);
718 }
719
720 #[test]
721 fn test_parse_port_range() {
722 assert_eq!(parse_port_range("garbage"), None);
723 assert_eq!(parse_port_range("1-"), None);
724 assert_eq!(parse_port_range("1-2"), Some((1, 2)));
725 assert_eq!(parse_port_range("1-2-3"), None);
726 assert_eq!(parse_port_range("2-1"), None);
727 }
728
729 #[test]
730 fn test_parse_host() {
731 parse_host("localhost:1234").unwrap_err();
732 parse_host("localhost").unwrap();
733 parse_host("127.0.0.0:1234").unwrap_err();
734 parse_host("127.0.0.0").unwrap();
735 }
736
737 #[test]
738 fn test_parse_host_port() {
739 parse_host_port("localhost:1234").unwrap();
740 parse_host_port("localhost").unwrap_err();
741 parse_host_port("127.0.0.0:1234").unwrap();
742 parse_host_port("127.0.0.0").unwrap_err();
743 }
744
745 #[test]
746 fn test_is_host_port() {
747 assert!(is_host_port("localhost:1234".to_string()).is_ok());
748 assert!(is_host_port("localhost".to_string()).is_err());
749 }
750
751 #[test]
752 fn test_bind() {
753 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
754 assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000);
755 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
756 let config = SocketConfig::default().reuseport(true);
757 let x = bind_to_with_config(ip_addr, 2002, config).unwrap();
758 let y = bind_to_with_config(ip_addr, 2002, config).unwrap();
759 assert_eq!(
760 x.local_addr().unwrap().port(),
761 y.local_addr().unwrap().port()
762 );
763 bind_to(ip_addr, 2002, false).unwrap_err();
764 bind_in_range(ip_addr, (2002, 2003)).unwrap_err();
765
766 let (port, v) = multi_bind_in_range_with_config(ip_addr, (2010, 2110), config, 10).unwrap();
767 for sock in &v {
768 assert_eq!(port, sock.local_addr().unwrap().port());
769 }
770 }
771
772 #[test]
773 fn test_bind_with_any_port() {
774 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
775 let config = SocketConfig::default();
776 let x = bind_with_any_port_with_config(ip_addr, config).unwrap();
777 let y = bind_with_any_port_with_config(ip_addr, config).unwrap();
778 assert_ne!(
779 x.local_addr().unwrap().port(),
780 y.local_addr().unwrap().port()
781 );
782 }
783
784 #[test]
785 fn test_bind_in_range_nil() {
786 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
787 bind_in_range(ip_addr, (2000, 2000)).unwrap_err();
788 bind_in_range(ip_addr, (2000, 1999)).unwrap_err();
789 }
790
791 #[test]
792 fn test_find_available_port_in_range() {
793 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
794 assert_eq!(
795 find_available_port_in_range(ip_addr, (3000, 3001)).unwrap(),
796 3000
797 );
798 let port = find_available_port_in_range(ip_addr, (3000, 3050)).unwrap();
799 assert!((3000..3050).contains(&port));
800
801 let _socket = bind_to(ip_addr, port, false).unwrap();
802 find_available_port_in_range(ip_addr, (port, port + 1)).unwrap_err();
803 }
804
805 #[test]
806 fn test_bind_common_in_range() {
807 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
808 let config = SocketConfig::default();
809 let (port, _sockets) =
810 bind_common_in_range_with_config(ip_addr, (3100, 3150), config).unwrap();
811 assert!((3100..3150).contains(&port));
812
813 bind_common_in_range_with_config(ip_addr, (port, port + 1), config).unwrap_err();
814 }
815
816 #[test]
817 fn test_get_public_ip_addr_none() {
818 solana_logger::setup();
819 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
820 let config = SocketConfig::default();
821 let (_server_port, (server_udp_socket, server_tcp_listener)) =
822 bind_common_in_range_with_config(ip_addr, (3200, 3300), config).unwrap();
823
824 let _runtime = ip_echo_server(
825 server_tcp_listener,
826 DEFAULT_IP_ECHO_SERVER_THREADS,
827 Some(42),
828 );
829
830 let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
831 assert_eq!(
832 get_public_ip_addr(&server_ip_echo_addr).unwrap(),
833 parse_host("127.0.0.1").unwrap(),
834 );
835 assert_eq!(get_cluster_shred_version(&server_ip_echo_addr).unwrap(), 42);
836 assert!(verify_all_reachable_tcp(&server_ip_echo_addr, vec![],));
837 assert!(verify_all_reachable_udp(&server_ip_echo_addr, &[],));
838 }
839
840 #[test]
841 fn test_get_public_ip_addr_reachable() {
842 solana_logger::setup();
843 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
844 let config = SocketConfig::default();
845 let (_server_port, (server_udp_socket, server_tcp_listener)) =
846 bind_common_in_range_with_config(ip_addr, (3200, 3300), config).unwrap();
847 let (_client_port, (client_udp_socket, client_tcp_listener)) =
848 bind_common_in_range_with_config(ip_addr, (3200, 3300), config).unwrap();
849
850 let _runtime = ip_echo_server(
851 server_tcp_listener,
852 DEFAULT_IP_ECHO_SERVER_THREADS,
853 Some(65535),
854 );
855
856 let ip_echo_server_addr = server_udp_socket.local_addr().unwrap();
857 assert_eq!(
858 get_public_ip_addr(&ip_echo_server_addr).unwrap(),
859 parse_host("127.0.0.1").unwrap(),
860 );
861 assert_eq!(
862 get_cluster_shred_version(&ip_echo_server_addr).unwrap(),
863 65535
864 );
865 assert!(verify_all_reachable_tcp(
866 &ip_echo_server_addr,
867 vec![client_tcp_listener],
868 ));
869 assert!(verify_all_reachable_udp(
870 &ip_echo_server_addr,
871 &[&client_udp_socket],
872 ));
873 }
874
875 #[test]
876 fn test_verify_ports_tcp_unreachable() {
877 solana_logger::setup();
878 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
879 let config = SocketConfig::default();
880 let (_server_port, (server_udp_socket, _server_tcp_listener)) =
881 bind_common_in_range_with_config(ip_addr, (3200, 3300), config).unwrap();
882
883 let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
885
886 let (_, (_client_udp_socket, client_tcp_listener)) =
887 bind_common_in_range_with_config(ip_addr, (3200, 3300), config).unwrap();
888
889 let rt = runtime();
890 assert!(!rt.block_on(ip_echo_client::verify_all_reachable_tcp(
891 server_ip_echo_addr,
892 vec![client_tcp_listener],
893 Duration::from_secs(2),
894 )));
895 }
896
897 #[test]
898 fn test_verify_ports_udp_unreachable() {
899 solana_logger::setup();
900 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
901 let config = SocketConfig::default();
902 let (_server_port, (server_udp_socket, _server_tcp_listener)) =
903 bind_common_in_range_with_config(ip_addr, (3200, 3300), config).unwrap();
904
905 let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
907
908 let (_correct_client_port, (client_udp_socket, _client_tcp_listener)) =
909 bind_common_in_range_with_config(ip_addr, (3200, 3300), config).unwrap();
910
911 let rt = runtime();
912 assert!(!rt.block_on(ip_echo_client::verify_all_reachable_udp(
913 server_ip_echo_addr,
914 &[&client_udp_socket],
915 Duration::from_secs(2),
916 3,
917 )));
918 }
919
920 #[test]
921 fn test_verify_many_ports_reachable() {
922 solana_logger::setup();
923 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
924 let config = SocketConfig::default();
925 let mut tcp_listeners = vec![];
926 let mut udp_sockets = vec![];
927
928 let (_server_port, (_, server_tcp_listener)) =
929 bind_common_in_range_with_config(ip_addr, (3200, 3300), config).unwrap();
930 for _ in 0..MAX_PORT_VERIFY_THREADS * 2 {
931 let (_client_port, (client_udp_socket, client_tcp_listener)) =
932 bind_common_in_range_with_config(
933 ip_addr,
934 (3300, 3300 + (MAX_PORT_VERIFY_THREADS * 3) as u16),
935 config,
936 )
937 .unwrap();
938 tcp_listeners.push(client_tcp_listener);
939 udp_sockets.push(client_udp_socket);
940 }
941
942 let ip_echo_server_addr = server_tcp_listener.local_addr().unwrap();
943
944 let _runtime = ip_echo_server(
945 server_tcp_listener,
946 DEFAULT_IP_ECHO_SERVER_THREADS,
947 Some(65535),
948 );
949
950 assert_eq!(
951 get_public_ip_addr(&ip_echo_server_addr).unwrap(),
952 parse_host("127.0.0.1").unwrap(),
953 );
954
955 let socket_refs = udp_sockets.iter().collect_vec();
956 assert!(verify_all_reachable_tcp(
957 &ip_echo_server_addr,
958 tcp_listeners,
959 ));
960 assert!(verify_all_reachable_udp(&ip_echo_server_addr, &socket_refs));
961 }
962
963 #[test]
964 fn test_bind_two_in_range_with_offset() {
965 solana_logger::setup();
966 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
967 let offset = 6;
968 if let Ok(((port1, _), (port2, _))) =
969 bind_two_in_range_with_offset(ip_addr, (1024, 65535), offset)
970 {
971 assert!(port2 == port1 + offset);
972 }
973 let offset = 42;
974 if let Ok(((port1, _), (port2, _))) =
975 bind_two_in_range_with_offset(ip_addr, (1024, 65535), offset)
976 {
977 assert!(port2 == port1 + offset);
978 }
979 assert!(bind_two_in_range_with_offset(ip_addr, (1024, 1044), offset).is_err());
980 }
981
982 #[test]
983 fn test_multi_bind_in_range_with_config_reuseport_disabled() {
984 let ip_addr: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
985 let config = SocketConfig::default(); let result = multi_bind_in_range_with_config(ip_addr, (2010, 2110), config, 2);
988
989 assert!(
990 result.is_err(),
991 "Expected an error when reuseport is not set to true"
992 );
993 }
994}