1use {
4 crate::packet::PacketBatch,
5 ahash::RandomState,
6 rand::Rng,
7 std::{
8 hash::Hash,
9 iter::repeat_with,
10 marker::PhantomData,
11 sync::atomic::{AtomicU64, Ordering},
12 time::{Duration, Instant},
13 },
14};
15
16pub struct Deduper<const K: usize, T: ?Sized> {
17 num_bits: u64,
18 bits: Vec<AtomicU64>,
19 state: [RandomState; K],
20 clock: Instant,
21 popcount: AtomicU64, _phantom: PhantomData<T>,
23}
24
25impl<const K: usize, T: ?Sized + Hash> Deduper<K, T> {
26 pub fn new<R: Rng>(rng: &mut R, num_bits: u64) -> Self {
27 let size = num_bits.checked_add(63).unwrap() / 64;
28 let size = usize::try_from(size).unwrap();
29 Self {
30 num_bits,
31 state: std::array::from_fn(|_| new_random_state(rng)),
32 clock: Instant::now(),
33 bits: repeat_with(AtomicU64::default).take(size).collect(),
34 popcount: AtomicU64::default(),
35 _phantom: PhantomData::<T>,
36 }
37 }
38
39 fn false_positive_rate(&self) -> f64 {
40 let popcount = self.popcount.load(Ordering::Relaxed);
41 let ones_ratio = popcount.min(self.num_bits) as f64 / self.num_bits as f64;
42 ones_ratio.powi(K as i32)
43 }
44
45 pub fn maybe_reset<R: Rng>(
49 &mut self,
50 rng: &mut R,
51 false_positive_rate: f64,
52 reset_cycle: Duration,
53 ) -> bool {
54 assert!(0.0 < false_positive_rate && false_positive_rate < 1.0);
55 let saturated = self.false_positive_rate() >= false_positive_rate;
56 if saturated || self.clock.elapsed() >= reset_cycle {
57 self.state = std::array::from_fn(|_| new_random_state(rng));
58 self.clock = Instant::now();
59 self.bits.fill_with(AtomicU64::default);
60 self.popcount = AtomicU64::default();
61 }
62 saturated
63 }
64
65 #[must_use]
67 #[allow(clippy::arithmetic_side_effects)]
68 pub fn dedup(&self, data: &T) -> bool {
69 let mut out = true;
70 for random_state in &self.state {
71 let hash: u64 = random_state.hash_one(data) % self.num_bits;
72 let index = (hash >> 6) as usize;
73 let mask: u64 = 1u64 << (hash & 63);
74 let old = self.bits[index].fetch_or(mask, Ordering::Relaxed);
75 if old & mask == 0u64 {
76 self.popcount.fetch_add(1, Ordering::Relaxed);
77 out = false;
78 }
79 }
80 out
81 }
82}
83
84fn new_random_state<R: Rng>(rng: &mut R) -> RandomState {
85 RandomState::with_seeds(rng.gen(), rng.gen(), rng.gen(), rng.gen())
86}
87
88pub fn dedup_packets_and_count_discards<const K: usize>(
89 deduper: &Deduper<K, [u8]>,
90 batches: &mut [PacketBatch],
91) -> u64 {
92 batches
93 .iter_mut()
94 .flat_map(PacketBatch::iter_mut)
95 .map(|packet| {
96 if !packet.meta().discard()
97 && packet
98 .data(..)
99 .map(|data| deduper.dedup(data))
100 .unwrap_or(true)
101 {
102 packet.meta_mut().set_discard(true);
103 }
104 u64::from(packet.meta().discard())
105 })
106 .sum()
107}
108
109#[cfg(test)]
110#[allow(clippy::arithmetic_side_effects)]
111mod tests {
112 use {
113 super::*,
114 crate::{
115 packet::{to_packet_batches, Packet},
116 sigverify,
117 test_tx::test_tx,
118 },
119 rand::SeedableRng,
120 rand_chacha::ChaChaRng,
121 solana_packet::{Meta, PACKET_DATA_SIZE},
122 test_case::test_case,
123 };
124
125 #[test]
126 fn test_dedup_same() {
127 let tx = test_tx();
128
129 let mut batches =
130 to_packet_batches(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 128);
131 let packet_count = sigverify::count_packets_in_batches(&batches);
132 let mut rng = rand::thread_rng();
133 let filter = Deduper::<2, [u8]>::new(&mut rng, 63_999_979);
134 let discard = dedup_packets_and_count_discards(&filter, &mut batches) as usize;
135 assert_eq!(packet_count, discard + 1);
136 }
137
138 #[test]
139 fn test_dedup_diff() {
140 let mut rng = rand::thread_rng();
141 let mut filter = Deduper::<2, [u8]>::new(&mut rng, 63_999_979);
142 let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
143 let discard = dedup_packets_and_count_discards(&filter, &mut batches) as usize;
144 assert_eq!(discard, 0);
146 assert!(!filter.maybe_reset(
147 &mut rng,
148 0.001, Duration::from_millis(0), ));
151 for i in filter.bits {
152 assert_eq!(i.load(Ordering::Relaxed), 0);
153 }
154 }
155
156 fn get_capacity<const K: usize>(num_bits: u64, false_positive_rate: f64) -> u64 {
157 (num_bits as f64 * false_positive_rate.powf(1f64 / K as f64)) as u64
158 }
159
160 #[test]
161 #[ignore]
162 fn test_dedup_saturated() {
163 const NUM_BITS: u64 = 63_999_979;
164 const FALSE_POSITIVE_RATE: f64 = 0.001;
165 let mut rng = rand::thread_rng();
166 let mut filter = Deduper::<2, [u8]>::new(&mut rng, NUM_BITS);
167 let capacity = get_capacity::<2>(NUM_BITS, FALSE_POSITIVE_RATE);
168 let mut discard = 0;
169 assert!(filter.popcount.load(Ordering::Relaxed) < capacity);
170 for i in 0..1000 {
171 let mut batches =
172 to_packet_batches(&(0..1000).map(|_| test_tx()).collect::<Vec<_>>(), 128);
173 discard += dedup_packets_and_count_discards(&filter, &mut batches) as usize;
174 trace!("{} {}", i, discard);
175 if filter.popcount.load(Ordering::Relaxed) > capacity {
176 break;
177 }
178 }
179 assert!(filter.popcount.load(Ordering::Relaxed) > capacity);
180 assert!(filter.false_positive_rate() >= FALSE_POSITIVE_RATE);
181 assert!(filter.maybe_reset(
182 &mut rng,
183 FALSE_POSITIVE_RATE,
184 Duration::from_millis(0), ));
186 }
187
188 #[test]
189 fn test_dedup_false_positive() {
190 let mut rng = rand::thread_rng();
191 let filter = Deduper::<2, [u8]>::new(&mut rng, 63_999_979);
192 let mut discard = 0;
193 for i in 0..10 {
194 let mut batches =
195 to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
196 discard += dedup_packets_and_count_discards(&filter, &mut batches) as usize;
197 debug!("false positive rate: {}/{}", discard, i * 1024);
198 }
199 assert!(discard < 2);
201 }
202
203 #[test_case(63_999_979, 0.001, 2_023_857)]
204 #[test_case(622_401_961, 0.001, 19_682_078)]
205 #[test_case(622_401_979, 0.001, 19_682_078)]
206 #[test_case(629_145_593, 0.001, 19_895_330)]
207 #[test_case(632_455_543, 0.001, 20_000_000)]
208 #[test_case(637_534_199, 0.001, 20_160_601)]
209 #[test_case(622_401_961, 0.0001, 6_224_019)]
210 #[test_case(622_401_979, 0.0001, 6_224_019)]
211 #[test_case(629_145_593, 0.0001, 6_291_455)]
212 #[test_case(632_455_543, 0.0001, 6_324_555)]
213 #[test_case(637_534_199, 0.0001, 6_375_341)]
214 fn test_dedup_capacity(num_bits: u64, false_positive_rate: f64, capacity: u64) {
215 let mut rng = rand::thread_rng();
216 assert_eq!(get_capacity::<2>(num_bits, false_positive_rate), capacity);
217 let mut deduper = Deduper::<2, [u8]>::new(&mut rng, num_bits);
218 assert_eq!(deduper.false_positive_rate(), 0.0);
219 deduper.popcount.store(capacity, Ordering::Relaxed);
220 assert!(deduper.false_positive_rate() < false_positive_rate);
221 deduper.popcount.store(capacity + 1, Ordering::Relaxed);
222 assert!(deduper.false_positive_rate() >= false_positive_rate);
223 assert!(deduper.maybe_reset(
224 &mut rng,
225 false_positive_rate,
226 Duration::from_millis(0), ));
228 }
229
230 #[test_case([0xf9; 32], 3_199_997, 101_192, 51_414, 77, 101_083)]
231 #[test_case([0xdc; 32], 3_200_003, 101_192, 51_414, 64, 101_097)]
232 #[test_case([0xa5; 32], 6_399_971, 202_384, 102_828, 117, 202_257)]
233 #[test_case([0xdb; 32], 6_400_013, 202_386, 102_828, 135, 202_254)]
234 #[test_case([0xcd; 32], 12_799_987, 404_771, 205_655, 273, 404_521)]
235 #[test_case([0xc3; 32], 12_800_009, 404_771, 205_656, 283, 404_365)]
236 fn test_dedup_seeded(
237 seed: [u8; 32],
238 num_bits: u64,
239 capacity: u64,
240 num_packets: usize,
241 num_dups: usize,
242 popcount: u64,
243 ) {
244 const FALSE_POSITIVE_RATE: f64 = 0.001;
245 let mut rng = ChaChaRng::from_seed(seed);
246 let mut deduper = Deduper::<2, [u8]>::new(&mut rng, num_bits);
247 assert_eq!(get_capacity::<2>(num_bits, FALSE_POSITIVE_RATE), capacity);
248 let mut packet = Packet::new([0u8; PACKET_DATA_SIZE], Meta::default());
249 let mut dup_count = 0usize;
250 for _ in 0..num_packets {
251 let size = rng.gen_range(0..PACKET_DATA_SIZE);
252 packet.meta_mut().size = size;
253 rng.fill(&mut packet.buffer_mut()[0..size]);
254 if deduper.dedup(packet.data(..).unwrap()) {
255 dup_count += 1;
256 }
257 assert!(deduper.dedup(packet.data(..).unwrap()));
258 }
259 assert_eq!(dup_count, num_dups);
260 assert_eq!(deduper.popcount.load(Ordering::Relaxed), popcount);
261 assert!(deduper.false_positive_rate() < FALSE_POSITIVE_RATE);
262 assert!(!deduper.maybe_reset(
263 &mut rng,
264 FALSE_POSITIVE_RATE,
265 Duration::from_millis(0), ));
267 }
268}