1use crate::err::Error;
2use crate::idx::trees::bkeys::TrieKeys;
3use crate::idx::trees::btree::{BState, BState1, BState1skip, BStatistics, BTree, BTreeStore};
4use crate::idx::trees::store::TreeNodeProvider;
5use crate::idx::{IndexKeyBase, VersionedStore};
6use crate::kvs::{Key, Transaction, TransactionType, Val};
7use revision::{revisioned, Revisioned};
8use roaring::RoaringTreemap;
9use serde::{Deserialize, Serialize};
10
11pub type DocId = u64;
12
13pub struct DocIds {
14 state_key: Key,
15 index_key_base: IndexKeyBase,
16 btree: BTree<TrieKeys>,
17 store: BTreeStore<TrieKeys>,
18 available_ids: Option<RoaringTreemap>,
19 next_doc_id: DocId,
20}
21
22impl DocIds {
23 pub async fn new(
24 tx: &Transaction,
25 tt: TransactionType,
26 ikb: IndexKeyBase,
27 default_btree_order: u32,
28 cache_size: u32,
29 ) -> Result<Self, Error> {
30 let state_key: Key = ikb.new_bd_key(None)?;
31 let state: State = if let Some(val) = tx.get(state_key.clone(), None).await? {
32 VersionedStore::try_from(val)?
33 } else {
34 State::new(default_btree_order)
35 };
36 let store = tx
37 .index_caches()
38 .get_store_btree_trie(
39 TreeNodeProvider::DocIds(ikb.clone()),
40 state.btree.generation(),
41 tt,
42 cache_size as usize,
43 )
44 .await?;
45 Ok(Self {
46 state_key,
47 index_key_base: ikb,
48 btree: BTree::new(state.btree),
49 store,
50 available_ids: state.available_ids,
51 next_doc_id: state.next_doc_id,
52 })
53 }
54
55 fn get_next_doc_id(&mut self) -> DocId {
56 if let Some(available_ids) = &mut self.available_ids {
58 if let Some(available_id) = available_ids.iter().next() {
59 available_ids.remove(available_id);
60 if available_ids.is_empty() {
61 self.available_ids = None;
62 }
63 return available_id;
64 }
65 }
66 let doc_id = self.next_doc_id;
68 self.next_doc_id += 1;
69 doc_id
70 }
71
72 pub(crate) async fn get_doc_id(
73 &self,
74 tx: &Transaction,
75 doc_key: Key,
76 ) -> Result<Option<DocId>, Error> {
77 self.btree.search(tx, &self.store, &doc_key).await
78 }
79
80 pub(in crate::idx) async fn resolve_doc_id(
83 &mut self,
84 tx: &Transaction,
85 doc_key: Key,
86 ) -> Result<Resolved, Error> {
87 {
88 if let Some(doc_id) = self.btree.search_mut(tx, &mut self.store, &doc_key).await? {
89 return Ok(Resolved::Existing(doc_id));
90 }
91 }
92 let doc_id = self.get_next_doc_id();
93 tx.set(self.index_key_base.new_bi_key(doc_id)?, doc_key.clone(), None).await?;
94 self.btree.insert(tx, &mut self.store, doc_key, doc_id).await?;
95 Ok(Resolved::New(doc_id))
96 }
97
98 pub(in crate::idx) async fn remove_doc(
99 &mut self,
100 tx: &Transaction,
101 doc_key: Key,
102 ) -> Result<Option<DocId>, Error> {
103 if let Some(doc_id) = self.btree.delete(tx, &mut self.store, doc_key).await? {
104 tx.del(self.index_key_base.new_bi_key(doc_id)?).await?;
105 if let Some(available_ids) = &mut self.available_ids {
106 available_ids.insert(doc_id);
107 } else {
108 let mut available_ids = RoaringTreemap::new();
109 available_ids.insert(doc_id);
110 self.available_ids = Some(available_ids);
111 }
112 Ok(Some(doc_id))
113 } else {
114 Ok(None)
115 }
116 }
117
118 pub(in crate::idx) async fn get_doc_key(
119 &self,
120 tx: &Transaction,
121 doc_id: DocId,
122 ) -> Result<Option<Key>, Error> {
123 let doc_id_key = self.index_key_base.new_bi_key(doc_id)?;
124 if let Some(val) = tx.get(doc_id_key, None).await? {
125 Ok(Some(val))
126 } else {
127 Ok(None)
128 }
129 }
130
131 pub(in crate::idx) async fn statistics(&self, tx: &Transaction) -> Result<BStatistics, Error> {
132 self.btree.statistics(tx, &self.store).await
133 }
134
135 pub(in crate::idx) async fn finish(&mut self, tx: &Transaction) -> Result<(), Error> {
136 if let Some(new_cache) = self.store.finish(tx).await? {
137 let btree = self.btree.inc_generation().clone();
138 let state = State {
139 btree,
140 available_ids: self.available_ids.take(),
141 next_doc_id: self.next_doc_id,
142 };
143 tx.set(self.state_key.clone(), VersionedStore::try_into(&state)?, None).await?;
144 tx.index_caches().advance_store_btree_trie(new_cache);
145 }
146 Ok(())
147 }
148}
149
150#[revisioned(revision = 1)]
151#[derive(Serialize, Deserialize)]
152struct State {
153 btree: BState,
154 available_ids: Option<RoaringTreemap>,
155 next_doc_id: DocId,
156}
157
158impl VersionedStore for State {
159 fn try_from(val: Val) -> Result<Self, Error> {
160 match Self::deserialize_revisioned(&mut val.as_slice()) {
161 Ok(r) => Ok(r),
162 Err(e) => match State1skip::deserialize_revisioned(&mut val.as_slice()) {
165 Ok(b_old) => Ok(b_old.into()),
166 Err(_) => match State1::deserialize_revisioned(&mut val.as_slice()) {
167 Ok(b_old) => Ok(b_old.into()),
168 Err(_) => Err(Error::Revision(e)),
170 },
171 },
172 }
173 }
174}
175
176#[revisioned(revision = 1)]
177#[derive(Serialize, Deserialize)]
178struct State1 {
179 btree: BState1,
180 available_ids: Option<RoaringTreemap>,
181 next_doc_id: DocId,
182}
183
184impl From<State1> for State {
185 fn from(s: State1) -> Self {
186 Self {
187 btree: s.btree.into(),
188 available_ids: s.available_ids,
189 next_doc_id: s.next_doc_id,
190 }
191 }
192}
193
194impl VersionedStore for State1 {}
195
196#[revisioned(revision = 1)]
197#[derive(Serialize, Deserialize)]
198struct State1skip {
199 btree: BState1skip,
200 available_ids: Option<RoaringTreemap>,
201 next_doc_id: DocId,
202}
203
204impl From<State1skip> for State {
205 fn from(s: State1skip) -> Self {
206 Self {
207 btree: s.btree.into(),
208 available_ids: s.available_ids,
209 next_doc_id: s.next_doc_id,
210 }
211 }
212}
213
214impl VersionedStore for State1skip {}
215
216impl State {
217 fn new(default_btree_order: u32) -> Self {
218 Self {
219 btree: BState::new(default_btree_order),
220 available_ids: None,
221 next_doc_id: 0,
222 }
223 }
224}
225
226#[derive(Debug, PartialEq)]
227pub(in crate::idx) enum Resolved {
228 New(DocId),
229 Existing(DocId),
230}
231
232impl Resolved {
233 pub(in crate::idx) fn doc_id(&self) -> &DocId {
234 match self {
235 Resolved::New(doc_id) => doc_id,
236 Resolved::Existing(doc_id) => doc_id,
237 }
238 }
239
240 pub(in crate::idx) fn was_existing(&self) -> bool {
241 match self {
242 Resolved::New(_) => false,
243 Resolved::Existing(_) => true,
244 }
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use crate::idx::docids::{DocIds, Resolved};
251 use crate::idx::IndexKeyBase;
252 use crate::kvs::TransactionType::*;
253 use crate::kvs::{Datastore, LockType::*, Transaction, TransactionType};
254
255 const BTREE_ORDER: u32 = 7;
256
257 async fn new_operation(ds: &Datastore, tt: TransactionType) -> (Transaction, DocIds) {
258 let tx = ds.transaction(tt, Optimistic).await.unwrap();
259 let d = DocIds::new(&tx, tt, IndexKeyBase::default(), BTREE_ORDER, 100).await.unwrap();
260 (tx, d)
261 }
262
263 async fn finish(tx: Transaction, mut d: DocIds) {
264 d.finish(&tx).await.unwrap();
265 tx.commit().await.unwrap();
266 }
267
268 #[tokio::test]
269 async fn test_resolve_doc_id() {
270 let ds = Datastore::new("memory").await.unwrap();
271
272 {
274 let (tx, mut d) = new_operation(&ds, Write).await;
275 let doc_id = d.resolve_doc_id(&tx, "Foo".into()).await.unwrap();
276 finish(tx, d).await;
277
278 let (tx, d) = new_operation(&ds, Read).await;
279 assert_eq!(d.statistics(&tx).await.unwrap().keys_count, 1);
280 assert_eq!(d.get_doc_key(&tx, 0).await.unwrap(), Some("Foo".into()));
281 assert_eq!(doc_id, Resolved::New(0));
282 }
283
284 {
286 let (tx, mut d) = new_operation(&ds, Write).await;
287 let doc_id = d.resolve_doc_id(&tx, "Foo".into()).await.unwrap();
288 finish(tx, d).await;
289
290 let (tx, d) = new_operation(&ds, Read).await;
291 assert_eq!(d.statistics(&tx).await.unwrap().keys_count, 1);
292 assert_eq!(d.get_doc_key(&tx, 0).await.unwrap(), Some("Foo".into()));
293 assert_eq!(doc_id, Resolved::Existing(0));
294 }
295
296 {
298 let (tx, mut d) = new_operation(&ds, Write).await;
299 let doc_id = d.resolve_doc_id(&tx, "Bar".into()).await.unwrap();
300 finish(tx, d).await;
301
302 let (tx, d) = new_operation(&ds, Read).await;
303 assert_eq!(d.statistics(&tx).await.unwrap().keys_count, 2);
304 assert_eq!(d.get_doc_key(&tx, 1).await.unwrap(), Some("Bar".into()));
305 assert_eq!(doc_id, Resolved::New(1));
306 }
307
308 {
310 let (tx, mut d) = new_operation(&ds, Write).await;
311 assert_eq!(d.resolve_doc_id(&tx, "Foo".into()).await.unwrap(), Resolved::Existing(0));
312 assert_eq!(d.resolve_doc_id(&tx, "Hello".into()).await.unwrap(), Resolved::New(2));
313 assert_eq!(d.resolve_doc_id(&tx, "Bar".into()).await.unwrap(), Resolved::Existing(1));
314 assert_eq!(d.resolve_doc_id(&tx, "World".into()).await.unwrap(), Resolved::New(3));
315 finish(tx, d).await;
316 let (tx, d) = new_operation(&ds, Read).await;
317 assert_eq!(d.statistics(&tx).await.unwrap().keys_count, 4);
318 }
319
320 {
321 let (tx, mut d) = new_operation(&ds, Write).await;
322 assert_eq!(d.resolve_doc_id(&tx, "Foo".into()).await.unwrap(), Resolved::Existing(0));
323 assert_eq!(d.resolve_doc_id(&tx, "Bar".into()).await.unwrap(), Resolved::Existing(1));
324 assert_eq!(d.resolve_doc_id(&tx, "Hello".into()).await.unwrap(), Resolved::Existing(2));
325 assert_eq!(d.resolve_doc_id(&tx, "World".into()).await.unwrap(), Resolved::Existing(3));
326 finish(tx, d).await;
327 let (tx, d) = new_operation(&ds, Read).await;
328 assert_eq!(d.get_doc_key(&tx, 0).await.unwrap(), Some("Foo".into()));
329 assert_eq!(d.get_doc_key(&tx, 1).await.unwrap(), Some("Bar".into()));
330 assert_eq!(d.get_doc_key(&tx, 2).await.unwrap(), Some("Hello".into()));
331 assert_eq!(d.get_doc_key(&tx, 3).await.unwrap(), Some("World".into()));
332 assert_eq!(d.statistics(&tx).await.unwrap().keys_count, 4);
333 }
334 }
335
336 #[tokio::test]
337 async fn test_remove_doc() {
338 let ds = Datastore::new("memory").await.unwrap();
339
340 {
342 let (tx, mut d) = new_operation(&ds, Write).await;
343 assert_eq!(d.resolve_doc_id(&tx, "Foo".into()).await.unwrap(), Resolved::New(0));
344 assert_eq!(d.resolve_doc_id(&tx, "Bar".into()).await.unwrap(), Resolved::New(1));
345 finish(tx, d).await;
346 }
347
348 {
350 let (tx, mut d) = new_operation(&ds, Write).await;
351 assert_eq!(d.remove_doc(&tx, "Dummy".into()).await.unwrap(), None);
352 assert_eq!(d.remove_doc(&tx, "Foo".into()).await.unwrap(), Some(0));
353 finish(tx, d).await;
354 }
355
356 {
358 let (tx, mut d) = new_operation(&ds, Write).await;
359 assert_eq!(d.remove_doc(&tx, "Foo".into()).await.unwrap(), None);
360 finish(tx, d).await;
361 }
362
363 {
365 let (tx, mut d) = new_operation(&ds, Write).await;
366 assert_eq!(d.resolve_doc_id(&tx, "Hello".into()).await.unwrap(), Resolved::New(0));
367 finish(tx, d).await;
368 }
369
370 {
372 let (tx, mut d) = new_operation(&ds, Write).await;
373 assert_eq!(d.remove_doc(&tx, "Dummy".into()).await.unwrap(), None);
374 assert_eq!(d.remove_doc(&tx, "Bar".into()).await.unwrap(), Some(1));
375 finish(tx, d).await;
376 }
377
378 {
380 let (tx, mut d) = new_operation(&ds, Write).await;
381 assert_eq!(d.remove_doc(&tx, "Foo".into()).await.unwrap(), None);
382 finish(tx, d).await;
383 }
384
385 {
387 let (tx, mut d) = new_operation(&ds, Write).await;
388 assert_eq!(d.resolve_doc_id(&tx, "World".into()).await.unwrap(), Resolved::New(1));
389 finish(tx, d).await;
390 }
391 }
392}