solana_streamer/
packet.rs1use {
3 crate::{
4 recvmmsg::{recv_mmsg, NUM_RCVMMSGS},
5 socket::SocketAddrSpace,
6 },
7 solana_metrics::inc_new_counter_debug,
8 std::{io::Result, net::UdpSocket, time::Instant},
9};
10pub use {
11 solana_perf::packet::{
12 to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS, PACKETS_PER_BATCH,
13 },
14 solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE},
15};
16
17pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) -> Result<usize> {
18 let mut i = 0;
19 socket.set_nonblocking(false)?;
26 trace!("receiving on {}", socket.local_addr().unwrap());
27 let start = Instant::now();
28 loop {
29 batch.resize(
30 std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH),
31 Packet::default(),
32 );
33 match recv_mmsg(socket, &mut batch[i..]) {
34 Err(_) if i > 0 => {
35 if start.elapsed().as_millis() as u64 > max_wait_ms {
36 break;
37 }
38 }
39 Err(e) => {
40 trace!("recv_from err {:?}", e);
41 return Err(e);
42 }
43 Ok(npkts) => {
44 if i == 0 {
45 socket.set_nonblocking(true)?;
46 }
47 trace!("got {} packets", npkts);
48 i += npkts;
49 if start.elapsed().as_millis() as u64 > max_wait_ms || i >= PACKETS_PER_BATCH {
52 break;
53 }
54 }
55 }
56 }
57 batch.truncate(i);
58 inc_new_counter_debug!("packets-recv_count", i);
59 Ok(i)
60}
61
62pub fn send_to(
63 batch: &PacketBatch,
64 socket: &UdpSocket,
65 socket_addr_space: &SocketAddrSpace,
66) -> Result<()> {
67 for p in batch.iter() {
68 let addr = p.meta.socket_addr();
69 if socket_addr_space.check(&addr) {
70 if let Some(data) = p.data(..) {
71 socket.send_to(data, addr)?;
72 }
73 }
74 }
75 Ok(())
76}
77
78#[cfg(test)]
79mod tests {
80 use {
81 super::*,
82 std::{
83 io,
84 io::Write,
85 net::{SocketAddr, UdpSocket},
86 },
87 };
88
89 #[test]
90 fn test_packets_set_addr() {
91 let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap();
93 let packets = vec![Packet::default()];
94 let mut packet_batch = PacketBatch::new(packets);
95 packet_batch.set_addr(&send_addr);
96 assert_eq!(packet_batch[0].meta.socket_addr(), send_addr);
97 }
98
99 #[test]
100 pub fn packet_send_recv() {
101 solana_logger::setup();
102 let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
103 let addr = recv_socket.local_addr().unwrap();
104 let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
105 let saddr = send_socket.local_addr().unwrap();
106
107 let packet_batch_size = 10;
108 let mut batch = PacketBatch::with_capacity(packet_batch_size);
109 batch.resize(packet_batch_size, Packet::default());
110
111 for m in batch.iter_mut() {
112 m.meta.set_socket_addr(&addr);
113 m.meta.size = PACKET_DATA_SIZE;
114 }
115 send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
116
117 batch.iter_mut().for_each(|pkt| pkt.meta = Meta::default());
118 let recvd = recv_from(&mut batch, &recv_socket, 1).unwrap();
119
120 assert_eq!(recvd, batch.len());
121
122 for m in batch.iter() {
123 assert_eq!(m.meta.size, PACKET_DATA_SIZE);
124 assert_eq!(m.meta.socket_addr(), saddr);
125 }
126 }
127
128 #[test]
129 pub fn debug_trait() {
130 write!(io::sink(), "{:?}", Packet::default()).unwrap();
131 write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
132 }
133
134 #[test]
135 fn test_packet_partial_eq() {
136 let mut p1 = Packet::default();
137 let mut p2 = Packet::default();
138
139 p1.meta.size = 1;
140 p1.buffer_mut()[0] = 0;
141
142 p2.meta.size = 1;
143 p2.buffer_mut()[0] = 0;
144
145 assert!(p1 == p2);
146
147 p2.buffer_mut()[0] = 4;
148 assert!(p1 != p2);
149 }
150
151 #[test]
152 fn test_packet_resize() {
153 solana_logger::setup();
154 let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
155 let addr = recv_socket.local_addr().unwrap();
156 let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
157 let mut batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
158 batch.resize(PACKETS_PER_BATCH, Packet::default());
159
160 for _ in 0..2 * PACKETS_PER_BATCH {
163 let batch_size = 1;
164 let mut batch = PacketBatch::with_capacity(batch_size);
165 batch.resize(batch_size, Packet::default());
166 for p in batch.iter_mut() {
167 p.meta.set_socket_addr(&addr);
168 p.meta.size = 1;
169 }
170 send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
171 }
172
173 let recvd = recv_from(&mut batch, &recv_socket, 100).unwrap();
174
175 assert_eq!(recvd, PACKETS_PER_BATCH);
177 assert_eq!(batch.capacity(), PACKETS_PER_BATCH);
178 }
179}