surrealdb_core/idx/
docids.rs

1use crate::err::Error;
2use crate::idx::trees::bkeys::TrieKeys;
3use crate::idx::trees::btree::{BState, BState1, BState1skip, BStatistics, BTree, BTreeStore};
4use crate::idx::trees::store::TreeNodeProvider;
5use crate::idx::{IndexKeyBase, VersionedStore};
6use crate::kvs::{Key, Transaction, TransactionType, Val};
7use revision::{revisioned, Revisioned};
8use roaring::RoaringTreemap;
9use serde::{Deserialize, Serialize};
10
11pub type DocId = u64;
12
13pub struct DocIds {
14	state_key: Key,
15	index_key_base: IndexKeyBase,
16	btree: BTree<TrieKeys>,
17	store: BTreeStore<TrieKeys>,
18	available_ids: Option<RoaringTreemap>,
19	next_doc_id: DocId,
20}
21
22impl DocIds {
23	pub async fn new(
24		tx: &Transaction,
25		tt: TransactionType,
26		ikb: IndexKeyBase,
27		default_btree_order: u32,
28		cache_size: u32,
29	) -> Result<Self, Error> {
30		let state_key: Key = ikb.new_bd_key(None)?;
31		let state: State = if let Some(val) = tx.get(state_key.clone(), None).await? {
32			VersionedStore::try_from(val)?
33		} else {
34			State::new(default_btree_order)
35		};
36		let store = tx
37			.index_caches()
38			.get_store_btree_trie(
39				TreeNodeProvider::DocIds(ikb.clone()),
40				state.btree.generation(),
41				tt,
42				cache_size as usize,
43			)
44			.await?;
45		Ok(Self {
46			state_key,
47			index_key_base: ikb,
48			btree: BTree::new(state.btree),
49			store,
50			available_ids: state.available_ids,
51			next_doc_id: state.next_doc_id,
52		})
53	}
54
55	fn get_next_doc_id(&mut self) -> DocId {
56		// We check first if there is any available id
57		if let Some(available_ids) = &mut self.available_ids {
58			if let Some(available_id) = available_ids.iter().next() {
59				available_ids.remove(available_id);
60				if available_ids.is_empty() {
61					self.available_ids = None;
62				}
63				return available_id;
64			}
65		}
66		// If not, we use the sequence
67		let doc_id = self.next_doc_id;
68		self.next_doc_id += 1;
69		doc_id
70	}
71
72	pub(crate) async fn get_doc_id(
73		&self,
74		tx: &Transaction,
75		doc_key: Key,
76	) -> Result<Option<DocId>, Error> {
77		self.btree.search(tx, &self.store, &doc_key).await
78	}
79
80	/// Returns the doc_id for the given doc_key.
81	/// If the doc_id does not exists, a new one is created, and associated to the given key.
82	pub(in crate::idx) async fn resolve_doc_id(
83		&mut self,
84		tx: &Transaction,
85		doc_key: Key,
86	) -> Result<Resolved, Error> {
87		{
88			if let Some(doc_id) = self.btree.search_mut(tx, &mut self.store, &doc_key).await? {
89				return Ok(Resolved::Existing(doc_id));
90			}
91		}
92		let doc_id = self.get_next_doc_id();
93		tx.set(self.index_key_base.new_bi_key(doc_id)?, doc_key.clone(), None).await?;
94		self.btree.insert(tx, &mut self.store, doc_key, doc_id).await?;
95		Ok(Resolved::New(doc_id))
96	}
97
98	pub(in crate::idx) async fn remove_doc(
99		&mut self,
100		tx: &Transaction,
101		doc_key: Key,
102	) -> Result<Option<DocId>, Error> {
103		if let Some(doc_id) = self.btree.delete(tx, &mut self.store, doc_key).await? {
104			tx.del(self.index_key_base.new_bi_key(doc_id)?).await?;
105			if let Some(available_ids) = &mut self.available_ids {
106				available_ids.insert(doc_id);
107			} else {
108				let mut available_ids = RoaringTreemap::new();
109				available_ids.insert(doc_id);
110				self.available_ids = Some(available_ids);
111			}
112			Ok(Some(doc_id))
113		} else {
114			Ok(None)
115		}
116	}
117
118	pub(in crate::idx) async fn get_doc_key(
119		&self,
120		tx: &Transaction,
121		doc_id: DocId,
122	) -> Result<Option<Key>, Error> {
123		let doc_id_key = self.index_key_base.new_bi_key(doc_id)?;
124		if let Some(val) = tx.get(doc_id_key, None).await? {
125			Ok(Some(val))
126		} else {
127			Ok(None)
128		}
129	}
130
131	pub(in crate::idx) async fn statistics(&self, tx: &Transaction) -> Result<BStatistics, Error> {
132		self.btree.statistics(tx, &self.store).await
133	}
134
135	pub(in crate::idx) async fn finish(&mut self, tx: &Transaction) -> Result<(), Error> {
136		if let Some(new_cache) = self.store.finish(tx).await? {
137			let btree = self.btree.inc_generation().clone();
138			let state = State {
139				btree,
140				available_ids: self.available_ids.take(),
141				next_doc_id: self.next_doc_id,
142			};
143			tx.set(self.state_key.clone(), VersionedStore::try_into(&state)?, None).await?;
144			tx.index_caches().advance_store_btree_trie(new_cache);
145		}
146		Ok(())
147	}
148}
149
150#[revisioned(revision = 1)]
151#[derive(Serialize, Deserialize)]
152struct State {
153	btree: BState,
154	available_ids: Option<RoaringTreemap>,
155	next_doc_id: DocId,
156}
157
158impl VersionedStore for State {
159	fn try_from(val: Val) -> Result<Self, Error> {
160		match Self::deserialize_revisioned(&mut val.as_slice()) {
161			Ok(r) => Ok(r),
162			// If it fails here, there is the chance it was an old version of BState
163			// that included the #[serde[skip]] updated parameter
164			Err(e) => match State1skip::deserialize_revisioned(&mut val.as_slice()) {
165				Ok(b_old) => Ok(b_old.into()),
166				Err(_) => match State1::deserialize_revisioned(&mut val.as_slice()) {
167					Ok(b_old) => Ok(b_old.into()),
168					// Otherwise we return the initial error
169					Err(_) => Err(Error::Revision(e)),
170				},
171			},
172		}
173	}
174}
175
176#[revisioned(revision = 1)]
177#[derive(Serialize, Deserialize)]
178struct State1 {
179	btree: BState1,
180	available_ids: Option<RoaringTreemap>,
181	next_doc_id: DocId,
182}
183
184impl From<State1> for State {
185	fn from(s: State1) -> Self {
186		Self {
187			btree: s.btree.into(),
188			available_ids: s.available_ids,
189			next_doc_id: s.next_doc_id,
190		}
191	}
192}
193
194impl VersionedStore for State1 {}
195
196#[revisioned(revision = 1)]
197#[derive(Serialize, Deserialize)]
198struct State1skip {
199	btree: BState1skip,
200	available_ids: Option<RoaringTreemap>,
201	next_doc_id: DocId,
202}
203
204impl From<State1skip> for State {
205	fn from(s: State1skip) -> Self {
206		Self {
207			btree: s.btree.into(),
208			available_ids: s.available_ids,
209			next_doc_id: s.next_doc_id,
210		}
211	}
212}
213
214impl VersionedStore for State1skip {}
215
216impl State {
217	fn new(default_btree_order: u32) -> Self {
218		Self {
219			btree: BState::new(default_btree_order),
220			available_ids: None,
221			next_doc_id: 0,
222		}
223	}
224}
225
226#[derive(Debug, PartialEq)]
227pub(in crate::idx) enum Resolved {
228	New(DocId),
229	Existing(DocId),
230}
231
232impl Resolved {
233	pub(in crate::idx) fn doc_id(&self) -> &DocId {
234		match self {
235			Resolved::New(doc_id) => doc_id,
236			Resolved::Existing(doc_id) => doc_id,
237		}
238	}
239
240	pub(in crate::idx) fn was_existing(&self) -> bool {
241		match self {
242			Resolved::New(_) => false,
243			Resolved::Existing(_) => true,
244		}
245	}
246}
247
248#[cfg(test)]
249mod tests {
250	use crate::idx::docids::{DocIds, Resolved};
251	use crate::idx::IndexKeyBase;
252	use crate::kvs::TransactionType::*;
253	use crate::kvs::{Datastore, LockType::*, Transaction, TransactionType};
254
255	const BTREE_ORDER: u32 = 7;
256
257	async fn new_operation(ds: &Datastore, tt: TransactionType) -> (Transaction, DocIds) {
258		let tx = ds.transaction(tt, Optimistic).await.unwrap();
259		let d = DocIds::new(&tx, tt, IndexKeyBase::default(), BTREE_ORDER, 100).await.unwrap();
260		(tx, d)
261	}
262
263	async fn finish(tx: Transaction, mut d: DocIds) {
264		d.finish(&tx).await.unwrap();
265		tx.commit().await.unwrap();
266	}
267
268	#[tokio::test]
269	async fn test_resolve_doc_id() {
270		let ds = Datastore::new("memory").await.unwrap();
271
272		// Resolve a first doc key
273		{
274			let (tx, mut d) = new_operation(&ds, Write).await;
275			let doc_id = d.resolve_doc_id(&tx, "Foo".into()).await.unwrap();
276			finish(tx, d).await;
277
278			let (tx, d) = new_operation(&ds, Read).await;
279			assert_eq!(d.statistics(&tx).await.unwrap().keys_count, 1);
280			assert_eq!(d.get_doc_key(&tx, 0).await.unwrap(), Some("Foo".into()));
281			assert_eq!(doc_id, Resolved::New(0));
282		}
283
284		// Resolve the same doc key
285		{
286			let (tx, mut d) = new_operation(&ds, Write).await;
287			let doc_id = d.resolve_doc_id(&tx, "Foo".into()).await.unwrap();
288			finish(tx, d).await;
289
290			let (tx, d) = new_operation(&ds, Read).await;
291			assert_eq!(d.statistics(&tx).await.unwrap().keys_count, 1);
292			assert_eq!(d.get_doc_key(&tx, 0).await.unwrap(), Some("Foo".into()));
293			assert_eq!(doc_id, Resolved::Existing(0));
294		}
295
296		// Resolve another single doc key
297		{
298			let (tx, mut d) = new_operation(&ds, Write).await;
299			let doc_id = d.resolve_doc_id(&tx, "Bar".into()).await.unwrap();
300			finish(tx, d).await;
301
302			let (tx, d) = new_operation(&ds, Read).await;
303			assert_eq!(d.statistics(&tx).await.unwrap().keys_count, 2);
304			assert_eq!(d.get_doc_key(&tx, 1).await.unwrap(), Some("Bar".into()));
305			assert_eq!(doc_id, Resolved::New(1));
306		}
307
308		// Resolve another two existing doc keys and two new doc keys (interlaced)
309		{
310			let (tx, mut d) = new_operation(&ds, Write).await;
311			assert_eq!(d.resolve_doc_id(&tx, "Foo".into()).await.unwrap(), Resolved::Existing(0));
312			assert_eq!(d.resolve_doc_id(&tx, "Hello".into()).await.unwrap(), Resolved::New(2));
313			assert_eq!(d.resolve_doc_id(&tx, "Bar".into()).await.unwrap(), Resolved::Existing(1));
314			assert_eq!(d.resolve_doc_id(&tx, "World".into()).await.unwrap(), Resolved::New(3));
315			finish(tx, d).await;
316			let (tx, d) = new_operation(&ds, Read).await;
317			assert_eq!(d.statistics(&tx).await.unwrap().keys_count, 4);
318		}
319
320		{
321			let (tx, mut d) = new_operation(&ds, Write).await;
322			assert_eq!(d.resolve_doc_id(&tx, "Foo".into()).await.unwrap(), Resolved::Existing(0));
323			assert_eq!(d.resolve_doc_id(&tx, "Bar".into()).await.unwrap(), Resolved::Existing(1));
324			assert_eq!(d.resolve_doc_id(&tx, "Hello".into()).await.unwrap(), Resolved::Existing(2));
325			assert_eq!(d.resolve_doc_id(&tx, "World".into()).await.unwrap(), Resolved::Existing(3));
326			finish(tx, d).await;
327			let (tx, d) = new_operation(&ds, Read).await;
328			assert_eq!(d.get_doc_key(&tx, 0).await.unwrap(), Some("Foo".into()));
329			assert_eq!(d.get_doc_key(&tx, 1).await.unwrap(), Some("Bar".into()));
330			assert_eq!(d.get_doc_key(&tx, 2).await.unwrap(), Some("Hello".into()));
331			assert_eq!(d.get_doc_key(&tx, 3).await.unwrap(), Some("World".into()));
332			assert_eq!(d.statistics(&tx).await.unwrap().keys_count, 4);
333		}
334	}
335
336	#[tokio::test]
337	async fn test_remove_doc() {
338		let ds = Datastore::new("memory").await.unwrap();
339
340		// Create two docs
341		{
342			let (tx, mut d) = new_operation(&ds, Write).await;
343			assert_eq!(d.resolve_doc_id(&tx, "Foo".into()).await.unwrap(), Resolved::New(0));
344			assert_eq!(d.resolve_doc_id(&tx, "Bar".into()).await.unwrap(), Resolved::New(1));
345			finish(tx, d).await;
346		}
347
348		// Remove doc 1
349		{
350			let (tx, mut d) = new_operation(&ds, Write).await;
351			assert_eq!(d.remove_doc(&tx, "Dummy".into()).await.unwrap(), None);
352			assert_eq!(d.remove_doc(&tx, "Foo".into()).await.unwrap(), Some(0));
353			finish(tx, d).await;
354		}
355
356		// Check 'Foo' has been removed
357		{
358			let (tx, mut d) = new_operation(&ds, Write).await;
359			assert_eq!(d.remove_doc(&tx, "Foo".into()).await.unwrap(), None);
360			finish(tx, d).await;
361		}
362
363		// Insert a new doc - should take the available id 1
364		{
365			let (tx, mut d) = new_operation(&ds, Write).await;
366			assert_eq!(d.resolve_doc_id(&tx, "Hello".into()).await.unwrap(), Resolved::New(0));
367			finish(tx, d).await;
368		}
369
370		// Remove doc 2
371		{
372			let (tx, mut d) = new_operation(&ds, Write).await;
373			assert_eq!(d.remove_doc(&tx, "Dummy".into()).await.unwrap(), None);
374			assert_eq!(d.remove_doc(&tx, "Bar".into()).await.unwrap(), Some(1));
375			finish(tx, d).await;
376		}
377
378		// Check 'Bar' has been removed
379		{
380			let (tx, mut d) = new_operation(&ds, Write).await;
381			assert_eq!(d.remove_doc(&tx, "Foo".into()).await.unwrap(), None);
382			finish(tx, d).await;
383		}
384
385		// Insert a new doc - should take the available id 2
386		{
387			let (tx, mut d) = new_operation(&ds, Write).await;
388			assert_eq!(d.resolve_doc_id(&tx, "World".into()).await.unwrap(), Resolved::New(1));
389			finish(tx, d).await;
390		}
391	}
392}