1pub mod cache;
2pub(crate) mod hnsw;
3mod lru;
4mod mapper;
5pub(crate) mod tree;
6
7use crate::ctx::Context;
8use crate::dbs::Options;
9use crate::err::Error;
10use crate::idx::trees::store::cache::TreeCache;
11use crate::idx::trees::store::hnsw::{HnswIndexes, SharedHnswIndex};
12use crate::idx::trees::store::mapper::Mappers;
13use crate::idx::trees::store::tree::{TreeRead, TreeWrite};
14use crate::idx::IndexKeyBase;
15#[cfg(not(target_family = "wasm"))]
16use crate::kvs::IndexBuilder;
17use crate::kvs::{Key, Transaction, TransactionType, Val};
18use crate::sql::index::HnswParams;
19use crate::sql::statements::DefineIndexStatement;
20use crate::sql::Index;
21use std::fmt::{Debug, Display, Formatter};
22use std::sync::Arc;
23
24pub type NodeId = u64;
25pub type StoreGeneration = u64;
26
27#[non_exhaustive]
28#[allow(clippy::large_enum_variant)]
29pub enum TreeStore<N>
30where
31 N: TreeNode + Debug + Clone,
32{
33 Write(TreeWrite<N>),
35 Read(TreeRead<N>),
37}
38
39impl<N> TreeStore<N>
40where
41 N: TreeNode + Debug + Display + Clone,
42{
43 pub async fn new(np: TreeNodeProvider, cache: Arc<TreeCache<N>>, tt: TransactionType) -> Self {
44 match tt {
45 TransactionType::Read => Self::Read(TreeRead::new(cache)),
46 TransactionType::Write => Self::Write(TreeWrite::new(np, cache)),
47 }
48 }
49
50 pub(in crate::idx) async fn get_node_mut(
51 &mut self,
52 tx: &Transaction,
53 node_id: NodeId,
54 ) -> Result<StoredNode<N>, Error> {
55 match self {
56 Self::Write(w) => w.get_node_mut(tx, node_id).await,
57 _ => Err(fail!("TreeStore::get_node_mut")),
58 }
59 }
60
61 pub(in crate::idx) async fn get_node(
62 &self,
63 tx: &Transaction,
64 node_id: NodeId,
65 ) -> Result<Arc<StoredNode<N>>, Error> {
66 match self {
67 Self::Read(r) => r.get_node(tx, node_id).await,
68 _ => Err(fail!("TreeStore::get_node")),
69 }
70 }
71
72 pub(in crate::idx) async fn get_node_txn(
73 &self,
74 ctx: &Context,
75 node_id: NodeId,
76 ) -> Result<Arc<StoredNode<N>>, Error> {
77 match self {
78 Self::Read(r) => {
79 let tx = ctx.tx();
80 r.get_node(&tx, node_id).await
81 }
82 _ => Err(fail!("TreeStore::get_node_txn")),
83 }
84 }
85
86 pub(in crate::idx) async fn set_node(
87 &mut self,
88 node: StoredNode<N>,
89 updated: bool,
90 ) -> Result<(), Error> {
91 match self {
92 Self::Write(w) => w.set_node(node, updated),
93 _ => Err(fail!("TreeStore::set_node")),
94 }
95 }
96
97 pub(in crate::idx) fn new_node(&mut self, id: NodeId, node: N) -> Result<StoredNode<N>, Error> {
98 match self {
99 Self::Write(w) => Ok(w.new_node(id, node)?),
100 _ => Err(fail!("TreeStore::new_node")),
101 }
102 }
103
104 pub(in crate::idx) async fn remove_node(
105 &mut self,
106 node_id: NodeId,
107 node_key: Key,
108 ) -> Result<(), Error> {
109 match self {
110 Self::Write(w) => w.remove_node(node_id, node_key),
111 _ => Err(fail!("TreeStore::remove_node")),
112 }
113 }
114
115 pub async fn finish(&mut self, tx: &Transaction) -> Result<Option<TreeCache<N>>, Error> {
116 match self {
117 Self::Write(w) => w.finish(tx).await,
118 _ => Ok(None),
119 }
120 }
121}
122
123#[derive(Clone)]
124#[non_exhaustive]
125pub enum TreeNodeProvider {
126 DocIds(IndexKeyBase),
127 DocLengths(IndexKeyBase),
128 Postings(IndexKeyBase),
129 Terms(IndexKeyBase),
130 Vector(IndexKeyBase),
131 Debug,
132}
133
134impl TreeNodeProvider {
135 pub fn get_key(&self, node_id: NodeId) -> Result<Key, Error> {
136 match self {
137 TreeNodeProvider::DocIds(ikb) => ikb.new_bd_key(Some(node_id)),
138 TreeNodeProvider::DocLengths(ikb) => ikb.new_bl_key(Some(node_id)),
139 TreeNodeProvider::Postings(ikb) => ikb.new_bp_key(Some(node_id)),
140 TreeNodeProvider::Terms(ikb) => ikb.new_bt_key(Some(node_id)),
141 TreeNodeProvider::Vector(ikb) => ikb.new_vm_key(Some(node_id)),
142 TreeNodeProvider::Debug => Ok(node_id.to_be_bytes().to_vec()),
143 }
144 }
145
146 async fn load<N>(&self, tx: &Transaction, id: NodeId) -> Result<StoredNode<N>, Error>
147 where
148 N: TreeNode + Clone,
149 {
150 let key = self.get_key(id)?;
151 if let Some(val) = tx.get(key.clone(), None).await? {
152 let size = val.len() as u32;
153 let node = N::try_from_val(val)?;
154 Ok(StoredNode::new(node, id, key, size))
155 } else {
156 Err(Error::CorruptedIndex("TreeStore::load"))
157 }
158 }
159
160 async fn save<N>(&self, tx: &Transaction, node: &mut StoredNode<N>) -> Result<(), Error>
161 where
162 N: TreeNode + Clone + Display,
163 {
164 let val = node.n.try_into_val()?;
165 node.size = val.len() as u32;
166 tx.set(node.key.clone(), val, None).await?;
167 Ok(())
168 }
169}
170
171#[non_exhaustive]
172#[derive(Debug)]
173pub struct StoredNode<N>
174where
175 N: Clone + Display,
176{
177 pub(super) n: N,
178 pub(super) id: NodeId,
179 pub(super) key: Key,
180 pub(super) size: u32,
181}
182
183impl<N> StoredNode<N>
184where
185 N: Clone + Display,
186{
187 pub(super) fn new(n: N, id: NodeId, key: Key, size: u32) -> Self {
188 Self {
189 n,
190 id,
191 key,
192 size,
193 }
194 }
195}
196
197impl<N> Display for StoredNode<N>
198where
199 N: Clone + Display,
200{
201 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
202 write!(f, "node_id: {} - {}", self.id, self.n)
203 }
204}
205
206pub trait TreeNode: Debug + Clone + Display {
207 fn prepare_save(&mut self) {}
208 fn try_from_val(val: Val) -> Result<Self, Error>
209 where
210 Self: Sized;
211 fn try_into_val(&self) -> Result<Val, Error>;
212}
213
214#[derive(Clone)]
215#[non_exhaustive]
216pub struct IndexStores(Arc<Inner>);
217
218struct Inner {
219 hnsw_indexes: HnswIndexes,
220 mappers: Mappers,
221}
222
223impl Default for IndexStores {
224 fn default() -> Self {
225 Self(Arc::new(Inner {
226 hnsw_indexes: HnswIndexes::default(),
227 mappers: Mappers::default(),
228 }))
229 }
230}
231
232impl IndexStores {
233 pub(crate) async fn get_index_hnsw(
234 &self,
235 ctx: &Context,
236 opt: &Options,
237 ix: &DefineIndexStatement,
238 p: &HnswParams,
239 ) -> Result<SharedHnswIndex, Error> {
240 let (ns, db) = opt.ns_db()?;
241 let ikb = IndexKeyBase::new(ns, db, ix)?;
242 self.0.hnsw_indexes.get(ctx, &ix.what, &ikb, p).await
243 }
244
245 pub(crate) async fn index_removed(
246 &self,
247 #[cfg(not(target_family = "wasm"))] ib: Option<&IndexBuilder>,
248 tx: &Transaction,
249 ns: &str,
250 db: &str,
251 tb: &str,
252 ix: &str,
253 ) -> Result<(), Error> {
254 #[cfg(not(target_family = "wasm"))]
255 if let Some(ib) = ib {
256 ib.remove_index(ns, db, tb, ix)?;
257 }
258 self.remove_index(ns, db, tx.get_tb_index(ns, db, tb, ix).await?.as_ref()).await
259 }
260
261 pub(crate) async fn namespace_removed(
262 &self,
263 #[cfg(not(target_family = "wasm"))] ib: Option<&IndexBuilder>,
264 tx: &Transaction,
265 ns: &str,
266 ) -> Result<(), Error> {
267 for db in tx.all_db(ns).await?.iter() {
268 #[cfg(not(target_family = "wasm"))]
269 self.database_removed(ib, tx, ns, &db.name).await?;
270 #[cfg(target_family = "wasm")]
271 self.database_removed(tx, ns, &db.name).await?;
272 }
273 Ok(())
274 }
275
276 pub(crate) async fn database_removed(
277 &self,
278 #[cfg(not(target_family = "wasm"))] ib: Option<&IndexBuilder>,
279 tx: &Transaction,
280 ns: &str,
281 db: &str,
282 ) -> Result<(), Error> {
283 for tb in tx.all_tb(ns, db, None).await?.iter() {
284 #[cfg(not(target_family = "wasm"))]
285 self.table_removed(ib, tx, ns, db, &tb.name).await?;
286 #[cfg(target_family = "wasm")]
287 self.table_removed(tx, ns, db, &tb.name).await?;
288 }
289 Ok(())
290 }
291
292 pub(crate) async fn table_removed(
293 &self,
294 #[cfg(not(target_family = "wasm"))] ib: Option<&IndexBuilder>,
295 tx: &Transaction,
296 ns: &str,
297 db: &str,
298 tb: &str,
299 ) -> Result<(), Error> {
300 for ix in tx.all_tb_indexes(ns, db, tb).await?.iter() {
301 #[cfg(not(target_family = "wasm"))]
302 if let Some(ib) = ib {
303 ib.remove_index(ns, db, tb, &ix.name)?;
304 }
305 self.remove_index(ns, db, ix).await?;
306 }
307 Ok(())
308 }
309
310 async fn remove_index(
311 &self,
312 ns: &str,
313 db: &str,
314 ix: &DefineIndexStatement,
315 ) -> Result<(), Error> {
316 if matches!(ix.index, Index::Hnsw(_)) {
317 let ikb = IndexKeyBase::new(ns, db, ix)?;
318 self.remove_hnsw_index(ikb).await?;
319 }
320 Ok(())
321 }
322
323 async fn remove_hnsw_index(&self, ikb: IndexKeyBase) -> Result<(), Error> {
324 self.0.hnsw_indexes.remove(&ikb).await?;
325 Ok(())
326 }
327
328 pub(crate) fn mappers(&self) -> &Mappers {
329 &self.0.mappers
330 }
331}