surrealdb_core/idx/trees/
btree.rs

1use crate::err::Error;
2use crate::idx::trees::bkeys::BKeys;
3use crate::idx::trees::store::{NodeId, StoreGeneration, StoredNode, TreeNode, TreeStore};
4use crate::idx::VersionedStore;
5use crate::kvs::{Key, Transaction, Val};
6use crate::sql::{Object, Value};
7#[cfg(debug_assertions)]
8use ahash::HashSet;
9use revision::{revisioned, Revisioned};
10use serde::{Deserialize, Serialize};
11use std::collections::VecDeque;
12use std::fmt::{Debug, Display, Formatter};
13use std::io::Cursor;
14use std::marker::PhantomData;
15
16pub type Payload = u64;
17
18type BStoredNode<BK> = StoredNode<BTreeNode<BK>>;
19pub(in crate::idx) type BTreeStore<BK> = TreeStore<BTreeNode<BK>>;
20
21#[non_exhaustive]
22pub struct BTree<BK>
23where
24	BK: BKeys,
25{
26	state: BState,
27	full_size: u32,
28	bk: PhantomData<BK>,
29}
30
31#[revisioned(revision = 2)]
32#[derive(Clone, Serialize, Deserialize)]
33#[non_exhaustive]
34pub struct BState {
35	minimum_degree: u32,
36	root: Option<NodeId>,
37	next_node_id: NodeId,
38	#[revision(start = 2)]
39	generation: StoreGeneration,
40}
41
42impl VersionedStore for BState {
43	fn try_from(val: Val) -> Result<Self, Error> {
44		match Self::deserialize_revisioned(&mut val.as_slice()) {
45			Ok(r) => Ok(r),
46			// If it fails here, there is the chance it was an old version of BState
47			// that included the #[serde[skip]] updated parameter
48			Err(e) => match BState1skip::deserialize_revisioned(&mut val.as_slice()) {
49				Ok(b_old) => Ok(b_old.into()),
50				Err(_) => match BState1::deserialize_revisioned(&mut val.as_slice()) {
51					Ok(b_old) => Ok(b_old.into()),
52					// Otherwise we return the initial error
53					Err(_) => Err(Error::Revision(e)),
54				},
55			},
56		}
57	}
58}
59
60#[revisioned(revision = 1)]
61#[derive(Clone, Serialize, Deserialize)]
62pub(in crate::idx) struct BState1 {
63	minimum_degree: u32,
64	root: Option<NodeId>,
65	next_node_id: NodeId,
66}
67
68#[revisioned(revision = 1)]
69#[derive(Clone, Serialize, Deserialize)]
70pub(in crate::idx) struct BState1skip {
71	minimum_degree: u32,
72	root: Option<NodeId>,
73	next_node_id: NodeId,
74	#[serde(skip)]
75	updated: bool,
76}
77
78impl From<BState1> for BState {
79	fn from(o: BState1) -> Self {
80		Self {
81			minimum_degree: o.minimum_degree,
82			root: o.root,
83			next_node_id: o.next_node_id,
84			generation: 0,
85		}
86	}
87}
88
89impl From<BState1skip> for BState {
90	fn from(o: BState1skip) -> Self {
91		Self {
92			minimum_degree: o.minimum_degree,
93			root: o.root,
94			next_node_id: o.next_node_id,
95			generation: 0,
96		}
97	}
98}
99
100impl BState {
101	pub fn new(minimum_degree: u32) -> Self {
102		assert!(minimum_degree >= 2, "Minimum degree should be >= 2");
103		Self {
104			minimum_degree,
105			root: None,
106			next_node_id: 0,
107			generation: 0,
108		}
109	}
110
111	fn set_root(&mut self, node_id: Option<NodeId>) {
112		if node_id.ne(&self.root) {
113			self.root = node_id;
114		}
115	}
116
117	fn new_node_id(&mut self) -> NodeId {
118		let new_node_id = self.next_node_id;
119		self.next_node_id += 1;
120		new_node_id
121	}
122
123	pub(in crate::idx) fn generation(&self) -> u64 {
124		self.generation
125	}
126}
127
128#[derive(Debug, Default, PartialEq)]
129pub(in crate::idx) struct BStatistics {
130	pub(in crate::idx) keys_count: u64,
131	pub(in crate::idx) max_depth: u32,
132	pub(in crate::idx) nodes_count: u32,
133	pub(in crate::idx) total_size: u64,
134}
135
136impl From<BStatistics> for Value {
137	fn from(stats: BStatistics) -> Self {
138		let mut res = Object::default();
139		res.insert("keys_count".to_owned(), Value::from(stats.keys_count));
140		res.insert("max_depth".to_owned(), Value::from(stats.max_depth));
141		res.insert("nodes_count".to_owned(), Value::from(stats.nodes_count));
142		res.insert("total_size".to_owned(), Value::from(stats.total_size));
143		Value::from(res)
144	}
145}
146
147#[derive(Debug, Clone)]
148#[non_exhaustive]
149pub enum BTreeNode<BK>
150where
151	BK: BKeys + Clone,
152{
153	Internal(BK, Vec<NodeId>),
154	Leaf(BK),
155}
156
157impl<BK> TreeNode for BTreeNode<BK>
158where
159	BK: BKeys + Clone,
160{
161	fn prepare_save(&mut self) {
162		self.keys_mut().compile();
163	}
164
165	fn try_from_val(val: Val) -> Result<Self, Error> {
166		let mut c: Cursor<Vec<u8>> = Cursor::new(val);
167		let node_type: u8 = bincode::deserialize_from(&mut c)?;
168		let keys = BK::read_from(&mut c)?;
169		match node_type {
170			1u8 => {
171				let child: Vec<NodeId> = bincode::deserialize_from(c)?;
172				Ok(BTreeNode::Internal(keys, child))
173			}
174			2u8 => Ok(BTreeNode::Leaf(keys)),
175			_ => Err(Error::CorruptedIndex("BTreeNode::try_from_val")),
176		}
177	}
178
179	fn try_into_val(&self) -> Result<Val, Error> {
180		let mut c: Cursor<Vec<u8>> = Cursor::new(Vec::new());
181		match self {
182			BTreeNode::Internal(keys, child) => {
183				bincode::serialize_into(&mut c, &1u8)?;
184				keys.write_to(&mut c)?;
185				bincode::serialize_into(&mut c, &child)?;
186			}
187			BTreeNode::Leaf(keys) => {
188				bincode::serialize_into(&mut c, &2u8)?;
189				keys.write_to(&mut c)?;
190			}
191		};
192		Ok(c.into_inner())
193	}
194}
195
196impl<BK> BTreeNode<BK>
197where
198	BK: BKeys + Clone,
199{
200	fn keys(&self) -> &BK {
201		match self {
202			BTreeNode::Internal(keys, _) => keys,
203			BTreeNode::Leaf(keys) => keys,
204		}
205	}
206
207	fn keys_mut(&mut self) -> &mut BK {
208		match self {
209			BTreeNode::Internal(keys, _) => keys,
210			BTreeNode::Leaf(keys) => keys,
211		}
212	}
213
214	fn append(
215		&mut self,
216		key: Key,
217		payload: Payload,
218		node: BTreeNode<BK>,
219	) -> Result<Option<Payload>, Error> {
220		match self {
221			BTreeNode::Internal(keys, children) => {
222				if let BTreeNode::Internal(append_keys, mut append_children) = node {
223					keys.append(append_keys);
224					children.append(&mut append_children);
225					Ok(keys.insert(key, payload))
226				} else {
227					Err(Error::CorruptedIndex("BTree::append(1)"))
228				}
229			}
230			BTreeNode::Leaf(keys) => {
231				if let BTreeNode::Leaf(append_keys) = node {
232					keys.append(append_keys);
233					Ok(keys.insert(key, payload))
234				} else {
235					Err(Error::CorruptedIndex("BTree::append(2)"))
236				}
237			}
238		}
239	}
240	#[cfg(debug_assertions)]
241	fn check(&self) {
242		match self {
243			BTreeNode::Internal(k, c) => {
244				if (k.len() + 1) as usize != c.len() {
245					panic!("k: {} - c: {} - {}", k.len(), c.len(), self);
246				}
247			}
248			BTreeNode::Leaf(_) => {}
249		}
250	}
251}
252
253impl<BK> Display for BTreeNode<BK>
254where
255	BK: BKeys + Clone,
256{
257	fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
258		match self {
259			BTreeNode::Internal(k, c) => write!(f, "(I) - k: {} - c: {:?}", k, c),
260			BTreeNode::Leaf(k) => write!(f, "(L) - k: {}", k),
261		}
262	}
263}
264
265struct SplitResult {
266	left_node_id: NodeId,
267	right_node_id: NodeId,
268	median_key: Key,
269}
270
271impl<BK> BTree<BK>
272where
273	BK: BKeys + Debug + Clone,
274{
275	pub fn new(state: BState) -> Self {
276		Self {
277			full_size: state.minimum_degree * 2 - 1,
278			state,
279			bk: PhantomData,
280		}
281	}
282
283	pub(in crate::idx) fn inc_generation(&mut self) -> &BState {
284		self.state.generation += 1;
285		&self.state
286	}
287
288	pub async fn search(
289		&self,
290		tx: &Transaction,
291		store: &BTreeStore<BK>,
292		searched_key: &Key,
293	) -> Result<Option<Payload>, Error> {
294		let mut next_node = self.state.root;
295		while let Some(node_id) = next_node.take() {
296			let current = store.get_node(tx, node_id).await?;
297			if let Some(payload) = current.n.keys().get(searched_key) {
298				return Ok(Some(payload));
299			}
300			if let BTreeNode::Internal(keys, children) = &current.n {
301				let child_idx = keys.get_child_idx(searched_key);
302				next_node.replace(children[child_idx]);
303			}
304		}
305		Ok(None)
306	}
307
308	pub async fn search_mut(
309		&self,
310		tx: &Transaction,
311		store: &mut BTreeStore<BK>,
312		searched_key: &Key,
313	) -> Result<Option<Payload>, Error> {
314		let mut next_node = self.state.root;
315		while let Some(node_id) = next_node.take() {
316			let current = store.get_node_mut(tx, node_id).await?;
317			if let Some(payload) = current.n.keys().get(searched_key) {
318				store.set_node(current, false).await?;
319				return Ok(Some(payload));
320			}
321			if let BTreeNode::Internal(keys, children) = &current.n {
322				let child_idx = keys.get_child_idx(searched_key);
323				next_node.replace(children[child_idx]);
324			}
325			store.set_node(current, false).await?;
326		}
327		Ok(None)
328	}
329
330	pub async fn insert(
331		&mut self,
332		tx: &Transaction,
333		store: &mut BTreeStore<BK>,
334		key: Key,
335		payload: Payload,
336	) -> Result<(), Error> {
337		if let Some(root_id) = self.state.root {
338			// We already have a root node
339			let root = store.get_node_mut(tx, root_id).await?;
340			if root.n.keys().len() == self.full_size {
341				// The root node is full, let's split it
342				let new_root_id = self.state.new_node_id();
343				let new_root = store
344					.new_node(new_root_id, BTreeNode::Internal(BK::default(), vec![root_id]))?;
345				self.state.set_root(Some(new_root.id));
346				self.split_child(store, new_root, 0, root).await?;
347				self.insert_non_full(tx, store, new_root_id, key, payload).await?;
348			} else {
349				// The root node has place, let's insert the value
350				let root_id = root.id;
351				store.set_node(root, false).await?;
352				self.insert_non_full(tx, store, root_id, key, payload).await?;
353			}
354		} else {
355			// We don't have a root node, let's create id
356			let new_root_id = self.state.new_node_id();
357			let new_root_node =
358				store.new_node(new_root_id, BTreeNode::Leaf(BK::with_key_val(key, payload)?))?;
359			#[cfg(debug_assertions)]
360			new_root_node.n.check();
361			store.set_node(new_root_node, true).await?;
362			self.state.set_root(Some(new_root_id));
363		}
364		Ok(())
365	}
366
367	async fn insert_non_full(
368		&mut self,
369		tx: &Transaction,
370		store: &mut BTreeStore<BK>,
371		node_id: NodeId,
372		key: Key,
373		payload: Payload,
374	) -> Result<(), Error> {
375		let mut next_node_id = Some(node_id);
376		while let Some(node_id) = next_node_id.take() {
377			let mut node = store.get_node_mut(tx, node_id).await?;
378			let key: Key = key.clone();
379			match &mut node.n {
380				BTreeNode::Leaf(keys) => {
381					keys.insert(key, payload);
382					store.set_node(node, true).await?;
383				}
384				BTreeNode::Internal(keys, children) => {
385					if keys.get(&key).is_some() {
386						keys.insert(key, payload);
387						#[cfg(debug_assertions)]
388						node.n.check();
389						store.set_node(node, true).await?;
390						return Ok(());
391					}
392					let child_idx = keys.get_child_idx(&key);
393					let child = store.get_node_mut(tx, children[child_idx]).await?;
394					let next_id = if child.n.keys().len() == self.full_size {
395						let split_result = self.split_child(store, node, child_idx, child).await?;
396						if key.gt(&split_result.median_key) {
397							split_result.right_node_id
398						} else {
399							split_result.left_node_id
400						}
401					} else {
402						let child_id = child.id;
403						store.set_node(node, false).await?;
404						store.set_node(child, false).await?;
405						child_id
406					};
407					next_node_id.replace(next_id);
408				}
409			}
410		}
411		Ok(())
412	}
413
414	async fn split_child(
415		&mut self,
416		store: &mut BTreeStore<BK>,
417		mut parent_node: StoredNode<BTreeNode<BK>>,
418		idx: usize,
419		child_node: BStoredNode<BK>,
420	) -> Result<SplitResult, Error> {
421		let (left_node, right_node, median_key, median_payload) = match child_node.n {
422			BTreeNode::Internal(keys, children) => self.split_internal_node(keys, children)?,
423			BTreeNode::Leaf(keys) => self.split_leaf_node(keys)?,
424		};
425		let right_node_id = self.state.new_node_id();
426		match parent_node.n {
427			BTreeNode::Internal(ref mut keys, ref mut children) => {
428				if keys.insert(median_key.clone(), median_payload).is_some() {
429					#[cfg(debug_assertions)]
430					panic!("Existing key: {} - {}", String::from_utf8(median_key)?, parent_node.n)
431				}
432				children.insert(idx + 1, right_node_id);
433			}
434			BTreeNode::Leaf(ref mut keys) => {
435				keys.insert(median_key.clone(), median_payload);
436			}
437		};
438		// Save the mutated split child with half the (lower) keys
439		let left_node_id = child_node.id;
440		let left_node = store.new_node(left_node_id, left_node)?;
441		#[cfg(debug_assertions)]
442		left_node.n.check();
443		store.set_node(left_node, true).await?;
444		// Save the new child with half the (upper) keys
445		let right_node = store.new_node(right_node_id, right_node)?;
446		#[cfg(debug_assertions)]
447		right_node.n.check();
448		store.set_node(right_node, true).await?;
449		// Save the parent node
450		#[cfg(debug_assertions)]
451		parent_node.n.check();
452		store.set_node(parent_node, true).await?;
453		Ok(SplitResult {
454			left_node_id,
455			right_node_id,
456			median_key,
457		})
458	}
459
460	fn split_internal_node(
461		&mut self,
462		keys: BK,
463		mut left_children: Vec<NodeId>,
464	) -> Result<(BTreeNode<BK>, BTreeNode<BK>, Key, Payload), Error> {
465		let r = keys.split_keys()?;
466		let right_children = left_children.split_off(r.median_idx + 1);
467		let left_node = BTreeNode::Internal(r.left, left_children);
468		let right_node = BTreeNode::Internal(r.right, right_children);
469		Ok((left_node, right_node, r.median_key, r.median_payload))
470	}
471
472	fn split_leaf_node(
473		&mut self,
474		keys: BK,
475	) -> Result<(BTreeNode<BK>, BTreeNode<BK>, Key, Payload), Error> {
476		let r = keys.split_keys()?;
477		let left_node = BTreeNode::Leaf(r.left);
478		let right_node = BTreeNode::Leaf(r.right);
479		Ok((left_node, right_node, r.median_key, r.median_payload))
480	}
481
482	pub(in crate::idx) async fn delete(
483		&mut self,
484		tx: &Transaction,
485		store: &mut BTreeStore<BK>,
486		key_to_delete: Key,
487	) -> Result<Option<Payload>, Error> {
488		let mut deleted_payload = None;
489
490		if let Some(root_id) = self.state.root {
491			let mut next_node = Some((true, key_to_delete, root_id));
492
493			while let Some((is_main_key, key_to_delete, node_id)) = next_node.take() {
494				let mut node = store.get_node_mut(tx, node_id).await?;
495				#[cfg(debug_assertions)]
496				debug!(
497					"Delete loop - key_to_delete: {} - {node}",
498					String::from_utf8_lossy(&key_to_delete)
499				);
500				match &mut node.n {
501					BTreeNode::Leaf(keys) => {
502						// CLRS: 1
503						#[cfg(debug_assertions)]
504						debug!(
505							"CLRS: 1 - node: {node_id} - key_to_delete: {} - keys: {keys}",
506							String::from_utf8_lossy(&key_to_delete)
507						);
508						if let Some(payload) = keys.get(&key_to_delete) {
509							if is_main_key {
510								deleted_payload = Some(payload);
511							}
512							keys.remove(&key_to_delete);
513							if keys.len() == 0 {
514								// The node is empty, we can delete it
515								store.remove_node(node.id, node.key).await?;
516								// Check if this was the root node
517								if Some(node_id) == self.state.root {
518									self.state.set_root(None);
519								}
520							} else {
521								#[cfg(debug_assertions)]
522								node.n.check();
523								store.set_node(node, true).await?;
524							}
525						} else {
526							store.set_node(node, false).await?;
527						}
528					}
529					BTreeNode::Internal(keys, children) => {
530						if let Some(payload) = keys.get(&key_to_delete) {
531							// CLRS: 2
532							#[cfg(debug_assertions)]
533							debug!(
534								"CLRS: 2 - node: {node_id} - key_to_delete: {} - k: {keys} - c: {children:?}",
535								String::from_utf8_lossy(&key_to_delete)
536							);
537							if is_main_key {
538								deleted_payload = Some(payload);
539							}
540							next_node.replace(
541								self.deleted_from_internal(
542									tx,
543									store,
544									keys,
545									children,
546									key_to_delete,
547								)
548								.await?,
549							);
550							#[cfg(debug_assertions)]
551							node.n.check();
552							store.set_node(node, true).await?;
553						} else {
554							// CLRS: 3
555							#[cfg(debug_assertions)]
556							debug!(
557								"CLRS: 3 - node: {node_id} - key_to_delete: {} - keys: {keys}",
558								String::from_utf8_lossy(&key_to_delete)
559							);
560							let (node_update, is_main_key, key_to_delete, next_stored_node) = self
561								.deleted_traversal(
562									tx,
563									store,
564									keys,
565									children,
566									key_to_delete,
567									is_main_key,
568								)
569								.await?;
570							if keys.len() == 0 {
571								if let Some(root_id) = self.state.root {
572									// Delete the old root node
573									if root_id != node.id {
574										return Err(fail!("BTree::delete"));
575									}
576								}
577								store.remove_node(node.id, node.key).await?;
578								self.state.set_root(Some(next_stored_node));
579							} else {
580								#[cfg(debug_assertions)]
581								node.n.check();
582								store.set_node(node, node_update).await?;
583							}
584							next_node.replace((is_main_key, key_to_delete, next_stored_node));
585						}
586					}
587				}
588			}
589		}
590		Ok(deleted_payload)
591	}
592
593	async fn deleted_from_internal(
594		&mut self,
595		tx: &Transaction,
596		store: &mut BTreeStore<BK>,
597		keys: &mut BK,
598		children: &mut Vec<NodeId>,
599		key_to_delete: Key,
600	) -> Result<(bool, Key, NodeId), Error> {
601		#[cfg(debug_assertions)]
602		debug!(
603			"Delete from internal - key_to_delete: {} - keys: {keys}",
604			String::from_utf8_lossy(&key_to_delete)
605		);
606		let left_idx = keys.get_child_idx(&key_to_delete);
607		let left_id = children[left_idx];
608		let mut left_node = store.get_node_mut(tx, left_id).await?;
609		// if the child y that precedes k in nodexx has at least t keys
610		if left_node.n.keys().len() >= self.state.minimum_degree {
611			// CLRS: 2a -> left_node is named `y` in the book
612			#[cfg(debug_assertions)]
613			debug!(
614				"CLRS: 2a - key_to_delete: {} - left: {left_node} - keys: {keys}",
615				String::from_utf8_lossy(&key_to_delete)
616			);
617			let (key_prim, payload_prim) = self.find_highest(tx, store, left_node).await?;
618			if keys.remove(&key_to_delete).is_none() {
619				#[cfg(debug_assertions)]
620				panic!("Remove key {} {} ", String::from_utf8(key_to_delete)?, keys);
621			}
622			if keys.insert(key_prim.clone(), payload_prim).is_some() {
623				#[cfg(debug_assertions)]
624				panic!("Insert key {} {} ", String::from_utf8(key_prim)?, keys);
625			}
626			return Ok((false, key_prim, left_id));
627		}
628
629		let right_idx = left_idx + 1;
630		let right_id = children[right_idx];
631		let right_node = store.get_node_mut(tx, right_id).await?;
632		if right_node.n.keys().len() >= self.state.minimum_degree {
633			// Cleanup 2a evaluation
634			store.set_node(left_node, false).await?;
635			// CLRS: 2b -> right_node is name `z` in the book
636			#[cfg(debug_assertions)]
637			debug!(
638				"CLRS: 2b - key_to_delete: {} - right: {right_node} - keys: {keys}",
639				String::from_utf8_lossy(&key_to_delete)
640			);
641			let (key_prim, payload_prim) = self.find_lowest(tx, store, right_node).await?;
642			if keys.remove(&key_to_delete).is_none() {
643				#[cfg(debug_assertions)]
644				panic!("Remove key {} {} ", String::from_utf8(key_to_delete)?, keys);
645			}
646			if keys.insert(key_prim.clone(), payload_prim).is_some() {
647				panic!("Insert key {} {} ", String::from_utf8(key_prim)?, keys);
648			}
649			return Ok((false, key_prim, right_id));
650		}
651
652		// CLRS: 2c
653		// Merge children
654		// The payload is set to 0. The payload does not matter, as the key will be deleted after anyway.
655		#[cfg(debug_assertions)]
656		{
657			left_node.n.check();
658			debug!("CLRS: 2c");
659		}
660		left_node.n.append(key_to_delete.clone(), 0, right_node.n)?;
661		#[cfg(debug_assertions)]
662		left_node.n.check();
663		store.set_node(left_node, true).await?;
664		store.remove_node(right_id, right_node.key).await?;
665		keys.remove(&key_to_delete);
666		children.remove(right_idx);
667		Ok((false, key_to_delete, left_id))
668	}
669
670	async fn find_highest(
671		&mut self,
672		tx: &Transaction,
673		store: &mut BTreeStore<BK>,
674		node: StoredNode<BTreeNode<BK>>,
675	) -> Result<(Key, Payload), Error> {
676		let mut next_node = Some(node);
677		while let Some(node) = next_node.take() {
678			match &node.n {
679				BTreeNode::Internal(_, c) => {
680					let id = c[c.len() - 1];
681					store.set_node(node, false).await?;
682					let node = store.get_node_mut(tx, id).await?;
683					next_node.replace(node);
684				}
685				BTreeNode::Leaf(k) => {
686					let (key, payload) =
687						k.get_last_key().ok_or_else(|| fail!("BTree::find_highest(1)"))?;
688					#[cfg(debug_assertions)]
689					debug!("Find highest: {} - node: {}", String::from_utf8_lossy(&key), node);
690					store.set_node(node, false).await?;
691					return Ok((key, payload));
692				}
693			}
694		}
695		Err(fail!("BTree::find_highest(2)"))
696	}
697
698	async fn find_lowest(
699		&mut self,
700		tx: &Transaction,
701		store: &mut BTreeStore<BK>,
702		node: StoredNode<BTreeNode<BK>>,
703	) -> Result<(Key, Payload), Error> {
704		let mut next_node = Some(node);
705		while let Some(node) = next_node.take() {
706			match &node.n {
707				BTreeNode::Internal(_, c) => {
708					let id = c[0];
709					store.set_node(node, false).await?;
710					let node = store.get_node_mut(tx, id).await?;
711					next_node.replace(node);
712				}
713				BTreeNode::Leaf(k) => {
714					let (key, payload) =
715						k.get_first_key().ok_or_else(|| fail!("BTree::find_lowest(1)"))?;
716					#[cfg(debug_assertions)]
717					debug!("Find lowest: {} - node: {}", String::from_utf8_lossy(&key), node.id);
718					store.set_node(node, false).await?;
719					return Ok((key, payload));
720				}
721			}
722		}
723		Err(fail!("BTree::find_lowest(2)"))
724	}
725
726	async fn deleted_traversal(
727		&mut self,
728		tx: &Transaction,
729		store: &mut BTreeStore<BK>,
730		keys: &mut BK,
731		children: &mut Vec<NodeId>,
732		key_to_delete: Key,
733		is_main_key: bool,
734	) -> Result<(bool, bool, Key, NodeId), Error> {
735		// CLRS 3 Determine the root x.ci that must contain k
736		let child_idx = keys.get_child_idx(&key_to_delete);
737		let child_id = match children.get(child_idx) {
738			None => return Err(Error::CorruptedIndex("deleted_traversal:invalid_child_idx")),
739			Some(&child_id) => child_id,
740		};
741		#[cfg(debug_assertions)]
742		debug!(
743			"CLRS: 3 - key_to_delete: {} - child_id: {child_id}",
744			String::from_utf8_lossy(&key_to_delete)
745		);
746		let child_stored_node = store.get_node_mut(tx, child_id).await?;
747		// If x.ci has only t-1 keys, execute 3a or 3b
748		if child_stored_node.n.keys().len() < self.state.minimum_degree {
749			if child_idx < children.len() - 1 {
750				let right_child_stored_node =
751					store.get_node_mut(tx, children[child_idx + 1]).await?;
752				return if right_child_stored_node.n.keys().len() >= self.state.minimum_degree {
753					#[cfg(debug_assertions)]
754					debug!(
755						"CLRS: 3a - xci_child: {child_stored_node} - right_sibling_child: {right_child_stored_node}"
756					);
757					Self::delete_adjust_successor(
758						store,
759						keys,
760						child_idx,
761						key_to_delete,
762						is_main_key,
763						child_stored_node,
764						right_child_stored_node,
765					)
766					.await
767				} else {
768					// CLRS 3b successor
769					#[cfg(debug_assertions)]
770					debug!("CLRS: 3b merge - keys: {keys} - xci_child: {child_stored_node} - right_sibling_child: {right_child_stored_node}");
771					Self::merge_nodes(
772						store,
773						keys,
774						children,
775						child_idx,
776						key_to_delete,
777						is_main_key,
778						child_stored_node,
779						right_child_stored_node,
780					)
781					.await
782				};
783			}
784
785			// left child (predecessor)
786			if child_idx > 0 {
787				let child_idx = child_idx - 1;
788				let left_child_stored_node = store.get_node_mut(tx, children[child_idx]).await?;
789				return if left_child_stored_node.n.keys().len() >= self.state.minimum_degree {
790					#[cfg(debug_assertions)]
791					debug!("CLRS: 3a - left_sibling_child: {left_child_stored_node} - xci_child: {child_stored_node}",);
792					Self::delete_adjust_predecessor(
793						store,
794						keys,
795						child_idx,
796						key_to_delete,
797						is_main_key,
798						left_child_stored_node,
799						child_stored_node,
800					)
801					.await
802				} else {
803					// CLRS 3b predecessor
804					#[cfg(debug_assertions)]
805					debug!("CLRS: 3b merge - keys: {keys} - left_sibling_child: {left_child_stored_node} - xci_child: {child_stored_node}");
806					Self::merge_nodes(
807						store,
808						keys,
809						children,
810						child_idx,
811						key_to_delete,
812						is_main_key,
813						left_child_stored_node,
814						child_stored_node,
815					)
816					.await
817				};
818			}
819		}
820
821		store.set_node(child_stored_node, false).await?;
822		Ok((false, true, key_to_delete, child_id))
823	}
824
825	async fn delete_adjust_successor(
826		store: &mut BTreeStore<BK>,
827		keys: &mut BK,
828		child_idx: usize,
829		key_to_delete: Key,
830		is_main_key: bool,
831		mut child_stored_node: BStoredNode<BK>,
832		mut right_child_stored_node: BStoredNode<BK>,
833	) -> Result<(bool, bool, Key, NodeId), Error> {
834		let (ascending_key, ascending_payload) =
835			right_child_stored_node
836				.n
837				.keys()
838				.get_first_key()
839				.ok_or(Error::CorruptedIndex("BTree::delete_adjust_successor(1)"))?;
840		right_child_stored_node.n.keys_mut().remove(&ascending_key);
841		let descending_key = keys
842			.get_key(child_idx)
843			.ok_or(Error::CorruptedIndex("BTree::delete_adjust_successor(2)"))?;
844		let descending_payload = keys
845			.remove(&descending_key)
846			.ok_or(Error::CorruptedIndex("BTree::delete_adjust_successor(3)"))?;
847		if child_stored_node.n.keys_mut().insert(descending_key, descending_payload).is_some() {
848			#[cfg(debug_assertions)]
849			panic!("Duplicate insert key {} ", child_stored_node.n);
850		}
851		if let BTreeNode::Internal(_, rc) = &mut right_child_stored_node.n {
852			if let BTreeNode::Internal(_, lc) = &mut child_stored_node.n {
853				lc.push(rc.remove(0))
854			}
855		}
856		if keys.insert(ascending_key, ascending_payload).is_some() {
857			#[cfg(debug_assertions)]
858			panic!("Duplicate insert key {} ", keys);
859		}
860		let child_id = child_stored_node.id;
861		#[cfg(debug_assertions)]
862		{
863			child_stored_node.n.check();
864			right_child_stored_node.n.check();
865		}
866		store.set_node(child_stored_node, true).await?;
867		store.set_node(right_child_stored_node, true).await?;
868		Ok((true, is_main_key, key_to_delete, child_id))
869	}
870
871	async fn delete_adjust_predecessor(
872		store: &mut BTreeStore<BK>,
873		keys: &mut BK,
874		child_idx: usize,
875		key_to_delete: Key,
876		is_main_key: bool,
877		mut left_child_stored_node: BStoredNode<BK>,
878		mut child_stored_node: BStoredNode<BK>,
879	) -> Result<(bool, bool, Key, NodeId), Error> {
880		let (ascending_key, ascending_payload) = left_child_stored_node
881			.n
882			.keys()
883			.get_last_key()
884			.ok_or(Error::CorruptedIndex("BTree::delete_adjust_predecessor(1)"))?;
885		if left_child_stored_node.n.keys_mut().remove(&ascending_key).is_none() {
886			#[cfg(debug_assertions)]
887			panic!("Remove key {} {}", String::from_utf8(ascending_key)?, left_child_stored_node.n);
888		}
889		let descending_key = keys
890			.get_key(child_idx)
891			.ok_or(Error::CorruptedIndex("BTree::delete_adjust_predecessor(2)"))?;
892		let descending_payload = keys
893			.remove(&descending_key)
894			.ok_or(Error::CorruptedIndex("BTree::delete_adjust_predecessor(3)"))?;
895		if child_stored_node.n.keys_mut().insert(descending_key, descending_payload).is_some() {
896			#[cfg(debug_assertions)]
897			panic!("Insert key {}", child_stored_node.n);
898		}
899		if let BTreeNode::Internal(_, lc) = &mut left_child_stored_node.n {
900			if let BTreeNode::Internal(_, rc) = &mut child_stored_node.n {
901				rc.insert(0, lc.remove(lc.len() - 1));
902			}
903		}
904		if keys.insert(ascending_key, ascending_payload).is_some() {
905			#[cfg(debug_assertions)]
906			panic!("Insert key {}", keys);
907		}
908		let child_id = child_stored_node.id;
909		#[cfg(debug_assertions)]
910		{
911			child_stored_node.n.check();
912			left_child_stored_node.n.check();
913			debug!("{}", left_child_stored_node);
914			debug!("{}", child_stored_node);
915		}
916		store.set_node(child_stored_node, true).await?;
917		store.set_node(left_child_stored_node, true).await?;
918		Ok((true, is_main_key, key_to_delete, child_id))
919	}
920
921	#[allow(clippy::too_many_arguments)]
922	async fn merge_nodes(
923		store: &mut BTreeStore<BK>,
924		keys: &mut BK,
925		children: &mut Vec<NodeId>,
926		child_idx: usize,
927		key_to_delete: Key,
928		is_main_key: bool,
929		mut left_child: BStoredNode<BK>,
930		right_child: BStoredNode<BK>,
931	) -> Result<(bool, bool, Key, NodeId), Error> {
932		#[cfg(debug_assertions)]
933		debug!("Keys: {keys}");
934		let descending_key =
935			keys.get_key(child_idx).ok_or(Error::CorruptedIndex("BTree::merge_nodes(1)"))?;
936		let descending_payload =
937			keys.remove(&descending_key).ok_or(Error::CorruptedIndex("BTree::merge_nodes(2)"))?;
938		#[cfg(debug_assertions)]
939		debug!("descending_key: {}", String::from_utf8_lossy(&descending_key));
940		children.remove(child_idx + 1);
941		let left_id = left_child.id;
942		if left_child.n.append(descending_key, descending_payload, right_child.n)?.is_some() {
943			#[cfg(debug_assertions)]
944			panic!("Key already present");
945		}
946		#[cfg(debug_assertions)]
947		left_child.n.check();
948		store.set_node(left_child, true).await?;
949		store.remove_node(right_child.id, right_child.key).await?;
950		Ok((true, is_main_key, key_to_delete, left_id))
951	}
952
953	pub(in crate::idx) async fn statistics(
954		&self,
955		tx: &Transaction,
956		store: &BTreeStore<BK>,
957	) -> Result<BStatistics, Error> {
958		let mut stats = BStatistics::default();
959		#[cfg(debug_assertions)]
960		let mut keys = HashSet::default();
961		let mut node_queue = VecDeque::new();
962		if let Some(node_id) = self.state.root {
963			node_queue.push_front((node_id, 1));
964		}
965		while let Some((node_id, depth)) = node_queue.pop_front() {
966			let stored = store.get_node(tx, node_id).await?;
967			stats.keys_count += stored.n.keys().len() as u64;
968			if depth > stats.max_depth {
969				stats.max_depth = depth;
970			}
971			#[cfg(debug_assertions)]
972			{
973				let k = stored.n.keys();
974				for i in 0..k.len() {
975					if let Some(k) = k.get_key(i as usize) {
976						if !keys.insert(k.clone()) {
977							panic!("Duplicate key: {}", String::from_utf8(k)?);
978						}
979					}
980				}
981			}
982			stats.nodes_count += 1;
983			stats.total_size += stored.size as u64;
984			if let BTreeNode::Internal(_, children) = &stored.n {
985				let depth = depth + 1;
986				for child_id in children.iter() {
987					node_queue.push_front((*child_id, depth));
988				}
989			};
990		}
991		Ok(stats)
992	}
993}
994
995#[cfg(test)]
996mod tests {
997	use crate::err::Error;
998	use crate::idx::trees::bkeys::{BKeys, FstKeys, TrieKeys};
999	use crate::idx::trees::btree::{
1000		BState, BStatistics, BStoredNode, BTree, BTreeNode, BTreeStore, Payload,
1001	};
1002	use crate::idx::trees::store::{NodeId, TreeNode, TreeNodeProvider};
1003	use crate::idx::VersionedStore;
1004	use crate::kvs::{Datastore, Key, LockType::*, Transaction, TransactionType};
1005	use rand::prelude::SliceRandom;
1006	use rand::thread_rng;
1007	use std::cmp::Ordering;
1008	use std::collections::{BTreeMap, VecDeque};
1009	use std::fmt::Debug;
1010	use std::sync::Arc;
1011	use test_log::test;
1012
1013	#[test]
1014	fn test_btree_state_serde() {
1015		let s = BState::new(3);
1016		let val = VersionedStore::try_into(&s).unwrap();
1017		let s: BState = VersionedStore::try_from(val).unwrap();
1018		assert_eq!(s.minimum_degree, 3);
1019		assert_eq!(s.root, None);
1020		assert_eq!(s.next_node_id, 0);
1021	}
1022
1023	#[test]
1024	fn test_node_serde_internal() {
1025		let mut node = BTreeNode::Internal(FstKeys::default(), vec![]);
1026		node.prepare_save();
1027		let val = node.try_into_val().unwrap();
1028		let _: BTreeNode<FstKeys> = BTreeNode::try_from_val(val).unwrap();
1029	}
1030
1031	#[test]
1032	fn test_node_serde_leaf() {
1033		let mut node = BTreeNode::Leaf(TrieKeys::default());
1034		node.prepare_save();
1035		let val = node.try_into_val().unwrap();
1036		let _: BTreeNode<TrieKeys> = BTreeNode::try_from_val(val).unwrap();
1037	}
1038
1039	async fn insertions_test<F, BK>(
1040		tx: Transaction,
1041		mut st: BTreeStore<BK>,
1042		t: &mut BTree<BK>,
1043		samples_size: usize,
1044		sample_provider: F,
1045	) where
1046		F: Fn(usize) -> (Key, Payload),
1047		BK: BKeys + Debug + Clone,
1048	{
1049		for i in 0..samples_size {
1050			let (key, payload) = sample_provider(i);
1051			// Insert the sample
1052			t.insert(&tx, &mut st, key, payload).await.unwrap();
1053		}
1054		st.finish(&tx).await.unwrap();
1055		tx.commit().await.unwrap();
1056	}
1057
1058	async fn check_insertions<F, BK>(
1059		tx: Transaction,
1060		st: BTreeStore<BK>,
1061		t: &mut BTree<BK>,
1062		samples_size: usize,
1063		sample_provider: F,
1064	) where
1065		F: Fn(usize) -> (Key, Payload),
1066		BK: BKeys + Debug + Clone,
1067	{
1068		for i in 0..samples_size {
1069			let (key, payload) = sample_provider(i);
1070			assert_eq!(t.search(&tx, &st, &key).await.unwrap(), Some(payload));
1071		}
1072		tx.cancel().await.unwrap();
1073	}
1074
1075	fn get_key_value(idx: usize) -> (Key, Payload) {
1076		(format!("{}", idx).into(), (idx * 10) as Payload)
1077	}
1078
1079	async fn new_operation_fst<BK>(
1080		ds: &Datastore,
1081		t: &BTree<BK>,
1082		tt: TransactionType,
1083		cache_size: usize,
1084	) -> (Transaction, BTreeStore<FstKeys>)
1085	where
1086		BK: BKeys + Debug + Clone,
1087	{
1088		let tx = ds.transaction(tt, Optimistic).await.unwrap();
1089		let st = tx
1090			.index_caches()
1091			.get_store_btree_fst(TreeNodeProvider::Debug, t.state.generation, tt, cache_size)
1092			.await
1093			.unwrap();
1094		(tx, st)
1095	}
1096
1097	async fn new_operation_trie<BK>(
1098		ds: &Datastore,
1099		t: &BTree<BK>,
1100		tt: TransactionType,
1101		cache_size: usize,
1102	) -> (Transaction, BTreeStore<TrieKeys>)
1103	where
1104		BK: BKeys + Debug + Clone,
1105	{
1106		let tx = ds.transaction(tt, Optimistic).await.unwrap();
1107		let st = tx
1108			.index_caches()
1109			.get_store_btree_trie(TreeNodeProvider::Debug, t.state.generation, tt, cache_size)
1110			.await
1111			.unwrap();
1112		(tx, st)
1113	}
1114
1115	#[test(tokio::test)]
1116	async fn test_btree_fst_small_order_sequential_insertions() {
1117		let ds = Datastore::new("memory").await.unwrap();
1118
1119		let mut t = BTree::new(BState::new(5));
1120
1121		{
1122			let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Write, 20).await;
1123			insertions_test::<_, FstKeys>(tx, st, &mut t, 100, get_key_value).await;
1124		}
1125
1126		{
1127			let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await;
1128			check_insertions(tx, st, &mut t, 100, get_key_value).await;
1129		}
1130
1131		{
1132			let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await;
1133			assert_eq!(
1134				t.statistics(&tx, &st).await.unwrap(),
1135				BStatistics {
1136					keys_count: 100,
1137					max_depth: 3,
1138					nodes_count: 22,
1139					total_size: 1691,
1140				}
1141			);
1142			tx.cancel().await.unwrap();
1143		}
1144	}
1145
1146	#[test(tokio::test)]
1147	async fn test_btree_trie_small_order_sequential_insertions() {
1148		let ds = Datastore::new("memory").await.unwrap();
1149		let mut t = BTree::new(BState::new(6));
1150
1151		{
1152			let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await;
1153			insertions_test::<_, TrieKeys>(tx, st, &mut t, 100, get_key_value).await;
1154		}
1155
1156		{
1157			let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await;
1158			check_insertions(tx, st, &mut t, 100, get_key_value).await;
1159		}
1160
1161		{
1162			let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await;
1163			assert_eq!(
1164				t.statistics(&tx, &st).await.unwrap(),
1165				BStatistics {
1166					keys_count: 100,
1167					max_depth: 3,
1168					nodes_count: 18,
1169					total_size: 1656,
1170				}
1171			);
1172			tx.cancel().await.unwrap();
1173		}
1174	}
1175
1176	#[test(tokio::test)]
1177	async fn test_btree_fst_small_order_random_insertions() {
1178		let ds = Datastore::new("memory").await.unwrap();
1179		let mut t = BTree::new(BState::new(8));
1180
1181		let mut samples: Vec<usize> = (0..100).collect();
1182		let mut rng = thread_rng();
1183		samples.shuffle(&mut rng);
1184
1185		{
1186			let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Write, 20).await;
1187			insertions_test(tx, st, &mut t, 100, |i| get_key_value(samples[i])).await;
1188		}
1189
1190		{
1191			let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await;
1192			check_insertions(tx, st, &mut t, 100, |i| get_key_value(samples[i])).await;
1193		}
1194
1195		{
1196			let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await;
1197			let s = t.statistics(&tx, &st).await.unwrap();
1198			assert_eq!(s.keys_count, 100);
1199			tx.cancel().await.unwrap();
1200		}
1201	}
1202
1203	#[test(tokio::test)]
1204	async fn test_btree_trie_small_order_random_insertions() {
1205		let ds = Datastore::new("memory").await.unwrap();
1206		let mut t = BTree::new(BState::new(75));
1207
1208		let mut samples: Vec<usize> = (0..100).collect();
1209		let mut rng = thread_rng();
1210		samples.shuffle(&mut rng);
1211
1212		{
1213			let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await;
1214			insertions_test(tx, st, &mut t, 100, |i| get_key_value(samples[i])).await;
1215		}
1216
1217		{
1218			let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await;
1219			check_insertions(tx, st, &mut t, 100, |i| get_key_value(samples[i])).await;
1220		}
1221
1222		{
1223			let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await;
1224			let s = t.statistics(&tx, &st).await.unwrap();
1225			assert_eq!(s.keys_count, 100);
1226			tx.cancel().await.unwrap();
1227		}
1228	}
1229
1230	#[test(tokio::test)]
1231	async fn test_btree_fst_keys_large_order_sequential_insertions() {
1232		let ds = Datastore::new("memory").await.unwrap();
1233		let mut t = BTree::new(BState::new(60));
1234
1235		{
1236			let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Write, 20).await;
1237			insertions_test(tx, st, &mut t, 10000, get_key_value).await;
1238		}
1239
1240		{
1241			let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await;
1242			check_insertions(tx, st, &mut t, 10000, get_key_value).await;
1243		}
1244
1245		{
1246			let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await;
1247			assert_eq!(
1248				t.statistics(&tx, &st).await.unwrap(),
1249				BStatistics {
1250					keys_count: 10000,
1251					max_depth: 3,
1252					nodes_count: 158,
1253					total_size: 57486,
1254				}
1255			);
1256			tx.cancel().await.unwrap();
1257		}
1258	}
1259
1260	async fn test_btree_trie_keys_large_order_sequential_insertions(cache_size: usize) {
1261		let ds = Datastore::new("memory").await.unwrap();
1262		let mut t = BTree::new(BState::new(60));
1263
1264		{
1265			let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Write, cache_size).await;
1266			insertions_test(tx, st, &mut t, 10000, get_key_value).await;
1267		}
1268
1269		{
1270			let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, cache_size).await;
1271			check_insertions(tx, st, &mut t, 10000, get_key_value).await;
1272		}
1273
1274		{
1275			let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, cache_size).await;
1276			assert_eq!(
1277				t.statistics(&tx, &st).await.unwrap(),
1278				BStatistics {
1279					keys_count: 10000,
1280					max_depth: 3,
1281					nodes_count: 158,
1282					total_size: 75206,
1283				}
1284			);
1285			tx.cancel().await.unwrap();
1286		}
1287	}
1288
1289	#[test(tokio::test)]
1290	async fn test_btree_trie_keys_large_order_sequential_insertions_lru_cache() {
1291		test_btree_trie_keys_large_order_sequential_insertions(20).await
1292	}
1293
1294	#[test(tokio::test)]
1295	async fn test_btree_trie_keys_large_order_sequential_insertions_full_cache() {
1296		test_btree_trie_keys_large_order_sequential_insertions(0).await
1297	}
1298
1299	const REAL_WORLD_TERMS: [&str; 30] = [
1300		"the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog", "the", "fast",
1301		"fox", "jumped", "over", "the", "lazy", "dog", "the", "dog", "sat", "there", "and", "did",
1302		"nothing", "the", "other", "animals", "sat", "there", "watching",
1303	];
1304
1305	async fn test_btree_fst_real_world_insertions(default_minimum_degree: u32) -> BStatistics {
1306		let ds = Datastore::new("memory").await.unwrap();
1307		let mut t = BTree::new(BState::new(default_minimum_degree));
1308
1309		{
1310			let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Write, 20).await;
1311			insertions_test(tx, st, &mut t, REAL_WORLD_TERMS.len(), |i| {
1312				(REAL_WORLD_TERMS[i].as_bytes().to_vec(), i as Payload)
1313			})
1314			.await;
1315		}
1316
1317		let (tx, st) = new_operation_fst(&ds, &t, TransactionType::Read, 20).await;
1318		let statistics = t.statistics(&tx, &st).await.unwrap();
1319		tx.cancel().await.unwrap();
1320		statistics
1321	}
1322
1323	async fn test_btree_trie_real_world_insertions(default_minimum_degree: u32) -> BStatistics {
1324		let ds = Datastore::new("memory").await.unwrap();
1325		let mut t = BTree::new(BState::new(default_minimum_degree));
1326
1327		{
1328			let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await;
1329			insertions_test(tx, st, &mut t, REAL_WORLD_TERMS.len(), |i| {
1330				(REAL_WORLD_TERMS[i].as_bytes().to_vec(), i as Payload)
1331			})
1332			.await;
1333		}
1334
1335		let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await;
1336		let statistics = t.statistics(&tx, &st).await.unwrap();
1337		tx.cancel().await.unwrap();
1338
1339		statistics
1340	}
1341
1342	#[test(tokio::test)]
1343	async fn test_btree_fst_keys_real_world_insertions_small_order() {
1344		let expected = BStatistics {
1345			keys_count: 17,
1346			max_depth: 2,
1347			nodes_count: 5,
1348			total_size: 421,
1349		};
1350		let s = test_btree_fst_real_world_insertions(4).await;
1351		assert_eq!(s, expected);
1352	}
1353
1354	#[test(tokio::test)]
1355	async fn test_btree_fst_keys_real_world_insertions_large_order() {
1356		let expected = BStatistics {
1357			keys_count: 17,
1358			max_depth: 1,
1359			nodes_count: 1,
1360			total_size: 189,
1361		};
1362		let s = test_btree_fst_real_world_insertions(100).await;
1363		assert_eq!(s, expected);
1364	}
1365
1366	#[test(tokio::test)]
1367	async fn test_btree_trie_keys_real_world_insertions_small() {
1368		let expected = BStatistics {
1369			keys_count: 17,
1370			max_depth: 2,
1371			nodes_count: 3,
1372			total_size: 339,
1373		};
1374		let s = test_btree_trie_real_world_insertions(6).await;
1375		assert_eq!(s, expected);
1376	}
1377
1378	#[test(tokio::test)]
1379	async fn test_btree_trie_keys_real_world_insertions_large_order() {
1380		let expected = BStatistics {
1381			keys_count: 17,
1382			max_depth: 1,
1383			nodes_count: 1,
1384			total_size: 229,
1385		};
1386		let s = test_btree_trie_real_world_insertions(100).await;
1387		assert_eq!(s, expected);
1388	}
1389
1390	// This is the examples from the chapter B-Trees in CLRS:
1391	// https://en.wikipedia.org/wiki/Introduction_to_Algorithms
1392	const CLRS_EXAMPLE: [(&str, Payload); 23] = [
1393		("a", 1),
1394		("c", 3),
1395		("g", 7),
1396		("j", 10),
1397		("k", 11),
1398		("m", 13),
1399		("n", 14),
1400		("o", 15),
1401		("p", 16),
1402		("t", 20),
1403		("u", 21),
1404		("x", 24),
1405		("y", 25),
1406		("z", 26),
1407		("v", 22),
1408		("d", 4),
1409		("e", 5),
1410		("r", 18),
1411		("s", 19), // (a) Initial tree
1412		("b", 2),  // (b) B inserted
1413		("q", 17), // (c) Q inserted
1414		("l", 12), // (d) L inserted
1415		("f", 6),  // (e) F inserted
1416	];
1417
1418	#[test(tokio::test)]
1419	// This check node splitting. CLRS: Figure 18.7, page 498.
1420	async fn clrs_insertion_test() {
1421		let ds = Datastore::new("memory").await.unwrap();
1422		let mut t = BTree::<TrieKeys>::new(BState::new(3));
1423
1424		{
1425			let (tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await;
1426			for (key, payload) in CLRS_EXAMPLE {
1427				t.insert(&tx, &mut st, key.into(), payload).await.unwrap();
1428			}
1429			st.finish(&tx).await.unwrap();
1430			tx.commit().await.unwrap();
1431		}
1432
1433		let (tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await;
1434
1435		let s = t.statistics(&tx, &st).await.unwrap();
1436		assert_eq!(s.keys_count, 23);
1437		assert_eq!(s.max_depth, 3);
1438		assert_eq!(s.nodes_count, 10);
1439		// There should be one record per node
1440		assert_eq!(10, tx.scan(vec![]..vec![0xf], 100, None).await.unwrap().len());
1441
1442		let nodes_count = t
1443			.inspect_nodes(&tx, &mut st, |count, depth, node_id, node| match count {
1444				0 => {
1445					assert_eq!(depth, 1);
1446					assert_eq!(node_id, 7);
1447					check_is_internal_node(&node.n, vec![("p", 16)], vec![1, 8]);
1448				}
1449				1 => {
1450					assert_eq!(depth, 2);
1451					assert_eq!(node_id, 1);
1452					check_is_internal_node(
1453						&node.n,
1454						vec![("c", 3), ("g", 7), ("m", 13)],
1455						vec![0, 9, 2, 3],
1456					);
1457				}
1458				2 => {
1459					assert_eq!(depth, 2);
1460					assert_eq!(node_id, 8);
1461					check_is_internal_node(&node.n, vec![("t", 20), ("x", 24)], vec![4, 6, 5]);
1462				}
1463				3 => {
1464					assert_eq!(depth, 3);
1465					assert_eq!(node_id, 0);
1466					check_is_leaf_node(&node.n, vec![("a", 1), ("b", 2)]);
1467				}
1468				4 => {
1469					assert_eq!(depth, 3);
1470					assert_eq!(node_id, 9);
1471					check_is_leaf_node(&node.n, vec![("d", 4), ("e", 5), ("f", 6)]);
1472				}
1473				5 => {
1474					assert_eq!(depth, 3);
1475					assert_eq!(node_id, 2);
1476					check_is_leaf_node(&node.n, vec![("j", 10), ("k", 11), ("l", 12)]);
1477				}
1478				6 => {
1479					assert_eq!(depth, 3);
1480					assert_eq!(node_id, 3);
1481					check_is_leaf_node(&node.n, vec![("n", 14), ("o", 15)]);
1482				}
1483				7 => {
1484					assert_eq!(depth, 3);
1485					assert_eq!(node_id, 4);
1486					check_is_leaf_node(&node.n, vec![("q", 17), ("r", 18), ("s", 19)]);
1487				}
1488				8 => {
1489					assert_eq!(depth, 3);
1490					assert_eq!(node_id, 6);
1491					check_is_leaf_node(&node.n, vec![("u", 21), ("v", 22)]);
1492				}
1493				9 => {
1494					assert_eq!(depth, 3);
1495					assert_eq!(node_id, 5);
1496					check_is_leaf_node(&node.n, vec![("y", 25), ("z", 26)]);
1497				}
1498				_ => panic!("This node should not exist {}", count),
1499			})
1500			.await
1501			.unwrap();
1502		assert_eq!(nodes_count, 10);
1503		tx.cancel().await.unwrap();
1504	}
1505
1506	async fn check_finish_commit<BK>(
1507		t: &mut BTree<BK>,
1508		mut st: BTreeStore<BK>,
1509		tx: Transaction,
1510		mut gen: u64,
1511		info: String,
1512	) -> Result<u64, Error>
1513	where
1514		BK: BKeys + Clone + Debug,
1515	{
1516		if st.finish(&tx).await?.is_some() {
1517			t.state.generation += 1;
1518		}
1519		gen += 1;
1520		assert_eq!(t.state.generation, gen, "{}", info);
1521		tx.commit().await?;
1522		Ok(gen)
1523	}
1524
1525	// This check the possible deletion cases. CRLS, Figure 18.8, pages 500-501
1526	#[test(tokio::test)]
1527	async fn test_btree_clrs_deletion_test() -> Result<(), Error> {
1528		let ds = Datastore::new("memory").await?;
1529		let mut t = BTree::<TrieKeys>::new(BState::new(3));
1530		let mut check_generation = 0;
1531		{
1532			let (tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await;
1533			for (key, payload) in CLRS_EXAMPLE {
1534				t.insert(&tx, &mut st, key.into(), payload).await?;
1535			}
1536			check_generation = check_finish_commit(
1537				&mut t,
1538				st,
1539				tx,
1540				check_generation,
1541				"Insert CLRS example".to_string(),
1542			)
1543			.await?;
1544		}
1545
1546		{
1547			let mut key_count = CLRS_EXAMPLE.len() as u64;
1548			for (key, payload) in [("f", 6), ("m", 13), ("g", 7), ("d", 4), ("b", 2)] {
1549				{
1550					let (tx, mut st) =
1551						new_operation_trie(&ds, &t, TransactionType::Write, 20).await;
1552					debug!("Delete {}", key);
1553					assert_eq!(t.delete(&tx, &mut st, key.into()).await?, Some(payload));
1554					check_generation = check_finish_commit(
1555						&mut t,
1556						st,
1557						tx,
1558						check_generation,
1559						format!("Delete {key}"),
1560					)
1561					.await?;
1562				}
1563				key_count -= 1;
1564				{
1565					let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await;
1566					let s = t.statistics(&tx, &st).await?;
1567					assert_eq!(s.keys_count, key_count);
1568				}
1569			}
1570		}
1571
1572		let (tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await;
1573
1574		let s = t.statistics(&tx, &st).await.unwrap();
1575		assert_eq!(s.keys_count, 18);
1576		assert_eq!(s.max_depth, 2);
1577		assert_eq!(s.nodes_count, 7);
1578		// There should be one record per node
1579		assert_eq!(7, tx.scan(vec![]..vec![0xf], 100, None).await.unwrap().len());
1580
1581		let nodes_count = t
1582			.inspect_nodes(&tx, &mut st, |count, depth, node_id, node| match count {
1583				0 => {
1584					assert_eq!(depth, 1);
1585					assert_eq!(node_id, 1);
1586					check_is_internal_node(
1587						&node.n,
1588						vec![("e", 5), ("l", 12), ("p", 16), ("t", 20), ("x", 24)],
1589						vec![0, 9, 3, 4, 6, 5],
1590					);
1591				}
1592				1 => {
1593					assert_eq!(depth, 2);
1594					assert_eq!(node_id, 0);
1595					check_is_leaf_node(&node.n, vec![("a", 1), ("c", 3)]);
1596				}
1597				2 => {
1598					assert_eq!(depth, 2);
1599					assert_eq!(node_id, 9);
1600					check_is_leaf_node(&node.n, vec![("j", 10), ("k", 11)]);
1601				}
1602				3 => {
1603					assert_eq!(depth, 2);
1604					assert_eq!(node_id, 3);
1605					check_is_leaf_node(&node.n, vec![("n", 14), ("o", 15)]);
1606				}
1607				4 => {
1608					assert_eq!(depth, 2);
1609					assert_eq!(node_id, 4);
1610					check_is_leaf_node(&node.n, vec![("q", 17), ("r", 18), ("s", 19)]);
1611				}
1612				5 => {
1613					assert_eq!(depth, 2);
1614					assert_eq!(node_id, 6);
1615					check_is_leaf_node(&node.n, vec![("u", 21), ("v", 22)]);
1616				}
1617				6 => {
1618					assert_eq!(depth, 2);
1619					assert_eq!(node_id, 5);
1620					check_is_leaf_node(&node.n, vec![("y", 25), ("z", 26)]);
1621				}
1622				_ => panic!("This node should not exist {}", count),
1623			})
1624			.await
1625			.unwrap();
1626		assert_eq!(nodes_count, 7);
1627		tx.cancel().await?;
1628		Ok(())
1629	}
1630
1631	// This check the possible deletion cases. CRLS, Figure 18.8, pages 500-501
1632	#[test(tokio::test)]
1633	async fn test_btree_fill_and_empty() -> Result<(), Error> {
1634		let ds = Datastore::new("memory").await?;
1635		let mut t = BTree::<TrieKeys>::new(BState::new(3));
1636
1637		let mut expected_keys = BTreeMap::new();
1638
1639		let mut check_generation = 0;
1640		{
1641			let (tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await;
1642			for (key, payload) in CLRS_EXAMPLE {
1643				expected_keys.insert(key.to_string(), payload);
1644				t.insert(&tx, &mut st, key.into(), payload).await?;
1645				let (_, tree_keys) = check_btree_properties(&t, &tx, &mut st).await?;
1646				assert_eq!(expected_keys, tree_keys);
1647			}
1648			check_generation = check_finish_commit(
1649				&mut t,
1650				st,
1651				tx,
1652				check_generation,
1653				"Insert CLRS example".to_string(),
1654			)
1655			.await?;
1656		}
1657
1658		{
1659			let (tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await;
1660			print_tree(&tx, &mut st, &t).await;
1661			tx.cancel().await?;
1662		}
1663
1664		for (key, _) in CLRS_EXAMPLE {
1665			debug!("------------------------");
1666			debug!("Delete {}", key);
1667			{
1668				let (tx, mut st) = new_operation_trie(&ds, &t, TransactionType::Write, 20).await;
1669				assert!(t.delete(&tx, &mut st, key.into()).await?.is_some());
1670				expected_keys.remove(key);
1671				let (_, tree_keys) = check_btree_properties(&t, &tx, &mut st).await?;
1672				assert_eq!(expected_keys, tree_keys);
1673				check_generation = check_finish_commit(
1674					&mut t,
1675					st,
1676					tx,
1677					check_generation,
1678					format!("Delete CLRS example {key}"),
1679				)
1680				.await?;
1681			}
1682
1683			// Check that every expected keys are still found in the tree
1684			{
1685				let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await;
1686				for (key, payload) in &expected_keys {
1687					assert_eq!(
1688						t.search(&tx, &st, &key.as_str().into()).await?,
1689						Some(*payload),
1690						"Can't find: {key}",
1691					)
1692				}
1693				tx.cancel().await?;
1694			}
1695		}
1696
1697		let (tx, st) = new_operation_trie(&ds, &t, TransactionType::Read, 20).await;
1698		let s = t.statistics(&tx, &st).await?;
1699		assert_eq!(s.keys_count, 0);
1700		assert_eq!(s.max_depth, 0);
1701		assert_eq!(s.nodes_count, 0);
1702		// There should not be any record in the database
1703		assert_eq!(0, tx.scan(vec![]..vec![0xf], 100, None).await.unwrap().len());
1704		tx.cancel().await?;
1705		Ok(())
1706	}
1707
1708	#[test(tokio::test)]
1709	async fn test_delete_adjust() -> Result<(), Error> {
1710		let ds = Datastore::new("memory").await?;
1711		let mut t = BTree::<FstKeys>::new(BState::new(3));
1712
1713		let terms = [
1714			"aliquam",
1715			"delete",
1716			"if",
1717			"from",
1718			"Docusaurus",
1719			"amet,",
1720			"don't",
1721			"And",
1722			"interactive",
1723			"well!",
1724			"supports",
1725			"ultricies.",
1726			"Fusce",
1727			"consequat.",
1728			"just",
1729			"use",
1730			"elementum",
1731			"term",
1732			"blogging",
1733			"to",
1734			"want",
1735			"added",
1736			"Lorem",
1737			"ipsum",
1738			"blog:",
1739			"MDX.",
1740			"posts.",
1741			"features",
1742			"posts",
1743			"features,",
1744			"truncate",
1745			"images:",
1746			"Long",
1747			"Pellentesque",
1748			"authors.yml.",
1749			"filenames,",
1750			"such",
1751			"co-locate",
1752			"you",
1753			"can",
1754			"the",
1755			"-->",
1756			"comment",
1757			"tags",
1758			"A",
1759			"React",
1760			"The",
1761			"adipiscing",
1762			"consectetur",
1763			"very",
1764			"this",
1765			"and",
1766			"sit",
1767			"directory,",
1768			"Regular",
1769			"Markdown",
1770			"Simply",
1771			"blog",
1772			"MDX",
1773			"list",
1774			"extracted",
1775			"summary",
1776			"amet",
1777			"plugin.",
1778			"your",
1779			"long",
1780			"First",
1781			"power",
1782			"post,",
1783			"convenient",
1784			"folders)",
1785			"of",
1786			"date",
1787			"powered",
1788			"2019-05-30-welcome.md",
1789			"view.",
1790			"are",
1791			"be",
1792			"<!--",
1793			"Welcome",
1794			"is",
1795			"2019-05-30-welcome/index.md",
1796			"by",
1797			"directory.",
1798			"folder",
1799			"Use",
1800			"search",
1801			"authors",
1802			"false",
1803			"as:",
1804			"tempor",
1805			"files",
1806			"config.",
1807			"dignissim",
1808			"as",
1809			"a",
1810			"in",
1811			"This",
1812			"authors.yml",
1813			"create",
1814			"dolor",
1815			"Enter",
1816			"support",
1817			"add",
1818			"eros",
1819			"post",
1820			"Post",
1821			"size",
1822			"(or",
1823			"rhoncus",
1824			"Blog",
1825			"limit",
1826			"elit.",
1827		];
1828		let mut keys = BTreeMap::new();
1829		{
1830			let (tx, mut st) = new_operation_fst(&ds, &t, TransactionType::Write, 100).await;
1831			for term in terms {
1832				t.insert(&tx, &mut st, term.into(), 0).await?;
1833				keys.insert(term.to_string(), 0);
1834				let (_, tree_keys) = check_btree_properties(&t, &tx, &mut st).await?;
1835				assert_eq!(keys, tree_keys);
1836			}
1837			st.finish(&tx).await?;
1838			tx.commit().await?;
1839		}
1840		{
1841			let (tx, mut st) = new_operation_fst(&ds, &t, TransactionType::Read, 100).await;
1842			print_tree(&tx, &mut st, &t).await;
1843		}
1844		{
1845			let (tx, mut st) = new_operation_fst(&ds, &t, TransactionType::Write, 100).await;
1846			for term in terms {
1847				debug!("Delete {term}");
1848				t.delete(&tx, &mut st, term.into()).await?;
1849				print_tree_mut(&tx, &mut st, &t).await;
1850				keys.remove(term);
1851				let (_, tree_keys) = check_btree_properties(&t, &tx, &mut st).await?;
1852				assert_eq!(keys, tree_keys);
1853			}
1854			st.finish(&tx).await?;
1855			tx.commit().await?;
1856		}
1857		{
1858			let (tx, mut st) = new_operation_fst(&ds, &t, TransactionType::Write, 100).await;
1859			assert_eq!(check_btree_properties(&t, &tx, &mut st).await?.0, 0);
1860			st.finish(&tx).await?;
1861			tx.cancel().await?;
1862		}
1863		Ok(())
1864	}
1865
1866	async fn check_btree_properties<BK>(
1867		t: &BTree<BK>,
1868		tx: &Transaction,
1869		st: &mut BTreeStore<BK>,
1870	) -> Result<(usize, BTreeMap<String, Payload>), Error>
1871	where
1872		BK: BKeys + Clone + Debug,
1873	{
1874		let mut unique_keys = BTreeMap::new();
1875		let n = t
1876			.inspect_nodes_mut(tx, st, |_, _, _, sn| {
1877				let keys = sn.n.keys();
1878				for i in 0..keys.len() {
1879					let key = keys.get_key(i as usize).unwrap_or_else(|| panic!("No key"));
1880					let payload = keys.get(&key).unwrap_or_else(|| panic!("No payload"));
1881					if unique_keys.insert(String::from_utf8(key).unwrap(), payload).is_some() {
1882						panic!("Non unique");
1883					}
1884				}
1885			})
1886			.await?;
1887		Ok((n, unique_keys))
1888	}
1889
1890	/////////////
1891	// HELPERS //
1892	/////////////
1893
1894	fn check_is_internal_node<BK>(
1895		node: &BTreeNode<BK>,
1896		expected_keys: Vec<(&str, i32)>,
1897		expected_children: Vec<NodeId>,
1898	) where
1899		BK: BKeys + Clone,
1900	{
1901		if let BTreeNode::Internal(keys, children) = node {
1902			check_keys(keys, expected_keys);
1903			assert_eq!(children, &expected_children, "The children are not matching");
1904		} else {
1905			panic!("An internal node was expected, we got a leaf node");
1906		}
1907	}
1908
1909	fn check_is_leaf_node<BK>(node: &BTreeNode<BK>, expected_keys: Vec<(&str, i32)>)
1910	where
1911		BK: BKeys + Clone,
1912	{
1913		if let BTreeNode::Leaf(keys) = node {
1914			check_keys(keys, expected_keys);
1915		} else {
1916			panic!("An internal node was expected, we got a leaf node");
1917		}
1918	}
1919
1920	async fn print_tree<BK>(tx: &Transaction, st: &mut BTreeStore<BK>, t: &BTree<BK>)
1921	where
1922		BK: BKeys + Debug + Clone,
1923	{
1924		debug!("----------------------------------");
1925		t.inspect_nodes(tx, st, |_count, depth, node_id, node| {
1926			debug!("depth: {} - node: {} - {}", depth, node_id, node.n);
1927		})
1928		.await
1929		.unwrap();
1930		debug!("----------------------------------");
1931	}
1932
1933	async fn print_tree_mut<BK>(tx: &Transaction, st: &mut BTreeStore<BK>, t: &BTree<BK>)
1934	where
1935		BK: BKeys + Debug + Clone,
1936	{
1937		debug!("----------------------------------");
1938		t.inspect_nodes_mut(tx, st, |_count, depth, node_id, node| {
1939			debug!("depth: {} - node: {} - {}", depth, node_id, node.n);
1940		})
1941		.await
1942		.unwrap();
1943		debug!("----------------------------------");
1944	}
1945
1946	fn check_keys<BK>(keys: &BK, expected_keys: Vec<(&str, i32)>)
1947	where
1948		BK: BKeys,
1949	{
1950		assert_eq!(keys.len() as usize, expected_keys.len(), "The number of keys does not match");
1951		for (key, payload) in expected_keys {
1952			assert_eq!(
1953				keys.get(&key.into()),
1954				Some(payload as Payload),
1955				"The key {} does not match",
1956				key
1957			);
1958		}
1959	}
1960
1961	impl<BK> BTree<BK>
1962	where
1963		BK: BKeys + Debug + Clone,
1964	{
1965		/// This is for debugging
1966		async fn inspect_nodes<F>(
1967			&self,
1968			tx: &Transaction,
1969			st: &mut BTreeStore<BK>,
1970			inspect_func: F,
1971		) -> Result<usize, Error>
1972		where
1973			F: Fn(usize, usize, NodeId, Arc<BStoredNode<BK>>),
1974		{
1975			let mut node_queue = VecDeque::new();
1976			if let Some(node_id) = self.state.root {
1977				node_queue.push_front((node_id, 1));
1978			}
1979			let mut count = 0;
1980			while let Some((node_id, depth)) = node_queue.pop_front() {
1981				let stored_node = st.get_node(tx, node_id).await?;
1982				if let BTreeNode::Internal(_, children) = &stored_node.n {
1983					let depth = depth + 1;
1984					for child_id in children {
1985						node_queue.push_back((*child_id, depth));
1986					}
1987				}
1988				inspect_func(count, depth, node_id, stored_node);
1989				count += 1;
1990			}
1991			Ok(count)
1992		}
1993
1994		/// This is for debugging
1995		async fn inspect_nodes_mut<F>(
1996			&self,
1997			tx: &Transaction,
1998			st: &mut BTreeStore<BK>,
1999			mut inspect_func: F,
2000		) -> Result<usize, Error>
2001		where
2002			F: FnMut(usize, usize, NodeId, &BStoredNode<BK>),
2003		{
2004			let mut node_queue = VecDeque::new();
2005			if let Some(node_id) = self.state.root {
2006				node_queue.push_front((node_id, 1, None::<Key>, None::<Key>));
2007			}
2008			let mut count = 0;
2009			while let Some((node_id, depth, left_key, right_key)) = node_queue.pop_front() {
2010				let stored_node = st.get_node_mut(tx, node_id).await?;
2011				if let BTreeNode::Internal(keys, children) = &stored_node.n {
2012					let depth = depth + 1;
2013					let mut child_right_key = None;
2014					for (idx, child_id) in children.iter().enumerate() {
2015						let child_left_key = child_right_key;
2016						child_right_key = keys.get_key(idx);
2017						if let Some(crk) = &child_left_key {
2018							if let Some(lk) = &left_key {
2019								assert_eq!(
2020									lk.cmp(crk),
2021									Ordering::Less,
2022									"left: {} < {} - node: {} - {}",
2023									String::from_utf8_lossy(lk),
2024									String::from_utf8_lossy(crk),
2025									stored_node.id,
2026									stored_node.n
2027								);
2028							}
2029							if let Some(rk) = &right_key {
2030								assert_eq!(
2031									crk.cmp(rk),
2032									Ordering::Less,
2033									"right: {} < {} - node: {} - {}",
2034									String::from_utf8_lossy(crk),
2035									String::from_utf8_lossy(rk),
2036									stored_node.id,
2037									stored_node.n
2038								);
2039							}
2040						}
2041						node_queue.push_back((
2042							*child_id,
2043							depth,
2044							child_left_key.clone(),
2045							child_right_key.clone(),
2046						));
2047					}
2048				}
2049				inspect_func(count, depth, node_id, &stored_node);
2050				st.set_node(stored_node, false).await?;
2051				count += 1;
2052			}
2053			Ok(count)
2054		}
2055	}
2056}