solana_perf/
packet.rs

1//! The `packet` module defines data structures and methods to pull data from the network.
2pub use solana_packet::{self, Meta, Packet, PacketFlags, PACKET_DATA_SIZE};
3use {
4    crate::{cuda_runtime::PinnedVec, recycler::Recycler},
5    bincode::config::Options,
6    rayon::prelude::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator},
7    serde::{de::DeserializeOwned, Deserialize, Serialize},
8    std::{
9        borrow::Borrow,
10        io::Read,
11        net::SocketAddr,
12        ops::{Index, IndexMut},
13        slice::{Iter, IterMut, SliceIndex},
14    },
15};
16
17pub const NUM_PACKETS: usize = 1024 * 8;
18
19pub const PACKETS_PER_BATCH: usize = 64;
20pub const NUM_RCVMMSGS: usize = 64;
21
22#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
23#[derive(Debug, Default, Clone, Serialize, Deserialize)]
24pub struct PacketBatch {
25    packets: PinnedVec<Packet>,
26}
27
28pub type PacketBatchRecycler = Recycler<PinnedVec<Packet>>;
29
30impl PacketBatch {
31    pub fn new(packets: Vec<Packet>) -> Self {
32        let packets = PinnedVec::from_vec(packets);
33        Self { packets }
34    }
35
36    pub fn with_capacity(capacity: usize) -> Self {
37        let packets = PinnedVec::with_capacity(capacity);
38        Self { packets }
39    }
40
41    pub fn new_pinned_with_capacity(capacity: usize) -> Self {
42        let mut batch = Self::with_capacity(capacity);
43        batch.packets.reserve_and_pin(capacity);
44        batch
45    }
46
47    pub fn new_unpinned_with_recycler(
48        recycler: &PacketBatchRecycler,
49        capacity: usize,
50        name: &'static str,
51    ) -> Self {
52        let mut packets = recycler.allocate(name);
53        packets.reserve(capacity);
54        Self { packets }
55    }
56
57    pub fn new_with_recycler(
58        recycler: &PacketBatchRecycler,
59        capacity: usize,
60        name: &'static str,
61    ) -> Self {
62        let mut packets = recycler.allocate(name);
63        packets.reserve_and_pin(capacity);
64        Self { packets }
65    }
66
67    pub fn new_with_recycler_data(
68        recycler: &PacketBatchRecycler,
69        name: &'static str,
70        mut packets: Vec<Packet>,
71    ) -> Self {
72        let mut batch = Self::new_with_recycler(recycler, packets.len(), name);
73        batch.packets.append(&mut packets);
74        batch
75    }
76
77    pub fn new_unpinned_with_recycler_data_and_dests<S, T>(
78        recycler: &PacketBatchRecycler,
79        name: &'static str,
80        dests_and_data: impl IntoIterator<Item = (S, T), IntoIter: ExactSizeIterator>,
81    ) -> Self
82    where
83        S: Borrow<SocketAddr>,
84        T: solana_packet::Encode,
85    {
86        let dests_and_data = dests_and_data.into_iter();
87        let mut batch = Self::new_unpinned_with_recycler(recycler, dests_and_data.len(), name);
88        batch
89            .packets
90            .resize(dests_and_data.len(), Packet::default());
91
92        for ((addr, data), packet) in dests_and_data.zip(batch.packets.iter_mut()) {
93            let addr = addr.borrow();
94            if !addr.ip().is_unspecified() && addr.port() != 0 {
95                if let Err(e) = Packet::populate_packet(packet, Some(addr), &data) {
96                    // TODO: This should never happen. Instead the caller should
97                    // break the payload into smaller messages, and here any errors
98                    // should be propagated.
99                    error!("Couldn't write to packet {:?}. Data skipped.", e);
100                    packet.meta_mut().set_discard(true);
101                }
102            } else {
103                trace!("Dropping packet, as destination is unknown");
104                packet.meta_mut().set_discard(true);
105            }
106        }
107        batch
108    }
109
110    pub fn new_unpinned_with_recycler_data(
111        recycler: &PacketBatchRecycler,
112        name: &'static str,
113        mut packets: Vec<Packet>,
114    ) -> Self {
115        let mut batch = Self::new_unpinned_with_recycler(recycler, packets.len(), name);
116        batch.packets.append(&mut packets);
117        batch
118    }
119
120    pub fn resize(&mut self, new_len: usize, value: Packet) {
121        self.packets.resize(new_len, value)
122    }
123
124    pub fn truncate(&mut self, len: usize) {
125        self.packets.truncate(len);
126    }
127
128    pub fn push(&mut self, packet: Packet) {
129        self.packets.push(packet);
130    }
131
132    pub fn set_addr(&mut self, addr: &SocketAddr) {
133        for p in self.iter_mut() {
134            p.meta_mut().set_socket_addr(addr);
135        }
136    }
137
138    pub fn len(&self) -> usize {
139        self.packets.len()
140    }
141
142    pub fn capacity(&self) -> usize {
143        self.packets.capacity()
144    }
145
146    pub fn is_empty(&self) -> bool {
147        self.packets.is_empty()
148    }
149
150    pub fn as_ptr(&self) -> *const Packet {
151        self.packets.as_ptr()
152    }
153
154    pub fn iter(&self) -> Iter<'_, Packet> {
155        self.packets.iter()
156    }
157
158    pub fn iter_mut(&mut self) -> IterMut<'_, Packet> {
159        self.packets.iter_mut()
160    }
161
162    /// See Vector::set_len() for more details
163    ///
164    /// # Safety
165    ///
166    /// - `new_len` must be less than or equal to [`self.capacity`].
167    /// - The elements at `old_len..new_len` must be initialized. Packet data
168    ///   will likely be overwritten when populating the packet, but the meta
169    ///   should specifically be initialized to known values.
170    pub unsafe fn set_len(&mut self, new_len: usize) {
171        self.packets.set_len(new_len);
172    }
173}
174
175impl<I: SliceIndex<[Packet]>> Index<I> for PacketBatch {
176    type Output = I::Output;
177
178    #[inline]
179    fn index(&self, index: I) -> &Self::Output {
180        &self.packets[index]
181    }
182}
183
184impl<I: SliceIndex<[Packet]>> IndexMut<I> for PacketBatch {
185    #[inline]
186    fn index_mut(&mut self, index: I) -> &mut Self::Output {
187        &mut self.packets[index]
188    }
189}
190
191impl<'a> IntoIterator for &'a PacketBatch {
192    type Item = &'a Packet;
193    type IntoIter = Iter<'a, Packet>;
194
195    fn into_iter(self) -> Self::IntoIter {
196        self.packets.iter()
197    }
198}
199
200impl<'a> IntoParallelIterator for &'a PacketBatch {
201    type Iter = rayon::slice::Iter<'a, Packet>;
202    type Item = &'a Packet;
203    fn into_par_iter(self) -> Self::Iter {
204        self.packets.par_iter()
205    }
206}
207
208impl<'a> IntoParallelIterator for &'a mut PacketBatch {
209    type Iter = rayon::slice::IterMut<'a, Packet>;
210    type Item = &'a mut Packet;
211    fn into_par_iter(self) -> Self::Iter {
212        self.packets.par_iter_mut()
213    }
214}
215
216impl From<PacketBatch> for Vec<Packet> {
217    fn from(batch: PacketBatch) -> Self {
218        batch.packets.into()
219    }
220}
221
222pub fn to_packet_batches<T: Serialize>(items: &[T], chunk_size: usize) -> Vec<PacketBatch> {
223    items
224        .chunks(chunk_size)
225        .map(|batch_items| {
226            let mut batch = PacketBatch::with_capacity(batch_items.len());
227            batch.resize(batch_items.len(), Packet::default());
228            for (item, packet) in batch_items.iter().zip(batch.packets.iter_mut()) {
229                Packet::populate_packet(packet, None, item).expect("serialize request");
230            }
231            batch
232        })
233        .collect()
234}
235
236#[cfg(test)]
237fn to_packet_batches_for_tests<T: Serialize>(items: &[T]) -> Vec<PacketBatch> {
238    to_packet_batches(items, NUM_PACKETS)
239}
240
241pub fn deserialize_from_with_limit<R, T>(reader: R) -> bincode::Result<T>
242where
243    R: Read,
244    T: DeserializeOwned,
245{
246    // with_limit causes pre-allocation size to be limited
247    // to prevent against memory exhaustion attacks.
248    bincode::options()
249        .with_limit(PACKET_DATA_SIZE as u64)
250        .with_fixint_encoding()
251        .allow_trailing_bytes()
252        .deserialize_from(reader)
253}
254
255#[cfg(test)]
256mod tests {
257    use {
258        super::*, solana_hash::Hash, solana_keypair::Keypair, solana_signer::Signer,
259        solana_system_transaction::transfer,
260    };
261
262    #[test]
263    fn test_to_packet_batches() {
264        let keypair = Keypair::new();
265        let hash = Hash::new_from_array([1; 32]);
266        let tx = transfer(&keypair, &keypair.pubkey(), 1, hash);
267        let rv = to_packet_batches_for_tests(&[tx.clone(); 1]);
268        assert_eq!(rv.len(), 1);
269        assert_eq!(rv[0].len(), 1);
270
271        #[allow(clippy::useless_vec)]
272        let rv = to_packet_batches_for_tests(&vec![tx.clone(); NUM_PACKETS]);
273        assert_eq!(rv.len(), 1);
274        assert_eq!(rv[0].len(), NUM_PACKETS);
275
276        #[allow(clippy::useless_vec)]
277        let rv = to_packet_batches_for_tests(&vec![tx; NUM_PACKETS + 1]);
278        assert_eq!(rv.len(), 2);
279        assert_eq!(rv[0].len(), NUM_PACKETS);
280        assert_eq!(rv[1].len(), 1);
281    }
282
283    #[test]
284    fn test_to_packets_pinning() {
285        let recycler = PacketBatchRecycler::default();
286        for i in 0..2 {
287            let _first_packets = PacketBatch::new_with_recycler(&recycler, i + 1, "first one");
288        }
289    }
290}