solana_bucket_map/
bucket_map.rs

1//! BucketMap is a mostly contention free concurrent map backed by MmapMut
2
3use {
4    crate::{
5        bucket_api::BucketApi, bucket_stats::BucketMapStats, restart::Restart, MaxSearch, RefCount,
6    },
7    solana_pubkey::Pubkey,
8    std::{
9        convert::TryInto,
10        fmt::Debug,
11        fs::{self},
12        path::PathBuf,
13        sync::{Arc, Mutex},
14    },
15    tempfile::TempDir,
16};
17
18#[derive(Debug, Default, Clone)]
19pub struct BucketMapConfig {
20    pub max_buckets: usize,
21    pub drives: Option<Vec<PathBuf>>,
22    pub max_search: Option<MaxSearch>,
23    /// A file with a known path where the current state of the bucket files on disk is saved as the index is running.
24    /// This file can be used to restore the index files as they existed prior to the process being stopped.
25    pub restart_config_file: Option<PathBuf>,
26}
27
28impl BucketMapConfig {
29    /// Create a new BucketMapConfig
30    /// NOTE: BucketMap requires that max_buckets is a power of two
31    pub fn new(max_buckets: usize) -> BucketMapConfig {
32        BucketMapConfig {
33            max_buckets,
34            ..BucketMapConfig::default()
35        }
36    }
37}
38
39pub struct BucketMap<T: Clone + Copy + Debug + PartialEq + 'static> {
40    buckets: Vec<Arc<BucketApi<T>>>,
41    drives: Arc<Vec<PathBuf>>,
42    max_buckets_pow2: u8,
43    pub stats: Arc<BucketMapStats>,
44    pub temp_dir: Option<TempDir>,
45    /// true if dropping self removes all folders.
46    /// This is primarily for test environments.
47    pub erase_drives_on_drop: bool,
48}
49
50impl<T: Clone + Copy + Debug + PartialEq> Drop for BucketMap<T> {
51    fn drop(&mut self) {
52        if self.temp_dir.is_none() && self.erase_drives_on_drop {
53            BucketMap::<T>::erase_previous_drives(&self.drives);
54        }
55    }
56}
57
58impl<T: Clone + Copy + Debug + PartialEq> Debug for BucketMap<T> {
59    fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        Ok(())
61    }
62}
63
64// this should be <= 1 << DEFAULT_CAPACITY or we end up searching the same items over and over - probably not a big deal since it is so small anyway
65pub(crate) const MAX_SEARCH_DEFAULT: MaxSearch = 32;
66
67/// used to communicate resize necessary and current size.
68#[derive(Debug)]
69pub enum BucketMapError {
70    /// (bucket_index, current_capacity_pow2)
71    /// Note that this is specific to data buckets, which grow in powers of 2
72    DataNoSpace((u64, u8)),
73
74    /// current_capacity_entries
75    /// Note that this is specific to index buckets, which can be 'Actual' sizes
76    IndexNoSpace(u64),
77}
78
79impl<T: Clone + Copy + Debug + PartialEq> BucketMap<T> {
80    pub fn new(config: BucketMapConfig) -> Self {
81        assert_ne!(
82            config.max_buckets, 0,
83            "Max number of buckets must be non-zero"
84        );
85        assert!(
86            config.max_buckets.is_power_of_two(),
87            "Max number of buckets must be a power of two"
88        );
89        let max_search = config.max_search.unwrap_or(MAX_SEARCH_DEFAULT);
90
91        let mut restart = Restart::get_restart_file(&config);
92
93        if restart.is_none() {
94            // If we were able to load a restart file from the previous run, then don't wipe the accounts index drives from last time.
95            // Unused files will be wiped by `get_restartable_buckets`
96            if let Some(drives) = config.drives.as_ref() {
97                Self::erase_previous_drives(drives);
98            }
99        }
100
101        let stats = Arc::default();
102
103        if restart.is_none() {
104            restart = Restart::new(&config);
105        }
106
107        let mut temp_dir = None;
108        let drives = config.drives.unwrap_or_else(|| {
109            temp_dir = Some(TempDir::new().unwrap());
110            vec![temp_dir.as_ref().unwrap().path().to_path_buf()]
111        });
112        let drives = Arc::new(drives);
113
114        let restart = restart.map(|restart| Arc::new(Mutex::new(restart)));
115
116        let restartable_buckets =
117            Restart::get_restartable_buckets(restart.as_ref(), &drives, config.max_buckets);
118
119        let buckets = restartable_buckets
120            .into_iter()
121            .map(|restartable_bucket| {
122                Arc::new(BucketApi::new(
123                    Arc::clone(&drives),
124                    max_search,
125                    Arc::clone(&stats),
126                    restartable_bucket,
127                ))
128            })
129            .collect();
130
131        // A simple log2 function that is correct if x is a power of two
132        let log2 = |x: usize| usize::BITS - x.leading_zeros() - 1;
133
134        Self {
135            buckets,
136            drives,
137            max_buckets_pow2: log2(config.max_buckets) as u8,
138            stats,
139            temp_dir,
140            // if we are keeping track of restart, then don't wipe the drives on drop
141            erase_drives_on_drop: restart.is_none(),
142        }
143    }
144
145    fn erase_previous_drives(drives: &[PathBuf]) {
146        drives.iter().for_each(|folder| {
147            let _ = fs::remove_dir_all(folder);
148            let _ = fs::create_dir_all(folder);
149        })
150    }
151
152    pub fn num_buckets(&self) -> usize {
153        self.buckets.len()
154    }
155
156    /// Get the values for Pubkey `key`
157    pub fn read_value(&self, key: &Pubkey) -> Option<(Vec<T>, RefCount)> {
158        self.get_bucket(key).read_value(key)
159    }
160
161    /// Delete the Pubkey `key`
162    pub fn delete_key(&self, key: &Pubkey) {
163        self.get_bucket(key).delete_key(key);
164    }
165
166    /// Update Pubkey `key`'s value with 'value'
167    pub fn insert(&self, key: &Pubkey, value: (&[T], RefCount)) {
168        self.get_bucket(key).insert(key, value)
169    }
170
171    /// Update Pubkey `key`'s value with 'value'
172    pub fn try_insert(&self, key: &Pubkey, value: (&[T], RefCount)) -> Result<(), BucketMapError> {
173        self.get_bucket(key).try_write(key, value)
174    }
175
176    /// Update Pubkey `key`'s value with function `updatefn`
177    pub fn update<F>(&self, key: &Pubkey, updatefn: F)
178    where
179        F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
180    {
181        self.get_bucket(key).update(key, updatefn)
182    }
183
184    pub fn get_bucket(&self, key: &Pubkey) -> &Arc<BucketApi<T>> {
185        self.get_bucket_from_index(self.bucket_ix(key))
186    }
187
188    pub fn get_bucket_from_index(&self, ix: usize) -> &Arc<BucketApi<T>> {
189        &self.buckets[ix]
190    }
191
192    /// Get the bucket index for Pubkey `key`
193    pub fn bucket_ix(&self, key: &Pubkey) -> usize {
194        if self.max_buckets_pow2 > 0 {
195            let location = read_be_u64(key.as_ref());
196            (location >> (u64::BITS - self.max_buckets_pow2 as u32)) as usize
197        } else {
198            0
199        }
200    }
201}
202
203/// Look at the first 8 bytes of the input and reinterpret them as a u64
204fn read_be_u64(input: &[u8]) -> u64 {
205    assert!(input.len() >= std::mem::size_of::<u64>());
206    u64::from_be_bytes(input[0..std::mem::size_of::<u64>()].try_into().unwrap())
207}
208
209#[cfg(test)]
210mod tests {
211    use {
212        super::*,
213        crate::index_entry::MAX_LEGAL_REFCOUNT,
214        rand::{thread_rng, Rng},
215        std::{collections::HashMap, sync::RwLock},
216    };
217
218    #[test]
219    fn bucket_map_test_insert() {
220        let key = Pubkey::new_unique();
221        let config = BucketMapConfig::new(1 << 1);
222        let index = BucketMap::new(config);
223        index.update(&key, |_| Some((vec![0], 1)));
224        assert_eq!(index.read_value(&key), Some((vec![0], 1)));
225    }
226
227    #[test]
228    fn bucket_map_test_insert2() {
229        for pass in 0..3 {
230            let key = Pubkey::new_unique();
231            let config = BucketMapConfig::new(1 << 1);
232            let index = BucketMap::new(config);
233            let bucket = index.get_bucket(&key);
234            if pass == 0 {
235                index.insert(&key, (&[0], 0));
236            } else {
237                let result = index.try_insert(&key, (&[0], 0));
238                assert!(result.is_err());
239                assert_eq!(index.read_value(&key), None);
240                if pass == 2 {
241                    // another call to try insert again - should still return an error
242                    let result = index.try_insert(&key, (&[0], 0));
243                    assert!(result.is_err());
244                    assert_eq!(index.read_value(&key), None);
245                }
246                bucket.grow(result.unwrap_err());
247                let result = index.try_insert(&key, (&[0], 0));
248                assert!(result.is_ok());
249            }
250            assert_eq!(index.read_value(&key), Some((vec![0], 0)));
251        }
252    }
253
254    #[test]
255    fn bucket_map_test_update2() {
256        let key = Pubkey::new_unique();
257        let config = BucketMapConfig::new(1 << 1);
258        let index = BucketMap::new(config);
259        index.insert(&key, (&[0], 1));
260        assert_eq!(index.read_value(&key), Some((vec![0], 1)));
261        index.insert(&key, (&[1], 1));
262        assert_eq!(index.read_value(&key), Some((vec![1], 1)));
263    }
264
265    #[test]
266    fn bucket_map_test_update() {
267        let key = Pubkey::new_unique();
268        let config = BucketMapConfig::new(1 << 1);
269        let index = BucketMap::new(config);
270        index.update(&key, |_| Some((vec![0], 1)));
271        assert_eq!(index.read_value(&key), Some((vec![0], 1)));
272        index.update(&key, |_| Some((vec![1], 1)));
273        assert_eq!(index.read_value(&key), Some((vec![1], 1)));
274    }
275
276    #[test]
277    fn bucket_map_test_update_to_0_len() {
278        solana_logger::setup();
279        let key = Pubkey::new_unique();
280        let config = BucketMapConfig::new(1 << 1);
281        let index = BucketMap::new(config);
282        index.update(&key, |_| Some((vec![0], 1)));
283        assert_eq!(index.read_value(&key), Some((vec![0], 1)));
284        // sets len to 0, updates in place
285        index.update(&key, |_| Some((vec![], 1)));
286        assert_eq!(index.read_value(&key), Some((vec![], 1)));
287        // sets len to 0, doesn't update in place - finds a new place, which causes us to no longer have an allocation in data
288        index.update(&key, |_| Some((vec![], 2)));
289        assert_eq!(index.read_value(&key), Some((vec![], 2)));
290        // sets len to 1, doesn't update in place - finds a new place
291        index.update(&key, |_| Some((vec![1], 2)));
292        assert_eq!(index.read_value(&key), Some((vec![1], 2)));
293    }
294
295    #[test]
296    fn bucket_map_test_delete() {
297        let config = BucketMapConfig::new(1 << 1);
298        let index = BucketMap::new(config);
299        for i in 0..10 {
300            let key = Pubkey::new_unique();
301            assert_eq!(index.read_value(&key), None);
302
303            index.update(&key, |_| Some((vec![i], 1)));
304            assert_eq!(index.read_value(&key), Some((vec![i], 1)));
305
306            index.delete_key(&key);
307            assert_eq!(index.read_value(&key), None);
308
309            index.update(&key, |_| Some((vec![i], 1)));
310            assert_eq!(index.read_value(&key), Some((vec![i], 1)));
311            index.delete_key(&key);
312        }
313    }
314
315    #[test]
316    fn bucket_map_test_delete_2() {
317        let config = BucketMapConfig::new(1 << 2);
318        let index = BucketMap::new(config);
319        for i in 0..100 {
320            let key = Pubkey::new_unique();
321            assert_eq!(index.read_value(&key), None);
322
323            index.update(&key, |_| Some((vec![i], 1)));
324            assert_eq!(index.read_value(&key), Some((vec![i], 1)));
325
326            index.delete_key(&key);
327            assert_eq!(index.read_value(&key), None);
328
329            index.update(&key, |_| Some((vec![i], 1)));
330            assert_eq!(index.read_value(&key), Some((vec![i], 1)));
331            index.delete_key(&key);
332        }
333    }
334
335    #[test]
336    fn bucket_map_test_n_drives() {
337        let config = BucketMapConfig::new(1 << 2);
338        let index = BucketMap::new(config);
339        for i in 0..100 {
340            let key = Pubkey::new_unique();
341            index.update(&key, |_| Some((vec![i], 1)));
342            assert_eq!(index.read_value(&key), Some((vec![i], 1)));
343        }
344    }
345    #[test]
346    fn bucket_map_test_grow_read() {
347        let config = BucketMapConfig::new(1 << 2);
348        let index = BucketMap::new(config);
349        let keys: Vec<Pubkey> = (0..100).map(|_| Pubkey::new_unique()).collect();
350        for k in 0..keys.len() {
351            let key = &keys[k];
352            let i = read_be_u64(key.as_ref());
353            index.update(key, |_| Some((vec![i], 1)));
354            assert_eq!(index.read_value(key), Some((vec![i], 1)));
355            for (ix, key) in keys.iter().enumerate() {
356                let i = read_be_u64(key.as_ref());
357                //debug!("READ: {:?} {}", key, i);
358                let expected = if ix <= k { Some((vec![i], 1)) } else { None };
359                assert_eq!(index.read_value(key), expected);
360            }
361        }
362    }
363
364    #[test]
365    fn bucket_map_test_n_delete() {
366        let config = BucketMapConfig::new(1 << 2);
367        let index = BucketMap::new(config);
368        let keys: Vec<Pubkey> = (0..20).map(|_| Pubkey::new_unique()).collect();
369        for key in keys.iter() {
370            let i = read_be_u64(key.as_ref());
371            index.update(key, |_| Some((vec![i], 1)));
372            assert_eq!(index.read_value(key), Some((vec![i], 1)));
373        }
374        for key in keys.iter() {
375            let i = read_be_u64(key.as_ref());
376            //debug!("READ: {:?} {}", key, i);
377            assert_eq!(index.read_value(key), Some((vec![i], 1)));
378        }
379        for k in 0..keys.len() {
380            let key = &keys[k];
381            index.delete_key(key);
382            assert_eq!(index.read_value(key), None);
383            for key in keys.iter().skip(k + 1) {
384                let i = read_be_u64(key.as_ref());
385                assert_eq!(index.read_value(key), Some((vec![i], 1)));
386            }
387        }
388    }
389
390    #[test]
391    fn hashmap_compare() {
392        use std::sync::Mutex;
393        solana_logger::setup();
394        for mut use_batch_insert in [true, false] {
395            let maps = (0..2)
396                .map(|max_buckets_pow2| {
397                    let config = BucketMapConfig::new(1 << max_buckets_pow2);
398                    BucketMap::new(config)
399                })
400                .collect::<Vec<_>>();
401            let hash_map = RwLock::new(HashMap::<Pubkey, (Vec<(usize, usize)>, RefCount)>::new());
402            let max_slot_list_len = 5;
403            let all_keys = Mutex::new(vec![]);
404
405            let gen_rand_value = || {
406                let count = thread_rng().gen_range(0..max_slot_list_len);
407                let v = (0..count)
408                    .map(|x| (x as usize, x as usize /*thread_rng().gen::<usize>()*/))
409                    .collect::<Vec<_>>();
410                let range = thread_rng().gen_range(0..100);
411                // pick ref counts that are useful and common
412                let rc = if range < 50 {
413                    1
414                } else if range < 60 {
415                    0
416                } else if range < 70 {
417                    2
418                } else {
419                    thread_rng().gen_range(0..MAX_LEGAL_REFCOUNT)
420                };
421
422                (v, rc)
423            };
424
425            let get_key = || {
426                let mut keys = all_keys.lock().unwrap();
427                if keys.is_empty() {
428                    return None;
429                }
430                let len = keys.len();
431                Some(keys.remove(thread_rng().gen_range(0..len)))
432            };
433            let return_key = |key| {
434                let mut keys = all_keys.lock().unwrap();
435                keys.push(key);
436            };
437
438            let verify = || {
439                let expected_count = hash_map.read().unwrap().len();
440                let mut maps = maps
441                    .iter()
442                    .map(|map| {
443                        let total_entries = (0..map.num_buckets())
444                            .map(|bucket| map.get_bucket_from_index(bucket).bucket_len() as usize)
445                            .sum::<usize>();
446                        assert_eq!(total_entries, expected_count);
447                        let mut r = vec![];
448                        for bin in 0..map.num_buckets() {
449                            r.append(
450                                &mut map.buckets[bin]
451                                    .items_in_range(&None::<&std::ops::RangeInclusive<Pubkey>>),
452                            );
453                        }
454                        r
455                    })
456                    .collect::<Vec<_>>();
457                let hm = hash_map.read().unwrap();
458                for (k, v) in hm.iter() {
459                    for map in maps.iter_mut() {
460                        for i in 0..map.len() {
461                            if k == &map[i].pubkey {
462                                assert_eq!(map[i].slot_list, v.0);
463                                assert_eq!(map[i].ref_count, v.1);
464                                map.remove(i);
465                                break;
466                            }
467                        }
468                    }
469                }
470                for map in maps.iter() {
471                    assert!(map.is_empty());
472                }
473            };
474            let mut initial: usize = 100; // put this many items in to start
475            if use_batch_insert {
476                // insert a lot more when inserting with batch to make sure we hit resizing during batch
477                initial *= 3;
478            }
479
480            // do random operations: insert, update, delete, add/unref in random order
481            // verify consistency between hashmap and all bucket maps
482            for i in 0..10000 {
483                initial = initial.saturating_sub(1);
484                if initial > 0 || thread_rng().gen_range(0..5) == 0 {
485                    // insert
486                    let mut to_add = 1;
487                    if initial > 1 && use_batch_insert {
488                        to_add = thread_rng().gen_range(1..(initial / 4).max(2));
489                        initial -= to_add;
490                    }
491
492                    let additions = (0..to_add)
493                        .map(|_| {
494                            let k = solana_pubkey::new_rand();
495                            let mut v = gen_rand_value();
496                            if use_batch_insert {
497                                // refcount has to be 1 to use batch insert
498                                v.1 = 1;
499                                // len has to be 1 to use batch insert
500                                if v.0.len() > 1 {
501                                    v.0.truncate(1);
502                                } else if v.0.is_empty() {
503                                    loop {
504                                        let mut new_v = gen_rand_value();
505                                        if !new_v.0.is_empty() {
506                                            v.0 = vec![new_v.0.pop().unwrap()];
507                                            break;
508                                        }
509                                    }
510                                }
511                            }
512                            (k, v)
513                        })
514                        .collect::<Vec<_>>();
515
516                    additions.clone().into_iter().for_each(|(k, v)| {
517                        hash_map.write().unwrap().insert(k, v);
518                        return_key(k);
519                    });
520                    let insert = thread_rng().gen_range(0..2) == 0;
521                    maps.iter().for_each(|map| {
522                        // batch insert can only work for the map with only 1 bucket so that we can batch add to a single bucket
523                        let batch_insert_now = map.buckets.len() == 1
524                            && use_batch_insert
525                            && thread_rng().gen_range(0..2) == 0;
526                        if batch_insert_now {
527                            // batch insert into the map with 1 bucket 50% of the time
528                            let mut batch_additions = additions
529                                .clone()
530                                .into_iter()
531                                .map(|(k, mut v)| (k, v.0.pop().unwrap()))
532                                .collect::<Vec<_>>();
533                            let mut duplicates = 0;
534                            if batch_additions.len() > 1 && thread_rng().gen_range(0..2) == 0 {
535                                // insert a duplicate sometimes
536                                let item_to_duplicate =
537                                    thread_rng().gen_range(0..batch_additions.len());
538                                let where_to_insert_duplicate =
539                                    thread_rng().gen_range(0..batch_additions.len());
540                                batch_additions.insert(
541                                    where_to_insert_duplicate,
542                                    batch_additions[item_to_duplicate],
543                                );
544                                duplicates += 1;
545                            }
546                            assert_eq!(
547                                map.get_bucket_from_index(0)
548                                    .batch_insert_non_duplicates(&batch_additions,)
549                                    .len(),
550                                duplicates
551                            );
552                        } else {
553                            additions.clone().into_iter().for_each(|(k, v)| {
554                                if insert {
555                                    map.insert(&k, (&v.0, v.1))
556                                } else {
557                                    map.update(&k, |current| {
558                                        assert!(current.is_none());
559                                        Some(v.clone())
560                                    })
561                                }
562                            });
563                        }
564                    });
565
566                    if use_batch_insert && initial == 1 {
567                        // done using batch insert once we have added the initial entries
568                        // now, the test can remove, update, addref, etc.
569                        use_batch_insert = false;
570                    }
571                }
572                if use_batch_insert && initial > 0 {
573                    // if we are using batch insert, it is illegal to update, delete, or addref/unref an account until all batch inserts are complete
574                    continue;
575                }
576                if thread_rng().gen_range(0..10) == 0 {
577                    // update
578                    if let Some(k) = get_key() {
579                        let hm = hash_map.read().unwrap();
580                        let (v, rc) = gen_rand_value();
581                        let v_old = hm.get(&k);
582                        let insert = thread_rng().gen_range(0..2) == 0;
583                        maps.iter().for_each(|map| {
584                            if insert {
585                                map.insert(&k, (&v, rc))
586                            } else {
587                                map.update(&k, |current| {
588                                    assert_eq!(current, v_old.map(|(v, rc)| (&v[..], *rc)), "{k}");
589                                    Some((v.clone(), rc))
590                                })
591                            }
592                        });
593                        drop(hm);
594                        hash_map.write().unwrap().insert(k, (v, rc));
595                        return_key(k);
596                    }
597                }
598                if thread_rng().gen_range(0..20) == 0 {
599                    // delete
600                    if let Some(k) = get_key() {
601                        let mut hm = hash_map.write().unwrap();
602                        hm.remove(&k);
603                        maps.iter().for_each(|map| {
604                            map.delete_key(&k);
605                        });
606                    }
607                }
608                if thread_rng().gen_range(0..10) == 0 {
609                    // add/unref
610                    if let Some(k) = get_key() {
611                        let mut inc = thread_rng().gen_range(0..2) == 0;
612                        let mut hm = hash_map.write().unwrap();
613                        let (v, mut rc) = hm.get(&k).map(|(v, rc)| (v.to_vec(), *rc)).unwrap();
614                        if !inc && rc == 0 {
615                            // can't decrement rc=0
616                            inc = true;
617                        }
618                        rc = if inc { rc + 1 } else { rc - 1 };
619                        hm.insert(k, (v.to_vec(), rc));
620                        maps.iter().for_each(|map| {
621                            map.update(&k, |current| Some((current.unwrap().0.to_vec(), rc)))
622                        });
623
624                        return_key(k);
625                    }
626                }
627                if i % 1000 == 0 {
628                    verify();
629                }
630            }
631            verify();
632        }
633    }
634}