solana_bucket_map/
bucket_api.rs

1use {
2    crate::{
3        bucket::Bucket, bucket_item::BucketItem, bucket_map::BucketMapError,
4        bucket_stats::BucketMapStats, MaxSearch, RefCount,
5    },
6    solana_sdk::pubkey::Pubkey,
7    std::{
8        ops::RangeBounds,
9        path::PathBuf,
10        sync::{
11            atomic::{AtomicU64, Ordering},
12            Arc, RwLock, RwLockWriteGuard,
13        },
14    },
15};
16
17type LockedBucket<T> = RwLock<Option<Bucket<T>>>;
18
19pub struct BucketApi<T: Clone + Copy> {
20    drives: Arc<Vec<PathBuf>>,
21    max_search: MaxSearch,
22    pub stats: Arc<BucketMapStats>,
23
24    bucket: LockedBucket<T>,
25    count: Arc<AtomicU64>,
26}
27
28impl<T: Clone + Copy> BucketApi<T> {
29    pub fn new(
30        drives: Arc<Vec<PathBuf>>,
31        max_search: MaxSearch,
32        stats: Arc<BucketMapStats>,
33    ) -> Self {
34        Self {
35            drives,
36            max_search,
37            stats,
38            bucket: RwLock::default(),
39            count: Arc::default(),
40        }
41    }
42
43    /// Get the items for bucket
44    pub fn items_in_range<R>(&self, range: &Option<&R>) -> Vec<BucketItem<T>>
45    where
46        R: RangeBounds<Pubkey>,
47    {
48        self.bucket
49            .read()
50            .unwrap()
51            .as_ref()
52            .map(|bucket| bucket.items_in_range(range))
53            .unwrap_or_default()
54    }
55
56    /// Get the Pubkeys
57    pub fn keys(&self) -> Vec<Pubkey> {
58        self.bucket
59            .read()
60            .unwrap()
61            .as_ref()
62            .map_or_else(Vec::default, |bucket| bucket.keys())
63    }
64
65    /// Get the values for Pubkey `key`
66    pub fn read_value(&self, key: &Pubkey) -> Option<(Vec<T>, RefCount)> {
67        self.bucket.read().unwrap().as_ref().and_then(|bucket| {
68            bucket
69                .read_value(key)
70                .map(|(value, ref_count)| (value.to_vec(), ref_count))
71        })
72    }
73
74    pub fn bucket_len(&self) -> u64 {
75        self.count.load(Ordering::Relaxed)
76    }
77
78    pub fn delete_key(&self, key: &Pubkey) {
79        let mut bucket = self.get_write_bucket();
80        if let Some(bucket) = bucket.as_mut() {
81            bucket.delete_key(key)
82        }
83    }
84
85    fn get_write_bucket(&self) -> RwLockWriteGuard<Option<Bucket<T>>> {
86        let mut bucket = self.bucket.write().unwrap();
87        if bucket.is_none() {
88            *bucket = Some(Bucket::new(
89                Arc::clone(&self.drives),
90                self.max_search,
91                Arc::clone(&self.stats),
92                Arc::clone(&self.count),
93            ));
94        } else {
95            let write = bucket.as_mut().unwrap();
96            write.handle_delayed_grows();
97        }
98        bucket
99    }
100
101    pub fn addref(&self, key: &Pubkey) -> Option<RefCount> {
102        self.get_write_bucket()
103            .as_mut()
104            .and_then(|bucket| bucket.addref(key))
105    }
106
107    pub fn unref(&self, key: &Pubkey) -> Option<RefCount> {
108        self.get_write_bucket()
109            .as_mut()
110            .and_then(|bucket| bucket.unref(key))
111    }
112
113    pub fn insert(&self, pubkey: &Pubkey, value: (&[T], RefCount)) {
114        let mut bucket = self.get_write_bucket();
115        bucket.as_mut().unwrap().insert(pubkey, value)
116    }
117
118    pub fn grow(&self, err: BucketMapError) {
119        // grows are special - they get a read lock and modify 'reallocated'
120        // the grown changes are applied the next time there is a write lock taken
121        if let Some(bucket) = self.bucket.read().unwrap().as_ref() {
122            bucket.grow(err)
123        }
124    }
125
126    pub fn update<F>(&self, key: &Pubkey, updatefn: F)
127    where
128        F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
129    {
130        let mut bucket = self.get_write_bucket();
131        bucket.as_mut().unwrap().update(key, updatefn)
132    }
133
134    pub fn try_write(
135        &self,
136        pubkey: &Pubkey,
137        value: (&[T], RefCount),
138    ) -> Result<(), BucketMapError> {
139        let mut bucket = self.get_write_bucket();
140        bucket.as_mut().unwrap().try_write(pubkey, value.0, value.1)
141    }
142}