surrealdb_core/idx/trees/store/
cache.rs

1use crate::err::Error;
2use crate::idx::trees::bkeys::{FstKeys, TrieKeys};
3use crate::idx::trees::btree::{BTreeNode, BTreeStore};
4use crate::idx::trees::mtree::{MTreeNode, MTreeStore};
5use crate::idx::trees::store::lru::{CacheKey, ConcurrentLru};
6use crate::idx::trees::store::{
7	NodeId, StoreGeneration, StoredNode, TreeNode, TreeNodeProvider, TreeStore,
8};
9use crate::kvs::{Key, Transaction, TransactionType};
10use ahash::{HashMap, HashSet};
11use dashmap::mapref::entry::Entry;
12use dashmap::DashMap;
13use std::cmp::Ordering;
14use std::fmt::{Debug, Display};
15use std::sync::Arc;
16
17#[derive(Default)]
18pub(crate) struct IndexTreeCaches {
19	btree_fst_caches: TreeCaches<BTreeNode<FstKeys>>,
20	btree_trie_caches: TreeCaches<BTreeNode<TrieKeys>>,
21	mtree_caches: TreeCaches<MTreeNode>,
22}
23
24impl IndexTreeCaches {
25	pub(crate) async fn get_store_btree_fst(
26		&self,
27		keys: TreeNodeProvider,
28		generation: StoreGeneration,
29		tt: TransactionType,
30		cache_size: usize,
31	) -> Result<BTreeStore<FstKeys>, Error> {
32		let cache = self.btree_fst_caches.get_cache(generation, &keys, cache_size).await?;
33		Ok(TreeStore::new(keys, cache, tt).await)
34	}
35
36	pub(crate) fn advance_store_btree_fst(&self, new_cache: TreeCache<BTreeNode<FstKeys>>) {
37		self.btree_fst_caches.new_cache(new_cache);
38	}
39
40	pub(crate) async fn get_store_btree_trie(
41		&self,
42		keys: TreeNodeProvider,
43		generation: StoreGeneration,
44		tt: TransactionType,
45		cache_size: usize,
46	) -> Result<BTreeStore<TrieKeys>, Error> {
47		let cache = self.btree_trie_caches.get_cache(generation, &keys, cache_size).await?;
48		Ok(TreeStore::new(keys, cache, tt).await)
49	}
50
51	pub(crate) fn advance_store_btree_trie(&self, new_cache: TreeCache<BTreeNode<TrieKeys>>) {
52		self.btree_trie_caches.new_cache(new_cache);
53	}
54
55	pub async fn get_store_mtree(
56		&self,
57		keys: TreeNodeProvider,
58		generation: StoreGeneration,
59		tt: TransactionType,
60		cache_size: usize,
61	) -> Result<MTreeStore, Error> {
62		let cache = self.mtree_caches.get_cache(generation, &keys, cache_size).await?;
63		Ok(TreeStore::new(keys, cache, tt).await)
64	}
65
66	pub(crate) fn advance_store_mtree(&self, new_cache: TreeCache<MTreeNode>) {
67		self.mtree_caches.new_cache(new_cache);
68	}
69}
70
71pub(super) struct TreeCaches<N>(Arc<DashMap<Key, Arc<TreeCache<N>>>>)
72where
73	N: TreeNode + Debug + Clone + Display;
74
75impl<N> TreeCaches<N>
76where
77	N: TreeNode + Debug + Clone + Display,
78{
79	pub(super) async fn get_cache(
80		&self,
81		generation: StoreGeneration,
82		keys: &TreeNodeProvider,
83		cache_size: usize,
84	) -> Result<Arc<TreeCache<N>>, Error> {
85		#[cfg(debug_assertions)]
86		debug!("get_cache {generation}");
87		// We take the key from the node 0 as the key identifier for the cache
88		let cache_key = keys.get_key(0)?;
89		match self.0.entry(cache_key.clone()) {
90			Entry::Occupied(mut e) => {
91				let c = e.get_mut();
92				// The cache and the store are matching, we can send a clone of the cache.
93				match generation.cmp(&c.generation()) {
94					Ordering::Less => {
95						// The store generation is older than the current cache,
96						// we return an empty cache, but we don't hold it
97						Ok(Arc::new(TreeCache::new(
98							generation,
99							cache_key,
100							keys.clone(),
101							cache_size,
102						)))
103					}
104					Ordering::Equal => Ok(c.clone()),
105					Ordering::Greater => {
106						// The store generation is more recent than the cache,
107						// we create a new one and hold it
108						let c = Arc::new(TreeCache::new(
109							generation,
110							cache_key,
111							keys.clone(),
112							cache_size,
113						));
114						e.insert(c.clone());
115						Ok(c)
116					}
117				}
118			}
119			Entry::Vacant(e) => {
120				// There is no cache for index, we create one and hold it
121				let c = Arc::new(TreeCache::new(generation, cache_key, keys.clone(), cache_size));
122				e.insert(c.clone());
123				Ok(c)
124			}
125		}
126	}
127
128	pub(super) fn new_cache(&self, new_cache: TreeCache<N>) {
129		match self.0.entry(new_cache.cache_key().clone()) {
130			Entry::Occupied(mut e) => {
131				let old_cache = e.get();
132				// We only store the cache if it is a newer generation
133				if new_cache.generation() > old_cache.generation() {
134					e.insert(Arc::new(new_cache));
135				}
136			}
137			Entry::Vacant(e) => {
138				e.insert(Arc::new(new_cache));
139			}
140		}
141	}
142}
143
144impl<N> Default for TreeCaches<N>
145where
146	N: TreeNode + Debug + Clone + Display,
147{
148	fn default() -> Self {
149		Self(Arc::new(DashMap::new()))
150	}
151}
152
153#[non_exhaustive]
154pub enum TreeCache<N>
155where
156	N: TreeNode + Debug + Clone + Display,
157{
158	Lru(Key, StoreGeneration, TreeLruCache<N>),
159	Full(Key, StoreGeneration, TreeFullCache<N>),
160}
161
162impl<N> TreeCache<N>
163where
164	N: TreeNode + Debug + Clone + Display,
165{
166	pub fn new(
167		generation: StoreGeneration,
168		cache_key: Key,
169		keys: TreeNodeProvider,
170		cache_size: usize,
171	) -> Self {
172		if cache_size == 0 {
173			Self::Full(cache_key, generation, TreeFullCache::new(keys))
174		} else {
175			Self::Lru(cache_key, generation, TreeLruCache::with_capacity(keys, cache_size))
176		}
177	}
178
179	#[cfg(test)]
180	pub(in crate::idx) fn len(&self) -> usize {
181		match self {
182			Self::Lru(_, _, c) => c.lru.len(),
183			Self::Full(_, _, c) => c.cache.len(),
184		}
185	}
186
187	pub(super) async fn get_node(
188		&self,
189		tx: &Transaction,
190		node_id: NodeId,
191	) -> Result<Arc<StoredNode<N>>, Error> {
192		match self {
193			Self::Lru(_, _, c) => c.get_node(tx, node_id).await,
194			Self::Full(_, _, c) => c.get_node(tx, node_id).await,
195		}
196	}
197
198	pub(super) async fn set_node(&self, node: StoredNode<N>) {
199		match self {
200			Self::Lru(_, _, c) => c.set_node(node).await,
201			Self::Full(_, _, c) => c.set_node(node),
202		}
203	}
204
205	pub(super) async fn remove_node(&self, node_id: &NodeId) {
206		match self {
207			Self::Lru(_, _, c) => c.remove_node(node_id).await,
208			Self::Full(_, _, c) => c.remove_node(node_id),
209		}
210	}
211
212	pub(super) fn cache_key(&self) -> &Key {
213		match self {
214			Self::Lru(k, _, _) => k,
215			Self::Full(k, _, _) => k,
216		}
217	}
218
219	fn generation(&self) -> StoreGeneration {
220		match self {
221			Self::Lru(_, gen, _) | TreeCache::Full(_, gen, _) => *gen,
222		}
223	}
224
225	/// Creates a copy of the cache, with a generation number incremented by one.
226	/// The new cache does not contain the NodeID contained in `updated` and `removed`.
227	pub(super) async fn next_generation(
228		&self,
229		updated: &HashSet<NodeId>,
230		removed: &HashMap<NodeId, Key>,
231	) -> Self {
232		match self {
233			Self::Lru(k, g, c) => {
234				Self::Lru(k.clone(), *g + 1, c.next_generation(updated, removed).await)
235			}
236			Self::Full(k, g, c) => {
237				Self::Full(k.clone(), *g + 1, c.next_generation(updated, removed))
238			}
239		}
240	}
241}
242
243#[non_exhaustive]
244pub struct TreeLruCache<N>
245where
246	N: TreeNode + Debug + Clone + Display,
247{
248	keys: TreeNodeProvider,
249	lru: ConcurrentLru<Arc<StoredNode<N>>>,
250}
251
252impl<N> TreeLruCache<N>
253where
254	N: TreeNode + Debug + Clone,
255{
256	fn with_capacity(keys: TreeNodeProvider, size: usize) -> Self {
257		let lru = ConcurrentLru::with_capacity(size);
258		Self {
259			keys,
260			lru,
261		}
262	}
263
264	async fn get_node(
265		&self,
266		tx: &Transaction,
267		node_id: NodeId,
268	) -> Result<Arc<StoredNode<N>>, Error> {
269		if let Some(n) = self.lru.get(node_id).await {
270			return Ok(n);
271		}
272		let n = Arc::new(self.keys.load::<N>(tx, node_id).await?);
273		self.lru.insert(node_id as CacheKey, n.clone()).await;
274		Ok(n)
275	}
276
277	async fn set_node(&self, node: StoredNode<N>) {
278		self.lru.insert(node.id as CacheKey, node.into()).await;
279	}
280	async fn remove_node(&self, node_id: &NodeId) {
281		self.lru.remove(*node_id as CacheKey).await;
282	}
283
284	async fn next_generation(
285		&self,
286		updated: &HashSet<NodeId>,
287		removed: &HashMap<NodeId, Key>,
288	) -> Self {
289		Self {
290			keys: self.keys.clone(),
291			lru: self.lru.duplicate(|id| !removed.contains_key(id) || !updated.contains(id)).await,
292		}
293	}
294}
295
296#[non_exhaustive]
297pub struct TreeFullCache<N>
298where
299	N: TreeNode + Debug + Clone,
300{
301	keys: TreeNodeProvider,
302	cache: DashMap<NodeId, Arc<StoredNode<N>>>,
303}
304
305impl<N> TreeFullCache<N>
306where
307	N: TreeNode + Debug + Clone,
308{
309	pub fn new(keys: TreeNodeProvider) -> Self {
310		Self {
311			keys,
312			cache: DashMap::new(),
313		}
314	}
315
316	pub(super) async fn get_node(
317		&self,
318		tx: &Transaction,
319		node_id: NodeId,
320	) -> Result<Arc<StoredNode<N>>, Error> {
321		match self.cache.entry(node_id) {
322			Entry::Occupied(e) => Ok(e.get().clone()),
323			Entry::Vacant(e) => {
324				let n = Arc::new(self.keys.load::<N>(tx, node_id).await?);
325				e.insert(n.clone());
326				Ok(n)
327			}
328		}
329	}
330
331	pub(super) fn set_node(&self, node: StoredNode<N>) {
332		self.cache.insert(node.id, node.into());
333	}
334
335	pub(super) fn remove_node(&self, node_id: &NodeId) {
336		self.cache.remove(node_id);
337	}
338
339	fn next_generation(&self, updated: &HashSet<NodeId>, removed: &HashMap<NodeId, Key>) -> Self {
340		let new_cache = Self::new(self.keys.clone());
341		self.cache
342			.iter()
343			.filter(|r| !removed.contains_key(r.key()))
344			.filter(|r| !updated.contains(r.key()))
345			.for_each(|r| {
346				new_cache.cache.insert(r.id, r.value().clone());
347			});
348		new_cache
349	}
350}