1use crate::err::Error;
2use crate::idx::trees::bkeys::{FstKeys, TrieKeys};
3use crate::idx::trees::btree::{BTreeNode, BTreeStore};
4use crate::idx::trees::mtree::{MTreeNode, MTreeStore};
5use crate::idx::trees::store::lru::{CacheKey, ConcurrentLru};
6use crate::idx::trees::store::{
7 NodeId, StoreGeneration, StoredNode, TreeNode, TreeNodeProvider, TreeStore,
8};
9use crate::kvs::{Key, Transaction, TransactionType};
10use ahash::{HashMap, HashSet};
11use dashmap::mapref::entry::Entry;
12use dashmap::DashMap;
13use std::cmp::Ordering;
14use std::fmt::{Debug, Display};
15use std::sync::Arc;
16
17#[derive(Default)]
18pub(crate) struct IndexTreeCaches {
19 btree_fst_caches: TreeCaches<BTreeNode<FstKeys>>,
20 btree_trie_caches: TreeCaches<BTreeNode<TrieKeys>>,
21 mtree_caches: TreeCaches<MTreeNode>,
22}
23
24impl IndexTreeCaches {
25 pub(crate) async fn get_store_btree_fst(
26 &self,
27 keys: TreeNodeProvider,
28 generation: StoreGeneration,
29 tt: TransactionType,
30 cache_size: usize,
31 ) -> Result<BTreeStore<FstKeys>, Error> {
32 let cache = self.btree_fst_caches.get_cache(generation, &keys, cache_size).await?;
33 Ok(TreeStore::new(keys, cache, tt).await)
34 }
35
36 pub(crate) fn advance_store_btree_fst(&self, new_cache: TreeCache<BTreeNode<FstKeys>>) {
37 self.btree_fst_caches.new_cache(new_cache);
38 }
39
40 pub(crate) async fn get_store_btree_trie(
41 &self,
42 keys: TreeNodeProvider,
43 generation: StoreGeneration,
44 tt: TransactionType,
45 cache_size: usize,
46 ) -> Result<BTreeStore<TrieKeys>, Error> {
47 let cache = self.btree_trie_caches.get_cache(generation, &keys, cache_size).await?;
48 Ok(TreeStore::new(keys, cache, tt).await)
49 }
50
51 pub(crate) fn advance_store_btree_trie(&self, new_cache: TreeCache<BTreeNode<TrieKeys>>) {
52 self.btree_trie_caches.new_cache(new_cache);
53 }
54
55 pub async fn get_store_mtree(
56 &self,
57 keys: TreeNodeProvider,
58 generation: StoreGeneration,
59 tt: TransactionType,
60 cache_size: usize,
61 ) -> Result<MTreeStore, Error> {
62 let cache = self.mtree_caches.get_cache(generation, &keys, cache_size).await?;
63 Ok(TreeStore::new(keys, cache, tt).await)
64 }
65
66 pub(crate) fn advance_store_mtree(&self, new_cache: TreeCache<MTreeNode>) {
67 self.mtree_caches.new_cache(new_cache);
68 }
69}
70
71pub(super) struct TreeCaches<N>(Arc<DashMap<Key, Arc<TreeCache<N>>>>)
72where
73 N: TreeNode + Debug + Clone + Display;
74
75impl<N> TreeCaches<N>
76where
77 N: TreeNode + Debug + Clone + Display,
78{
79 pub(super) async fn get_cache(
80 &self,
81 generation: StoreGeneration,
82 keys: &TreeNodeProvider,
83 cache_size: usize,
84 ) -> Result<Arc<TreeCache<N>>, Error> {
85 #[cfg(debug_assertions)]
86 debug!("get_cache {generation}");
87 let cache_key = keys.get_key(0)?;
89 match self.0.entry(cache_key.clone()) {
90 Entry::Occupied(mut e) => {
91 let c = e.get_mut();
92 match generation.cmp(&c.generation()) {
94 Ordering::Less => {
95 Ok(Arc::new(TreeCache::new(
98 generation,
99 cache_key,
100 keys.clone(),
101 cache_size,
102 )))
103 }
104 Ordering::Equal => Ok(c.clone()),
105 Ordering::Greater => {
106 let c = Arc::new(TreeCache::new(
109 generation,
110 cache_key,
111 keys.clone(),
112 cache_size,
113 ));
114 e.insert(c.clone());
115 Ok(c)
116 }
117 }
118 }
119 Entry::Vacant(e) => {
120 let c = Arc::new(TreeCache::new(generation, cache_key, keys.clone(), cache_size));
122 e.insert(c.clone());
123 Ok(c)
124 }
125 }
126 }
127
128 pub(super) fn new_cache(&self, new_cache: TreeCache<N>) {
129 match self.0.entry(new_cache.cache_key().clone()) {
130 Entry::Occupied(mut e) => {
131 let old_cache = e.get();
132 if new_cache.generation() > old_cache.generation() {
134 e.insert(Arc::new(new_cache));
135 }
136 }
137 Entry::Vacant(e) => {
138 e.insert(Arc::new(new_cache));
139 }
140 }
141 }
142}
143
144impl<N> Default for TreeCaches<N>
145where
146 N: TreeNode + Debug + Clone + Display,
147{
148 fn default() -> Self {
149 Self(Arc::new(DashMap::new()))
150 }
151}
152
153#[non_exhaustive]
154pub enum TreeCache<N>
155where
156 N: TreeNode + Debug + Clone + Display,
157{
158 Lru(Key, StoreGeneration, TreeLruCache<N>),
159 Full(Key, StoreGeneration, TreeFullCache<N>),
160}
161
162impl<N> TreeCache<N>
163where
164 N: TreeNode + Debug + Clone + Display,
165{
166 pub fn new(
167 generation: StoreGeneration,
168 cache_key: Key,
169 keys: TreeNodeProvider,
170 cache_size: usize,
171 ) -> Self {
172 if cache_size == 0 {
173 Self::Full(cache_key, generation, TreeFullCache::new(keys))
174 } else {
175 Self::Lru(cache_key, generation, TreeLruCache::with_capacity(keys, cache_size))
176 }
177 }
178
179 #[cfg(test)]
180 pub(in crate::idx) fn len(&self) -> usize {
181 match self {
182 Self::Lru(_, _, c) => c.lru.len(),
183 Self::Full(_, _, c) => c.cache.len(),
184 }
185 }
186
187 pub(super) async fn get_node(
188 &self,
189 tx: &Transaction,
190 node_id: NodeId,
191 ) -> Result<Arc<StoredNode<N>>, Error> {
192 match self {
193 Self::Lru(_, _, c) => c.get_node(tx, node_id).await,
194 Self::Full(_, _, c) => c.get_node(tx, node_id).await,
195 }
196 }
197
198 pub(super) async fn set_node(&self, node: StoredNode<N>) {
199 match self {
200 Self::Lru(_, _, c) => c.set_node(node).await,
201 Self::Full(_, _, c) => c.set_node(node),
202 }
203 }
204
205 pub(super) async fn remove_node(&self, node_id: &NodeId) {
206 match self {
207 Self::Lru(_, _, c) => c.remove_node(node_id).await,
208 Self::Full(_, _, c) => c.remove_node(node_id),
209 }
210 }
211
212 pub(super) fn cache_key(&self) -> &Key {
213 match self {
214 Self::Lru(k, _, _) => k,
215 Self::Full(k, _, _) => k,
216 }
217 }
218
219 fn generation(&self) -> StoreGeneration {
220 match self {
221 Self::Lru(_, gen, _) | TreeCache::Full(_, gen, _) => *gen,
222 }
223 }
224
225 pub(super) async fn next_generation(
228 &self,
229 updated: &HashSet<NodeId>,
230 removed: &HashMap<NodeId, Key>,
231 ) -> Self {
232 match self {
233 Self::Lru(k, g, c) => {
234 Self::Lru(k.clone(), *g + 1, c.next_generation(updated, removed).await)
235 }
236 Self::Full(k, g, c) => {
237 Self::Full(k.clone(), *g + 1, c.next_generation(updated, removed))
238 }
239 }
240 }
241}
242
243#[non_exhaustive]
244pub struct TreeLruCache<N>
245where
246 N: TreeNode + Debug + Clone + Display,
247{
248 keys: TreeNodeProvider,
249 lru: ConcurrentLru<Arc<StoredNode<N>>>,
250}
251
252impl<N> TreeLruCache<N>
253where
254 N: TreeNode + Debug + Clone,
255{
256 fn with_capacity(keys: TreeNodeProvider, size: usize) -> Self {
257 let lru = ConcurrentLru::with_capacity(size);
258 Self {
259 keys,
260 lru,
261 }
262 }
263
264 async fn get_node(
265 &self,
266 tx: &Transaction,
267 node_id: NodeId,
268 ) -> Result<Arc<StoredNode<N>>, Error> {
269 if let Some(n) = self.lru.get(node_id).await {
270 return Ok(n);
271 }
272 let n = Arc::new(self.keys.load::<N>(tx, node_id).await?);
273 self.lru.insert(node_id as CacheKey, n.clone()).await;
274 Ok(n)
275 }
276
277 async fn set_node(&self, node: StoredNode<N>) {
278 self.lru.insert(node.id as CacheKey, node.into()).await;
279 }
280 async fn remove_node(&self, node_id: &NodeId) {
281 self.lru.remove(*node_id as CacheKey).await;
282 }
283
284 async fn next_generation(
285 &self,
286 updated: &HashSet<NodeId>,
287 removed: &HashMap<NodeId, Key>,
288 ) -> Self {
289 Self {
290 keys: self.keys.clone(),
291 lru: self.lru.duplicate(|id| !removed.contains_key(id) || !updated.contains(id)).await,
292 }
293 }
294}
295
296#[non_exhaustive]
297pub struct TreeFullCache<N>
298where
299 N: TreeNode + Debug + Clone,
300{
301 keys: TreeNodeProvider,
302 cache: DashMap<NodeId, Arc<StoredNode<N>>>,
303}
304
305impl<N> TreeFullCache<N>
306where
307 N: TreeNode + Debug + Clone,
308{
309 pub fn new(keys: TreeNodeProvider) -> Self {
310 Self {
311 keys,
312 cache: DashMap::new(),
313 }
314 }
315
316 pub(super) async fn get_node(
317 &self,
318 tx: &Transaction,
319 node_id: NodeId,
320 ) -> Result<Arc<StoredNode<N>>, Error> {
321 match self.cache.entry(node_id) {
322 Entry::Occupied(e) => Ok(e.get().clone()),
323 Entry::Vacant(e) => {
324 let n = Arc::new(self.keys.load::<N>(tx, node_id).await?);
325 e.insert(n.clone());
326 Ok(n)
327 }
328 }
329 }
330
331 pub(super) fn set_node(&self, node: StoredNode<N>) {
332 self.cache.insert(node.id, node.into());
333 }
334
335 pub(super) fn remove_node(&self, node_id: &NodeId) {
336 self.cache.remove(node_id);
337 }
338
339 fn next_generation(&self, updated: &HashSet<NodeId>, removed: &HashMap<NodeId, Key>) -> Self {
340 let new_cache = Self::new(self.keys.clone());
341 self.cache
342 .iter()
343 .filter(|r| !removed.contains_key(r.key()))
344 .filter(|r| !updated.contains(r.key()))
345 .for_each(|r| {
346 new_cache.cache.insert(r.id, r.value().clone());
347 });
348 new_cache
349 }
350}