solana_streamer/
packet.rs1use {
3 crate::{
4 recvmmsg::{recv_mmsg, NUM_RCVMMSGS},
5 socket::SocketAddrSpace,
6 },
7 std::{
8 io::Result,
9 net::UdpSocket,
10 time::{Duration, Instant},
11 },
12};
13pub use {
14 solana_perf::packet::{
15 to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS, PACKETS_PER_BATCH,
16 },
17 solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE},
18};
19
20pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait: Duration) -> Result<usize> {
21 let mut i = 0;
22 socket.set_nonblocking(false)?;
29 trace!("receiving on {}", socket.local_addr().unwrap());
30 let start = Instant::now();
31 loop {
32 batch.resize(
33 std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH),
34 Packet::default(),
35 );
36 match recv_mmsg(socket, &mut batch[i..]) {
37 Err(_) if i > 0 => {
38 if start.elapsed() > max_wait {
39 break;
40 }
41 }
42 Err(e) => {
43 trace!("recv_from err {:?}", e);
44 return Err(e);
45 }
46 Ok(npkts) => {
47 if i == 0 {
48 socket.set_nonblocking(true)?;
49 }
50 trace!("got {} packets", npkts);
51 i += npkts;
52 if start.elapsed() > max_wait || i >= PACKETS_PER_BATCH {
55 break;
56 }
57 }
58 }
59 }
60 batch.truncate(i);
61 Ok(i)
62}
63
64pub fn send_to(
65 batch: &PacketBatch,
66 socket: &UdpSocket,
67 socket_addr_space: &SocketAddrSpace,
68) -> Result<()> {
69 for p in batch.iter() {
70 let addr = p.meta().socket_addr();
71 if socket_addr_space.check(&addr) {
72 if let Some(data) = p.data(..) {
73 socket.send_to(data, addr)?;
74 }
75 }
76 }
77 Ok(())
78}
79
80#[cfg(test)]
81mod tests {
82 use {
83 super::*,
84 std::{
85 io,
86 io::Write,
87 net::{SocketAddr, UdpSocket},
88 },
89 };
90
91 #[test]
92 fn test_packets_set_addr() {
93 let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap();
95 let packets = vec![Packet::default()];
96 let mut packet_batch = PacketBatch::new(packets);
97 packet_batch.set_addr(&send_addr);
98 assert_eq!(packet_batch[0].meta().socket_addr(), send_addr);
99 }
100
101 #[test]
102 pub fn packet_send_recv() {
103 solana_logger::setup();
104 let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
105 let addr = recv_socket.local_addr().unwrap();
106 let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
107 let saddr = send_socket.local_addr().unwrap();
108
109 let packet_batch_size = 10;
110 let mut batch = PacketBatch::with_capacity(packet_batch_size);
111 batch.resize(packet_batch_size, Packet::default());
112
113 for m in batch.iter_mut() {
114 m.meta_mut().set_socket_addr(&addr);
115 m.meta_mut().size = PACKET_DATA_SIZE;
116 }
117 send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
118
119 batch
120 .iter_mut()
121 .for_each(|pkt| *pkt.meta_mut() = Meta::default());
122 let recvd = recv_from(
123 &mut batch,
124 &recv_socket,
125 Duration::from_millis(1), )
127 .unwrap();
128 assert_eq!(recvd, batch.len());
129
130 for m in batch.iter() {
131 assert_eq!(m.meta().size, PACKET_DATA_SIZE);
132 assert_eq!(m.meta().socket_addr(), saddr);
133 }
134 }
135
136 #[test]
137 pub fn debug_trait() {
138 write!(io::sink(), "{:?}", Packet::default()).unwrap();
139 write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
140 }
141
142 #[test]
143 fn test_packet_partial_eq() {
144 let mut p1 = Packet::default();
145 let mut p2 = Packet::default();
146
147 p1.meta_mut().size = 1;
148 p1.buffer_mut()[0] = 0;
149
150 p2.meta_mut().size = 1;
151 p2.buffer_mut()[0] = 0;
152
153 assert!(p1 == p2);
154
155 p2.buffer_mut()[0] = 4;
156 assert!(p1 != p2);
157 }
158
159 #[test]
160 fn test_packet_resize() {
161 solana_logger::setup();
162 let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
163 let addr = recv_socket.local_addr().unwrap();
164 let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
165 let mut batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
166 batch.resize(PACKETS_PER_BATCH, Packet::default());
167
168 for _ in 0..2 * PACKETS_PER_BATCH {
171 let batch_size = 1;
172 let mut batch = PacketBatch::with_capacity(batch_size);
173 batch.resize(batch_size, Packet::default());
174 for p in batch.iter_mut() {
175 p.meta_mut().set_socket_addr(&addr);
176 p.meta_mut().size = 1;
177 }
178 send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
179 }
180 let recvd = recv_from(
181 &mut batch,
182 &recv_socket,
183 Duration::from_millis(100), )
185 .unwrap();
186 assert_eq!(recvd, PACKETS_PER_BATCH);
188 assert_eq!(batch.capacity(), PACKETS_PER_BATCH);
189 }
190}