use crate::sealed::Sealed;
use crate::tree_store::{
AccessGuardMut, Btree, BtreeDrain, BtreeDrainFilter, BtreeMut, BtreeRangeIter, Checksum,
PageHint, PageNumber, RawBtree, TransactionalMemory, MAX_VALUE_LENGTH,
};
use crate::types::{MutInPlaceValue, RedbKey, RedbValue};
use crate::{AccessGuard, StorageError, WriteTransaction};
use crate::{Result, TableHandle};
use std::borrow::Borrow;
use std::fmt::{Debug, Formatter};
use std::ops::RangeBounds;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub struct TableStats {
pub(crate) tree_height: u32,
pub(crate) leaf_pages: u64,
pub(crate) branch_pages: u64,
pub(crate) stored_leaf_bytes: u64,
pub(crate) metadata_bytes: u64,
pub(crate) fragmented_bytes: u64,
}
impl TableStats {
pub fn tree_height(&self) -> u32 {
self.tree_height
}
pub fn leaf_pages(&self) -> u64 {
self.leaf_pages
}
pub fn branch_pages(&self) -> u64 {
self.branch_pages
}
pub fn stored_bytes(&self) -> u64 {
self.stored_leaf_bytes
}
pub fn metadata_bytes(&self) -> u64 {
self.metadata_bytes
}
pub fn fragmented_bytes(&self) -> u64 {
self.fragmented_bytes
}
}
pub struct Table<'db, 'txn, K: RedbKey + 'static, V: RedbValue + 'static> {
name: String,
transaction: &'txn WriteTransaction<'db>,
tree: BtreeMut<'txn, K, V>,
}
impl<K: RedbKey + 'static, V: RedbValue + 'static> TableHandle for Table<'_, '_, K, V> {
fn name(&self) -> &str {
&self.name
}
}
impl<'db, 'txn, K: RedbKey + 'static, V: RedbValue + 'static> Table<'db, 'txn, K, V> {
pub(crate) fn new(
name: &str,
table_root: Option<(PageNumber, Checksum)>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
mem: &'db TransactionalMemory,
transaction: &'txn WriteTransaction<'db>,
) -> Table<'db, 'txn, K, V> {
Table {
name: name.to_string(),
transaction,
tree: BtreeMut::new(table_root, mem, freed_pages),
}
}
#[allow(dead_code)]
pub(crate) fn print_debug(&self, include_values: bool) -> Result {
self.tree.print_debug(include_values)
}
pub fn pop_first(&mut self) -> Result<Option<(AccessGuard<K>, AccessGuard<V>)>> {
let first = self
.iter()?
.next()
.map(|x| x.map(|(key, _)| K::as_bytes(&key.value()).as_ref().to_vec()));
if let Some(owned_key) = first {
let owned_key = owned_key?;
let key = K::from_bytes(&owned_key);
let value = self.remove(&key)?.unwrap();
drop(key);
Ok(Some((AccessGuard::with_owned_value(owned_key), value)))
} else {
Ok(None)
}
}
pub fn pop_last(&mut self) -> Result<Option<(AccessGuard<K>, AccessGuard<V>)>> {
let last = self
.iter()?
.next_back()
.map(|x| x.map(|(key, _)| K::as_bytes(&key.value()).as_ref().to_vec()));
if let Some(owned_key) = last {
let owned_key = owned_key?;
let key = K::from_bytes(&owned_key);
let value = self.remove(&key)?.unwrap();
drop(key);
Ok(Some((AccessGuard::with_owned_value(owned_key), value)))
} else {
Ok(None)
}
}
pub fn drain<'a, KR>(&mut self, range: impl RangeBounds<KR> + 'a) -> Result<Drain<K, V>>
where
K: 'a,
KR: Borrow<K::SelfType<'a>> + 'a,
{
self.tree.drain(&range).map(Drain::new)
}
pub fn drain_filter<'a, KR, F: for<'f> Fn(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
&mut self,
range: impl RangeBounds<KR> + 'a,
predicate: F,
) -> Result<DrainFilter<K, V, F>>
where
K: 'a,
KR: Borrow<K::SelfType<'a>> + 'a,
{
self.tree
.drain_filter(&range, predicate)
.map(DrainFilter::new)
}
pub fn insert<'k, 'v>(
&mut self,
key: impl Borrow<K::SelfType<'k>>,
value: impl Borrow<V::SelfType<'v>>,
) -> Result<Option<AccessGuard<V>>> {
let value_len = V::as_bytes(value.borrow()).as_ref().len();
if value_len > MAX_VALUE_LENGTH {
return Err(StorageError::ValueTooLarge(value_len));
}
let key_len = K::as_bytes(key.borrow()).as_ref().len();
if key_len > MAX_VALUE_LENGTH {
return Err(StorageError::ValueTooLarge(key_len));
}
self.tree.insert(key.borrow(), value.borrow())
}
pub fn remove<'a>(
&mut self,
key: impl Borrow<K::SelfType<'a>>,
) -> Result<Option<AccessGuard<V>>>
where
K: 'a,
{
self.tree.remove(key.borrow())
}
}
impl<'db, 'txn, K: RedbKey + 'static, V: MutInPlaceValue + 'static> Table<'db, 'txn, K, V> {
pub fn insert_reserve<'a>(
&mut self,
key: impl Borrow<K::SelfType<'a>>,
value_length: u32,
) -> Result<AccessGuardMut<V>>
where
K: 'a,
{
if value_length as usize > MAX_VALUE_LENGTH {
return Err(StorageError::ValueTooLarge(value_length as usize));
}
let key_len = K::as_bytes(key.borrow()).as_ref().len();
if key_len > MAX_VALUE_LENGTH {
return Err(StorageError::ValueTooLarge(key_len));
}
self.tree.insert_reserve(key.borrow(), value_length)
}
}
impl<'db, 'txn, K: RedbKey + 'static, V: RedbValue + 'static> ReadableTable<K, V>
for Table<'db, 'txn, K, V>
{
fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
where
K: 'a,
{
self.tree.get(key.borrow())
}
fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
where
K: 'a,
KR: Borrow<K::SelfType<'a>> + 'a,
{
self.tree.range(&range).map(Range::new)
}
fn stats(&self) -> Result<TableStats> {
let tree_stats = self.tree.stats()?;
Ok(TableStats {
tree_height: tree_stats.tree_height,
leaf_pages: tree_stats.leaf_pages,
branch_pages: tree_stats.branch_pages,
stored_leaf_bytes: tree_stats.stored_leaf_bytes,
metadata_bytes: tree_stats.metadata_bytes,
fragmented_bytes: tree_stats.fragmented_bytes,
})
}
fn len(&self) -> Result<u64> {
self.tree.len()
}
fn is_empty(&self) -> Result<bool> {
self.len().map(|x| x == 0)
}
}
impl<K: RedbKey, V: RedbValue> Sealed for Table<'_, '_, K, V> {}
impl<'db, 'txn, K: RedbKey + 'static, V: RedbValue + 'static> Drop for Table<'db, 'txn, K, V> {
fn drop(&mut self) {
self.transaction.close_table(&self.name, &self.tree);
}
}
fn debug_helper<K: RedbKey + 'static, V: RedbValue + 'static>(
f: &mut Formatter<'_>,
name: &str,
len: Result<u64>,
first: Result<Option<(AccessGuard<K>, AccessGuard<V>)>>,
last: Result<Option<(AccessGuard<K>, AccessGuard<V>)>>,
) -> std::fmt::Result {
write!(f, "Table [ name: \"{}\", ", name)?;
if let Ok(len) = len {
if len == 0 {
write!(f, "No entries")?;
} else if len == 1 {
if let Ok(first) = first {
let (key, value) = first.as_ref().unwrap();
write!(f, "One key-value: {:?} = {:?}", key.value(), value.value())?;
} else {
write!(f, "I/O Error accessing table!")?;
}
} else {
if let Ok(first) = first {
let (key, value) = first.as_ref().unwrap();
write!(f, "first: {:?} = {:?}, ", key.value(), value.value())?;
} else {
write!(f, "I/O Error accessing table!")?;
}
if len > 2 {
write!(f, "...{} more entries..., ", len - 2)?;
}
if let Ok(last) = last {
let (key, value) = last.as_ref().unwrap();
write!(f, "last: {:?} = {:?}", key.value(), value.value())?;
} else {
write!(f, "I/O Error accessing table!")?;
}
}
} else {
write!(f, "I/O Error accessing table!")?;
}
write!(f, " ]")?;
Ok(())
}
impl<K: RedbKey + 'static, V: RedbValue + 'static> Debug for Table<'_, '_, K, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
debug_helper(f, &self.name, self.len(), self.first(), self.last())
}
}
pub trait ReadableTable<K: RedbKey + 'static, V: RedbValue + 'static>: Sealed {
fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
where
K: 'a;
fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
where
K: 'a,
KR: Borrow<K::SelfType<'a>> + 'a;
fn first(&self) -> Result<Option<(AccessGuard<K>, AccessGuard<V>)>> {
self.iter()?.next().transpose()
}
fn last(&self) -> Result<Option<(AccessGuard<K>, AccessGuard<V>)>> {
self.iter()?.next_back().transpose()
}
fn stats(&self) -> Result<TableStats>;
fn len(&self) -> Result<u64>;
fn is_empty(&self) -> Result<bool>;
fn iter(&self) -> Result<Range<K, V>> {
self.range::<K::SelfType<'_>>(..)
}
}
pub struct ReadOnlyUntypedTable<'txn> {
tree: RawBtree<'txn>,
}
impl<'txn> ReadOnlyUntypedTable<'txn> {
pub(crate) fn new(
root_page: Option<(PageNumber, Checksum)>,
fixed_key_size: Option<usize>,
fixed_value_size: Option<usize>,
mem: &'txn TransactionalMemory,
) -> Self {
Self {
tree: RawBtree::new(root_page, fixed_key_size, fixed_value_size, mem),
}
}
pub fn stats(&self) -> Result<TableStats> {
let tree_stats = self.tree.stats()?;
Ok(TableStats {
tree_height: tree_stats.tree_height,
leaf_pages: tree_stats.leaf_pages,
branch_pages: tree_stats.branch_pages,
stored_leaf_bytes: tree_stats.stored_leaf_bytes,
metadata_bytes: tree_stats.metadata_bytes,
fragmented_bytes: tree_stats.fragmented_bytes,
})
}
}
pub struct ReadOnlyTable<'txn, K: RedbKey + 'static, V: RedbValue + 'static> {
name: String,
tree: Btree<'txn, K, V>,
}
impl<'txn, K: RedbKey + 'static, V: RedbValue + 'static> ReadOnlyTable<'txn, K, V> {
pub(crate) fn new(
name: String,
root_page: Option<(PageNumber, Checksum)>,
hint: PageHint,
mem: &'txn TransactionalMemory,
) -> Result<ReadOnlyTable<'txn, K, V>> {
Ok(ReadOnlyTable {
name,
tree: Btree::new(root_page, hint, mem)?,
})
}
}
impl<'txn, K: RedbKey + 'static, V: RedbValue + 'static> ReadableTable<K, V>
for ReadOnlyTable<'txn, K, V>
{
fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
where
K: 'a,
{
self.tree.get(key.borrow())
}
fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
where
K: 'a,
KR: Borrow<K::SelfType<'a>> + 'a,
{
self.tree.range(&range).map(Range::new)
}
fn stats(&self) -> Result<TableStats> {
let tree_stats = self.tree.stats()?;
Ok(TableStats {
tree_height: tree_stats.tree_height,
leaf_pages: tree_stats.leaf_pages,
branch_pages: tree_stats.branch_pages,
stored_leaf_bytes: tree_stats.stored_leaf_bytes,
metadata_bytes: tree_stats.metadata_bytes,
fragmented_bytes: tree_stats.fragmented_bytes,
})
}
fn len(&self) -> Result<u64> {
self.tree.len()
}
fn is_empty(&self) -> Result<bool> {
self.len().map(|x| x == 0)
}
}
impl<K: RedbKey, V: RedbValue> Sealed for ReadOnlyTable<'_, K, V> {}
impl<K: RedbKey + 'static, V: RedbValue + 'static> Debug for ReadOnlyTable<'_, K, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
debug_helper(f, &self.name, self.len(), self.first(), self.last())
}
}
pub struct Drain<'a, K: RedbKey + 'static, V: RedbValue + 'static> {
inner: BtreeDrain<'a, K, V>,
}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Drain<'a, K, V> {
fn new(inner: BtreeDrain<'a, K, V>) -> Self {
Self { inner }
}
}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Iterator for Drain<'a, K, V> {
type Item = Result<(AccessGuard<'a, K>, AccessGuard<'a, V>)>;
fn next(&mut self) -> Option<Self::Item> {
let entry = self.inner.next()?;
Some(entry.map(|entry| {
let (page, key_range, value_range) = entry.into_raw();
let key = AccessGuard::with_page(page.clone(), key_range);
let value = AccessGuard::with_page(page, value_range);
(key, value)
}))
}
}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> DoubleEndedIterator for Drain<'a, K, V> {
fn next_back(&mut self) -> Option<Self::Item> {
let entry = self.inner.next_back()?;
Some(entry.map(|entry| {
let (page, key_range, value_range) = entry.into_raw();
let key = AccessGuard::with_page(page.clone(), key_range);
let value = AccessGuard::with_page(page, value_range);
(key, value)
}))
}
}
pub struct DrainFilter<
'a,
K: RedbKey + 'static,
V: RedbValue + 'static,
F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
> {
inner: BtreeDrainFilter<'a, K, V, F>,
}
impl<
'a,
K: RedbKey + 'static,
V: RedbValue + 'static,
F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
> DrainFilter<'a, K, V, F>
{
fn new(inner: BtreeDrainFilter<'a, K, V, F>) -> Self {
Self { inner }
}
}
impl<
'a,
K: RedbKey + 'static,
V: RedbValue + 'static,
F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
> Iterator for DrainFilter<'a, K, V, F>
{
type Item = Result<(AccessGuard<'a, K>, AccessGuard<'a, V>)>;
fn next(&mut self) -> Option<Self::Item> {
let entry = self.inner.next()?;
Some(entry.map(|entry| {
let (page, key_range, value_range) = entry.into_raw();
let key = AccessGuard::with_page(page.clone(), key_range);
let value = AccessGuard::with_page(page, value_range);
(key, value)
}))
}
}
impl<
'a,
K: RedbKey + 'static,
V: RedbValue + 'static,
F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
> DoubleEndedIterator for DrainFilter<'a, K, V, F>
{
fn next_back(&mut self) -> Option<Self::Item> {
let entry = self.inner.next_back()?;
Some(entry.map(|entry| {
let (page, key_range, value_range) = entry.into_raw();
let key = AccessGuard::with_page(page.clone(), key_range);
let value = AccessGuard::with_page(page, value_range);
(key, value)
}))
}
}
pub struct Range<'a, K: RedbKey + 'static, V: RedbValue + 'static> {
inner: BtreeRangeIter<'a, K, V>,
}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Range<'a, K, V> {
pub(super) fn new(inner: BtreeRangeIter<'a, K, V>) -> Self {
Self { inner }
}
}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Iterator for Range<'a, K, V> {
type Item = Result<(AccessGuard<'a, K>, AccessGuard<'a, V>)>;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|x| {
x.map(|entry| {
let (page, key_range, value_range) = entry.into_raw();
let key = AccessGuard::with_page(page.clone(), key_range);
let value = AccessGuard::with_page(page, value_range);
(key, value)
})
})
}
}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> DoubleEndedIterator for Range<'a, K, V> {
fn next_back(&mut self) -> Option<Self::Item> {
self.inner.next_back().map(|x| {
x.map(|entry| {
let (page, key_range, value_range) = entry.into_raw();
let key = AccessGuard::with_page(page.clone(), key_range);
let value = AccessGuard::with_page(page, value_range);
(key, value)
})
})
}
}