1use {
4 crate::{
5 bucket_api::BucketApi, bucket_stats::BucketMapStats, restart::Restart, MaxSearch, RefCount,
6 },
7 solana_pubkey::Pubkey,
8 std::{
9 convert::TryInto,
10 fmt::Debug,
11 fs::{self},
12 path::PathBuf,
13 sync::{Arc, Mutex},
14 },
15 tempfile::TempDir,
16};
17
18#[derive(Debug, Default, Clone)]
19pub struct BucketMapConfig {
20 pub max_buckets: usize,
21 pub drives: Option<Vec<PathBuf>>,
22 pub max_search: Option<MaxSearch>,
23 pub restart_config_file: Option<PathBuf>,
26}
27
28impl BucketMapConfig {
29 pub fn new(max_buckets: usize) -> BucketMapConfig {
32 BucketMapConfig {
33 max_buckets,
34 ..BucketMapConfig::default()
35 }
36 }
37}
38
39pub struct BucketMap<T: Clone + Copy + Debug + PartialEq + 'static> {
40 buckets: Vec<Arc<BucketApi<T>>>,
41 drives: Arc<Vec<PathBuf>>,
42 max_buckets_pow2: u8,
43 pub stats: Arc<BucketMapStats>,
44 pub temp_dir: Option<TempDir>,
45 pub erase_drives_on_drop: bool,
48}
49
50impl<T: Clone + Copy + Debug + PartialEq> Drop for BucketMap<T> {
51 fn drop(&mut self) {
52 if self.temp_dir.is_none() && self.erase_drives_on_drop {
53 BucketMap::<T>::erase_previous_drives(&self.drives);
54 }
55 }
56}
57
58impl<T: Clone + Copy + Debug + PartialEq> Debug for BucketMap<T> {
59 fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 Ok(())
61 }
62}
63
64pub(crate) const MAX_SEARCH_DEFAULT: MaxSearch = 32;
66
67#[derive(Debug)]
69pub enum BucketMapError {
70 DataNoSpace((u64, u8)),
73
74 IndexNoSpace(u64),
77}
78
79impl<T: Clone + Copy + Debug + PartialEq> BucketMap<T> {
80 pub fn new(config: BucketMapConfig) -> Self {
81 assert_ne!(
82 config.max_buckets, 0,
83 "Max number of buckets must be non-zero"
84 );
85 assert!(
86 config.max_buckets.is_power_of_two(),
87 "Max number of buckets must be a power of two"
88 );
89 let max_search = config.max_search.unwrap_or(MAX_SEARCH_DEFAULT);
90
91 let mut restart = Restart::get_restart_file(&config);
92
93 if restart.is_none() {
94 if let Some(drives) = config.drives.as_ref() {
97 Self::erase_previous_drives(drives);
98 }
99 }
100
101 let stats = Arc::default();
102
103 if restart.is_none() {
104 restart = Restart::new(&config);
105 }
106
107 let mut temp_dir = None;
108 let drives = config.drives.unwrap_or_else(|| {
109 temp_dir = Some(TempDir::new().unwrap());
110 vec![temp_dir.as_ref().unwrap().path().to_path_buf()]
111 });
112 let drives = Arc::new(drives);
113
114 let restart = restart.map(|restart| Arc::new(Mutex::new(restart)));
115
116 let restartable_buckets =
117 Restart::get_restartable_buckets(restart.as_ref(), &drives, config.max_buckets);
118
119 let buckets = restartable_buckets
120 .into_iter()
121 .map(|restartable_bucket| {
122 Arc::new(BucketApi::new(
123 Arc::clone(&drives),
124 max_search,
125 Arc::clone(&stats),
126 restartable_bucket,
127 ))
128 })
129 .collect();
130
131 let log2 = |x: usize| usize::BITS - x.leading_zeros() - 1;
133
134 Self {
135 buckets,
136 drives,
137 max_buckets_pow2: log2(config.max_buckets) as u8,
138 stats,
139 temp_dir,
140 erase_drives_on_drop: restart.is_none(),
142 }
143 }
144
145 fn erase_previous_drives(drives: &[PathBuf]) {
146 drives.iter().for_each(|folder| {
147 let _ = fs::remove_dir_all(folder);
148 let _ = fs::create_dir_all(folder);
149 })
150 }
151
152 pub fn num_buckets(&self) -> usize {
153 self.buckets.len()
154 }
155
156 pub fn read_value(&self, key: &Pubkey) -> Option<(Vec<T>, RefCount)> {
158 self.get_bucket(key).read_value(key)
159 }
160
161 pub fn delete_key(&self, key: &Pubkey) {
163 self.get_bucket(key).delete_key(key);
164 }
165
166 pub fn insert(&self, key: &Pubkey, value: (&[T], RefCount)) {
168 self.get_bucket(key).insert(key, value)
169 }
170
171 pub fn try_insert(&self, key: &Pubkey, value: (&[T], RefCount)) -> Result<(), BucketMapError> {
173 self.get_bucket(key).try_write(key, value)
174 }
175
176 pub fn update<F>(&self, key: &Pubkey, updatefn: F)
178 where
179 F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
180 {
181 self.get_bucket(key).update(key, updatefn)
182 }
183
184 pub fn get_bucket(&self, key: &Pubkey) -> &Arc<BucketApi<T>> {
185 self.get_bucket_from_index(self.bucket_ix(key))
186 }
187
188 pub fn get_bucket_from_index(&self, ix: usize) -> &Arc<BucketApi<T>> {
189 &self.buckets[ix]
190 }
191
192 pub fn bucket_ix(&self, key: &Pubkey) -> usize {
194 if self.max_buckets_pow2 > 0 {
195 let location = read_be_u64(key.as_ref());
196 (location >> (u64::BITS - self.max_buckets_pow2 as u32)) as usize
197 } else {
198 0
199 }
200 }
201}
202
203fn read_be_u64(input: &[u8]) -> u64 {
205 assert!(input.len() >= std::mem::size_of::<u64>());
206 u64::from_be_bytes(input[0..std::mem::size_of::<u64>()].try_into().unwrap())
207}
208
209#[cfg(test)]
210mod tests {
211 use {
212 super::*,
213 crate::index_entry::MAX_LEGAL_REFCOUNT,
214 rand::{thread_rng, Rng},
215 std::{collections::HashMap, sync::RwLock},
216 };
217
218 #[test]
219 fn bucket_map_test_insert() {
220 let key = Pubkey::new_unique();
221 let config = BucketMapConfig::new(1 << 1);
222 let index = BucketMap::new(config);
223 index.update(&key, |_| Some((vec![0], 1)));
224 assert_eq!(index.read_value(&key), Some((vec![0], 1)));
225 }
226
227 #[test]
228 fn bucket_map_test_insert2() {
229 for pass in 0..3 {
230 let key = Pubkey::new_unique();
231 let config = BucketMapConfig::new(1 << 1);
232 let index = BucketMap::new(config);
233 let bucket = index.get_bucket(&key);
234 if pass == 0 {
235 index.insert(&key, (&[0], 0));
236 } else {
237 let result = index.try_insert(&key, (&[0], 0));
238 assert!(result.is_err());
239 assert_eq!(index.read_value(&key), None);
240 if pass == 2 {
241 let result = index.try_insert(&key, (&[0], 0));
243 assert!(result.is_err());
244 assert_eq!(index.read_value(&key), None);
245 }
246 bucket.grow(result.unwrap_err());
247 let result = index.try_insert(&key, (&[0], 0));
248 assert!(result.is_ok());
249 }
250 assert_eq!(index.read_value(&key), Some((vec![0], 0)));
251 }
252 }
253
254 #[test]
255 fn bucket_map_test_update2() {
256 let key = Pubkey::new_unique();
257 let config = BucketMapConfig::new(1 << 1);
258 let index = BucketMap::new(config);
259 index.insert(&key, (&[0], 1));
260 assert_eq!(index.read_value(&key), Some((vec![0], 1)));
261 index.insert(&key, (&[1], 1));
262 assert_eq!(index.read_value(&key), Some((vec![1], 1)));
263 }
264
265 #[test]
266 fn bucket_map_test_update() {
267 let key = Pubkey::new_unique();
268 let config = BucketMapConfig::new(1 << 1);
269 let index = BucketMap::new(config);
270 index.update(&key, |_| Some((vec![0], 1)));
271 assert_eq!(index.read_value(&key), Some((vec![0], 1)));
272 index.update(&key, |_| Some((vec![1], 1)));
273 assert_eq!(index.read_value(&key), Some((vec![1], 1)));
274 }
275
276 #[test]
277 fn bucket_map_test_update_to_0_len() {
278 solana_logger::setup();
279 let key = Pubkey::new_unique();
280 let config = BucketMapConfig::new(1 << 1);
281 let index = BucketMap::new(config);
282 index.update(&key, |_| Some((vec![0], 1)));
283 assert_eq!(index.read_value(&key), Some((vec![0], 1)));
284 index.update(&key, |_| Some((vec![], 1)));
286 assert_eq!(index.read_value(&key), Some((vec![], 1)));
287 index.update(&key, |_| Some((vec![], 2)));
289 assert_eq!(index.read_value(&key), Some((vec![], 2)));
290 index.update(&key, |_| Some((vec![1], 2)));
292 assert_eq!(index.read_value(&key), Some((vec![1], 2)));
293 }
294
295 #[test]
296 fn bucket_map_test_delete() {
297 let config = BucketMapConfig::new(1 << 1);
298 let index = BucketMap::new(config);
299 for i in 0..10 {
300 let key = Pubkey::new_unique();
301 assert_eq!(index.read_value(&key), None);
302
303 index.update(&key, |_| Some((vec![i], 1)));
304 assert_eq!(index.read_value(&key), Some((vec![i], 1)));
305
306 index.delete_key(&key);
307 assert_eq!(index.read_value(&key), None);
308
309 index.update(&key, |_| Some((vec![i], 1)));
310 assert_eq!(index.read_value(&key), Some((vec![i], 1)));
311 index.delete_key(&key);
312 }
313 }
314
315 #[test]
316 fn bucket_map_test_delete_2() {
317 let config = BucketMapConfig::new(1 << 2);
318 let index = BucketMap::new(config);
319 for i in 0..100 {
320 let key = Pubkey::new_unique();
321 assert_eq!(index.read_value(&key), None);
322
323 index.update(&key, |_| Some((vec![i], 1)));
324 assert_eq!(index.read_value(&key), Some((vec![i], 1)));
325
326 index.delete_key(&key);
327 assert_eq!(index.read_value(&key), None);
328
329 index.update(&key, |_| Some((vec![i], 1)));
330 assert_eq!(index.read_value(&key), Some((vec![i], 1)));
331 index.delete_key(&key);
332 }
333 }
334
335 #[test]
336 fn bucket_map_test_n_drives() {
337 let config = BucketMapConfig::new(1 << 2);
338 let index = BucketMap::new(config);
339 for i in 0..100 {
340 let key = Pubkey::new_unique();
341 index.update(&key, |_| Some((vec![i], 1)));
342 assert_eq!(index.read_value(&key), Some((vec![i], 1)));
343 }
344 }
345 #[test]
346 fn bucket_map_test_grow_read() {
347 let config = BucketMapConfig::new(1 << 2);
348 let index = BucketMap::new(config);
349 let keys: Vec<Pubkey> = (0..100).map(|_| Pubkey::new_unique()).collect();
350 for k in 0..keys.len() {
351 let key = &keys[k];
352 let i = read_be_u64(key.as_ref());
353 index.update(key, |_| Some((vec![i], 1)));
354 assert_eq!(index.read_value(key), Some((vec![i], 1)));
355 for (ix, key) in keys.iter().enumerate() {
356 let i = read_be_u64(key.as_ref());
357 let expected = if ix <= k { Some((vec![i], 1)) } else { None };
359 assert_eq!(index.read_value(key), expected);
360 }
361 }
362 }
363
364 #[test]
365 fn bucket_map_test_n_delete() {
366 let config = BucketMapConfig::new(1 << 2);
367 let index = BucketMap::new(config);
368 let keys: Vec<Pubkey> = (0..20).map(|_| Pubkey::new_unique()).collect();
369 for key in keys.iter() {
370 let i = read_be_u64(key.as_ref());
371 index.update(key, |_| Some((vec![i], 1)));
372 assert_eq!(index.read_value(key), Some((vec![i], 1)));
373 }
374 for key in keys.iter() {
375 let i = read_be_u64(key.as_ref());
376 assert_eq!(index.read_value(key), Some((vec![i], 1)));
378 }
379 for k in 0..keys.len() {
380 let key = &keys[k];
381 index.delete_key(key);
382 assert_eq!(index.read_value(key), None);
383 for key in keys.iter().skip(k + 1) {
384 let i = read_be_u64(key.as_ref());
385 assert_eq!(index.read_value(key), Some((vec![i], 1)));
386 }
387 }
388 }
389
390 #[test]
391 fn hashmap_compare() {
392 use std::sync::Mutex;
393 solana_logger::setup();
394 for mut use_batch_insert in [true, false] {
395 let maps = (0..2)
396 .map(|max_buckets_pow2| {
397 let config = BucketMapConfig::new(1 << max_buckets_pow2);
398 BucketMap::new(config)
399 })
400 .collect::<Vec<_>>();
401 let hash_map = RwLock::new(HashMap::<Pubkey, (Vec<(usize, usize)>, RefCount)>::new());
402 let max_slot_list_len = 5;
403 let all_keys = Mutex::new(vec![]);
404
405 let gen_rand_value = || {
406 let count = thread_rng().gen_range(0..max_slot_list_len);
407 let v = (0..count)
408 .map(|x| (x as usize, x as usize ))
409 .collect::<Vec<_>>();
410 let range = thread_rng().gen_range(0..100);
411 let rc = if range < 50 {
413 1
414 } else if range < 60 {
415 0
416 } else if range < 70 {
417 2
418 } else {
419 thread_rng().gen_range(0..MAX_LEGAL_REFCOUNT)
420 };
421
422 (v, rc)
423 };
424
425 let get_key = || {
426 let mut keys = all_keys.lock().unwrap();
427 if keys.is_empty() {
428 return None;
429 }
430 let len = keys.len();
431 Some(keys.remove(thread_rng().gen_range(0..len)))
432 };
433 let return_key = |key| {
434 let mut keys = all_keys.lock().unwrap();
435 keys.push(key);
436 };
437
438 let verify = || {
439 let expected_count = hash_map.read().unwrap().len();
440 let mut maps = maps
441 .iter()
442 .map(|map| {
443 let total_entries = (0..map.num_buckets())
444 .map(|bucket| map.get_bucket_from_index(bucket).bucket_len() as usize)
445 .sum::<usize>();
446 assert_eq!(total_entries, expected_count);
447 let mut r = vec![];
448 for bin in 0..map.num_buckets() {
449 r.append(
450 &mut map.buckets[bin]
451 .items_in_range(&None::<&std::ops::RangeInclusive<Pubkey>>),
452 );
453 }
454 r
455 })
456 .collect::<Vec<_>>();
457 let hm = hash_map.read().unwrap();
458 for (k, v) in hm.iter() {
459 for map in maps.iter_mut() {
460 for i in 0..map.len() {
461 if k == &map[i].pubkey {
462 assert_eq!(map[i].slot_list, v.0);
463 assert_eq!(map[i].ref_count, v.1);
464 map.remove(i);
465 break;
466 }
467 }
468 }
469 }
470 for map in maps.iter() {
471 assert!(map.is_empty());
472 }
473 };
474 let mut initial: usize = 100; if use_batch_insert {
476 initial *= 3;
478 }
479
480 for i in 0..10000 {
483 initial = initial.saturating_sub(1);
484 if initial > 0 || thread_rng().gen_range(0..5) == 0 {
485 let mut to_add = 1;
487 if initial > 1 && use_batch_insert {
488 to_add = thread_rng().gen_range(1..(initial / 4).max(2));
489 initial -= to_add;
490 }
491
492 let additions = (0..to_add)
493 .map(|_| {
494 let k = solana_pubkey::new_rand();
495 let mut v = gen_rand_value();
496 if use_batch_insert {
497 v.1 = 1;
499 if v.0.len() > 1 {
501 v.0.truncate(1);
502 } else if v.0.is_empty() {
503 loop {
504 let mut new_v = gen_rand_value();
505 if !new_v.0.is_empty() {
506 v.0 = vec![new_v.0.pop().unwrap()];
507 break;
508 }
509 }
510 }
511 }
512 (k, v)
513 })
514 .collect::<Vec<_>>();
515
516 additions.clone().into_iter().for_each(|(k, v)| {
517 hash_map.write().unwrap().insert(k, v);
518 return_key(k);
519 });
520 let insert = thread_rng().gen_range(0..2) == 0;
521 maps.iter().for_each(|map| {
522 let batch_insert_now = map.buckets.len() == 1
524 && use_batch_insert
525 && thread_rng().gen_range(0..2) == 0;
526 if batch_insert_now {
527 let mut batch_additions = additions
529 .clone()
530 .into_iter()
531 .map(|(k, mut v)| (k, v.0.pop().unwrap()))
532 .collect::<Vec<_>>();
533 let mut duplicates = 0;
534 if batch_additions.len() > 1 && thread_rng().gen_range(0..2) == 0 {
535 let item_to_duplicate =
537 thread_rng().gen_range(0..batch_additions.len());
538 let where_to_insert_duplicate =
539 thread_rng().gen_range(0..batch_additions.len());
540 batch_additions.insert(
541 where_to_insert_duplicate,
542 batch_additions[item_to_duplicate],
543 );
544 duplicates += 1;
545 }
546 assert_eq!(
547 map.get_bucket_from_index(0)
548 .batch_insert_non_duplicates(&batch_additions,)
549 .len(),
550 duplicates
551 );
552 } else {
553 additions.clone().into_iter().for_each(|(k, v)| {
554 if insert {
555 map.insert(&k, (&v.0, v.1))
556 } else {
557 map.update(&k, |current| {
558 assert!(current.is_none());
559 Some(v.clone())
560 })
561 }
562 });
563 }
564 });
565
566 if use_batch_insert && initial == 1 {
567 use_batch_insert = false;
570 }
571 }
572 if use_batch_insert && initial > 0 {
573 continue;
575 }
576 if thread_rng().gen_range(0..10) == 0 {
577 if let Some(k) = get_key() {
579 let hm = hash_map.read().unwrap();
580 let (v, rc) = gen_rand_value();
581 let v_old = hm.get(&k);
582 let insert = thread_rng().gen_range(0..2) == 0;
583 maps.iter().for_each(|map| {
584 if insert {
585 map.insert(&k, (&v, rc))
586 } else {
587 map.update(&k, |current| {
588 assert_eq!(current, v_old.map(|(v, rc)| (&v[..], *rc)), "{k}");
589 Some((v.clone(), rc))
590 })
591 }
592 });
593 drop(hm);
594 hash_map.write().unwrap().insert(k, (v, rc));
595 return_key(k);
596 }
597 }
598 if thread_rng().gen_range(0..20) == 0 {
599 if let Some(k) = get_key() {
601 let mut hm = hash_map.write().unwrap();
602 hm.remove(&k);
603 maps.iter().for_each(|map| {
604 map.delete_key(&k);
605 });
606 }
607 }
608 if thread_rng().gen_range(0..10) == 0 {
609 if let Some(k) = get_key() {
611 let mut inc = thread_rng().gen_range(0..2) == 0;
612 let mut hm = hash_map.write().unwrap();
613 let (v, mut rc) = hm.get(&k).map(|(v, rc)| (v.to_vec(), *rc)).unwrap();
614 if !inc && rc == 0 {
615 inc = true;
617 }
618 rc = if inc { rc + 1 } else { rc - 1 };
619 hm.insert(k, (v.to_vec(), rc));
620 maps.iter().for_each(|map| {
621 map.update(&k, |current| Some((current.unwrap().0.to_vec(), rc)))
622 });
623
624 return_key(k);
625 }
626 }
627 if i % 1000 == 0 {
628 verify();
629 }
630 }
631 verify();
632 }
633 }
634}