1pub use solana_sdk::packet::{Meta, Packet, PacketFlags, PACKET_DATA_SIZE};
3use {
4 crate::{cuda_runtime::PinnedVec, recycler::Recycler},
5 bincode::config::Options,
6 rayon::prelude::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator},
7 serde::{de::DeserializeOwned, Deserialize, Serialize},
8 std::{
9 io::Read,
10 net::SocketAddr,
11 ops::{Index, IndexMut},
12 slice::{Iter, IterMut, SliceIndex},
13 },
14};
15
16pub const NUM_PACKETS: usize = 1024 * 8;
17
18pub const PACKETS_PER_BATCH: usize = 64;
19pub const NUM_RCVMMSGS: usize = 64;
20
21#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
22#[derive(Debug, Default, Clone, Serialize, Deserialize)]
23pub struct PacketBatch {
24 packets: PinnedVec<Packet>,
25}
26
27pub type PacketBatchRecycler = Recycler<PinnedVec<Packet>>;
28
29impl PacketBatch {
30 pub fn new(packets: Vec<Packet>) -> Self {
31 let packets = PinnedVec::from_vec(packets);
32 Self { packets }
33 }
34
35 pub fn with_capacity(capacity: usize) -> Self {
36 let packets = PinnedVec::with_capacity(capacity);
37 Self { packets }
38 }
39
40 pub fn new_pinned_with_capacity(capacity: usize) -> Self {
41 let mut batch = Self::with_capacity(capacity);
42 batch.packets.reserve_and_pin(capacity);
43 batch
44 }
45
46 pub fn new_unpinned_with_recycler(
47 recycler: &PacketBatchRecycler,
48 capacity: usize,
49 name: &'static str,
50 ) -> Self {
51 let mut packets = recycler.allocate(name);
52 packets.reserve(capacity);
53 Self { packets }
54 }
55
56 pub fn new_with_recycler(
57 recycler: &PacketBatchRecycler,
58 capacity: usize,
59 name: &'static str,
60 ) -> Self {
61 let mut packets = recycler.allocate(name);
62 packets.reserve_and_pin(capacity);
63 Self { packets }
64 }
65
66 pub fn new_with_recycler_data(
67 recycler: &PacketBatchRecycler,
68 name: &'static str,
69 mut packets: Vec<Packet>,
70 ) -> Self {
71 let mut batch = Self::new_with_recycler(recycler, packets.len(), name);
72 batch.packets.append(&mut packets);
73 batch
74 }
75
76 pub fn new_unpinned_with_recycler_data_and_dests<T: Serialize>(
77 recycler: &PacketBatchRecycler,
78 name: &'static str,
79 dests_and_data: &[(SocketAddr, T)],
80 ) -> Self {
81 let mut batch = Self::new_unpinned_with_recycler(recycler, dests_and_data.len(), name);
82 batch
83 .packets
84 .resize(dests_and_data.len(), Packet::default());
85
86 for ((addr, data), packet) in dests_and_data.iter().zip(batch.packets.iter_mut()) {
87 if !addr.ip().is_unspecified() && addr.port() != 0 {
88 if let Err(e) = Packet::populate_packet(packet, Some(addr), &data) {
89 error!("Couldn't write to packet {:?}. Data skipped.", e);
93 }
94 } else {
95 trace!("Dropping packet, as destination is unknown");
96 }
97 }
98 batch
99 }
100
101 pub fn new_unpinned_with_recycler_data(
102 recycler: &PacketBatchRecycler,
103 name: &'static str,
104 mut packets: Vec<Packet>,
105 ) -> Self {
106 let mut batch = Self::new_unpinned_with_recycler(recycler, packets.len(), name);
107 batch.packets.append(&mut packets);
108 batch
109 }
110
111 pub fn resize(&mut self, new_len: usize, value: Packet) {
112 self.packets.resize(new_len, value)
113 }
114
115 pub fn truncate(&mut self, len: usize) {
116 self.packets.truncate(len);
117 }
118
119 pub fn push(&mut self, packet: Packet) {
120 self.packets.push(packet);
121 }
122
123 pub fn set_addr(&mut self, addr: &SocketAddr) {
124 for p in self.iter_mut() {
125 p.meta_mut().set_socket_addr(addr);
126 }
127 }
128
129 pub fn len(&self) -> usize {
130 self.packets.len()
131 }
132
133 pub fn capacity(&self) -> usize {
134 self.packets.capacity()
135 }
136
137 pub fn is_empty(&self) -> bool {
138 self.packets.is_empty()
139 }
140
141 pub fn as_ptr(&self) -> *const Packet {
142 self.packets.as_ptr()
143 }
144
145 pub fn iter(&self) -> Iter<'_, Packet> {
146 self.packets.iter()
147 }
148
149 pub fn iter_mut(&mut self) -> IterMut<'_, Packet> {
150 self.packets.iter_mut()
151 }
152
153 pub unsafe fn set_len(&mut self, new_len: usize) {
162 self.packets.set_len(new_len);
163 }
164}
165
166impl<I: SliceIndex<[Packet]>> Index<I> for PacketBatch {
167 type Output = I::Output;
168
169 #[inline]
170 fn index(&self, index: I) -> &Self::Output {
171 &self.packets[index]
172 }
173}
174
175impl<I: SliceIndex<[Packet]>> IndexMut<I> for PacketBatch {
176 #[inline]
177 fn index_mut(&mut self, index: I) -> &mut Self::Output {
178 &mut self.packets[index]
179 }
180}
181
182impl<'a> IntoIterator for &'a PacketBatch {
183 type Item = &'a Packet;
184 type IntoIter = Iter<'a, Packet>;
185
186 fn into_iter(self) -> Self::IntoIter {
187 self.packets.iter()
188 }
189}
190
191impl<'a> IntoParallelIterator for &'a PacketBatch {
192 type Iter = rayon::slice::Iter<'a, Packet>;
193 type Item = &'a Packet;
194 fn into_par_iter(self) -> Self::Iter {
195 self.packets.par_iter()
196 }
197}
198
199impl<'a> IntoParallelIterator for &'a mut PacketBatch {
200 type Iter = rayon::slice::IterMut<'a, Packet>;
201 type Item = &'a mut Packet;
202 fn into_par_iter(self) -> Self::Iter {
203 self.packets.par_iter_mut()
204 }
205}
206
207impl From<PacketBatch> for Vec<Packet> {
208 fn from(batch: PacketBatch) -> Self {
209 batch.packets.into()
210 }
211}
212
213pub fn to_packet_batches<T: Serialize>(items: &[T], chunk_size: usize) -> Vec<PacketBatch> {
214 items
215 .chunks(chunk_size)
216 .map(|batch_items| {
217 let mut batch = PacketBatch::with_capacity(batch_items.len());
218 batch.resize(batch_items.len(), Packet::default());
219 for (item, packet) in batch_items.iter().zip(batch.packets.iter_mut()) {
220 Packet::populate_packet(packet, None, item).expect("serialize request");
221 }
222 batch
223 })
224 .collect()
225}
226
227#[cfg(test)]
228fn to_packet_batches_for_tests<T: Serialize>(items: &[T]) -> Vec<PacketBatch> {
229 to_packet_batches(items, NUM_PACKETS)
230}
231
232pub fn deserialize_from_with_limit<R, T>(reader: R) -> bincode::Result<T>
233where
234 R: Read,
235 T: DeserializeOwned,
236{
237 bincode::options()
240 .with_limit(PACKET_DATA_SIZE as u64)
241 .with_fixint_encoding()
242 .allow_trailing_bytes()
243 .deserialize_from(reader)
244}
245
246#[cfg(test)]
247mod tests {
248 use {
249 super::*,
250 solana_sdk::{
251 hash::Hash,
252 signature::{Keypair, Signer},
253 system_transaction,
254 },
255 };
256
257 #[test]
258 fn test_to_packet_batches() {
259 let keypair = Keypair::new();
260 let hash = Hash::new(&[1; 32]);
261 let tx = system_transaction::transfer(&keypair, &keypair.pubkey(), 1, hash);
262 let rv = to_packet_batches_for_tests(&[tx.clone(); 1]);
263 assert_eq!(rv.len(), 1);
264 assert_eq!(rv[0].len(), 1);
265
266 #[allow(clippy::useless_vec)]
267 let rv = to_packet_batches_for_tests(&vec![tx.clone(); NUM_PACKETS]);
268 assert_eq!(rv.len(), 1);
269 assert_eq!(rv[0].len(), NUM_PACKETS);
270
271 #[allow(clippy::useless_vec)]
272 let rv = to_packet_batches_for_tests(&vec![tx; NUM_PACKETS + 1]);
273 assert_eq!(rv.len(), 2);
274 assert_eq!(rv[0].len(), NUM_PACKETS);
275 assert_eq!(rv[1].len(), 1);
276 }
277
278 #[test]
279 fn test_to_packets_pinning() {
280 let recycler = PacketBatchRecycler::default();
281 for i in 0..2 {
282 let _first_packets = PacketBatch::new_with_recycler(&recycler, i + 1, "first one");
283 }
284 }
285}