sanakirja_core/btree/
page.rs

1//! Implementation of B tree pages for Sized types, i.e. types whose
2//! representation has a size known at compile time (and the same as
3//! [`core::mem::size_of()`]).
4//!
5//! The details of the implementation are as follows:
6//!
7//! - The page starts with a 16 bytes header of the following form
8//! (where all the fields are encoded in Little-Endian):
9//!
10//!     ```
11//!     #[repr(C)]
12//!     pub struct Header {
13
14//!         /// Offset to the first entry in the page, shifted 3 bits
15//!         /// to the right to allow for the dirty bit (plus two
16//!         /// extra bits, zero for now), as explained in the
17//!         /// documentation of CowPage, at the root of this
18//!         /// crate. This is 4096 for empty pages, and it is unused
19//!         /// for leaves. Moreover, this field can't be increased:
20//!         /// when it reaches the bottom, the page must be cloned.
21//!         data: u16,
22//!         /// Number of entries on the page.
23//!         n: u16,
24//!         /// CRC (if used).
25//!         crc: u32,
26
27//!         /// The 52 most significant bits are the left child of
28//!         /// this page (0 for leaves), while the 12 LSBs represent
29//!         /// the space this page would take when cloned from scratch,
30//!         /// minus the header. The reason for this is that entries
31//!         /// in internal nodes aren't really removed when deleted,
32//!         /// they're only "unlinked" from the array of offsets (see
33//!         /// below). Therefore, we must have a way to tell when a
34//!         /// page can be "compacted" to reclaim space.
35
36//!         left_page: u64,
37//!     }
38//!     ```
39
40//! - For leaves, the rest of the page has the same representation as
41//! an array of length `n`, of elements of type `Tuple<K, V>`:
42//!   ```
43//!   #[repr(C)]
44//!   struct Tuple<K, V> {
45//!       k: K,
46//!       v: V,
47//!   }
48//!   ```
49//!   If the alignment of that structure is more than 16 bytes, we
50//!   need to add some padding between the header and that array.
51
52//! - For internal nodes, the rest of the page starts with an array of
53//! length `n` of Little-Endian-encoded [`u64`], where the 12 least
54//! significant bits of each [`u64`] are an offset to a `Tuple<K, V>` in
55//! the page, and the 52 other bits are an offset in the file to the
56//! right child of the entry.
57//!
58//!   Moreover, the offset represented by the 12 LSBs is after (or at)
59//!   `header.data`.
60
61//!   We say we can "allocate" in the page if the `data` of the header
62//!   is greater than or equal to the position of the last "offset",
63//!   plus the size we want to allocate (note that since we allocate
64//!   from the end of the page, `data` is always a multiple of the
65//!   alignment of `Tuple<K, V>`).
66
67use super::*;
68use crate::btree::del::*;
69use crate::btree::put::*;
70use core::cmp::Ordering;
71
72mod alloc; // The "alloc" trait, to provide common methods for leaves and internal nodes.
73
74mod put; // Inserting a new value onto a page (possibly splitting the page).
75
76mod rebalance; // Rebalance two sibling pages to the left or to the right.
77
78use super::page_unsized::{cursor::PageCursor, header};
79use alloc::*;
80use header::*;
81use rebalance::*;
82
83/// This struct is used to allocate a pair `(K, V)` on the stack, and
84/// copy it from a C pointer from a page.
85///
86/// This is also used to form slices of pairs in order to use
87/// functions from the core library.
88#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
89#[repr(C)]
90struct Tuple<K, V> {
91    k: K,
92    v: V,
93}
94
95/// Empty type implementing `BTreePage` and `BTreeMutPage`.
96#[derive(Debug)]
97pub struct Page<K, V> {
98    k: core::marker::PhantomData<K>,
99    v: core::marker::PhantomData<V>,
100}
101
102impl<K: Storable, V: Storable> super::BTreePage<K, V> for Page<K, V> {
103    // Cursors are quite straightforward. One non-trivial thing is
104    // that they represent both a position on a page and the interval
105    // from that position to the end of the page, as is apparent in
106    // their `split_at` method.
107    //
108    // Another thing to note is that -1 and `c.total` are valid
109    // positions for a cursor `c`. The reason for this is that
110    // position `-1` has a right child (which is the first element's
111    // left child).
112
113    type Cursor = PageCursor;
114
115    fn is_empty(c: &Self::Cursor) -> bool {
116        c.cur >= c.total as isize
117    }
118
119    fn is_init(c: &Self::Cursor) -> bool {
120        c.cur < 0
121    }
122
123    fn cursor_before(p: &crate::CowPage) -> Self::Cursor {
124        PageCursor::new(p, -1)
125    }
126
127    fn cursor_after(p: &crate::CowPage) -> Self::Cursor {
128        PageCursor::after(p)
129    }
130
131    fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor) {
132        (
133            PageCursor {
134                cur: 0,
135                total: c.cur.max(0) as usize,
136                is_leaf: c.is_leaf,
137            },
138            *c,
139        )
140    }
141    fn move_next(c: &mut Self::Cursor) -> bool {
142        if c.cur >= c.total as isize {
143            return false;
144        }
145        c.cur += 1;
146        true
147    }
148    fn move_prev(c: &mut Self::Cursor) -> bool {
149        if c.cur < 0 {
150            return false;
151        }
152        c.cur -= 1;
153        true
154    }
155
156    fn current<'a, T: LoadPage>(
157        _txn: &T,
158        page: crate::Page<'a>,
159        c: &Self::Cursor,
160    ) -> Option<(&'a K, &'a V, u64)> {
161        // First, there's no current entry if the cursor is outside
162        // the range of entries.
163        if c.cur < 0 || c.cur >= c.total as isize {
164            None
165        } else if c.is_leaf {
166            // Else, if this is a leaf, the elements are packed
167            // together in a contiguous array.
168            //
169            // This means that the header may be followed by padding
170            // (in order to align the entries). These are constants
171            // known at compile-time, so `al` and `hdr` are optimised
172            // away by the compiler.
173            let al = core::mem::align_of::<Tuple<K, V>>();
174
175            // The following is a way to compute the first multiple of
176            // `al` after `HDR`, assuming `al` is a power of 2 (which
177            // is always the case since we get it from `align_of`).
178            let hdr = (HDR + al - 1) & !(al - 1);
179
180            // The position of the `Tuple<K, V>` we're looking for is
181            // `f * cur` bytes after the padded header (because
182            // `size_of` includes alignment padding).
183            let f = core::mem::size_of::<Tuple<K, V>>();
184            let kv = unsafe {
185                &*(page.data.as_ptr().add(hdr + c.cur as usize * f) as *const Tuple<K, V>)
186            };
187            Some((&kv.k, &kv.v, 0))
188        } else {
189            // Internal nodes have an extra level of indirection: we
190            // first need to find `off`, the offset in the page, in
191            // the initial array of offsets. Since these offsets are
192            // `u64`, and the header is of size 16 bytes, the array is
193            // already aligned.
194            unsafe {
195                let off =
196                    u64::from_le(*(page.data.as_ptr().add(HDR + 8 * c.cur as usize) as *const u64));
197                // Check that we aren't reading outside of the page
198                // (for example because of a malformed offset).
199                assert!((off as usize & 0xfff) + core::mem::size_of::<Tuple<K, V>>() <= 4096);
200
201                // Once we have the offset, cast its 12 LSBs to a
202                // position in the page, and read the `Tuple<K, V>` at
203                // that position.
204                let kv = &*(page.data.as_ptr().add((off as usize) & 0xfff) as *const Tuple<K, V>);
205                Some((&kv.k, &kv.v, off & !0xfff))
206            }
207        }
208    }
209
210    // The left and right child methods aren't really surprising. One
211    // thing to note is that cursors are always in positions between
212    // `-1` and `c.total` (bounds included), so we only have to check
213    // one side of the bound in the assertions.
214    //
215    // We also check, before entering the `unsafe` sections, that
216    // we're only reading data that is on a page.
217    fn left_child(page: crate::Page, c: &Self::Cursor) -> u64 {
218        if c.is_leaf {
219            0
220        } else {
221            assert!(c.cur >= 0 && HDR as isize + c.cur * 8 - 8 <= 4088);
222            let off = unsafe {
223                *(page.data.as_ptr().offset((HDR as isize + c.cur * 8) - 8) as *const u64)
224            };
225            u64::from_le(off) & !0xfff
226        }
227    }
228    fn right_child(page: crate::Page, c: &Self::Cursor) -> u64 {
229        if c.is_leaf {
230            0
231        } else {
232            assert!(c.cur < c.total as isize && HDR as isize + c.cur * 8 <= 4088);
233            let off =
234                unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8) as *const u64) };
235            u64::from_le(off) & !0xfff
236        }
237    }
238
239    fn set_cursor<'a, T: LoadPage>(
240        txn: &'a T,
241        page: crate::Page,
242        c: &mut PageCursor,
243        k0: &K,
244        v0: Option<&V>,
245    ) -> Result<(&'a K, &'a V, u64), usize> {
246        unsafe {
247            // `lookup` has the same semantic as
248            // `core::slice::binary_search`, i.e. it returns either
249            // `Ok(n)`, where `n` is the position where `(k0, v0)` was
250            // found, or `Err(n)` where `n` is the position where
251            // `(k0, v0)` can be inserted to preserve the order.
252            match lookup(txn, page, c, k0, v0) {
253                Ok(n) => {
254                    c.cur = n as isize;
255                    // Just read the tuple and return it.
256                    if c.is_leaf {
257                        let f = core::mem::size_of::<Tuple<K, V>>();
258                        let al = core::mem::align_of::<Tuple<K, V>>();
259                        let hdr_size = (HDR + al - 1) & !(al - 1);
260                        let tup =
261                            &*(page.data.as_ptr().add(hdr_size + f * n) as *const Tuple<K, V>);
262                        Ok((&tup.k, &tup.v, 0))
263                    } else {
264                        let off =
265                            u64::from_le(*(page.data.as_ptr().add(HDR + n * 8) as *const u64));
266                        let tup =
267                            &*(page.data.as_ptr().add(off as usize & 0xfff) as *const Tuple<K, V>);
268                        Ok((&tup.k, &tup.v, off & !0xfff))
269                    }
270                }
271                Err(n) => {
272                    c.cur = n as isize;
273                    Err(n)
274                }
275            }
276        }
277    }
278}
279
280impl<K: Storable + core::fmt::Debug, V: Storable + core::fmt::Debug> super::BTreeMutPage<K, V>
281    for Page<K, V>
282{
283    // Once again, this is quite straightforward.
284    fn init(page: &mut MutPage) {
285        let h = header_mut(page);
286        h.init();
287    }
288
289    // When deleting from internal nodes, we take a replacement from
290    // one of the leaves (in our current implementation, the leftmost
291    // entry of the right subtree). This method copies an entry from
292    // the leaf onto the program stack, which is necessary since
293    // deletions in leaves overwrites entries.
294    //
295    // Another design choice would have been to do the same as for the
296    // unsized implementation, but in this case this would have meant
297    // copying the saved value to the end of the leaf, potentially
298    // preventing merges, and not even saving a memory copy.
299    type Saved = (K, V);
300
301    fn save_deleted_leaf_entry(k: &K, v: &V) -> Self::Saved {
302        unsafe {
303            let mut k0 = core::mem::MaybeUninit::uninit();
304            let mut v0 = core::mem::MaybeUninit::uninit();
305            core::ptr::copy_nonoverlapping(k, k0.as_mut_ptr(), 1);
306            core::ptr::copy_nonoverlapping(v, v0.as_mut_ptr(), 1);
307            (k0.assume_init(), v0.assume_init())
308        }
309    }
310
311    unsafe fn from_saved<'a>(s: &Self::Saved) -> (&'a K, &'a V) {
312        (core::mem::transmute(&s.0), core::mem::transmute(&s.1))
313    }
314
315    // `put` inserts one or two entries onto a node (internal or
316    // leaf). This is implemented in the `put` module.
317    unsafe fn put<'a, T: AllocPage>(
318        txn: &mut T,
319        page: CowPage,
320        mutable: bool,
321        replace: bool,
322        c: &PageCursor,
323        k0: &'a K,
324        v0: &'a V,
325        k1v1: Option<(&'a K, &'a V)>,
326        l: u64,
327        r: u64,
328    ) -> Result<super::put::Put<'a, K, V>, T::Error> {
329        assert!(c.cur >= 0);
330        // In the sized case, deletions can never cause a split, so we
331        // never have to insert two elements at the same position.
332        assert!(k1v1.is_none());
333        if r == 0 {
334            put::put::<_, _, _, Leaf>(txn, page, mutable, replace, c.cur as usize, k0, v0, 0, 0)
335        } else {
336            put::put::<_, _, _, Internal>(txn, page, mutable, replace, c.cur as usize, k0, v0, l, r)
337        }
338    }
339
340    unsafe fn put_mut<T: AllocPage>(
341        txn: &mut T,
342        page: &mut MutPage,
343        c: &mut Self::Cursor,
344        k0: &K,
345        v0: &V,
346        r: u64,
347    ) {
348        use super::page_unsized::AllocWrite;
349        let mut n = c.cur;
350        if r == 0 {
351            Leaf::alloc_write(txn, page, k0, v0, 0, r, &mut n);
352        } else {
353            Internal::alloc_write(txn, page, k0, v0, 0, r, &mut n);
354        }
355        c.total += 1;
356    }
357
358    unsafe fn set_left_child(page: &mut MutPage, c: &Self::Cursor, l: u64) {
359        let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);
360        *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
361    }
362
363    // This function updates an internal node, setting the left child
364    // of the cursor to `l`.
365    unsafe fn update_left_child<T: AllocPage>(
366        txn: &mut T,
367        page: CowPage,
368        mutable: bool,
369        c: &Self::Cursor,
370        l: u64,
371    ) -> Result<crate::btree::put::Ok, T::Error> {
372        assert!(!c.is_leaf && c.cur >= 0 && (c.cur as usize) <= c.total);
373        let freed;
374        let page = if mutable && page.is_dirty() {
375            // If the page is mutable (dirty), just convert it to a
376            // mutable page, and update.
377            freed = 0;
378            MutPage(page)
379        } else {
380            // Else, clone the page.
381            let mut new = unsafe { txn.alloc_page()? };
382            <Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
383            // First clone the left child of the page.
384            let l = header(page.as_page()).left_page() & !0xfff;
385            let hdr = header_mut(&mut new);
386            hdr.set_left_page(l);
387            // And then the rest of the page.
388            let s = Internal::offset_slice::<T, K, V>(page.as_page());
389            clone::<K, V, Internal>(page.as_page(), &mut new, s, &mut 0);
390            // Mark the former version of the page as free.
391            freed = page.offset | if page.is_dirty() { 1 } else { 0 };
392            new
393        };
394        // Finally, update the left child of the cursor.
395        unsafe {
396            let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);
397            *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
398        }
399        Ok(Ok { page, freed })
400    }
401
402    unsafe fn del<T: AllocPage>(
403        txn: &mut T,
404        page: crate::CowPage,
405        mutable: bool,
406        c: &PageCursor,
407        l: u64,
408    ) -> Result<(MutPage, u64), T::Error> {
409        assert!(c.cur >= 0 && (c.cur as usize) < c.total);
410        if mutable && page.is_dirty() {
411            // In the mutable case, we just need to move some memory
412            // around.
413            let p = page.data;
414            let mut page = MutPage(page);
415            let hdr = header_mut(&mut page);
416            let f = core::mem::size_of::<Tuple<K, V>>();
417            if c.is_leaf {
418                // In leaves, we need to move the n - c - 1 elements
419                // that are strictly after the cursor, by `f` (the
420                // size of an entry).
421                //
422                // Here's the reasoning to avoid off-by-one errors: if
423                // `c` is 0 (i.e. we're deleting the first element on
424                // the page), we remove one entry, so there are n - 1
425                // remaining entries.
426                let n = hdr.n() as usize;
427                let hdr_size = {
428                    // As usual, header + padding
429                    let al = core::mem::align_of::<Tuple<K, V>>();
430                    (HDR + al - 1) & !(al - 1)
431                };
432                let off = hdr_size + c.cur as usize * f;
433                core::ptr::copy(p.add(off + f), p.add(off), f * (n - c.cur as usize - 1));
434                // Removing `f` bytes from the page.
435                hdr.decr(f);
436            } else {
437                // Internal nodes are easier to deal with, as we just
438                // have to move the offsets.
439                unsafe {
440                    let ptr = p.add(HDR + c.cur as usize * 8) as *mut u64;
441                    core::ptr::copy(ptr.offset(1), ptr, hdr.n() as usize - c.cur as usize - 1);
442                }
443                // Removing `f` bytes from the page (the tuple), plus
444                // one 8-bytes offset.
445                hdr.decr(f + 8);
446
447                // Updating the left page if necessary.
448                if l > 0 {
449                    unsafe {
450                        let off = (p.add(HDR) as *mut u64).offset(c.cur as isize - 1);
451                        *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
452                    }
453                }
454            }
455            hdr.set_n(hdr.n() - 1);
456            // Returning the mutable page itself, and 0 (for "no page freed")
457            Ok((page, 0))
458        } else {
459            // Immutable pages need to be cloned. The strategy is the
460            // same in both cases: get an "offset slice", split it at
461            // the cursor, remove the first entry of the second half,
462            // and clone.
463            let mut new = txn.alloc_page()?;
464            <Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
465            if c.is_leaf {
466                let s = Leaf::offset_slice::<T, K, V>(page.as_page());
467                let (s0, s1) = s.split_at(c.cur as usize);
468                let (_, s1) = s1.split_at(1);
469                let mut n = 0;
470                clone::<K, V, Leaf>(page.as_page(), &mut new, s0, &mut n);
471                clone::<K, V, Leaf>(page.as_page(), &mut new, s1, &mut n);
472            } else {
473                // Internal nodes a bit trickier, since the left child
474                // is not counted in the "offset slice", so we need to
475                // clone it separately. Also, the `l` argument to this
476                // function might be non-zero in this case.
477                let s = Internal::offset_slice::<T, K, V>(page.as_page());
478                let (s0, s1) = s.split_at(c.cur as usize);
479                let (_, s1) = s1.split_at(1);
480
481                // First, clone the left child of the page.
482                let hdr = header(page.as_page());
483                let left = hdr.left_page() & !0xfff;
484                unsafe { *(new.0.data.add(HDR - 8) as *mut u64) = left.to_le() };
485
486                // Then, clone the entries strictly before the cursor
487                // (i.e. clone `s0`).
488                let mut n = 0;
489                clone::<K, V, Internal>(page.as_page(), &mut new, s0, &mut n);
490
491                // If we need to update the left child of the deleted
492                // item, do it.
493                if l > 0 {
494                    unsafe {
495                        let off = new.0.data.offset(HDR as isize + (n - 1) * 8) as *mut u64;
496                        *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
497                    }
498                }
499
500                // Finally, clone the right half of the page (`s1`).
501                clone::<K, V, Internal>(page.as_page(), &mut new, s1, &mut n);
502            }
503            Ok((new, page.offset))
504        }
505    }
506
507    // Decide what to do with the concatenation of two neighbouring
508    // pages, with a middle element.
509    unsafe fn merge_or_rebalance<'a, T: AllocPage>(
510        txn: &mut T,
511        m: Concat<'a, K, V, Self>,
512    ) -> Result<Op<'a, T, K, V>, T::Error> {
513        // First evaluate the size of the middle element on a page.
514        let (hdr_size, mid_size) = if m.modified.c0.is_leaf {
515            let al = core::mem::align_of::<Tuple<K, V>>();
516            (
517                (HDR + al - 1) & !(al - 1),
518                core::mem::size_of::<Tuple<K, V>>(),
519            )
520        } else {
521            (HDR, 8 + core::mem::size_of::<Tuple<K, V>>())
522        };
523
524        // Evaluate the size of the modified half of the concatenation
525        // (which includes the header).
526        let mod_size = size::<K, V>(&m.modified);
527        // Add the "occupied" size (which excludes the header).
528        let occupied = {
529            let hdr = header(m.other.as_page());
530            (hdr.left_page() & 0xfff) as usize
531        };
532
533        // One surprising observation here is that adding the sizes
534        // works. This is surprising because of alignment and
535        // padding. It works because we can split the sizes into an
536        // offset part (always 8 bytes) and a data part (of a constant
537        // alignment), and thus we never need any padding anywhere on
538        // the page.
539        if mod_size + mid_size + occupied <= PAGE_SIZE {
540            // If the concatenation fits on a page, merge.
541            return if m.modified.c0.is_leaf {
542                super::page_unsized::merge::<_, _, _, _, Leaf>(txn, m)
543            } else {
544                super::page_unsized::merge::<_, _, _, _, Internal>(txn, m)
545            };
546        }
547        // If the modified page is large enough, or if the other page
548        // has only just barely the minimum number of elements to be
549        // at least half-full, return.
550        //
551        // The situation where the other page isn't full enough might
552        // happen for example if elements occupy exactly 1/5th of a
553        // page + 1 byte, and the modified page has 2 of them after
554        // the deletion, while the other page has 3.
555        if mod_size >= PAGE_SIZE / 2 || hdr_size + occupied - mid_size < PAGE_SIZE / 2 {
556            if let Some((k, v)) = m.modified.ins {
557                // Perform the required modification and return.
558                return Ok(Op::Put(unsafe {
559                    Self::put(
560                        txn,
561                        m.modified.page,
562                        m.modified.mutable,
563                        m.modified.skip_first,
564                        &m.modified.c1,
565                        k,
566                        v,
567                        m.modified.ins2,
568                        m.modified.l,
569                        m.modified.r,
570                    )?
571                }));
572            } else if m.modified.skip_first {
573                // `ins2` is only ever used when the page immediately
574                // below a deletion inside an internal node has split,
575                // and we need to replace the deleted value, *and*
576                // insert the middle entry of the split.
577                debug_assert!(m.modified.ins2.is_none());
578                let (page, freed) = Self::del(
579                    txn,
580                    m.modified.page,
581                    m.modified.mutable,
582                    &m.modified.c1,
583                    m.modified.l,
584                )?;
585                return Ok(Op::Put(Put::Ok(Ok { page, freed })));
586            } else {
587                let mut c1 = m.modified.c1.clone();
588                let mut l = m.modified.l;
589                if l == 0 {
590                    Self::move_next(&mut c1);
591                    l = m.modified.r;
592                }
593                return Ok(Op::Put(Put::Ok(Self::update_left_child(
594                    txn,
595                    m.modified.page,
596                    m.modified.mutable,
597                    &c1,
598                    l,
599                )?)));
600            }
601        }
602
603        // Finally, if we're here, we can rebalance. There are four
604        // (relatively explicit) cases, see the `rebalance` submodule
605        // to see how this is done.
606        if m.mod_is_left {
607            if m.modified.c0.is_leaf {
608                rebalance_left::<_, _, _, Leaf>(txn, m)
609            } else {
610                rebalance_left::<_, _, _, Internal>(txn, m)
611            }
612        } else {
613            super::page_unsized::rebalance::rebalance_right::<_, _, _, Self>(txn, m)
614        }
615    }
616}
617
618/// Size of a modified page (including the header).
619fn size<K: Storable, V: Storable>(m: &ModifiedPage<K, V, Page<K, V>>) -> usize {
620    let mut total = {
621        let hdr = header(m.page.as_page());
622        (hdr.left_page() & 0xfff) as usize
623    };
624    if m.c1.is_leaf {
625        let al = core::mem::align_of::<Tuple<K, V>>();
626        total += (HDR + al - 1) & (!al - 1);
627    } else {
628        total += HDR
629    };
630
631    // Extra size for the offsets.
632    let extra = if m.c1.is_leaf { 0 } else { 8 };
633
634    if m.ins.is_some() {
635        total += extra + core::mem::size_of::<Tuple<K, V>>();
636        if m.ins2.is_some() {
637            total += extra + core::mem::size_of::<Tuple<K, V>>()
638        }
639    }
640    // If we skip the first entry of `m.c1`, remove its size.
641    if m.skip_first {
642        total -= extra + core::mem::size_of::<Tuple<K, V>>()
643    }
644    total
645}
646
647/// Linear search, leaf version. This is relatively straightforward.
648fn leaf_linear_search<T: LoadPage, K: Storable, V: Storable>(
649    txn: &T,
650    k0: &K,
651    v0: Option<&V>,
652    s: &[Tuple<K, V>],
653) -> Result<usize, usize> {
654    let mut n = 0;
655    for sm in s.iter() {
656        match sm.k.compare(txn, k0) {
657            Ordering::Less => n += 1,
658            Ordering::Greater => return Err(n),
659            Ordering::Equal => {
660                if let Some(v0) = v0 {
661                    match sm.v.compare(txn, v0) {
662                        Ordering::Less => n += 1,
663                        Ordering::Greater => return Err(n),
664                        Ordering::Equal => return Ok(n),
665                    }
666                } else {
667                    return Ok(n);
668                }
669            }
670        }
671    }
672    Err(n)
673}
674
675/// An equivalent of `Ord::cmp` on `Tuple<K, V>` but using
676/// `Storable::compare` instead of `Ord::cmp` on `K` and `V`.
677fn cmp<T: LoadPage, K: Storable, V: Storable>(
678    txn: &T,
679    k0: &K,
680    v0: Option<&V>,
681    p: &[u8; 4096],
682    off: u64,
683) -> Ordering {
684    let off = u64::from_le(off) & 0xfff;
685    if off as usize + core::mem::size_of::<Tuple<K, V>>() > PAGE_SIZE {
686        panic!(
687            "off = {:?}, size = {:?}",
688            off,
689            core::mem::size_of::<Tuple<K, V>>()
690        );
691    }
692    let tup = unsafe { &*(p.as_ptr().offset(off as isize & 0xfff) as *const Tuple<K, V>) };
693    match tup.k.compare(txn, k0) {
694        Ordering::Equal => {
695            if let Some(v0) = v0 {
696                tup.v.compare(txn, v0)
697            } else {
698                Ordering::Equal
699            }
700        }
701        o => o,
702    }
703}
704
705/// Linear search for internal nodes. Does what it says.
706unsafe fn internal_search<T: LoadPage, K: Storable, V: Storable>(
707    txn: &T,
708    k0: &K,
709    v0: Option<&V>,
710    s: &[u64],
711    p: &[u8; 4096],
712) -> Result<usize, usize> {
713    for (n, off) in s.iter().enumerate() {
714        match cmp(txn, k0, v0, p, *off) {
715            Ordering::Less => {}
716            Ordering::Greater => return Err(n),
717            Ordering::Equal => return Ok(n),
718        }
719    }
720    Err(s.len())
721}
722
723/// Lookup just forms slices of offsets (for internal nodes) or values
724/// (for leaves), and runs an internal search on them.
725unsafe fn lookup<T: LoadPage, K: Storable, V: Storable>(
726    txn: &T,
727    page: crate::Page,
728    c: &mut PageCursor,
729    k0: &K,
730    v0: Option<&V>,
731) -> Result<usize, usize> {
732    let hdr = header(page);
733    c.total = hdr.n() as usize;
734    c.is_leaf = hdr.is_leaf();
735    if c.is_leaf {
736        let al = core::mem::align_of::<Tuple<K, V>>();
737        let hdr_size = (HDR + al - 1) & !(al - 1);
738        let s = core::slice::from_raw_parts(
739            page.data.as_ptr().add(hdr_size) as *const Tuple<K, V>,
740            hdr.n() as usize,
741        );
742        leaf_linear_search(txn, k0, v0, s)
743    } else {
744        let s = core::slice::from_raw_parts(
745            page.data.as_ptr().add(HDR) as *const u64,
746            hdr.n() as usize,
747        );
748        internal_search(txn, k0, v0, s, page.data)
749    }
750}
751
752/// Clone a slice of offsets onto a page. This essentially does what
753/// it says. Note that the leftmost child of a page is not included in
754/// the offset slices, so we don't have to handle it here.
755///
756/// This should really be in the `Alloc` trait, but Rust doesn't have
757/// associated type constructors, so we can't have an associated type
758/// that's sometimes a slice and sometimes a "Range".
759fn clone<K: Storable, V: Storable, L: Alloc>(
760    page: crate::Page,
761    new: &mut MutPage,
762    s: Offsets,
763    n: &mut isize,
764) {
765    match s {
766        Offsets::Slice(s) => {
767            // We know we're in an internal node here.
768            let size = core::mem::size_of::<Tuple<K, V>>();
769            for off in s.iter() {
770                let off = u64::from_le(*off);
771                let r = off & !0xfff;
772                let off = off & 0xfff;
773                unsafe {
774                    let ptr = page.data.as_ptr().add(off as usize);
775
776                    // Reserve the space on the page
777                    let hdr = header_mut(new);
778                    let data = hdr.data() as u16;
779                    let off_new = data - size as u16;
780                    hdr.set_data(off_new);
781
782                    // Copy the entry from the original page to its
783                    // position on the new page.
784                    core::ptr::copy_nonoverlapping(ptr, new.0.data.add(off_new as usize), size);
785
786                    // Set the offset to this new entry in the offset
787                    // array, along with the right child page.
788                    let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;
789                    *ptr = (r | off_new as u64).to_le();
790                }
791                *n += 1;
792            }
793            let hdr = header_mut(new);
794            hdr.set_n(hdr.n() + s.len() as u16);
795            hdr.incr((8 + size) * s.len());
796        }
797        Offsets::Range(r) => {
798            let size = core::mem::size_of::<Tuple<K, V>>();
799            let a = core::mem::align_of::<Tuple<K, V>>();
800            let header_size = (HDR + a - 1) & !(a - 1);
801            let len = r.len();
802            for off in r {
803                unsafe {
804                    let ptr = page.data.as_ptr().add(header_size + off * size);
805                    let new_ptr = new.0.data.add(header_size + (*n as usize) * size);
806                    core::ptr::copy_nonoverlapping(ptr, new_ptr, size);
807                }
808                *n += 1;
809            }
810            // On leaves, we do have to update everything manually,
811            // because we're simply copying stuff.
812            let hdr = header_mut(new);
813            hdr.set_n(hdr.n() + len as u16);
814            hdr.incr(size * len);
815        }
816    }
817}