use crate::error::CommitError;
use crate::multimap_table::ReadOnlyUntypedMultimapTable;
use crate::sealed::Sealed;
use crate::table::ReadOnlyUntypedTable;
use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
use crate::tree_store::{
Btree, BtreeMut, Checksum, FreedPageList, FreedTableKey, InternalTableDefinition, PageHint,
PageNumber, SerializedSavepoint, TableTree, TableType, TransactionalMemory, MAX_VALUE_LENGTH,
};
use crate::types::{RedbKey, RedbValue};
use crate::{
AccessGuard, Database, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range,
ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table,
TableDefinition, TableError, TableHandle, UntypedMultimapTableHandle, UntypedTableHandle,
};
#[cfg(feature = "logging")]
use log::{info, warn};
use std::borrow::Borrow;
use std::cmp::min;
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::marker::PhantomData;
use std::ops::{RangeBounds, RangeFull};
#[cfg(not(target_has_atomic = "64"))]
use portable_atomic::{AtomicBool, Ordering};
#[cfg(target_has_atomic = "64")]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::{panic, thread};
const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
SystemTableDefinition::new("next_savepoint_id");
pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
SystemTableDefinition::new("persistent_savepoints");
pub struct SystemTableDefinition<'a, K: RedbKey + 'static, V: RedbValue + 'static> {
name: &'a str,
_key_type: PhantomData<K>,
_value_type: PhantomData<V>,
}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> SystemTableDefinition<'a, K, V> {
pub const fn new(name: &'a str) -> Self {
assert!(!name.is_empty());
Self {
name,
_key_type: PhantomData,
_value_type: PhantomData,
}
}
}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableHandle
for SystemTableDefinition<'a, K, V>
{
fn name(&self) -> &str {
self.name
}
}
impl<K: RedbKey, V: RedbValue> Sealed for SystemTableDefinition<'_, K, V> {}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Clone for SystemTableDefinition<'a, K, V> {
fn clone(&self) -> Self {
*self
}
}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Copy for SystemTableDefinition<'a, K, V> {}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Display for SystemTableDefinition<'a, K, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}<{}, {}>",
self.name,
K::type_name().name(),
V::type_name().name()
)
}
}
#[derive(Debug)]
pub struct DatabaseStats {
pub(crate) tree_height: u32,
pub(crate) allocated_pages: u64,
pub(crate) leaf_pages: u64,
pub(crate) branch_pages: u64,
pub(crate) stored_leaf_bytes: u64,
pub(crate) metadata_bytes: u64,
pub(crate) fragmented_bytes: u64,
pub(crate) page_size: usize,
}
impl DatabaseStats {
pub fn tree_height(&self) -> u32 {
self.tree_height
}
pub fn allocated_pages(&self) -> u64 {
self.allocated_pages
}
pub fn leaf_pages(&self) -> u64 {
self.leaf_pages
}
pub fn branch_pages(&self) -> u64 {
self.branch_pages
}
pub fn stored_bytes(&self) -> u64 {
self.stored_leaf_bytes
}
pub fn metadata_bytes(&self) -> u64 {
self.metadata_bytes
}
pub fn fragmented_bytes(&self) -> u64 {
self.fragmented_bytes
}
pub fn page_size(&self) -> usize {
self.page_size
}
}
#[derive(Copy, Clone, Debug)]
#[non_exhaustive]
pub enum Durability {
None,
Eventual,
Immediate,
Paranoid,
}
pub struct SystemTable<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> {
name: String,
namespace: &'s mut SystemNamespace<'db>,
tree: BtreeMut<'s, K, V>,
}
impl<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> SystemTable<'db, 's, K, V> {
fn new(
name: &str,
table_root: Option<(PageNumber, Checksum)>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
mem: &'db TransactionalMemory,
namespace: &'s mut SystemNamespace<'db>,
) -> SystemTable<'db, 's, K, V> {
SystemTable {
name: name.to_string(),
namespace,
tree: BtreeMut::new(table_root, mem, freed_pages),
}
}
fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
where
K: 'a,
{
self.tree.get(key.borrow())
}
fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
where
K: 'a,
KR: Borrow<K::SelfType<'a>> + 'a,
{
self.tree.range(&range).map(Range::new)
}
pub fn insert<'k, 'v>(
&mut self,
key: impl Borrow<K::SelfType<'k>>,
value: impl Borrow<V::SelfType<'v>>,
) -> Result<Option<AccessGuard<V>>> {
let value_len = V::as_bytes(value.borrow()).as_ref().len();
if value_len > MAX_VALUE_LENGTH {
return Err(StorageError::ValueTooLarge(value_len));
}
let key_len = K::as_bytes(key.borrow()).as_ref().len();
if key_len > MAX_VALUE_LENGTH {
return Err(StorageError::ValueTooLarge(key_len));
}
self.tree.insert(key.borrow(), value.borrow())
}
pub fn remove<'a>(
&mut self,
key: impl Borrow<K::SelfType<'a>>,
) -> Result<Option<AccessGuard<V>>>
where
K: 'a,
{
self.tree.remove(key.borrow())
}
}
impl<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> Drop for SystemTable<'db, 's, K, V> {
fn drop(&mut self) {
self.namespace.close_table(&self.name, &self.tree);
}
}
struct SystemNamespace<'db> {
table_tree: TableTree<'db>,
}
impl<'db> SystemNamespace<'db> {
fn open_system_table<'txn, 's, K: RedbKey + 'static, V: RedbValue + 'static>(
&'s mut self,
transaction: &'txn WriteTransaction<'db>,
definition: SystemTableDefinition<K, V>,
) -> Result<SystemTable<'db, 's, K, V>> {
#[cfg(feature = "logging")]
info!("Opening system table: {}", definition);
let root = self
.table_tree
.get_or_create_table::<K, V>(definition.name(), TableType::Normal)
.map_err(|e| {
e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
})?;
transaction.dirty.store(true, Ordering::Release);
Ok(SystemTable::new(
definition.name(),
root.get_root(),
transaction.freed_pages.clone(),
transaction.mem,
self,
))
}
fn close_table<K: RedbKey + 'static, V: RedbValue + 'static>(
&mut self,
name: &str,
table: &BtreeMut<K, V>,
) {
self.table_tree
.stage_update_table_root(name, table.get_root());
}
}
struct TableNamespace<'db> {
open_tables: HashMap<String, &'static panic::Location<'static>>,
table_tree: TableTree<'db>,
}
impl<'db> TableNamespace<'db> {
#[track_caller]
fn inner_open<K: RedbKey + 'static, V: RedbValue + 'static>(
&mut self,
name: &str,
table_type: TableType,
) -> Result<Option<(PageNumber, Checksum)>, TableError> {
if let Some(location) = self.open_tables.get(name) {
return Err(TableError::TableAlreadyOpen(name.to_string(), location));
}
let internal_table = self
.table_tree
.get_or_create_table::<K, V>(name, table_type)?;
self.open_tables
.insert(name.to_string(), panic::Location::caller());
Ok(internal_table.get_root())
}
#[track_caller]
pub fn open_multimap_table<'txn, K: RedbKey + 'static, V: RedbKey + 'static>(
&mut self,
transaction: &'txn WriteTransaction<'db>,
definition: MultimapTableDefinition<K, V>,
) -> Result<MultimapTable<'db, 'txn, K, V>, TableError> {
#[cfg(feature = "logging")]
info!("Opening multimap table: {}", definition);
let root = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
transaction.dirty.store(true, Ordering::Release);
Ok(MultimapTable::new(
definition.name(),
root,
transaction.freed_pages.clone(),
transaction.mem,
transaction,
))
}
#[track_caller]
pub fn open_table<'txn, K: RedbKey + 'static, V: RedbValue + 'static>(
&mut self,
transaction: &'txn WriteTransaction<'db>,
definition: TableDefinition<K, V>,
) -> Result<Table<'db, 'txn, K, V>, TableError> {
#[cfg(feature = "logging")]
info!("Opening table: {}", definition);
let root = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
transaction.dirty.store(true, Ordering::Release);
Ok(Table::new(
definition.name(),
root,
transaction.freed_pages.clone(),
transaction.mem,
transaction,
))
}
#[track_caller]
fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
if let Some(location) = self.open_tables.get(name) {
return Err(TableError::TableAlreadyOpen(name.to_string(), location));
}
self.table_tree.delete_table(name, table_type)
}
#[track_caller]
fn delete_table<'txn>(
&mut self,
transaction: &'txn WriteTransaction<'db>,
name: &str,
) -> Result<bool, TableError> {
#[cfg(feature = "logging")]
info!("Deleting table: {}", name);
transaction.dirty.store(true, Ordering::Release);
self.inner_delete(name, TableType::Normal)
}
#[track_caller]
fn delete_multimap_table<'txn>(
&mut self,
transaction: &'txn WriteTransaction<'db>,
name: &str,
) -> Result<bool, TableError> {
#[cfg(feature = "logging")]
info!("Deleting multimap table: {}", name);
transaction.dirty.store(true, Ordering::Release);
self.inner_delete(name, TableType::Multimap)
}
pub(crate) fn close_table<K: RedbKey + 'static, V: RedbValue + 'static>(
&mut self,
name: &str,
table: &BtreeMut<K, V>,
) {
self.open_tables.remove(name).unwrap();
self.table_tree
.stage_update_table_root(name, table.get_root());
}
}
pub struct WriteTransaction<'db> {
db: &'db Database,
transaction_tracker: Arc<Mutex<TransactionTracker>>,
mem: &'db TransactionalMemory,
transaction_id: TransactionId,
freed_tree: Mutex<BtreeMut<'db, FreedTableKey, FreedPageList<'static>>>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
post_commit_frees: Arc<Mutex<Vec<PageNumber>>>,
tables: Mutex<TableNamespace<'db>>,
system_tables: Mutex<SystemNamespace<'db>>,
completed: bool,
dirty: AtomicBool,
durability: Durability,
created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
}
impl<'db> WriteTransaction<'db> {
pub(crate) fn new(
db: &'db Database,
transaction_tracker: Arc<Mutex<TransactionTracker>>,
) -> Result<Self> {
let transaction_id = db.start_write_transaction();
let root_page = db.get_memory().get_data_root();
let system_page = db.get_memory().get_system_root();
let freed_root = db.get_memory().get_freed_root();
let freed_pages = Arc::new(Mutex::new(vec![]));
let post_commit_frees = Arc::new(Mutex::new(vec![]));
let tables = TableNamespace {
open_tables: Default::default(),
table_tree: TableTree::new(root_page, db.get_memory(), freed_pages.clone()),
};
let system_tables = SystemNamespace {
table_tree: TableTree::new(system_page, db.get_memory(), freed_pages.clone()),
};
Ok(Self {
db,
transaction_tracker,
mem: db.get_memory(),
transaction_id,
tables: Mutex::new(tables),
system_tables: Mutex::new(system_tables),
freed_tree: Mutex::new(BtreeMut::new(
freed_root,
db.get_memory(),
post_commit_frees.clone(),
)),
freed_pages,
post_commit_frees,
completed: false,
dirty: AtomicBool::new(false),
durability: Durability::Immediate,
created_persistent_savepoints: Mutex::new(Default::default()),
deleted_persistent_savepoints: Mutex::new(vec![]),
})
}
pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
if !matches!(
self.durability,
Durability::Immediate | Durability::Paranoid
) {
return Err(SavepointError::InvalidSavepoint);
}
let mut savepoint = self.ephemeral_savepoint()?;
let mut system_tables = self.system_tables.lock().unwrap();
let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
next_table.insert((), savepoint.get_id().next())?;
drop(next_table);
let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
savepoint_table.insert(
savepoint.get_id(),
SerializedSavepoint::from_savepoint(&savepoint),
)?;
savepoint.set_persistent();
self.created_persistent_savepoints
.lock()
.unwrap()
.insert(savepoint.get_id());
Ok(savepoint.get_id().0)
}
pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
let mut system_tables = self.system_tables.lock().unwrap();
let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
let value = next_table.get(())?;
if let Some(next_id) = value {
Ok(Some(next_id.value()))
} else {
Ok(None)
}
}
pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
let mut system_tables = self.system_tables.lock().unwrap();
let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
let value = table.get(SavepointId(id))?;
value
.map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
.ok_or(SavepointError::InvalidSavepoint)
}
pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
if !matches!(
self.durability,
Durability::Immediate | Durability::Paranoid
) {
return Err(SavepointError::InvalidSavepoint);
}
let mut system_tables = self.system_tables.lock().unwrap();
let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
let savepoint = table.remove(SavepointId(id))?;
if let Some(serialized) = savepoint {
let savepoint = serialized
.value()
.to_savepoint(self.transaction_tracker.clone());
self.deleted_persistent_savepoints
.lock()
.unwrap()
.push((savepoint.get_id(), savepoint.get_transaction_id()));
Ok(true)
} else {
Ok(false)
}
}
pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
let mut system_tables = self.system_tables.lock().unwrap();
let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
let mut savepoints = vec![];
for savepoint in table.range::<SavepointId>(..)? {
savepoints.push(savepoint?.0.value().0);
}
Ok(savepoints.into_iter())
}
pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
if self.dirty.load(Ordering::Acquire) {
return Err(SavepointError::InvalidSavepoint);
}
let (id, transaction_id) = self.db.allocate_savepoint()?;
#[cfg(feature = "logging")]
info!(
"Creating savepoint id={:?}, txn_id={:?}",
id, transaction_id
);
let regional_allocators = self.mem.get_raw_allocator_states();
let root = self.mem.get_data_root();
let system_root = self.mem.get_system_root();
let freed_root = self.mem.get_freed_root();
let savepoint = Savepoint::new_ephemeral(
self.db.get_memory(),
self.transaction_tracker.clone(),
id,
transaction_id,
root,
system_root,
freed_root,
regional_allocators,
);
Ok(savepoint)
}
pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
assert_eq!(
self.transaction_tracker.as_ref() as *const _,
savepoint.db_address()
);
if !self
.transaction_tracker
.lock()
.unwrap()
.is_valid_savepoint(savepoint.get_id())
{
return Err(SavepointError::InvalidSavepoint);
}
#[cfg(feature = "logging")]
info!(
"Beginning savepoint restore (id={:?}) in transaction id={:?}",
savepoint.get_id(),
self.transaction_id
);
assert_eq!(self.db.get_memory().get_version(), savepoint.get_version());
self.dirty.store(true, Ordering::Release);
let allocated_since_savepoint = self
.mem
.pages_allocated_since_raw_state(savepoint.get_regional_allocator_states());
let referenced_by_system_tree = self
.system_tables
.lock()
.unwrap()
.table_tree
.all_referenced_pages()?;
let mut freed_pages = vec![];
for page in allocated_since_savepoint {
if referenced_by_system_tree.contains(&page) {
continue;
}
if self.mem.uncommitted(page) {
self.mem.free(page);
} else {
freed_pages.push(page);
}
}
*self.freed_pages.lock().unwrap() = freed_pages;
self.tables.lock().unwrap().table_tree = TableTree::new(
savepoint.get_user_root(),
self.mem,
self.freed_pages.clone(),
);
let oldest_unprocessed_transaction = if let Some(entry) = self
.freed_tree
.lock()
.unwrap()
.range::<RangeFull, FreedTableKey>(&(..))?
.next()
{
entry?.key().transaction_id
} else {
self.transaction_id.raw_id()
};
let mut freed_tree = BtreeMut::new(
savepoint.get_freed_root(),
self.mem,
self.post_commit_frees.clone(),
);
let lookup_key = FreedTableKey {
transaction_id: oldest_unprocessed_transaction,
pagination_id: 0,
};
let mut to_remove = vec![];
for entry in freed_tree.range(&(..lookup_key))? {
to_remove.push(entry?.key());
}
for key in to_remove {
freed_tree.remove(&key)?;
}
*self.freed_tree.lock().unwrap() = freed_tree;
self.transaction_tracker
.lock()
.unwrap()
.invalidate_savepoints_after(savepoint.get_id());
for persistent_savepoint in self.list_persistent_savepoints()? {
if persistent_savepoint > savepoint.get_id().0 {
self.delete_persistent_savepoint(persistent_savepoint)?;
}
}
Ok(())
}
pub fn set_durability(&mut self, durability: Durability) {
let no_created = self
.created_persistent_savepoints
.lock()
.unwrap()
.is_empty();
let no_deleted = self
.deleted_persistent_savepoints
.lock()
.unwrap()
.is_empty();
assert!(no_created && no_deleted);
self.durability = durability;
}
#[track_caller]
pub fn open_table<'txn, K: RedbKey + 'static, V: RedbValue + 'static>(
&'txn self,
definition: TableDefinition<K, V>,
) -> Result<Table<'db, 'txn, K, V>, TableError> {
self.tables.lock().unwrap().open_table(self, definition)
}
#[track_caller]
pub fn open_multimap_table<'txn, K: RedbKey + 'static, V: RedbKey + 'static>(
&'txn self,
definition: MultimapTableDefinition<K, V>,
) -> Result<MultimapTable<'db, 'txn, K, V>, TableError> {
self.tables
.lock()
.unwrap()
.open_multimap_table(self, definition)
}
pub(crate) fn close_table<K: RedbKey + 'static, V: RedbValue + 'static>(
&self,
name: &str,
table: &BtreeMut<K, V>,
) {
self.tables.lock().unwrap().close_table(name, table);
}
pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
let name = definition.name().to_string();
drop(definition);
self.tables.lock().unwrap().delete_table(self, &name)
}
pub fn delete_multimap_table(
&self,
definition: impl MultimapTableHandle,
) -> Result<bool, TableError> {
let name = definition.name().to_string();
drop(definition);
self.tables
.lock()
.unwrap()
.delete_multimap_table(self, &name)
}
pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
self.tables
.lock()
.unwrap()
.table_tree
.list_tables(TableType::Normal)
.map(|x| x.into_iter().map(UntypedTableHandle::new))
}
pub fn list_multimap_tables(
&self,
) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
self.tables
.lock()
.unwrap()
.table_tree
.list_tables(TableType::Multimap)
.map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
}
pub fn commit(mut self) -> Result<(), CommitError> {
self.completed = true;
self.commit_inner()
}
fn commit_inner(&mut self) -> Result<(), CommitError> {
#[cfg(feature = "logging")]
info!(
"Committing transaction id={:?} with durability={:?}",
self.transaction_id, self.durability
);
match self.durability {
Durability::None => self.non_durable_commit()?,
Durability::Eventual => self.durable_commit(true, false)?,
Durability::Immediate => self.durable_commit(false, false)?,
Durability::Paranoid => self.durable_commit(false, true)?,
}
for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
self.transaction_tracker
.lock()
.unwrap()
.deallocate_savepoint(*savepoint, *transaction);
}
#[cfg(feature = "logging")]
info!(
"Finished commit of transaction id={:?}",
self.transaction_id
);
Ok(())
}
pub fn abort(mut self) -> Result {
self.completed = true;
self.abort_inner()
}
fn abort_inner(&mut self) -> Result {
#[cfg(feature = "logging")]
info!("Aborting transaction id={:?}", self.transaction_id);
for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
match self.delete_persistent_savepoint(savepoint.0) {
Ok(_) => {}
Err(err) => match err {
SavepointError::InvalidSavepoint => {
unreachable!();
}
SavepointError::Storage(storage_err) => {
return Err(storage_err);
}
},
}
}
self.tables
.lock()
.unwrap()
.table_tree
.clear_table_root_updates();
self.mem.rollback_uncommitted_writes()?;
#[cfg(feature = "logging")]
info!("Finished abort of transaction id={:?}", self.transaction_id);
Ok(())
}
pub(crate) fn durable_commit(&mut self, eventual: bool, two_phase: bool) -> Result {
let oldest_live_read = self
.transaction_tracker
.lock()
.unwrap()
.oldest_live_read_transaction()
.unwrap_or(self.transaction_id);
let user_root = self
.tables
.lock()
.unwrap()
.table_tree
.flush_table_root_updates()?;
let system_root = self
.system_tables
.lock()
.unwrap()
.table_tree
.flush_table_root_updates()?;
self.process_freed_pages(oldest_live_read)?;
let savepoint_exists = self
.transaction_tracker
.lock()
.unwrap()
.any_savepoint_exists();
self.store_freed_pages(savepoint_exists)?;
self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
let freed_root = self.freed_tree.lock().unwrap().get_root();
self.mem.commit(
user_root,
system_root,
freed_root,
self.transaction_id,
eventual,
two_phase,
)?;
self.transaction_tracker
.lock()
.unwrap()
.clear_pending_non_durable_commits();
for page in self.post_commit_frees.lock().unwrap().drain(..) {
self.mem.free(page);
}
Ok(())
}
pub(crate) fn non_durable_commit(&mut self) -> Result {
let user_root = self
.tables
.lock()
.unwrap()
.table_tree
.flush_table_root_updates()?;
let system_root = self
.system_tables
.lock()
.unwrap()
.table_tree
.flush_table_root_updates()?;
self.store_freed_pages(true)?;
self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
let freed_root = self.freed_tree.lock().unwrap().get_root();
self.mem
.non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?;
self.transaction_tracker
.lock()
.unwrap()
.register_non_durable_commit(self.transaction_id);
Ok(())
}
pub(crate) fn compact_pages(&mut self) -> Result<bool> {
let mut progress = false;
if self.mem.relocate_region_tracker()? {
progress = true;
}
let mut tables = self.tables.lock().unwrap();
let table_tree = &mut tables.table_tree;
if table_tree.compact_tables()? {
progress = true;
}
Ok(progress)
}
fn process_freed_pages(&mut self, oldest_live_read: TransactionId) -> Result {
assert_eq!(PageNumber::serialized_size(), 8);
let lookup_key = FreedTableKey {
transaction_id: oldest_live_read.raw_id(),
pagination_id: 0,
};
let mut to_remove = vec![];
let mut freed_tree = self.freed_tree.lock().unwrap();
for entry in freed_tree.range(&(..lookup_key))? {
let entry = entry?;
to_remove.push(entry.key());
let value = entry.value();
for i in 0..value.len() {
self.mem.free(value.get(i));
}
}
for key in to_remove {
freed_tree.remove(&key)?;
}
Ok(())
}
fn store_freed_pages(&mut self, include_post_commit_free: bool) -> Result {
assert_eq!(PageNumber::serialized_size(), 8); let mut pagination_counter = 0u64;
let mut freed_tree = self.freed_tree.lock().unwrap();
if include_post_commit_free {
self.freed_pages
.lock()
.unwrap()
.extend(self.post_commit_frees.lock().unwrap().drain(..));
}
while !self.freed_pages.lock().unwrap().is_empty() {
let chunk_size = 100;
let buffer_size = FreedPageList::required_bytes(chunk_size);
let key = FreedTableKey {
transaction_id: self.transaction_id.raw_id(),
pagination_id: pagination_counter,
};
let mut access_guard =
freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
let mut freed_pages = self.freed_pages.lock().unwrap();
let len = freed_pages.len();
access_guard.as_mut().clear();
for page in freed_pages.drain(len - min(len, chunk_size)..) {
access_guard.as_mut().push_back(page);
}
drop(access_guard);
pagination_counter += 1;
if include_post_commit_free {
freed_pages.extend(self.post_commit_frees.lock().unwrap().drain(..));
}
}
Ok(())
}
pub fn stats(&self) -> Result<DatabaseStats> {
let tables = self.tables.lock().unwrap();
let table_tree = &tables.table_tree;
let data_tree_stats = table_tree.stats()?;
let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?;
let total_metadata_bytes = data_tree_stats.metadata_bytes()
+ freed_tree_stats.metadata_bytes
+ freed_tree_stats.stored_leaf_bytes;
let total_fragmented =
data_tree_stats.fragmented_bytes() + freed_tree_stats.fragmented_bytes;
Ok(DatabaseStats {
tree_height: data_tree_stats.tree_height(),
allocated_pages: self.mem.count_allocated_pages()?,
leaf_pages: data_tree_stats.leaf_pages(),
branch_pages: data_tree_stats.branch_pages(),
stored_leaf_bytes: data_tree_stats.stored_bytes(),
metadata_bytes: total_metadata_bytes,
fragmented_bytes: total_fragmented,
page_size: self.mem.get_page_size(),
})
}
#[allow(dead_code)]
pub(crate) fn print_debug(&self) -> Result {
if let Some(page) = self
.tables
.lock()
.unwrap()
.table_tree
.flush_table_root_updates()
.unwrap()
{
eprintln!("Master tree:");
let master_tree: Btree<&str, InternalTableDefinition> =
Btree::new(Some(page), PageHint::None, self.mem)?;
master_tree.print_debug(true)?;
}
Ok(())
}
}
impl<'a> Drop for WriteTransaction<'a> {
fn drop(&mut self) {
self.db.end_write_transaction(self.transaction_id);
if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
#[allow(unused_variables)]
if let Err(error) = self.abort_inner() {
#[cfg(feature = "logging")]
warn!("Failure automatically aborting transaction: {}", error);
}
}
}
}
pub struct ReadTransaction<'a> {
transaction_tracker: Arc<Mutex<TransactionTracker>>,
mem: &'a TransactionalMemory,
tree: TableTree<'a>,
transaction_id: TransactionId,
}
impl<'db> ReadTransaction<'db> {
pub(crate) fn new(
mem: &'db TransactionalMemory,
transaction_tracker: Arc<Mutex<TransactionTracker>>,
transaction_id: TransactionId,
) -> Self {
let root_page = mem.get_data_root();
Self {
transaction_tracker,
mem,
tree: TableTree::new(root_page, mem, Default::default()),
transaction_id,
}
}
pub fn open_table<K: RedbKey + 'static, V: RedbValue + 'static>(
&self,
definition: TableDefinition<K, V>,
) -> Result<ReadOnlyTable<K, V>, TableError> {
let header = self
.tree
.get_table::<K, V>(definition.name(), TableType::Normal)?
.ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
Ok(ReadOnlyTable::new(
definition.name().to_string(),
header.get_root(),
PageHint::Clean,
self.mem,
)?)
}
pub fn open_untyped_table(
&self,
handle: impl TableHandle,
) -> Result<ReadOnlyUntypedTable, TableError> {
let header = self
.tree
.get_table_untyped(handle.name(), TableType::Normal)?
.ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
Ok(ReadOnlyUntypedTable::new(
header.get_root(),
header.get_fixed_key_size(),
header.get_fixed_value_size(),
self.mem,
))
}
pub fn open_multimap_table<K: RedbKey + 'static, V: RedbKey + 'static>(
&self,
definition: MultimapTableDefinition<K, V>,
) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
let header = self
.tree
.get_table::<K, V>(definition.name(), TableType::Multimap)?
.ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
Ok(ReadOnlyMultimapTable::new(
header.get_root(),
PageHint::Clean,
self.mem,
)?)
}
pub fn open_untyped_multimap_table(
&self,
handle: impl MultimapTableHandle,
) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
let header = self
.tree
.get_table_untyped(handle.name(), TableType::Multimap)?
.ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
Ok(ReadOnlyUntypedMultimapTable::new(
header.get_root(),
header.get_fixed_key_size(),
header.get_fixed_value_size(),
self.mem,
))
}
pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
self.tree
.list_tables(TableType::Normal)
.map(|x| x.into_iter().map(UntypedTableHandle::new))
}
pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
self.tree
.list_tables(TableType::Multimap)
.map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
}
}
impl<'a> Drop for ReadTransaction<'a> {
fn drop(&mut self) {
self.transaction_tracker
.lock()
.unwrap()
.deallocate_read_transaction(self.transaction_id);
}
}
#[cfg(test)]
mod test {
use crate::{Database, TableDefinition};
const X: TableDefinition<&str, &str> = TableDefinition::new("x");
#[test]
fn transaction_id_persistence() {
let tmpfile = crate::create_tempfile();
let db = Database::create(tmpfile.path()).unwrap();
let write_txn = db.begin_write().unwrap();
{
let mut table = write_txn.open_table(X).unwrap();
table.insert("hello", "world").unwrap();
}
let first_txn_id = write_txn.transaction_id;
write_txn.commit().unwrap();
drop(db);
let db2 = Database::create(tmpfile.path()).unwrap();
let write_txn = db2.begin_write().unwrap();
assert!(write_txn.transaction_id > first_txn_id);
}
}