1pub use solana_packet::{self, Meta, Packet, PacketFlags, PACKET_DATA_SIZE};
3use {
4 crate::{cuda_runtime::PinnedVec, recycler::Recycler},
5 bincode::config::Options,
6 rayon::prelude::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator},
7 serde::{de::DeserializeOwned, Deserialize, Serialize},
8 std::{
9 borrow::Borrow,
10 io::Read,
11 net::SocketAddr,
12 ops::{Index, IndexMut},
13 slice::{Iter, IterMut, SliceIndex},
14 },
15};
16
17pub const NUM_PACKETS: usize = 1024 * 8;
18
19pub const PACKETS_PER_BATCH: usize = 64;
20pub const NUM_RCVMMSGS: usize = 64;
21
22#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
23#[derive(Debug, Default, Clone, Serialize, Deserialize)]
24pub struct PacketBatch {
25 packets: PinnedVec<Packet>,
26}
27
28pub type PacketBatchRecycler = Recycler<PinnedVec<Packet>>;
29
30impl PacketBatch {
31 pub fn new(packets: Vec<Packet>) -> Self {
32 let packets = PinnedVec::from_vec(packets);
33 Self { packets }
34 }
35
36 pub fn with_capacity(capacity: usize) -> Self {
37 let packets = PinnedVec::with_capacity(capacity);
38 Self { packets }
39 }
40
41 pub fn new_pinned_with_capacity(capacity: usize) -> Self {
42 let mut batch = Self::with_capacity(capacity);
43 batch.packets.reserve_and_pin(capacity);
44 batch
45 }
46
47 pub fn new_unpinned_with_recycler(
48 recycler: &PacketBatchRecycler,
49 capacity: usize,
50 name: &'static str,
51 ) -> Self {
52 let mut packets = recycler.allocate(name);
53 packets.reserve(capacity);
54 Self { packets }
55 }
56
57 pub fn new_with_recycler(
58 recycler: &PacketBatchRecycler,
59 capacity: usize,
60 name: &'static str,
61 ) -> Self {
62 let mut packets = recycler.allocate(name);
63 packets.reserve_and_pin(capacity);
64 Self { packets }
65 }
66
67 pub fn new_with_recycler_data(
68 recycler: &PacketBatchRecycler,
69 name: &'static str,
70 mut packets: Vec<Packet>,
71 ) -> Self {
72 let mut batch = Self::new_with_recycler(recycler, packets.len(), name);
73 batch.packets.append(&mut packets);
74 batch
75 }
76
77 pub fn new_unpinned_with_recycler_data_and_dests<S, T>(
78 recycler: &PacketBatchRecycler,
79 name: &'static str,
80 dests_and_data: impl IntoIterator<Item = (S, T), IntoIter: ExactSizeIterator>,
81 ) -> Self
82 where
83 S: Borrow<SocketAddr>,
84 T: solana_packet::Encode,
85 {
86 let dests_and_data = dests_and_data.into_iter();
87 let mut batch = Self::new_unpinned_with_recycler(recycler, dests_and_data.len(), name);
88 batch
89 .packets
90 .resize(dests_and_data.len(), Packet::default());
91
92 for ((addr, data), packet) in dests_and_data.zip(batch.packets.iter_mut()) {
93 let addr = addr.borrow();
94 if !addr.ip().is_unspecified() && addr.port() != 0 {
95 if let Err(e) = Packet::populate_packet(packet, Some(addr), &data) {
96 error!("Couldn't write to packet {:?}. Data skipped.", e);
100 packet.meta_mut().set_discard(true);
101 }
102 } else {
103 trace!("Dropping packet, as destination is unknown");
104 packet.meta_mut().set_discard(true);
105 }
106 }
107 batch
108 }
109
110 pub fn new_unpinned_with_recycler_data(
111 recycler: &PacketBatchRecycler,
112 name: &'static str,
113 mut packets: Vec<Packet>,
114 ) -> Self {
115 let mut batch = Self::new_unpinned_with_recycler(recycler, packets.len(), name);
116 batch.packets.append(&mut packets);
117 batch
118 }
119
120 pub fn resize(&mut self, new_len: usize, value: Packet) {
121 self.packets.resize(new_len, value)
122 }
123
124 pub fn truncate(&mut self, len: usize) {
125 self.packets.truncate(len);
126 }
127
128 pub fn push(&mut self, packet: Packet) {
129 self.packets.push(packet);
130 }
131
132 pub fn set_addr(&mut self, addr: &SocketAddr) {
133 for p in self.iter_mut() {
134 p.meta_mut().set_socket_addr(addr);
135 }
136 }
137
138 pub fn len(&self) -> usize {
139 self.packets.len()
140 }
141
142 pub fn capacity(&self) -> usize {
143 self.packets.capacity()
144 }
145
146 pub fn is_empty(&self) -> bool {
147 self.packets.is_empty()
148 }
149
150 pub fn as_ptr(&self) -> *const Packet {
151 self.packets.as_ptr()
152 }
153
154 pub fn iter(&self) -> Iter<'_, Packet> {
155 self.packets.iter()
156 }
157
158 pub fn iter_mut(&mut self) -> IterMut<'_, Packet> {
159 self.packets.iter_mut()
160 }
161
162 pub unsafe fn set_len(&mut self, new_len: usize) {
171 self.packets.set_len(new_len);
172 }
173}
174
175impl<I: SliceIndex<[Packet]>> Index<I> for PacketBatch {
176 type Output = I::Output;
177
178 #[inline]
179 fn index(&self, index: I) -> &Self::Output {
180 &self.packets[index]
181 }
182}
183
184impl<I: SliceIndex<[Packet]>> IndexMut<I> for PacketBatch {
185 #[inline]
186 fn index_mut(&mut self, index: I) -> &mut Self::Output {
187 &mut self.packets[index]
188 }
189}
190
191impl<'a> IntoIterator for &'a PacketBatch {
192 type Item = &'a Packet;
193 type IntoIter = Iter<'a, Packet>;
194
195 fn into_iter(self) -> Self::IntoIter {
196 self.packets.iter()
197 }
198}
199
200impl<'a> IntoParallelIterator for &'a PacketBatch {
201 type Iter = rayon::slice::Iter<'a, Packet>;
202 type Item = &'a Packet;
203 fn into_par_iter(self) -> Self::Iter {
204 self.packets.par_iter()
205 }
206}
207
208impl<'a> IntoParallelIterator for &'a mut PacketBatch {
209 type Iter = rayon::slice::IterMut<'a, Packet>;
210 type Item = &'a mut Packet;
211 fn into_par_iter(self) -> Self::Iter {
212 self.packets.par_iter_mut()
213 }
214}
215
216impl From<PacketBatch> for Vec<Packet> {
217 fn from(batch: PacketBatch) -> Self {
218 batch.packets.into()
219 }
220}
221
222pub fn to_packet_batches<T: Serialize>(items: &[T], chunk_size: usize) -> Vec<PacketBatch> {
223 items
224 .chunks(chunk_size)
225 .map(|batch_items| {
226 let mut batch = PacketBatch::with_capacity(batch_items.len());
227 batch.resize(batch_items.len(), Packet::default());
228 for (item, packet) in batch_items.iter().zip(batch.packets.iter_mut()) {
229 Packet::populate_packet(packet, None, item).expect("serialize request");
230 }
231 batch
232 })
233 .collect()
234}
235
236#[cfg(test)]
237fn to_packet_batches_for_tests<T: Serialize>(items: &[T]) -> Vec<PacketBatch> {
238 to_packet_batches(items, NUM_PACKETS)
239}
240
241pub fn deserialize_from_with_limit<R, T>(reader: R) -> bincode::Result<T>
242where
243 R: Read,
244 T: DeserializeOwned,
245{
246 bincode::options()
249 .with_limit(PACKET_DATA_SIZE as u64)
250 .with_fixint_encoding()
251 .allow_trailing_bytes()
252 .deserialize_from(reader)
253}
254
255#[cfg(test)]
256mod tests {
257 use {
258 super::*, solana_hash::Hash, solana_keypair::Keypair, solana_signer::Signer,
259 solana_system_transaction::transfer,
260 };
261
262 #[test]
263 fn test_to_packet_batches() {
264 let keypair = Keypair::new();
265 let hash = Hash::new_from_array([1; 32]);
266 let tx = transfer(&keypair, &keypair.pubkey(), 1, hash);
267 let rv = to_packet_batches_for_tests(&[tx.clone(); 1]);
268 assert_eq!(rv.len(), 1);
269 assert_eq!(rv[0].len(), 1);
270
271 #[allow(clippy::useless_vec)]
272 let rv = to_packet_batches_for_tests(&vec![tx.clone(); NUM_PACKETS]);
273 assert_eq!(rv.len(), 1);
274 assert_eq!(rv[0].len(), NUM_PACKETS);
275
276 #[allow(clippy::useless_vec)]
277 let rv = to_packet_batches_for_tests(&vec![tx; NUM_PACKETS + 1]);
278 assert_eq!(rv.len(), 2);
279 assert_eq!(rv[0].len(), NUM_PACKETS);
280 assert_eq!(rv[1].len(), 1);
281 }
282
283 #[test]
284 fn test_to_packets_pinning() {
285 let recycler = PacketBatchRecycler::default();
286 for i in 0..2 {
287 let _first_packets = PacketBatch::new_with_recycler(&recycler, i + 1, "first one");
288 }
289 }
290}