solana_streamer/
packet.rs1use {
3 crate::{
4 recvmmsg::{recv_mmsg, NUM_RCVMMSGS},
5 socket::SocketAddrSpace,
6 },
7 std::{
8 io::Result,
9 net::UdpSocket,
10 time::{Duration, Instant},
11 },
12};
13pub use {
14 solana_packet::{Meta, Packet, PACKET_DATA_SIZE},
15 solana_perf::packet::{
16 to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS, PACKETS_PER_BATCH,
17 },
18};
19
20pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait: Duration) -> Result<usize> {
21 let mut i = 0;
22 socket.set_nonblocking(false)?;
29 trace!("receiving on {}", socket.local_addr().unwrap());
30 let start = Instant::now();
31 loop {
32 batch.resize(
33 std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH),
34 Packet::default(),
35 );
36 match recv_mmsg(socket, &mut batch[i..]) {
37 Err(_) if i > 0 => {
38 if start.elapsed() > max_wait {
39 break;
40 }
41 }
42 Err(e) => {
43 trace!("recv_from err {:?}", e);
44 return Err(e);
45 }
46 Ok(npkts) => {
47 if i == 0 {
48 socket.set_nonblocking(true)?;
49 }
50 trace!("got {} packets", npkts);
51 i += npkts;
52 if start.elapsed() > max_wait || i >= PACKETS_PER_BATCH {
55 break;
56 }
57 }
58 }
59 }
60 batch.truncate(i);
61 Ok(i)
62}
63
64pub fn send_to(
65 batch: &PacketBatch,
66 socket: &UdpSocket,
67 socket_addr_space: &SocketAddrSpace,
68) -> Result<()> {
69 for p in batch.iter() {
70 let addr = p.meta().socket_addr();
71 if socket_addr_space.check(&addr) {
72 if let Some(data) = p.data(..) {
73 socket.send_to(data, addr)?;
74 }
75 }
76 }
77 Ok(())
78}
79
80#[cfg(test)]
81mod tests {
82 use {
83 super::*,
84 solana_net_utils::bind_to_localhost,
85 std::{io, io::Write, net::SocketAddr},
86 };
87
88 #[test]
89 fn test_packets_set_addr() {
90 let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap();
92 let packets = vec![Packet::default()];
93 let mut packet_batch = PacketBatch::new(packets);
94 packet_batch.set_addr(&send_addr);
95 assert_eq!(packet_batch[0].meta().socket_addr(), send_addr);
96 }
97
98 #[test]
99 pub fn packet_send_recv() {
100 solana_logger::setup();
101 let recv_socket = bind_to_localhost().expect("bind");
102 let addr = recv_socket.local_addr().unwrap();
103 let send_socket = bind_to_localhost().expect("bind");
104 let saddr = send_socket.local_addr().unwrap();
105
106 let packet_batch_size = 10;
107 let mut batch = PacketBatch::with_capacity(packet_batch_size);
108 batch.resize(packet_batch_size, Packet::default());
109
110 for m in batch.iter_mut() {
111 m.meta_mut().set_socket_addr(&addr);
112 m.meta_mut().size = PACKET_DATA_SIZE;
113 }
114 send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
115
116 batch
117 .iter_mut()
118 .for_each(|pkt| *pkt.meta_mut() = Meta::default());
119 let recvd = recv_from(
120 &mut batch,
121 &recv_socket,
122 Duration::from_millis(1), )
124 .unwrap();
125 assert_eq!(recvd, batch.len());
126
127 for m in batch.iter() {
128 assert_eq!(m.meta().size, PACKET_DATA_SIZE);
129 assert_eq!(m.meta().socket_addr(), saddr);
130 }
131 }
132
133 #[test]
134 pub fn debug_trait() {
135 write!(io::sink(), "{:?}", Packet::default()).unwrap();
136 write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
137 }
138
139 #[test]
140 fn test_packet_partial_eq() {
141 let mut p1 = Packet::default();
142 let mut p2 = Packet::default();
143
144 p1.meta_mut().size = 1;
145 p1.buffer_mut()[0] = 0;
146
147 p2.meta_mut().size = 1;
148 p2.buffer_mut()[0] = 0;
149
150 assert!(p1 == p2);
151
152 p2.buffer_mut()[0] = 4;
153 assert!(p1 != p2);
154 }
155
156 #[test]
157 fn test_packet_resize() {
158 solana_logger::setup();
159 let recv_socket = bind_to_localhost().expect("bind");
160 let addr = recv_socket.local_addr().unwrap();
161 let send_socket = bind_to_localhost().expect("bind");
162 let mut batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
163 batch.resize(PACKETS_PER_BATCH, Packet::default());
164
165 for _ in 0..2 * PACKETS_PER_BATCH {
168 let batch_size = 1;
169 let mut batch = PacketBatch::with_capacity(batch_size);
170 batch.resize(batch_size, Packet::default());
171 for p in batch.iter_mut() {
172 p.meta_mut().set_socket_addr(&addr);
173 p.meta_mut().size = 1;
174 }
175 send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
176 }
177 let recvd = recv_from(
178 &mut batch,
179 &recv_socket,
180 Duration::from_millis(100), )
182 .unwrap();
183 assert_eq!(recvd, PACKETS_PER_BATCH);
185 assert_eq!(batch.capacity(), PACKETS_PER_BATCH);
186 }
187}