sanakirja_core/btree/put.rs
1//! Insertions into a B tree.
2use super::cursor::*;
3use super::*;
4
5/// The representation of the update to a page.
6#[derive(Debug)]
7pub struct Ok {
8 /// The new page, possibly the same as the previous version.
9 pub page: MutPage,
10 /// The freed page, or 0 if no page was freed.
11 pub freed: u64,
12}
13
14/// The result of an insertion, i.e. either an update or a split.
15#[derive(Debug)]
16pub enum Put<'a, K: ?Sized, V: ?Sized> {
17 Ok(Ok),
18 Split {
19 split_key: &'a K,
20 split_value: &'a V,
21 left: MutPage,
22 right: MutPage,
23 freed: u64,
24 },
25}
26
27/// Insert an entry into a database, returning `false` if and only if
28/// the exact same entry (key *and* value) was already in the
29/// database.
30pub fn put<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(
31 txn: &mut T,
32 db: &mut Db_<K, V, P>,
33 key: &K,
34 value: &V,
35) -> Result<bool, T::Error> {
36 // Set the cursor to the first element larger than or equal to
37 // `(key, value)`. We will insert at that position (where "insert"
38 // is meant in the sense of Rust's `Vec::insert`.
39 let mut cursor = Cursor::new(txn, db)?;
40 if cursor.set(txn, key, Some(value))?.is_some() {
41 // If `(key, value)` already exists in the tree, return.
42 return Ok(false);
43 }
44 // Else, we are at a leaf, since cursors that don't find an
45 // element go all the way to the leaves.
46 //
47 // We put on the leaf page, resulting in a `Put`, i.e. either
48 // `Put::Ok` or `Put::Split`.
49 let p = cursor.len(); // Save the position of the leaf cursor.
50 let is_owned = p < cursor.first_rc_len();
51
52 // Insert the key and value at the leaf, i.e. pop the top level of
53 // the stack (the leaf) and insert in there.
54 let cur = cursor.pop().unwrap();
55 let put = unsafe {
56 P::put(
57 txn,
58 cur.page,
59 is_owned,
60 false,
61 &cur.cursor,
62 key,
63 value,
64 None,
65 0,
66 0,
67 )?
68 };
69
70 // In both cases (`Ok` and `Split`), we need to walk up the tree
71 // and update each node.
72
73 // Moreover, since the middle elements of the splits may be on
74 // pages that must be freed at the end of this call, we collect
75 // them in the `free` array below, and free them only at the end
76 // of this function.
77 //
78 // If we didn't hold on to these pages, they could be reallocated
79 // in subsequent splits, and the middle element could be
80 // lost. This is important when the middle element climbs up more
81 // than one level (i.e. the middle element of the split of a leaf
82 // is also the middle element of the split of the leaf's parent,
83 // etc).
84 let mut free = [0; N_CURSORS];
85 unsafe {
86 db.db = core::num::NonZeroU64::new_unchecked(
87 put_cascade(txn, &mut cursor, put, &mut free)?.0.offset,
88 );
89 for f in &free[..p] {
90 if *f & 1 != 0 {
91 txn.decr_rc_owned((*f) ^ 1)?;
92 } else if *f > 0 {
93 txn.decr_rc(*f)?;
94 }
95 }
96 }
97 Ok(true)
98}
99
100fn put_cascade<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(
101 txn: &mut T,
102 cursor: &mut Cursor<K, V, P>,
103 mut put: Put<K, V>,
104 free: &mut [u64; N_CURSORS],
105) -> Result<MutPage, T::Error> {
106 loop {
107 match put {
108 Put::Split {
109 split_key,
110 split_value,
111 left,
112 right,
113 freed,
114 } => {
115 // Here, we are copying all children of the freed
116 // page, except possibly the last freed page (which is
117 // a child of the current node, if we aren't at a
118 // leaf). This includes the split key/value, also
119 // incremented in the following call:
120 incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
121 // Then, insert the split key/value in the relevant page:
122 let is_owned = cursor.len() < cursor.first_rc_len();
123 if let Some(cur) = cursor.pop() {
124 // In this case, there's a page above the page
125 // that just split (since we can pop the stack),
126 // so the page that just split isn't the root (but
127 // `cur` might be).
128 put = unsafe {
129 P::put(
130 txn,
131 cur.page,
132 is_owned,
133 false,
134 &cur.cursor,
135 split_key,
136 split_value,
137 None,
138 left.0.offset,
139 right.0.offset,
140 )?
141 };
142 } else {
143 // No page above the split, so the root has just
144 // split. Insert the split key/value into a new
145 // page above the entire tree.
146 let mut p = unsafe { txn.alloc_page()? };
147 let cursor = P::cursor_first(&p.0);
148 P::init(&mut p);
149 if let Put::Ok(p) = unsafe {
150 P::put(
151 txn,
152 p.0,
153 true,
154 false,
155 &cursor,
156 split_key,
157 split_value,
158 None,
159 left.0.offset,
160 right.0.offset,
161 )?
162 } {
163 // Return the new root.
164 return Ok(p.page);
165 } else {
166 unreachable!()
167 }
168 }
169 }
170 Put::Ok(Ok { page, freed }) => {
171 // Same as above: increment the relevant reference
172 // counters.
173 incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
174 // And update the left child of the current cursor,
175 // since the main invariant of cursors is that we're
176 // always visiting the left child (if we're visiting
177 // the last child of a page, the cursor is set to the
178 // position strictly after the entries).
179 let is_owned = cursor.len() < cursor.first_rc_len();
180 if let Some(curs) = cursor.pop() {
181 // If we aren't at the root, just update the child.
182 put = Put::Ok(unsafe {
183 P::update_left_child(txn, curs.page, is_owned, &curs.cursor, page.0.offset)?
184 })
185 } else {
186 // If we are at the root, `page` is the updated root.
187 return Ok(page);
188 }
189 }
190 }
191 }
192}
193
194/// This function does all the memory management for `put`.
195fn incr_descendants<
196 T: AllocPage + LoadPage,
197 K: Storable + ?Sized,
198 V: Storable + ?Sized,
199 P: BTreePage<K, V>,
200>(
201 txn: &mut T,
202 cursor: &mut Cursor<K, V, P>,
203 free: &mut [u64; N_CURSORS],
204 mut freed: u64,
205) -> Result<(), T::Error> {
206 // The freed page is on the page below.
207 if cursor.len() < cursor.first_rc_len() {
208 // If a page has split or was immutable (allocated by a
209 // previous transaction) and has been updated, we need to free
210 // the old page.
211 free[cursor.len()] = freed;
212 } else {
213 // This means that the new version of the page (either split
214 // or not) contains references to all the children of the
215 // freed page, except the last freed page (because the new
216 // version of that page isn't shared).
217 let cur = cursor.cur();
218
219 // Start a cursor at the leftmost position and increase the
220 // leftmost child page's RC (if we aren't at a leaf, and if
221 // that child isn't the last freed page).
222 let mut c = P::cursor_first(&cur.page);
223 let left = P::left_child(cur.page.as_page(), &c);
224 if left != 0 {
225 if left != (freed & !1) {
226 txn.incr_rc(left)?;
227 } else if cursor.len() == cursor.first_rc_len() {
228 freed = 0
229 }
230 }
231 // Then, for each entry of the page, increase the RCs of
232 // references contained in the keys and values, and increase
233 // the RC of the right child.
234 while let Some((k, v, r)) = P::next(txn, cur.page.as_page(), &mut c) {
235 for o in k.page_references().chain(v.page_references()) {
236 txn.incr_rc(o)?;
237 }
238 if r != 0 {
239 if r != (freed & !1) {
240 txn.incr_rc(r)?;
241 } else if cursor.len() == cursor.first_rc_len() {
242 freed = 0
243 }
244 }
245 }
246 // Else, the "freed" page is shared with another tree, and
247 // hence we just need to decrement its RC.
248 if freed > 0 && cursor.len() == cursor.first_rc_len() {
249 free[cursor.len()] = freed;
250 }
251 }
252 Ok(())
253}