1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
//! Insertions into a B tree.
use super::cursor::*;
use super::*;
/// The representation of the update to a page.
#[derive(Debug)]
pub struct Ok {
/// The new page, possibly the same as the previous version.
pub page: MutPage,
/// The freed page, or 0 if no page was freed.
pub freed: u64,
}
/// The result of an insertion, i.e. either an update or a split.
#[derive(Debug)]
pub enum Put<'a, K: ?Sized, V: ?Sized> {
Ok(Ok),
Split {
split_key: &'a K,
split_value: &'a V,
left: MutPage,
right: MutPage,
freed: u64,
},
}
/// Insert an entry into a database, returning `false` if and only if
/// the exact same entry (key *and* value) was already in the
/// database.
pub fn put<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(
txn: &mut T,
db: &mut Db_<K, V, P>,
key: &K,
value: &V,
) -> Result<bool, T::Error> {
// Set the cursor to the first element larger than or equal to
// `(key, value)`. We will insert at that position (where "insert"
// is meant in the sense of Rust's `Vec::insert`.
let mut cursor = Cursor::new(txn, db)?;
if cursor.set(txn, key, Some(value))?.is_some() {
// If `(key, value)` already exists in the tree, return.
return Ok(false);
}
// Else, we are at a leaf, since cursors that don't find an
// element go all the way to the leaves.
//
// We put on the leaf page, resulting in a `Put`, i.e. either
// `Put::Ok` or `Put::Split`.
let p = cursor.len(); // Save the position of the leaf cursor.
let is_owned = p < cursor.first_rc_len();
// Insert the key and value at the leaf, i.e. pop the top level of
// the stack (the leaf) and insert in there.
let cur = cursor.pop().unwrap();
let put = unsafe {
P::put(
txn,
cur.page,
is_owned,
false,
&cur.cursor,
key,
value,
None,
0,
0,
)?
};
// In both cases (`Ok` and `Split`), we need to walk up the tree
// and update each node.
// Moreover, since the middle elements of the splits may be on
// pages that must be freed at the end of this call, we collect
// them in the `free` array below, and free them only at the end
// of this function.
//
// If we didn't hold on to these pages, they could be reallocated
// in subsequent splits, and the middle element could be
// lost. This is important when the middle element climbs up more
// than one level (i.e. the middle element of the split of a leaf
// is also the middle element of the split of the leaf's parent,
// etc).
let mut free = [0; N_CURSORS];
unsafe {
db.db = core::num::NonZeroU64::new_unchecked(
put_cascade(txn, &mut cursor, put, &mut free)?.0.offset,
);
for f in &free[..p] {
if *f & 1 != 0 {
txn.decr_rc_owned((*f) ^ 1)?;
} else if *f > 0 {
txn.decr_rc(*f)?;
}
}
}
Ok(true)
}
fn put_cascade<T: AllocPage, K: Storable + ?Sized, V: Storable + ?Sized, P: BTreeMutPage<K, V>>(
txn: &mut T,
cursor: &mut Cursor<K, V, P>,
mut put: Put<K, V>,
free: &mut [u64; N_CURSORS],
) -> Result<MutPage, T::Error> {
loop {
match put {
Put::Split {
split_key,
split_value,
left,
right,
freed,
} => {
// Here, we are copying all children of the freed
// page, except possibly the last freed page (which is
// a child of the current node, if we aren't at a
// leaf). This includes the split key/value, also
// incremented in the following call:
incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
// Then, insert the split key/value in the relevant page:
let is_owned = cursor.len() < cursor.first_rc_len();
if let Some(cur) = cursor.pop() {
// In this case, there's a page above the page
// that just split (since we can pop the stack),
// so the page that just split isn't the root (but
// `cur` might be).
put = unsafe {
P::put(
txn,
cur.page,
is_owned,
false,
&cur.cursor,
split_key,
split_value,
None,
left.0.offset,
right.0.offset,
)?
};
} else {
// No page above the split, so the root has just
// split. Insert the split key/value into a new
// page above the entire tree.
let mut p = unsafe { txn.alloc_page()? };
let cursor = P::cursor_first(&p.0);
P::init(&mut p);
if let Put::Ok(p) = unsafe {
P::put(
txn,
p.0,
true,
false,
&cursor,
split_key,
split_value,
None,
left.0.offset,
right.0.offset,
)?
} {
// Return the new root.
return Ok(p.page);
} else {
unreachable!()
}
}
}
Put::Ok(Ok { page, freed }) => {
// Same as above: increment the relevant reference
// counters.
incr_descendants::<T, K, V, P>(txn, cursor, free, freed)?;
// And update the left child of the current cursor,
// since the main invariant of cursors is that we're
// always visiting the left child (if we're visiting
// the last child of a page, the cursor is set to the
// position strictly after the entries).
let is_owned = cursor.len() < cursor.first_rc_len();
if let Some(curs) = cursor.pop() {
// If we aren't at the root, just update the child.
put = Put::Ok(unsafe {
P::update_left_child(txn, curs.page, is_owned, &curs.cursor, page.0.offset)?
})
} else {
// If we are at the root, `page` is the updated root.
return Ok(page);
}
}
}
}
}
/// This function does all the memory management for `put`.
fn incr_descendants<
T: AllocPage + LoadPage,
K: Storable + ?Sized,
V: Storable + ?Sized,
P: BTreePage<K, V>,
>(
txn: &mut T,
cursor: &mut Cursor<K, V, P>,
free: &mut [u64; N_CURSORS],
mut freed: u64,
) -> Result<(), T::Error> {
// The freed page is on the page below.
if cursor.len() < cursor.first_rc_len() {
// If a page has split or was immutable (allocated by a
// previous transaction) and has been updated, we need to free
// the old page.
free[cursor.len()] = freed;
} else {
// This means that the new version of the page (either split
// or not) contains references to all the children of the
// freed page, except the last freed page (because the new
// version of that page isn't shared).
let cur = cursor.cur();
// Start a cursor at the leftmost position and increase the
// leftmost child page's RC (if we aren't at a leaf, and if
// that child isn't the last freed page).
let mut c = P::cursor_first(&cur.page);
let left = P::left_child(cur.page.as_page(), &c);
if left != 0 {
if left != (freed & !1) {
txn.incr_rc(left)?;
} else if cursor.len() == cursor.first_rc_len() {
freed = 0
}
}
// Then, for each entry of the page, increase the RCs of
// references contained in the keys and values, and increase
// the RC of the right child.
while let Some((k, v, r)) = P::next(txn, cur.page.as_page(), &mut c) {
for o in k.page_references().chain(v.page_references()) {
txn.incr_rc(o)?;
}
if r != 0 {
if r != (freed & !1) {
txn.incr_rc(r)?;
} else if cursor.len() == cursor.first_rc_len() {
freed = 0
}
}
}
// Else, the "freed" page is shared with another tree, and
// hence we just need to decrement its RC.
if freed > 0 && cursor.len() == cursor.first_rc_len() {
free[cursor.len()] = freed;
}
}
Ok(())
}