surrealdb_core/idx/trees/store/
mod.rs

1pub mod cache;
2pub(crate) mod hnsw;
3mod lru;
4mod mapper;
5pub(crate) mod tree;
6
7use crate::ctx::Context;
8use crate::dbs::Options;
9use crate::err::Error;
10use crate::idx::trees::store::cache::TreeCache;
11use crate::idx::trees::store::hnsw::{HnswIndexes, SharedHnswIndex};
12use crate::idx::trees::store::mapper::Mappers;
13use crate::idx::trees::store::tree::{TreeRead, TreeWrite};
14use crate::idx::IndexKeyBase;
15#[cfg(not(target_family = "wasm"))]
16use crate::kvs::IndexBuilder;
17use crate::kvs::{Key, Transaction, TransactionType, Val};
18use crate::sql::index::HnswParams;
19use crate::sql::statements::DefineIndexStatement;
20use crate::sql::Index;
21use std::fmt::{Debug, Display, Formatter};
22use std::sync::Arc;
23
24pub type NodeId = u64;
25pub type StoreGeneration = u64;
26
27#[non_exhaustive]
28#[allow(clippy::large_enum_variant)]
29pub enum TreeStore<N>
30where
31	N: TreeNode + Debug + Clone,
32{
33	/// caches every read nodes, and keeps track of updated and created nodes
34	Write(TreeWrite<N>),
35	/// caches read nodes in an LRU cache
36	Read(TreeRead<N>),
37}
38
39impl<N> TreeStore<N>
40where
41	N: TreeNode + Debug + Display + Clone,
42{
43	pub async fn new(np: TreeNodeProvider, cache: Arc<TreeCache<N>>, tt: TransactionType) -> Self {
44		match tt {
45			TransactionType::Read => Self::Read(TreeRead::new(cache)),
46			TransactionType::Write => Self::Write(TreeWrite::new(np, cache)),
47		}
48	}
49
50	pub(in crate::idx) async fn get_node_mut(
51		&mut self,
52		tx: &Transaction,
53		node_id: NodeId,
54	) -> Result<StoredNode<N>, Error> {
55		match self {
56			Self::Write(w) => w.get_node_mut(tx, node_id).await,
57			_ => Err(fail!("TreeStore::get_node_mut")),
58		}
59	}
60
61	pub(in crate::idx) async fn get_node(
62		&self,
63		tx: &Transaction,
64		node_id: NodeId,
65	) -> Result<Arc<StoredNode<N>>, Error> {
66		match self {
67			Self::Read(r) => r.get_node(tx, node_id).await,
68			_ => Err(fail!("TreeStore::get_node")),
69		}
70	}
71
72	pub(in crate::idx) async fn get_node_txn(
73		&self,
74		ctx: &Context,
75		node_id: NodeId,
76	) -> Result<Arc<StoredNode<N>>, Error> {
77		match self {
78			Self::Read(r) => {
79				let tx = ctx.tx();
80				r.get_node(&tx, node_id).await
81			}
82			_ => Err(fail!("TreeStore::get_node_txn")),
83		}
84	}
85
86	pub(in crate::idx) async fn set_node(
87		&mut self,
88		node: StoredNode<N>,
89		updated: bool,
90	) -> Result<(), Error> {
91		match self {
92			Self::Write(w) => w.set_node(node, updated),
93			_ => Err(fail!("TreeStore::set_node")),
94		}
95	}
96
97	pub(in crate::idx) fn new_node(&mut self, id: NodeId, node: N) -> Result<StoredNode<N>, Error> {
98		match self {
99			Self::Write(w) => Ok(w.new_node(id, node)?),
100			_ => Err(fail!("TreeStore::new_node")),
101		}
102	}
103
104	pub(in crate::idx) async fn remove_node(
105		&mut self,
106		node_id: NodeId,
107		node_key: Key,
108	) -> Result<(), Error> {
109		match self {
110			Self::Write(w) => w.remove_node(node_id, node_key),
111			_ => Err(fail!("TreeStore::remove_node")),
112		}
113	}
114
115	pub async fn finish(&mut self, tx: &Transaction) -> Result<Option<TreeCache<N>>, Error> {
116		match self {
117			Self::Write(w) => w.finish(tx).await,
118			_ => Ok(None),
119		}
120	}
121}
122
123#[derive(Clone)]
124#[non_exhaustive]
125pub enum TreeNodeProvider {
126	DocIds(IndexKeyBase),
127	DocLengths(IndexKeyBase),
128	Postings(IndexKeyBase),
129	Terms(IndexKeyBase),
130	Vector(IndexKeyBase),
131	Debug,
132}
133
134impl TreeNodeProvider {
135	pub fn get_key(&self, node_id: NodeId) -> Result<Key, Error> {
136		match self {
137			TreeNodeProvider::DocIds(ikb) => ikb.new_bd_key(Some(node_id)),
138			TreeNodeProvider::DocLengths(ikb) => ikb.new_bl_key(Some(node_id)),
139			TreeNodeProvider::Postings(ikb) => ikb.new_bp_key(Some(node_id)),
140			TreeNodeProvider::Terms(ikb) => ikb.new_bt_key(Some(node_id)),
141			TreeNodeProvider::Vector(ikb) => ikb.new_vm_key(Some(node_id)),
142			TreeNodeProvider::Debug => Ok(node_id.to_be_bytes().to_vec()),
143		}
144	}
145
146	async fn load<N>(&self, tx: &Transaction, id: NodeId) -> Result<StoredNode<N>, Error>
147	where
148		N: TreeNode + Clone,
149	{
150		let key = self.get_key(id)?;
151		if let Some(val) = tx.get(key.clone(), None).await? {
152			let size = val.len() as u32;
153			let node = N::try_from_val(val)?;
154			Ok(StoredNode::new(node, id, key, size))
155		} else {
156			Err(Error::CorruptedIndex("TreeStore::load"))
157		}
158	}
159
160	async fn save<N>(&self, tx: &Transaction, node: &mut StoredNode<N>) -> Result<(), Error>
161	where
162		N: TreeNode + Clone + Display,
163	{
164		let val = node.n.try_into_val()?;
165		node.size = val.len() as u32;
166		tx.set(node.key.clone(), val, None).await?;
167		Ok(())
168	}
169}
170
171#[non_exhaustive]
172#[derive(Debug)]
173pub struct StoredNode<N>
174where
175	N: Clone + Display,
176{
177	pub(super) n: N,
178	pub(super) id: NodeId,
179	pub(super) key: Key,
180	pub(super) size: u32,
181}
182
183impl<N> StoredNode<N>
184where
185	N: Clone + Display,
186{
187	pub(super) fn new(n: N, id: NodeId, key: Key, size: u32) -> Self {
188		Self {
189			n,
190			id,
191			key,
192			size,
193		}
194	}
195}
196
197impl<N> Display for StoredNode<N>
198where
199	N: Clone + Display,
200{
201	fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
202		write!(f, "node_id: {} - {}", self.id, self.n)
203	}
204}
205
206pub trait TreeNode: Debug + Clone + Display {
207	fn prepare_save(&mut self) {}
208	fn try_from_val(val: Val) -> Result<Self, Error>
209	where
210		Self: Sized;
211	fn try_into_val(&self) -> Result<Val, Error>;
212}
213
214#[derive(Clone)]
215#[non_exhaustive]
216pub struct IndexStores(Arc<Inner>);
217
218struct Inner {
219	hnsw_indexes: HnswIndexes,
220	mappers: Mappers,
221}
222
223impl Default for IndexStores {
224	fn default() -> Self {
225		Self(Arc::new(Inner {
226			hnsw_indexes: HnswIndexes::default(),
227			mappers: Mappers::default(),
228		}))
229	}
230}
231
232impl IndexStores {
233	pub(crate) async fn get_index_hnsw(
234		&self,
235		ctx: &Context,
236		opt: &Options,
237		ix: &DefineIndexStatement,
238		p: &HnswParams,
239	) -> Result<SharedHnswIndex, Error> {
240		let (ns, db) = opt.ns_db()?;
241		let ikb = IndexKeyBase::new(ns, db, ix)?;
242		self.0.hnsw_indexes.get(ctx, &ix.what, &ikb, p).await
243	}
244
245	pub(crate) async fn index_removed(
246		&self,
247		#[cfg(not(target_family = "wasm"))] ib: Option<&IndexBuilder>,
248		tx: &Transaction,
249		ns: &str,
250		db: &str,
251		tb: &str,
252		ix: &str,
253	) -> Result<(), Error> {
254		#[cfg(not(target_family = "wasm"))]
255		if let Some(ib) = ib {
256			ib.remove_index(ns, db, tb, ix)?;
257		}
258		self.remove_index(ns, db, tx.get_tb_index(ns, db, tb, ix).await?.as_ref()).await
259	}
260
261	pub(crate) async fn namespace_removed(
262		&self,
263		#[cfg(not(target_family = "wasm"))] ib: Option<&IndexBuilder>,
264		tx: &Transaction,
265		ns: &str,
266	) -> Result<(), Error> {
267		for db in tx.all_db(ns).await?.iter() {
268			#[cfg(not(target_family = "wasm"))]
269			self.database_removed(ib, tx, ns, &db.name).await?;
270			#[cfg(target_family = "wasm")]
271			self.database_removed(tx, ns, &db.name).await?;
272		}
273		Ok(())
274	}
275
276	pub(crate) async fn database_removed(
277		&self,
278		#[cfg(not(target_family = "wasm"))] ib: Option<&IndexBuilder>,
279		tx: &Transaction,
280		ns: &str,
281		db: &str,
282	) -> Result<(), Error> {
283		for tb in tx.all_tb(ns, db, None).await?.iter() {
284			#[cfg(not(target_family = "wasm"))]
285			self.table_removed(ib, tx, ns, db, &tb.name).await?;
286			#[cfg(target_family = "wasm")]
287			self.table_removed(tx, ns, db, &tb.name).await?;
288		}
289		Ok(())
290	}
291
292	pub(crate) async fn table_removed(
293		&self,
294		#[cfg(not(target_family = "wasm"))] ib: Option<&IndexBuilder>,
295		tx: &Transaction,
296		ns: &str,
297		db: &str,
298		tb: &str,
299	) -> Result<(), Error> {
300		for ix in tx.all_tb_indexes(ns, db, tb).await?.iter() {
301			#[cfg(not(target_family = "wasm"))]
302			if let Some(ib) = ib {
303				ib.remove_index(ns, db, tb, &ix.name)?;
304			}
305			self.remove_index(ns, db, ix).await?;
306		}
307		Ok(())
308	}
309
310	async fn remove_index(
311		&self,
312		ns: &str,
313		db: &str,
314		ix: &DefineIndexStatement,
315	) -> Result<(), Error> {
316		if matches!(ix.index, Index::Hnsw(_)) {
317			let ikb = IndexKeyBase::new(ns, db, ix)?;
318			self.remove_hnsw_index(ikb).await?;
319		}
320		Ok(())
321	}
322
323	async fn remove_hnsw_index(&self, ikb: IndexKeyBase) -> Result<(), Error> {
324		self.0.hnsw_indexes.remove(&ikb).await?;
325		Ok(())
326	}
327
328	pub(crate) fn mappers(&self) -> &Mappers {
329		&self.0.mappers
330	}
331}