sanakirja_core/btree/
page_unsized.rs

1//! Implementation of B tree pages for Unsized types, or types with an
2//! dynamically-sized representation (for example enums with widely
3//! different sizes).
4//!
5//! This module follows the same organisation as the sized
6//! implementation, and contains types shared between the two
7//! implementations.
8//!
9//! The types that can be used with this implementation must implement
10//! the [`UnsizedStorable`] trait, which essentially replaces the
11//! [`core::mem`] functions for determining the size and alignment of
12//! values.
13//!
14//! One key difference is the implementation of leaves (internal nodes
15//! have the same format): indeed, in this implementation, leaves have
16//! almost the same format as internal nodes, except that their
17//! offsets are written on the page as little-endian-encoded [`u16`]
18//! (with only the 12 LSBs used, i.e. 4 bits unused).
19
20use super::*;
21use crate::btree::del::*;
22use crate::btree::put::*;
23use core::cmp::Ordering;
24
25// The header is the same as for the sized implementation, so we share
26// it here.
27pub(super) mod header;
28
29// Like in the sized implementation, we have the same three submodules.
30mod alloc;
31
32// This is a common module with the sized implementation.
33pub(super) mod cursor;
34
35mod put;
36pub(super) mod rebalance;
37use alloc::*;
38use cursor::*;
39use header::*;
40use rebalance::*;
41
42#[derive(Debug)]
43pub struct Page<K: ?Sized, V: ?Sized> {
44    k: core::marker::PhantomData<K>,
45    v: core::marker::PhantomData<V>,
46}
47
48// Many of these functions are the same as in the Sized implementations.
49impl<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized> super::BTreePage<K, V>
50    for Page<K, V>
51{
52    fn is_empty(c: &Self::Cursor) -> bool {
53        c.cur >= c.total as isize
54    }
55
56    fn is_init(c: &Self::Cursor) -> bool {
57        c.cur < 0
58    }
59    type Cursor = PageCursor;
60    fn cursor_before(p: &crate::CowPage) -> Self::Cursor {
61        PageCursor::new(p, -1)
62    }
63    fn cursor_after(p: &crate::CowPage) -> Self::Cursor {
64        PageCursor::after(p)
65    }
66
67    // Split a cursor, returning two cursors `(a, b)` where b is the
68    // same as `c`, and `a` is a cursor running from the first element
69    // of the page to `c` (excluding `c`).
70    fn split_at(c: &Self::Cursor) -> (Self::Cursor, Self::Cursor) {
71        (
72            PageCursor {
73                cur: 0,
74                total: c.cur.max(0) as usize,
75                is_leaf: c.is_leaf,
76            },
77            *c,
78        )
79    }
80
81    fn move_next(c: &mut Self::Cursor) -> bool {
82        if c.cur < c.total as isize {
83            c.cur += 1;
84            true
85        } else {
86            false
87        }
88    }
89    fn move_prev(c: &mut Self::Cursor) -> bool {
90        if c.cur > 0 {
91            c.cur -= 1;
92            true
93        } else {
94            c.cur = -1;
95            false
96        }
97    }
98
99    // This function is the same as the sized implementation for
100    // internal nodes, since the only difference between leaves and
101    // internal nodes in this implementation is the size of offsets (2
102    // bytes for leaves, 8 bytes for internal nodes).
103    fn current<'a, T: LoadPage>(
104        txn: &T,
105        page: crate::Page<'a>,
106        c: &Self::Cursor,
107    ) -> Option<(&'a K, &'a V, u64)> {
108        if c.cur < 0 || c.cur >= c.total as isize {
109            None
110        } else if c.is_leaf {
111            unsafe {
112                let off =
113                    u16::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 2) as *const u16));
114                let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().add(off as usize));
115                Some((
116                    K::from_raw_ptr(txn, k as *const u8),
117                    V::from_raw_ptr(txn, v as *const u8),
118                    0,
119                ))
120            }
121        } else {
122            unsafe {
123                let off =
124                    u64::from_le(*(page.data.as_ptr().add(HDR + c.cur as usize * 8) as *const u64));
125                let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().add((off & 0xfff) as usize));
126                Some((
127                    K::from_raw_ptr(txn, k as *const u8),
128                    V::from_raw_ptr(txn, v as *const u8),
129                    off & !0xfff,
130                ))
131            }
132        }
133    }
134
135    // These methods are the same as in the sized implementation.
136    fn left_child(page: crate::Page, c: &Self::Cursor) -> u64 {
137        assert!(c.cur >= 0);
138        if c.is_leaf {
139            0
140        } else {
141            assert!(c.cur >= 0 && HDR as isize + c.cur * 8 - 8 <= 4088);
142            let off =
143                unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8 - 8) as *const u64) };
144            u64::from_le(off) & !0xfff
145        }
146    }
147    fn right_child(page: crate::Page, c: &Self::Cursor) -> u64 {
148        assert!(c.cur < c.total as isize);
149        if c.is_leaf {
150            0
151        } else {
152            assert!(c.cur < c.total as isize && HDR as isize + c.cur * 8 - 8 <= 4088);
153            let off =
154                unsafe { *(page.data.as_ptr().offset(HDR as isize + c.cur * 8) as *const u64) };
155            u64::from_le(off) & !0xfff
156        }
157    }
158
159    fn set_cursor<'a, T: LoadPage>(
160        txn: &'a T,
161        page: crate::Page,
162        c: &mut PageCursor,
163        k0: &K,
164        v0: Option<&V>,
165    ) -> Result<(&'a K, &'a V, u64), usize> {
166        unsafe {
167            // `lookup` has the same semantic as
168            // `core::slice::binary_search`, i.e. it returns either
169            // `Ok(n)`, where `n` is the position where `(k0, v0)` was
170            // found, or `Err(n)` where `n` is the position where
171            // `(k0, v0)` can be inserted to preserve the order.
172            match lookup(txn, page, c, k0, v0) {
173                Ok(n) => {
174                    c.cur = n as isize;
175                    if c.is_leaf {
176                        let off =
177                            u16::from_le(*(page.data.as_ptr().add(HDR + n * 2) as *const u16));
178                        let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().add(off as usize));
179                        Ok((K::from_raw_ptr(txn, k), V::from_raw_ptr(txn, v), 0))
180                    } else {
181                        let off =
182                            u64::from_le(*(page.data.as_ptr().add(HDR + n * 8) as *const u64));
183                        let (k, v) =
184                            read::<T, K, V>(txn, page.data.as_ptr().add(off as usize & 0xfff));
185                        Ok((
186                            K::from_raw_ptr(txn, k),
187                            V::from_raw_ptr(txn, v),
188                            off & !0xfff,
189                        ))
190                    }
191                }
192                Err(n) => {
193                    c.cur = n as isize;
194                    Err(n)
195                }
196            }
197        }
198    }
199}
200
201// There quite some duplicated code in the following function, because
202// we're forming a slice of offsets, and the using the core library's
203// `binary_search_by` method on slices.
204unsafe fn lookup<T: LoadPage, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
205    txn: &T,
206    page: crate::Page,
207    c: &mut PageCursor,
208    k0: &K,
209    v0: Option<&V>,
210) -> Result<usize, usize> {
211    let hdr = header(page);
212    c.total = hdr.n() as usize;
213    c.is_leaf = hdr.is_leaf();
214    if c.is_leaf {
215        lookup_leaf(txn, page, k0, v0, hdr)
216    } else {
217        lookup_internal(txn, page, k0, v0, hdr)
218    }
219}
220
221unsafe fn lookup_internal<T: LoadPage, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
222    txn: &T,
223    page: crate::Page,
224    k0: &K,
225    v0: Option<&V>,
226    hdr: &header::Header,
227) -> Result<usize, usize> {
228    let s =
229        core::slice::from_raw_parts(page.data.as_ptr().add(HDR) as *const u64, hdr.n() as usize);
230    if let Some(v0) = v0 {
231        s.binary_search_by(|&off| {
232            let off = u64::from_le(off) & 0xfff;
233            let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize & 0xfff));
234            let k = K::from_raw_ptr(txn, k);
235            match k.compare(txn, k0) {
236                Ordering::Equal => {
237                    let v = V::from_raw_ptr(txn, v);
238                    v.compare(txn, v0)
239                }
240                cmp => cmp,
241            }
242        })
243    } else {
244        match s.binary_search_by(|&off| {
245            let off = u64::from_le(off) & 0xfff;
246            let (k, _) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize & 0xfff));
247            let k = K::from_raw_ptr(txn, k);
248            k.compare(txn, k0)
249        }) {
250            Err(i) => Err(i),
251            Ok(mut i) => {
252                // Rewind if there are multiple matching keys.
253                while i > 0 {
254                    let off = u64::from_le(s[i - 1]) & 0xfff;
255                    let (k, _) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize));
256                    let k = K::from_raw_ptr(txn, k);
257                    if let Ordering::Equal = k.compare(txn, k0) {
258                        i -= 1
259                    } else {
260                        break;
261                    }
262                }
263                Ok(i)
264            }
265        }
266    }
267}
268
269unsafe fn lookup_leaf<T: LoadPage, K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
270    txn: &T,
271    page: crate::Page,
272    k0: &K,
273    v0: Option<&V>,
274    hdr: &header::Header,
275) -> Result<usize, usize> {
276    let s =
277        core::slice::from_raw_parts(page.data.as_ptr().add(HDR) as *const u16, hdr.n() as usize);
278    if let Some(v0) = v0 {
279        s.binary_search_by(|&off| {
280            let off = u16::from_le(off);
281            let (k, v) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize));
282            let k = K::from_raw_ptr(txn, k as *const u8);
283            match k.compare(txn, k0) {
284                Ordering::Equal => {
285                    let v = V::from_raw_ptr(txn, v as *const u8);
286                    v.compare(txn, v0)
287                }
288                cmp => cmp,
289            }
290        })
291    } else {
292        match s.binary_search_by(|&off| {
293            let off = u16::from_le(off);
294            let (k, _) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize));
295            let k = K::from_raw_ptr(txn, k);
296            k.compare(txn, k0)
297        }) {
298            Err(e) => Err(e),
299            Ok(mut i) => {
300                // Rewind if there are multiple matching keys.
301                while i > 0 {
302                    let off = u16::from_le(s[i - 1]);
303                    let (k, _) = read::<T, K, V>(txn, page.data.as_ptr().offset(off as isize));
304                    let k = K::from_raw_ptr(txn, k);
305                    if let Ordering::Equal = k.compare(txn, k0) {
306                        i -= 1
307                    } else {
308                        break;
309                    }
310                }
311                Ok(i)
312            }
313        }
314    }
315}
316
317impl<
318        K: UnsizedStorable + ?Sized + core::fmt::Debug,
319        V: UnsizedStorable + ?Sized + core::fmt::Debug,
320    > super::BTreeMutPage<K, V> for Page<K, V>
321{
322    // The init function is straightforward.
323    fn init(page: &mut MutPage) {
324        let h = header_mut(page);
325        h.init();
326    }
327
328    // Deletions in internal nodes of a B tree need to replace the
329    // deleted value with a value from a leaf.
330    //
331    // In this implementation of pages, we never actually wipe any
332    // data from pages, we're even only appending data, or cloning the
333    // pages to compact them. Therefore, raw pointers to leaves are
334    // always valid for as long as the page isn't freed, which can
335    // only happen at the very last step of an insertion or deletion.
336    type Saved = (*const K, *const V);
337    fn save_deleted_leaf_entry(k: &K, v: &V) -> Self::Saved {
338        (k as *const K, v as *const V)
339    }
340
341    unsafe fn from_saved<'a>(s: &Self::Saved) -> (&'a K, &'a V) {
342        (&*s.0, &*s.1)
343    }
344
345    // As in the sized implementation, `put` is split into its own submodule.
346    unsafe fn put<'a, T: AllocPage>(
347        txn: &mut T,
348        page: CowPage,
349        mutable: bool,
350        replace: bool,
351        c: &PageCursor,
352        k0: &'a K,
353        v0: &'a V,
354        k1v1: Option<(&'a K, &'a V)>,
355        l: u64,
356        r: u64,
357    ) -> Result<crate::btree::put::Put<'a, K, V>, T::Error> {
358        debug_assert!(c.cur >= 0);
359        debug_assert!(k1v1.is_none() || replace);
360        if r == 0 {
361            put::put::<_, _, _, Leaf>(
362                txn,
363                page,
364                mutable,
365                replace,
366                c.cur as usize,
367                k0,
368                v0,
369                k1v1,
370                0,
371                0,
372            )
373        } else {
374            put::put::<_, _, _, Internal>(
375                txn,
376                page,
377                mutable,
378                replace,
379                c.cur as usize,
380                k0,
381                v0,
382                k1v1,
383                l,
384                r,
385            )
386        }
387    }
388
389    unsafe fn put_mut<T: AllocPage>(
390        txn: &mut T,
391        page: &mut MutPage,
392        c: &mut Self::Cursor,
393        k0: &K,
394        v0: &V,
395        r: u64,
396    ) {
397        let mut n = c.cur;
398        if r == 0 {
399            Leaf::alloc_write(txn, page, k0, v0, 0, r, &mut n);
400        } else {
401            Internal::alloc_write(txn, page, k0, v0, 0, r, &mut n);
402        }
403        c.total += 1;
404    }
405
406    unsafe fn set_left_child(page: &mut MutPage, c: &Self::Cursor, l: u64) {
407        let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur - 1);
408        *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
409    }
410
411    // Update the left child of the entry pointed to by cursor `c`.
412    unsafe fn update_left_child<T: AllocPage>(
413        txn: &mut T,
414        page: CowPage,
415        mutable: bool,
416        c: &Self::Cursor,
417        l: u64,
418    ) -> Result<crate::btree::put::Ok, T::Error> {
419        assert!(!c.is_leaf);
420        let freed;
421        let page = if mutable && page.is_dirty() {
422            // If the page is dirty (allocated by this transaction)
423            // and isn't shared, just make it mutable.
424            freed = 0;
425            MutPage(page)
426        } else {
427            // Else, clone the page:
428            let mut new = txn.alloc_page()?;
429            <Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
430            // Copy the left child
431            let l = header(page.as_page()).left_page() & !0xfff;
432            let hdr = header_mut(&mut new);
433            hdr.set_left_page(l);
434            // Copy all the entries
435            let s = Internal::offset_slice(page.as_page());
436            clone::<K, V, Internal>(page.as_page(), &mut new, s, &mut 0);
437            // Mark the old version for freeing.
438            freed = page.offset | if page.is_dirty() { 1 } else { 0 };
439            new
440        };
441        // Finally, update the left children of the cursor. We know
442        // that all valid positions of a cursor except the leftmost
443        // one (-1) have a left child.
444        assert!(c.cur >= 0);
445        unsafe {
446            let off = (page.0.data.add(HDR) as *mut u64).offset(c.cur as isize - 1);
447            *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
448        }
449        Ok(Ok { page, freed })
450    }
451
452    // Here is how deletions work: if the page is dirty and mutable,
453    // we "unlink" the value by moving the end of the offset array to
454    // the left by one offset (2 bytes in leaves, 8 bytes in internal
455    // nodes).
456    unsafe fn del<T: AllocPage>(
457        txn: &mut T,
458        page: crate::CowPage,
459        mutable: bool,
460        c: &PageCursor,
461        l: u64,
462    ) -> Result<(MutPage, u64), T::Error> {
463        // Check that the cursor is at a valid position for a deletion.
464        debug_assert!(c.cur >= 0 && c.cur < c.total as isize);
465        if mutable && page.is_dirty() {
466            let p = page.data;
467            let mut page = MutPage(page);
468            let hdr = header_mut(&mut page);
469            let n = hdr.n();
470            if c.is_leaf {
471                unsafe {
472                    let ptr = p.add(HDR + c.cur as usize * 2) as *mut u16;
473                    // Get the offset in the page of the key/value data.
474                    let off = u16::from_le(*ptr);
475                    assert_eq!(off & 0xf000, 0);
476                    // Erase the offset by shifting the last (`n -
477                    // c.cur - 1`) offsets. The reasoning against
478                    // "off-by-one errors" is that if the cursor is
479                    // positioned on the first element (`c.cur == 0`),
480                    // there are `n-1` elements to shift.
481                    core::ptr::copy(ptr.offset(1), ptr, n as usize - c.cur as usize - 1);
482                    // Remove the size of the actualy key/value, plus
483                    // 2 bytes (the offset).
484                    hdr.decr(2 + entry_size::<K, V>(p.add(off as usize)));
485                }
486            } else {
487                unsafe {
488                    let ptr = p.add(HDR + c.cur as usize * 8) as *mut u64;
489                    // Offset to the key/value data.
490                    let off = u64::from_le(*ptr);
491                    // Shift the offsets like in the leaf case above.
492                    core::ptr::copy(ptr.offset(1), ptr, n as usize - c.cur as usize - 1);
493                    if l > 0 {
494                        // In an internal node, we may have to update
495                        // the left child of the current
496                        // position. After the move, the current
497                        // offset is at `ptr`, so we need to find the
498                        // offset one step to the left.
499                        let p = ptr.offset(-1);
500                        *p = (l | (u64::from_le(*p) & 0xfff)).to_le();
501                    }
502                    // Remove the size of the key/value, plus 8 bytes
503                    // (the offset).
504                    hdr.decr(8 + entry_size::<K, V>(p.add((off & 0xfff) as usize)));
505                }
506            };
507            hdr.set_n(n - 1);
508            // Return the page, and 0 for "nothing was freed".
509            Ok((page, 0))
510        } else {
511            // If the page cannot be mutated, we allocate a new page and clone.
512            let mut new = txn.alloc_page()?;
513            <Page<K, V> as BTreeMutPage<K, V>>::init(&mut new);
514            if c.is_leaf {
515                // In a leaf, this is just a matter of getting the
516                // offset slice, removing the current position and
517                // cloning.
518                let s = Leaf::offset_slice(page.as_page());
519                let (s0, s1) = s.split_at(c.cur as usize);
520                let (_, s1) = s1.split_at(1);
521                let mut n = 0;
522                clone::<K, V, Leaf>(page.as_page(), &mut new, s0, &mut n);
523                clone::<K, V, Leaf>(page.as_page(), &mut new, s1, &mut n);
524            } else {
525                // In an internal node, things are a bit trickier,
526                // since we might need to update the left child.
527                //
528                // First, clone the leftmost child of the page.
529                let hdr = header(page.as_page());
530                let left = hdr.left_page() & !0xfff;
531                unsafe {
532                    *(new.0.data.add(HDR) as *mut u64).offset(-1) = left.to_le();
533                }
534                // Then, clone the first half of the page.
535                let s = Internal::offset_slice(page.as_page());
536                let (s0, s1) = s.split_at(c.cur as usize);
537                let (_, s1) = s1.split_at(1);
538                let mut n = 0;
539                clone::<K, V, Internal>(page.as_page(), &mut new, s0, &mut n);
540                // Update the left child, which was written by the
541                // call to `clone` as the right child of the last
542                // entry in `s0`.
543                if l > 0 {
544                    unsafe {
545                        let off = (new.0.data.add(HDR) as *mut u64).offset(n - 1);
546                        *off = (l | (u64::from_le(*off) & 0xfff)).to_le();
547                    }
548                }
549                // Then, clone the second half of the page.
550                clone::<K, V, Internal>(page.as_page(), &mut new, s1, &mut n);
551            }
552            // Return the new page, and the offset of the freed page.
553            Ok((new, page.offset))
554        }
555    }
556
557    // Decide what to do with the concatenation of two neighbouring
558    // pages, with a middle element.
559    //
560    // This is highly similar to the sized case.
561    unsafe fn merge_or_rebalance<'a, T: AllocPage>(
562        txn: &mut T,
563        m: Concat<'a, K, V, Self>,
564    ) -> Result<Op<'a, T, K, V>, T::Error> {
565        // First evaluate the size of the middle element on a
566        // page. Contrarily to the sized case, the offsets are
567        // aligned, so the header is always the same size (no
568        // padding).
569        let mid_size = if m.modified.c0.is_leaf {
570            2 + alloc_size::<K, V>(m.mid.0, m.mid.1)
571        } else {
572            8 + alloc_size::<K, V>(m.mid.0, m.mid.1)
573        };
574
575        // Evaluate the size of the modified page of the concatenation
576        // (which includes the header).
577        let mod_size = size::<K, V>(&m.modified);
578        // Add the "occupied" size (which excludes the header).
579        let occupied = {
580            let hdr = header(m.other.as_page());
581            (hdr.left_page() & 0xfff) as usize
582        };
583        if mod_size + mid_size + occupied <= PAGE_SIZE {
584            // If the concatenation fits on a page, merge.
585            return if m.modified.c0.is_leaf {
586                merge::<_, _, _, _, Leaf>(txn, m)
587            } else {
588                merge::<_, _, _, _, Internal>(txn, m)
589            };
590        }
591
592        // If we can't merge, evaluate the size of the first binding
593        // on the other page, to see if we can rebalance.
594        let first_other = PageCursor::new(&m.other, 0);
595        let first_other_size = current_size::<K, V>(m.other.as_page(), &first_other);
596
597        // If the modified page is at least half-full, or if removing
598        // the first entry on the other page would make that other
599        // page less than half-full, don't rebalance. See the Sized
600        // implementation to see cases where this happens.
601        if mod_size >= PAGE_SIZE / 2 || HDR + occupied - first_other_size < PAGE_SIZE / 2 {
602            if let Some((k, v)) = m.modified.ins {
603                return Ok(Op::Put(Self::put(
604                    txn,
605                    m.modified.page,
606                    m.modified.mutable,
607                    m.modified.skip_first,
608                    &m.modified.c1,
609                    k,
610                    v,
611                    m.modified.ins2,
612                    m.modified.l,
613                    m.modified.r,
614                )?));
615            } else if m.modified.skip_first {
616                debug_assert!(m.modified.ins2.is_none());
617                let (page, freed) = Self::del(
618                    txn,
619                    m.modified.page,
620                    m.modified.mutable,
621                    &m.modified.c1,
622                    m.modified.l,
623                )?;
624                return Ok(Op::Put(Put::Ok(Ok { page, freed })));
625            } else {
626                let mut c1 = m.modified.c1.clone();
627                let mut l = m.modified.l;
628                if l == 0 {
629                    Self::move_next(&mut c1);
630                    l = m.modified.r;
631                }
632                return Ok(Op::Put(Put::Ok(Self::update_left_child(
633                    txn,
634                    m.modified.page,
635                    m.modified.mutable,
636                    &c1,
637                    l,
638                )?)));
639            }
640        }
641        // Finally, if we're here, we can rebalance. There are four
642        // (relatively explicit) cases, see the `rebalance` submodule
643        // to see how this is done.
644        if m.mod_is_left {
645            if m.modified.c0.is_leaf {
646                rebalance_left::<_, _, _, Leaf>(txn, m)
647            } else {
648                rebalance_left::<_, _, _, Internal>(txn, m)
649            }
650        } else {
651            rebalance_right::<_, _, _, Self>(txn, m)
652        }
653    }
654}
655
656/// Size of a modified page (including the header).
657fn size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
658    m: &ModifiedPage<K, V, Page<K, V>>,
659) -> usize {
660    let mut total = {
661        let hdr = header(m.page.as_page());
662        (hdr.left_page() & 0xfff) as usize
663    };
664    total += HDR;
665
666    // Extra size for the offsets.
667    let extra = if m.c1.is_leaf { 2 } else { 8 };
668    if let Some((k, v)) = m.ins {
669        total += extra + alloc_size(k, v) as usize;
670        if let Some((k, v)) = m.ins2 {
671            total += extra + alloc_size(k, v) as usize;
672        }
673    }
674    if m.skip_first {
675        total -= current_size::<K, V>(m.page.as_page(), &m.c1) as usize;
676    }
677    total
678}
679
680// Size of a pair of type `(K, V)`. This is computed in the same way
681// as a struct with fields of type `K` and `V` in C.
682fn alloc_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(k: &K, v: &V) -> usize {
683    let s = ((k.size() + V::ALIGN - 1) & !(V::ALIGN - 1)) + v.size();
684    let al = K::ALIGN.max(V::ALIGN);
685    (s + al - 1) & !(al - 1)
686}
687
688// Total size of the entry for that cursor position, including the
689// offset size.
690fn current_size<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized>(
691    page: crate::Page,
692    c: &PageCursor,
693) -> usize {
694    if c.is_leaf {
695        Leaf::current_size::<K, V>(page, c.cur)
696    } else {
697        Internal::current_size::<K, V>(page, c.cur)
698    }
699}
700
701pub(super) trait AllocWrite<K: ?Sized, V: ?Sized> {
702    fn alloc_write<T: AllocPage>(
703        txn: &mut T,
704        new: &mut MutPage,
705        k0: &K,
706        v0: &V,
707        l: u64,
708        r: u64,
709        n: &mut isize,
710    );
711    fn set_left_child(new: &mut MutPage, n: isize, l: u64);
712}
713
714/// Perform the modifications on a page, by copying it onto page `new`.
715fn modify<
716    T: LoadPage + AllocPage,
717    K: core::fmt::Debug + ?Sized,
718    V: core::fmt::Debug + ?Sized,
719    P: BTreeMutPage<K, V>,
720    L: AllocWrite<K, V>,
721>(
722    txn: &mut T,
723    new: &mut MutPage,
724    m: &mut ModifiedPage<K, V, P>,
725    n: &mut isize,
726) {
727    let mut l = P::left_child(m.page.as_page(), &m.c0);
728    while let Some((k, v, r)) = P::next(txn, m.page.as_page(), &mut m.c0) {
729        L::alloc_write(txn, new, k, v, l, r, n);
730        l = 0;
731    }
732    let mut rr = m.r;
733    if let Some((k, v)) = m.ins {
734        if let Some((k2, v2)) = m.ins2 {
735            L::alloc_write(txn, new, k, v, l, m.l, n);
736            L::alloc_write(txn, new, k2, v2, 0, m.r, n);
737        } else if m.l > 0 {
738            L::alloc_write(txn, new, k, v, m.l, m.r, n);
739        } else {
740            L::alloc_write(txn, new, k, v, l, m.r, n);
741        }
742        l = 0;
743        rr = 0;
744    } else if m.l > 0 {
745        l = m.l
746    }
747    let mut c1 = m.c1.clone();
748    if m.skip_first {
749        P::move_next(&mut c1);
750    }
751    // Here's a confusing thing: if the first element of `c1` is the
752    // last element of a page, we may be updating its right child (in
753    // which case m.r > 0) rather than its left child like for all
754    // other elements.
755    //
756    // This case only ever happens for this function when we're
757    // updating the last child of a page p, and then merging p with
758    // its right neighbour.
759    while let Some((k, v, r)) = P::next(txn, m.page.as_page(), &mut c1) {
760        if rr > 0 {
761            L::alloc_write(txn, new, k, v, l, rr, n);
762            rr = 0;
763        } else {
764            L::alloc_write(txn, new, k, v, l, r, n);
765        }
766        l = 0;
767    }
768    if l != 0 {
769        // The while loop above didn't run, i.e. the insertion
770        // happened at the end of the page. In this case, we haven't
771        // had a chance to update the left page, so do it now.
772        L::set_left_child(new, *n, l)
773    }
774}
775
776/// The very unsurprising `merge` function
777pub(super) fn merge<
778    'a,
779    T: AllocPage + LoadPage,
780    K: ?Sized + core::fmt::Debug,
781    V: ?Sized + core::fmt::Debug,
782    P: BTreeMutPage<K, V>,
783    L: AllocWrite<K, V>,
784>(
785    txn: &mut T,
786    mut m: Concat<K, V, P>,
787) -> Result<Op<'a, T, K, V>, T::Error> {
788    // Here, we first allocate a page, then clone both pages onto it,
789    // in a different order depending on whether the modified page is
790    // the left or the right child.
791    //
792    // Note that in the case that this merge happens immediately after
793    // a put that reallocated the two sides of the merge in order to
794    // split (not all splits do that), we could be slightly more
795    // efficient, but with considerably more code.
796    let mut new = unsafe { txn.alloc_page()? };
797    P::init(&mut new);
798
799    let mut n = 0;
800    if m.mod_is_left {
801        modify::<_, _, _, _, L>(txn, &mut new, &mut m.modified, &mut n);
802        let mut rc = P::cursor_first(&m.other);
803        let l = P::left_child(m.other.as_page(), &rc);
804        L::alloc_write(txn, &mut new, m.mid.0, m.mid.1, 0, l, &mut n);
805        while let Some((k, v, r)) = P::next(txn, m.other.as_page(), &mut rc) {
806            L::alloc_write(txn, &mut new, k, v, 0, r, &mut n);
807        }
808    } else {
809        let mut rc = P::cursor_first(&m.other);
810        let mut l = P::left_child(m.other.as_page(), &rc);
811        while let Some((k, v, r)) = P::next(txn, m.other.as_page(), &mut rc) {
812            L::alloc_write(txn, &mut new, k, v, l, r, &mut n);
813            l = 0;
814        }
815        L::alloc_write(txn, &mut new, m.mid.0, m.mid.1, 0, 0, &mut n);
816        modify::<_, _, _, _, L>(txn, &mut new, &mut m.modified, &mut n);
817    }
818
819    let b0 = if m.modified.page.is_dirty() { 1 } else { 0 };
820    let b1 = if m.other.is_dirty() { 1 } else { 0 };
821    Ok(Op::Merged {
822        page: new,
823        freed: [m.modified.page.offset | b0, m.other.offset | b1],
824        marker: core::marker::PhantomData,
825    })
826}
827
828fn clone<K: UnsizedStorable + ?Sized, V: UnsizedStorable + ?Sized, L: Alloc>(
829    page: crate::Page,
830    new: &mut MutPage,
831    s: Offsets<L::Offset>,
832    n: &mut isize,
833) {
834    for off in s.0.iter() {
835        let (r, off): (u64, usize) = (*off).into();
836        unsafe {
837            let ptr = page.data.as_ptr().add(off);
838            let size = entry_size::<K, V>(ptr);
839            // Reserve the space on the page
840            let hdr = header_mut(new);
841            let data = hdr.data() as u16;
842            let off_new = data - size as u16;
843            hdr.set_data(off_new);
844            hdr.set_n(hdr.n() + 1);
845            if hdr.is_leaf() {
846                hdr.incr(2 + size);
847                let ptr = new.0.data.offset(HDR as isize + *n * 2) as *mut u16;
848                *ptr = (off_new as u16).to_le();
849            } else {
850                hdr.incr(8 + size);
851                // Set the offset to this new entry in the offset
852                // array, along with the right child page.
853                let ptr = new.0.data.offset(HDR as isize + *n * 8) as *mut u64;
854                *ptr = (r | off_new as u64).to_le();
855            }
856            core::ptr::copy_nonoverlapping(ptr, new.0.data.offset(off_new as isize), size);
857        }
858        *n += 1;
859    }
860}