solana_bucket_map/
bucket_api.rs

1use {
2    crate::{
3        bucket::Bucket, bucket_item::BucketItem, bucket_map::BucketMapError,
4        bucket_stats::BucketMapStats, restart::RestartableBucket, MaxSearch, RefCount,
5    },
6    solana_pubkey::Pubkey,
7    std::{
8        ops::RangeBounds,
9        path::PathBuf,
10        sync::{
11            atomic::{AtomicU64, Ordering},
12            Arc, RwLock, RwLockWriteGuard,
13        },
14    },
15};
16
17type LockedBucket<T> = RwLock<Option<Bucket<T>>>;
18
19pub struct BucketApi<T: Clone + Copy + PartialEq + 'static> {
20    drives: Arc<Vec<PathBuf>>,
21    max_search: MaxSearch,
22    pub stats: Arc<BucketMapStats>,
23
24    bucket: LockedBucket<T>,
25    count: Arc<AtomicU64>,
26
27    /// keeps track of which index file this bucket is currently using
28    /// or at startup, which bucket file this bucket should initially use
29    restartable_bucket: RestartableBucket,
30}
31
32impl<T: Clone + Copy + PartialEq + std::fmt::Debug> BucketApi<T> {
33    pub(crate) fn new(
34        drives: Arc<Vec<PathBuf>>,
35        max_search: MaxSearch,
36        stats: Arc<BucketMapStats>,
37        restartable_bucket: RestartableBucket,
38    ) -> Self {
39        Self {
40            drives,
41            max_search,
42            stats,
43            bucket: RwLock::default(),
44            count: Arc::default(),
45            restartable_bucket,
46        }
47    }
48
49    /// Get the items for bucket
50    pub fn items_in_range<R>(&self, range: &Option<&R>) -> Vec<BucketItem<T>>
51    where
52        R: RangeBounds<Pubkey>,
53    {
54        self.bucket
55            .read()
56            .unwrap()
57            .as_ref()
58            .map(|bucket| bucket.items_in_range(range))
59            .unwrap_or_default()
60    }
61
62    /// Get the Pubkeys
63    pub fn keys(&self) -> Vec<Pubkey> {
64        self.bucket
65            .read()
66            .unwrap()
67            .as_ref()
68            .map_or_else(Vec::default, |bucket| bucket.keys())
69    }
70
71    /// Get the values for Pubkey `key`
72    pub fn read_value(&self, key: &Pubkey) -> Option<(Vec<T>, RefCount)> {
73        self.bucket.read().unwrap().as_ref().and_then(|bucket| {
74            bucket
75                .read_value(key)
76                .map(|(value, ref_count)| (value.to_vec(), ref_count))
77        })
78    }
79
80    pub fn bucket_len(&self) -> u64 {
81        self.count.load(Ordering::Relaxed)
82    }
83
84    pub fn delete_key(&self, key: &Pubkey) {
85        let mut bucket = self.get_write_bucket();
86        if let Some(bucket) = bucket.as_mut() {
87            bucket.delete_key(key)
88        }
89    }
90
91    /// allocate new bucket if not allocated yet
92    fn allocate_bucket(&self, bucket: &mut RwLockWriteGuard<Option<Bucket<T>>>) {
93        if bucket.is_none() {
94            **bucket = Some(Bucket::new(
95                Arc::clone(&self.drives),
96                self.max_search,
97                Arc::clone(&self.stats),
98                Arc::clone(&self.count),
99                self.restartable_bucket.clone(),
100            ));
101        }
102    }
103
104    fn get_write_bucket(&self) -> RwLockWriteGuard<Option<Bucket<T>>> {
105        let mut bucket = self.bucket.write().unwrap();
106        if let Some(bucket) = bucket.as_mut() {
107            bucket.handle_delayed_grows();
108        } else {
109            self.allocate_bucket(&mut bucket);
110        }
111        bucket
112    }
113
114    pub fn insert(&self, pubkey: &Pubkey, value: (&[T], RefCount)) {
115        let mut bucket = self.get_write_bucket();
116        bucket.as_mut().unwrap().insert(pubkey, value)
117    }
118
119    pub fn grow(&self, err: BucketMapError) {
120        // grows are special - they get a read lock and modify 'reallocated'
121        // the grown changes are applied the next time there is a write lock taken
122        if let Some(bucket) = self.bucket.read().unwrap().as_ref() {
123            bucket.grow(err)
124        }
125    }
126
127    /// caller can specify that the index needs to hold approximately `count` entries soon.
128    /// This gives a hint to the resizing algorithm and prevents repeated incremental resizes.
129    pub fn set_anticipated_count(&self, count: u64) {
130        let mut bucket = self.get_write_bucket();
131        bucket.as_mut().unwrap().set_anticipated_count(count);
132    }
133
134    /// batch insert of `items`. Assumption is a single slot list element and ref_count == 1.
135    /// For any pubkeys that already exist, the index in `items` of the failed insertion and the existing data (previously put in the index) are returned.
136    pub fn batch_insert_non_duplicates(&self, items: &[(Pubkey, T)]) -> Vec<(usize, T)> {
137        let mut bucket = self.get_write_bucket();
138        bucket.as_mut().unwrap().batch_insert_non_duplicates(items)
139    }
140
141    pub fn update<F>(&self, key: &Pubkey, updatefn: F)
142    where
143        F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
144    {
145        let mut bucket = self.get_write_bucket();
146        bucket.as_mut().unwrap().update(key, updatefn)
147    }
148
149    pub fn try_write(
150        &self,
151        pubkey: &Pubkey,
152        value: (&[T], RefCount),
153    ) -> Result<(), BucketMapError> {
154        let mut bucket = self.get_write_bucket();
155        bucket
156            .as_mut()
157            .unwrap()
158            .try_write(pubkey, value.0.iter(), value.0.len(), value.1)
159    }
160}