solana_bucket_map/
bucket_api.rs1use {
2 crate::{
3 bucket::Bucket, bucket_item::BucketItem, bucket_map::BucketMapError,
4 bucket_stats::BucketMapStats, restart::RestartableBucket, MaxSearch, RefCount,
5 },
6 solana_pubkey::Pubkey,
7 std::{
8 ops::RangeBounds,
9 path::PathBuf,
10 sync::{
11 atomic::{AtomicU64, Ordering},
12 Arc, RwLock, RwLockWriteGuard,
13 },
14 },
15};
16
17type LockedBucket<T> = RwLock<Option<Bucket<T>>>;
18
19pub struct BucketApi<T: Clone + Copy + PartialEq + 'static> {
20 drives: Arc<Vec<PathBuf>>,
21 max_search: MaxSearch,
22 pub stats: Arc<BucketMapStats>,
23
24 bucket: LockedBucket<T>,
25 count: Arc<AtomicU64>,
26
27 restartable_bucket: RestartableBucket,
30}
31
32impl<T: Clone + Copy + PartialEq + std::fmt::Debug> BucketApi<T> {
33 pub(crate) fn new(
34 drives: Arc<Vec<PathBuf>>,
35 max_search: MaxSearch,
36 stats: Arc<BucketMapStats>,
37 restartable_bucket: RestartableBucket,
38 ) -> Self {
39 Self {
40 drives,
41 max_search,
42 stats,
43 bucket: RwLock::default(),
44 count: Arc::default(),
45 restartable_bucket,
46 }
47 }
48
49 pub fn items_in_range<R>(&self, range: &Option<&R>) -> Vec<BucketItem<T>>
51 where
52 R: RangeBounds<Pubkey>,
53 {
54 self.bucket
55 .read()
56 .unwrap()
57 .as_ref()
58 .map(|bucket| bucket.items_in_range(range))
59 .unwrap_or_default()
60 }
61
62 pub fn keys(&self) -> Vec<Pubkey> {
64 self.bucket
65 .read()
66 .unwrap()
67 .as_ref()
68 .map_or_else(Vec::default, |bucket| bucket.keys())
69 }
70
71 pub fn read_value(&self, key: &Pubkey) -> Option<(Vec<T>, RefCount)> {
73 self.bucket.read().unwrap().as_ref().and_then(|bucket| {
74 bucket
75 .read_value(key)
76 .map(|(value, ref_count)| (value.to_vec(), ref_count))
77 })
78 }
79
80 pub fn bucket_len(&self) -> u64 {
81 self.count.load(Ordering::Relaxed)
82 }
83
84 pub fn delete_key(&self, key: &Pubkey) {
85 let mut bucket = self.get_write_bucket();
86 if let Some(bucket) = bucket.as_mut() {
87 bucket.delete_key(key)
88 }
89 }
90
91 fn allocate_bucket(&self, bucket: &mut RwLockWriteGuard<Option<Bucket<T>>>) {
93 if bucket.is_none() {
94 **bucket = Some(Bucket::new(
95 Arc::clone(&self.drives),
96 self.max_search,
97 Arc::clone(&self.stats),
98 Arc::clone(&self.count),
99 self.restartable_bucket.clone(),
100 ));
101 }
102 }
103
104 fn get_write_bucket(&self) -> RwLockWriteGuard<Option<Bucket<T>>> {
105 let mut bucket = self.bucket.write().unwrap();
106 if let Some(bucket) = bucket.as_mut() {
107 bucket.handle_delayed_grows();
108 } else {
109 self.allocate_bucket(&mut bucket);
110 }
111 bucket
112 }
113
114 pub fn insert(&self, pubkey: &Pubkey, value: (&[T], RefCount)) {
115 let mut bucket = self.get_write_bucket();
116 bucket.as_mut().unwrap().insert(pubkey, value)
117 }
118
119 pub fn grow(&self, err: BucketMapError) {
120 if let Some(bucket) = self.bucket.read().unwrap().as_ref() {
123 bucket.grow(err)
124 }
125 }
126
127 pub fn set_anticipated_count(&self, count: u64) {
130 let mut bucket = self.get_write_bucket();
131 bucket.as_mut().unwrap().set_anticipated_count(count);
132 }
133
134 pub fn batch_insert_non_duplicates(&self, items: &[(Pubkey, T)]) -> Vec<(usize, T)> {
137 let mut bucket = self.get_write_bucket();
138 bucket.as_mut().unwrap().batch_insert_non_duplicates(items)
139 }
140
141 pub fn update<F>(&self, key: &Pubkey, updatefn: F)
142 where
143 F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
144 {
145 let mut bucket = self.get_write_bucket();
146 bucket.as_mut().unwrap().update(key, updatefn)
147 }
148
149 pub fn try_write(
150 &self,
151 pubkey: &Pubkey,
152 value: (&[T], RefCount),
153 ) -> Result<(), BucketMapError> {
154 let mut bucket = self.get_write_bucket();
155 bucket
156 .as_mut()
157 .unwrap()
158 .try_write(pubkey, value.0.iter(), value.0.len(), value.1)
159 }
160}