sanakirja_core/btree/
put.rs

1//! Insertions into a B tree.
2use super::cursor::*;
3use super::*;
4
5/// The representation of the update to a page.
6#[derive(Debug)]
7pub struct Ok {
8    /// The new page, possibly the same as the previous version.
9    pub page: MutPage,
10    /// The freed page, or 0 if no page was freed.
11    pub freed: u64,
12}
13
14/// The result of an insertion, i.e. either an update or a split.
15#[derive(Debug)]
16pub enum Put<'a, K: ?Sized, V: ?Sized> {
17    Ok(Ok),
18    Split {
19        split_key: &'a K,
20        split_value: &'a V,
21        left: MutPage,
22        right: MutPage,
23        freed: u64,
24    },
25}
26
27/// Insert an entry into a database, returning `false` if and only if
28/// the exact same entry (key *and* value) was already in the
29/// database.
30pub fn put<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(
31    txn: &mut T,
32    db: &mut Db_<K, V, P>,
33    key: &K,
34    value: &V,
35) -> Result<bool, T::Error> {
36    // Set the cursor to the first element larger than or equal to
37    // `(key, value)`. We will insert at that position (where "insert"
38    // is meant in the sense of Rust's `Vec::insert`.
39    let mut cursor = Cursor::new(txn, db)?;
40    if cursor.set(txn, key, Some(value))?.is_some() {
41        // If `(key, value)` already exists in the tree, return.
42        return Ok(false);
43    }
44    // Else, we are at a leaf, since cursors that don't find an
45    // element go all the way to the leaves.
46    //
47    // We put on the leaf page, resulting in a `Put`, i.e. either
48    // `Put::Ok` or `Put::Split`.
49    let p = cursor.len(); // Save the position of the leaf cursor.
50    let is_owned = p < cursor.first_rc_len();
51
52    // Insert the key and value at the leaf, i.e. pop the top level of
53    // the stack (the leaf) and insert in there.
54    let cur = cursor.pop().unwrap();
55    let put = unsafe {
56        P::put(
57            txn,
58            cur.page,
59            is_owned,
60            false,
61            &cur.cursor,
62            key,
63            value,
64            None,
65            0,
66            0,
67        )?
68    };
69
70    // In both cases (`Ok` and `Split`), we need to walk up the tree
71    // and update each node.
72
73    // Moreover, since the middle elements of the splits may be on
74    // pages that must be freed at the end of this call, we collect
75    // them in the `free` array below, and free them only at the end
76    // of this function.
77    //
78    // If we didn't hold on to these pages, they could be reallocated
79    // in subsequent splits, and the middle element could be
80    // lost. This is important when the middle element climbs up more
81    // than one level (i.e. the middle element of the split of a leaf
82    // is also the middle element of the split of the leaf's parent,
83    // etc).
84    let mut free = [0; N_CURSORS];
85    unsafe {
86        db.db = core::num::NonZeroU64::new_unchecked(
87            put_cascade(txn, &mut cursor, put, &mut free)?.0.offset,
88        );
89        for f in &free[..p] {
90            if *f & 1 != 0 {
91                txn.decr_rc_owned((*f) ^ 1)?;
92            } else if *f > 0 {
93                txn.decr_rc(*f)?;
94            }
95        }
96    }
97    Ok(true)
98}
99
100fn put_cascade<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(
101    txn: &mut T,
102    cursor: &mut Cursor<K, V, P>,
103    mut put: Put<K, V>,
104    free: &mut [u64; N_CURSORS],
105) -> Result<MutPage, T::Error> {
106    loop {
107        match put {
108            Put::Split {
109                split_key,
110                split_value,
111                left,
112                right,
113                freed,
114            } => {
115                // Here, we are copying all children of the freed
116                // page, except possibly the last freed page (which is
117                // a child of the current node, if we aren't at a
118                // leaf). This includes the split key/value, also
119                // incremented in the following call:
120                incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
121                // Then, insert the split key/value in the relevant page:
122                let is_owned = cursor.len() < cursor.first_rc_len();
123                if let Some(cur) = cursor.pop() {
124                    // In this case, there's a page above the page
125                    // that just split (since we can pop the stack),
126                    // so the page that just split isn't the root (but
127                    // `cur` might be).
128                    put = unsafe {
129                        P::put(
130                            txn,
131                            cur.page,
132                            is_owned,
133                            false,
134                            &cur.cursor,
135                            split_key,
136                            split_value,
137                            None,
138                            left.0.offset,
139                            right.0.offset,
140                        )?
141                    };
142                } else {
143                    // No page above the split, so the root has just
144                    // split. Insert the split key/value into a new
145                    // page above the entire tree.
146                    let mut p = unsafe { txn.alloc_page()? };
147                    let cursor = P::cursor_first(&p.0);
148                    P::init(&mut p);
149                    if let Put::Ok(p) = unsafe {
150                        P::put(
151                            txn,
152                            p.0,
153                            true,
154                            false,
155                            &cursor,
156                            split_key,
157                            split_value,
158                            None,
159                            left.0.offset,
160                            right.0.offset,
161                        )?
162                    } {
163                        // Return the new root.
164                        return Ok(p.page);
165                    } else {
166                        unreachable!()
167                    }
168                }
169            }
170            Put::Ok(Ok { page, freed }) => {
171                // Same as above: increment the relevant reference
172                // counters.
173                incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
174                // And update the left child of the current cursor,
175                // since the main invariant of cursors is that we're
176                // always visiting the left child (if we're visiting
177                // the last child of a page, the cursor is set to the
178                // position strictly after the entries).
179                let is_owned = cursor.len() < cursor.first_rc_len();
180                if let Some(curs) = cursor.pop() {
181                    // If we aren't at the root, just update the child.
182                    put = Put::Ok(unsafe {
183                        P::update_left_child(txn, curs.page, is_owned, &curs.cursor, page.0.offset)?
184                    })
185                } else {
186                    // If we are at the root, `page` is the updated root.
187                    return Ok(page);
188                }
189            }
190        }
191    }
192}
193
194/// This function does all the memory management for `put`.
195fn incr_descendants<
196    T: AllocPage + LoadPage,
197    K: Storable + ?Sized,
198    V: Storable + ?Sized,
199    P: BTreePage<K, V>,
200>(
201    txn: &mut T,
202    cursor: &mut Cursor<K, V, P>,
203    free: &mut [u64; N_CURSORS],
204    mut freed: u64,
205) -> Result<(), T::Error> {
206    // The freed page is on the page below.
207    if cursor.len() < cursor.first_rc_len() {
208        // If a page has split or was immutable (allocated by a
209        // previous transaction) and has been updated, we need to free
210        // the old page.
211        free[cursor.len()] = freed;
212    } else {
213        // This means that the new version of the page (either split
214        // or not) contains references to all the children of the
215        // freed page, except the last freed page (because the new
216        // version of that page isn't shared).
217        let cur = cursor.cur();
218
219        // Start a cursor at the leftmost position and increase the
220        // leftmost child page's RC (if we aren't at a leaf, and if
221        // that child isn't the last freed page).
222        let mut c = P::cursor_first(&cur.page);
223        let left = P::left_child(cur.page.as_page(), &c);
224        if left != 0 {
225            if left != (freed & !1) {
226                txn.incr_rc(left)?;
227            } else if cursor.len() == cursor.first_rc_len() {
228                freed = 0
229            }
230        }
231        // Then, for each entry of the page, increase the RCs of
232        // references contained in the keys and values, and increase
233        // the RC of the right child.
234        while let Some((k, v, r)) = P::next(txn, cur.page.as_page(), &mut c) {
235            for o in k.page_references().chain(v.page_references()) {
236                txn.incr_rc(o)?;
237            }
238            if r != 0 {
239                if r != (freed & !1) {
240                    txn.incr_rc(r)?;
241                } else if cursor.len() == cursor.first_rc_len() {
242                    freed = 0
243                }
244            }
245        }
246        // Else, the "freed" page is shared with another tree, and
247        // hence we just need to decrement its RC.
248        if freed > 0 && cursor.len() == cursor.first_rc_len() {
249            free[cursor.len()] = freed;
250        }
251    }
252    Ok(())
253}