use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
use crate::tree_store::{
AllPageNumbersBtreeIter, BtreeRangeIter, Checksum, FreedPageList, FreedTableKey,
InternalTableDefinition, PageHint, PageNumber, RawBtree, SerializedSavepoint, TableTree,
TableType, TransactionalMemory, PAGE_SIZE,
};
use crate::types::{RedbKey, RedbValue};
use crate::{
CompactionError, DatabaseError, Durability, ReadOnlyTable, ReadableTable, SavepointError,
StorageError,
};
use crate::{ReadTransaction, Result, WriteTransaction};
use std::fmt::{Debug, Display, Formatter};
use std::fs::{File, OpenOptions};
use std::io;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::ops::RangeFull;
use std::path::Path;
#[cfg(not(target_has_atomic = "64"))]
use portable_atomic::{AtomicU64, Ordering};
#[cfg(target_has_atomic = "64")]
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use crate::error::TransactionError;
use crate::multimap_table::{parse_subtree_roots, DynamicCollection};
use crate::sealed::Sealed;
use crate::transactions::SAVEPOINT_TABLE;
use crate::tree_store::file_backend::FileBackend;
#[cfg(feature = "logging")]
use log::{info, warn};
#[allow(clippy::len_without_is_empty)]
pub trait StorageBackend: 'static + Debug + Send + Sync {
fn len(&self) -> std::result::Result<u64, io::Error>;
fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
}
struct AtomicTransactionId {
inner: AtomicU64,
}
impl AtomicTransactionId {
fn new(last_id: TransactionId) -> Self {
Self {
inner: AtomicU64::new(last_id.raw_id()),
}
}
fn next(&self) -> TransactionId {
let id = self.inner.fetch_add(1, Ordering::AcqRel);
TransactionId::new(id)
}
}
pub trait TableHandle: Sealed {
fn name(&self) -> &str;
}
#[derive(Clone)]
pub struct UntypedTableHandle {
name: String,
}
impl UntypedTableHandle {
pub(crate) fn new(name: String) -> Self {
Self { name }
}
}
impl TableHandle for UntypedTableHandle {
fn name(&self) -> &str {
&self.name
}
}
impl Sealed for UntypedTableHandle {}
pub trait MultimapTableHandle: Sealed {
fn name(&self) -> &str;
}
#[derive(Clone)]
pub struct UntypedMultimapTableHandle {
name: String,
}
impl UntypedMultimapTableHandle {
pub(crate) fn new(name: String) -> Self {
Self { name }
}
}
impl MultimapTableHandle for UntypedMultimapTableHandle {
fn name(&self) -> &str {
&self.name
}
}
impl Sealed for UntypedMultimapTableHandle {}
pub struct TableDefinition<'a, K: RedbKey + 'static, V: RedbValue + 'static> {
name: &'a str,
_key_type: PhantomData<K>,
_value_type: PhantomData<V>,
}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableDefinition<'a, K, V> {
pub const fn new(name: &'a str) -> Self {
assert!(!name.is_empty());
Self {
name,
_key_type: PhantomData,
_value_type: PhantomData,
}
}
}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableHandle for TableDefinition<'a, K, V> {
fn name(&self) -> &str {
self.name
}
}
impl<K: RedbKey, V: RedbValue> Sealed for TableDefinition<'_, K, V> {}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Clone for TableDefinition<'a, K, V> {
fn clone(&self) -> Self {
*self
}
}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Copy for TableDefinition<'a, K, V> {}
impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Display for TableDefinition<'a, K, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}<{}, {}>",
self.name,
K::type_name().name(),
V::type_name().name()
)
}
}
pub struct MultimapTableDefinition<'a, K: RedbKey + 'static, V: RedbKey + 'static> {
name: &'a str,
_key_type: PhantomData<K>,
_value_type: PhantomData<V>,
}
impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTableDefinition<'a, K, V> {
pub const fn new(name: &'a str) -> Self {
assert!(!name.is_empty());
Self {
name,
_key_type: PhantomData,
_value_type: PhantomData,
}
}
}
impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTableHandle
for MultimapTableDefinition<'a, K, V>
{
fn name(&self) -> &str {
self.name
}
}
impl<K: RedbKey, V: RedbKey> Sealed for MultimapTableDefinition<'_, K, V> {}
impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Clone for MultimapTableDefinition<'a, K, V> {
fn clone(&self) -> Self {
*self
}
}
impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Copy for MultimapTableDefinition<'a, K, V> {}
impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Display for MultimapTableDefinition<'a, K, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}<{}, {}>",
self.name,
K::type_name().name(),
V::type_name().name()
)
}
}
pub struct Database {
mem: TransactionalMemory,
next_transaction_id: AtomicTransactionId,
transaction_tracker: Arc<Mutex<TransactionTracker>>,
live_write_transaction: Mutex<Option<TransactionId>>,
live_write_transaction_available: Condvar,
}
impl Database {
pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
Self::builder().create(path)
}
pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
Self::builder().open(path)
}
pub(crate) fn start_write_transaction(&self) -> TransactionId {
let mut live_write_transaction = self.live_write_transaction.lock().unwrap();
while live_write_transaction.is_some() {
live_write_transaction = self
.live_write_transaction_available
.wait(live_write_transaction)
.unwrap();
}
assert!(live_write_transaction.is_none());
let transaction_id = self.next_transaction_id.next();
#[cfg(feature = "logging")]
info!("Beginning write transaction id={:?}", transaction_id);
*live_write_transaction = Some(transaction_id);
transaction_id
}
pub(crate) fn end_write_transaction(&self, id: TransactionId) {
let mut live_write_transaction = self.live_write_transaction.lock().unwrap();
assert_eq!(live_write_transaction.unwrap(), id);
*live_write_transaction = None;
self.live_write_transaction_available.notify_one();
}
pub(crate) fn get_memory(&self) -> &TransactionalMemory {
&self.mem
}
pub(crate) fn verify_primary_checksums(mem: &TransactionalMemory) -> Result<bool> {
let fake_freed_pages = Arc::new(Mutex::new(vec![]));
let table_tree = TableTree::new(mem.get_data_root(), mem, fake_freed_pages.clone());
if !table_tree.verify_checksums()? {
return Ok(false);
}
let system_table_tree =
TableTree::new(mem.get_system_root(), mem, fake_freed_pages.clone());
if !system_table_tree.verify_checksums()? {
return Ok(false);
}
assert!(fake_freed_pages.lock().unwrap().is_empty());
if let Some((freed_root, freed_checksum)) = mem.get_freed_root() {
if !RawBtree::new(
Some((freed_root, freed_checksum)),
FreedTableKey::fixed_width(),
FreedPageList::fixed_width(),
mem,
)
.verify_checksum()?
{
return Ok(false);
}
}
Ok(true)
}
pub fn check_integrity(&mut self) -> Result<bool> {
let allocator_hash = self.mem.allocator_hash();
let mut was_clean = self.mem.clear_cache_and_reload()?;
if !Self::verify_primary_checksums(&self.mem)? {
was_clean = false;
}
Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
DatabaseError::Storage(storage_err) => storage_err,
_ => unreachable!(),
})?;
if allocator_hash != self.mem.allocator_hash() {
was_clean = false;
}
self.mem.begin_writable()?;
Ok(was_clean)
}
pub fn compact(&mut self) -> Result<bool, CompactionError> {
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
if txn.list_persistent_savepoints()?.next().is_some() {
return Err(CompactionError::PersistentSavepointExists);
}
if self
.transaction_tracker
.lock()
.unwrap()
.any_savepoint_exists()
{
return Err(CompactionError::EphemeralSavepointExists);
}
txn.set_durability(Durability::Paranoid);
txn.commit().map_err(|e| e.into_storage_error())?;
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
txn.set_durability(Durability::Paranoid);
txn.commit().map_err(|e| e.into_storage_error())?;
assert!(self.mem.get_freed_root().is_none());
let mut compacted = false;
loop {
let mut progress = false;
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
if txn.compact_pages()? {
progress = true;
txn.commit().map_err(|e| e.into_storage_error())?;
} else {
txn.abort()?;
}
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
txn.set_durability(Durability::Paranoid);
txn.commit().map_err(|e| e.into_storage_error())?;
assert!(self.mem.get_freed_root().is_none());
if !progress {
break;
} else {
compacted = true;
}
}
Ok(compacted)
}
fn mark_persistent_savepoints(
system_root: Option<(PageNumber, Checksum)>,
mem: &TransactionalMemory,
oldest_unprocessed_free_transaction: TransactionId,
) -> Result {
let freed_list = Arc::new(Mutex::new(vec![]));
let table_tree = TableTree::new(system_root, mem, freed_list);
let fake_transaction_tracker = Arc::new(Mutex::new(TransactionTracker::new()));
if let Some(savepoint_table_def) = table_tree
.get_table::<SavepointId, SerializedSavepoint>(
SAVEPOINT_TABLE.name(),
TableType::Normal,
)
.map_err(|e| {
e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
})?
{
let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
ReadOnlyTable::new(
"internal savepoint table".to_string(),
savepoint_table_def.get_root(),
PageHint::None,
mem,
)?;
for result in savepoint_table.range::<SavepointId>(..)? {
let (_, savepoint_data) = result?;
let savepoint = savepoint_data
.value()
.to_savepoint(fake_transaction_tracker.clone());
if let Some((root, _)) = savepoint.get_user_root() {
Self::mark_tables_recursive(root, mem, true)?;
}
Self::mark_freed_tree(
savepoint.get_freed_root(),
mem,
oldest_unprocessed_free_transaction,
)?;
}
}
Ok(())
}
fn mark_freed_tree(
freed_root: Option<(PageNumber, Checksum)>,
mem: &TransactionalMemory,
oldest_unprocessed_free_transaction: TransactionId,
) -> Result {
if let Some((root, _)) = freed_root {
let freed_pages_iter = AllPageNumbersBtreeIter::new(
root,
FreedTableKey::fixed_width(),
FreedPageList::fixed_width(),
mem,
)?;
mem.mark_pages_allocated(freed_pages_iter, true)?;
}
let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
"internal freed table".to_string(),
freed_root,
PageHint::None,
mem,
)?;
let lookup_key = FreedTableKey {
transaction_id: oldest_unprocessed_free_transaction.raw_id(),
pagination_id: 0,
};
for result in freed_table.range::<FreedTableKey>(lookup_key..)? {
let (_, freed_page_list) = result?;
let mut freed_page_list_as_vec = vec![];
for i in 0..freed_page_list.value().len() {
freed_page_list_as_vec.push(Ok(freed_page_list.value().get(i)));
}
mem.mark_pages_allocated(freed_page_list_as_vec.into_iter(), true)?;
}
Ok(())
}
fn mark_tables_recursive(
root: PageNumber,
mem: &TransactionalMemory,
allow_duplicates: bool,
) -> Result {
let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem)?;
mem.mark_pages_allocated(master_pages_iter, allow_duplicates)?;
let iter: BtreeRangeIter<&str, InternalTableDefinition> =
BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem)?;
for entry in iter {
let definition = entry?.value();
if let Some((table_root, _)) = definition.get_root() {
match definition.get_type() {
TableType::Normal => {
let table_pages_iter = AllPageNumbersBtreeIter::new(
table_root,
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
mem,
)?;
mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;
}
TableType::Multimap => {
let table_pages_iter = AllPageNumbersBtreeIter::new(
table_root,
definition.get_fixed_key_size(),
DynamicCollection::<()>::fixed_width_with(
definition.get_fixed_value_size(),
),
mem,
)?;
mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;
let table_pages_iter = AllPageNumbersBtreeIter::new(
table_root,
definition.get_fixed_key_size(),
DynamicCollection::<()>::fixed_width_with(
definition.get_fixed_value_size(),
),
mem,
)?;
for table_page in table_pages_iter {
let page = mem.get_page(table_page?)?;
let subtree_roots = parse_subtree_roots(
&page,
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
);
for (sub_root, _) in subtree_roots {
let sub_root_iter = AllPageNumbersBtreeIter::new(
sub_root,
definition.get_fixed_value_size(),
<()>::fixed_width(),
mem,
)?;
mem.mark_pages_allocated(sub_root_iter, allow_duplicates)?;
}
}
}
}
}
}
Ok(())
}
fn do_repair(
mem: &mut TransactionalMemory,
repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
) -> Result<(), DatabaseError> {
if !Self::verify_primary_checksums(mem)? {
let mut handle = RepairSession::new(0.3);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}
mem.repair_primary_corrupted();
mem.clear_read_cache();
if !Self::verify_primary_checksums(mem)? {
return Err(DatabaseError::Storage(StorageError::Corrupted(
"Failed to repair database. All roots are corrupted".to_string(),
)));
}
}
let mut handle = RepairSession::new(0.6);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}
mem.begin_repair()?;
let data_root = mem.get_data_root();
if let Some((root, _)) = data_root {
Self::mark_tables_recursive(root, mem, false)?;
}
let freed_root = mem.get_freed_root();
Self::mark_freed_tree(freed_root, mem, TransactionId::new(0))?;
let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
"internal freed table".to_string(),
freed_root,
PageHint::None,
mem,
)?;
let oldest_unprocessed_transaction =
if let Some(entry) = freed_table.range::<FreedTableKey>(..)?.next() {
TransactionId::new(entry?.0.value().transaction_id)
} else {
mem.get_last_committed_transaction_id()?
};
drop(freed_table);
let mut handle = RepairSession::new(0.9);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}
let system_root = mem.get_system_root();
if let Some((root, _)) = system_root {
Self::mark_tables_recursive(root, mem, false)?;
}
Self::mark_persistent_savepoints(system_root, mem, oldest_unprocessed_transaction)?;
mem.end_repair()?;
mem.clear_read_cache();
let transaction_id = mem.get_last_committed_transaction_id()?.next();
mem.commit(
data_root,
system_root,
freed_root,
transaction_id,
false,
true,
)?;
Ok(())
}
fn new(
file: Box<dyn StorageBackend>,
page_size: usize,
region_size: Option<u64>,
read_cache_size_bytes: usize,
write_cache_size_bytes: usize,
repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
) -> Result<Self, DatabaseError> {
#[cfg(feature = "logging")]
let file_path = format!("{:?}", &file);
#[cfg(feature = "logging")]
info!("Opening database {:?}", &file_path);
let mut mem = TransactionalMemory::new(
file,
page_size,
region_size,
read_cache_size_bytes,
write_cache_size_bytes,
)?;
if mem.needs_repair()? {
#[cfg(feature = "logging")]
warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
let mut handle = RepairSession::new(0.0);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}
Self::do_repair(&mut mem, repair_callback)?;
}
mem.begin_writable()?;
let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
let db = Database {
mem,
next_transaction_id: AtomicTransactionId::new(next_transaction_id),
transaction_tracker: Arc::new(Mutex::new(TransactionTracker::new())),
live_write_transaction: Mutex::new(None),
live_write_transaction_available: Condvar::new(),
};
let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
if let Some(next_id) = txn.next_persistent_savepoint_id()? {
db.transaction_tracker
.lock()
.unwrap()
.restore_savepoint_counter_state(next_id);
}
for id in txn.list_persistent_savepoints()? {
let savepoint = match txn.get_persistent_savepoint(id) {
Ok(savepoint) => savepoint,
Err(err) => match err {
SavepointError::InvalidSavepoint => unreachable!(),
SavepointError::Storage(storage) => {
return Err(storage.into());
}
},
};
db.transaction_tracker
.lock()
.unwrap()
.register_persistent_savepoint(&savepoint);
}
txn.abort()?;
Ok(db)
}
fn allocate_read_transaction(&self) -> Result<TransactionId> {
let mut guard = self.transaction_tracker.lock().unwrap();
let id = self.mem.get_last_committed_transaction_id()?;
guard.register_read_transaction(id);
Ok(id)
}
pub(crate) fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
let id = self
.transaction_tracker
.lock()
.unwrap()
.allocate_savepoint();
Ok((id, self.allocate_read_transaction()?))
}
pub fn builder() -> Builder {
Builder::new()
}
pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
WriteTransaction::new(self, self.transaction_tracker.clone()).map_err(|e| e.into())
}
pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
let id = self.allocate_read_transaction()?;
#[cfg(feature = "logging")]
info!("Beginning read transaction id={:?}", id);
Ok(ReadTransaction::new(
self.get_memory(),
self.transaction_tracker.clone(),
id,
))
}
}
pub struct RepairSession {
progress: f64,
aborted: bool,
}
impl RepairSession {
pub(crate) fn new(progress: f64) -> Self {
Self {
progress,
aborted: false,
}
}
pub(crate) fn aborted(&self) -> bool {
self.aborted
}
pub fn abort(&mut self) {
self.aborted = true;
}
pub fn progress(&self) -> f64 {
self.progress
}
}
pub struct Builder {
page_size: usize,
region_size: Option<u64>,
read_cache_size_bytes: usize,
write_cache_size_bytes: usize,
repair_callback: Box<dyn Fn(&mut RepairSession)>,
}
impl Builder {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let mut result = Self {
page_size: PAGE_SIZE,
region_size: None,
read_cache_size_bytes: 0,
write_cache_size_bytes: 0,
repair_callback: Box::new(|_| {}),
};
result.set_cache_size(1024 * 1024 * 1024);
result
}
pub fn set_repair_callback(
&mut self,
callback: impl Fn(&mut RepairSession) + 'static,
) -> &mut Self {
self.repair_callback = Box::new(callback);
self
}
#[cfg(any(fuzzing, test))]
pub fn set_page_size(&mut self, size: usize) -> &mut Self {
assert!(size.is_power_of_two());
self.page_size = std::cmp::max(size, 512);
self
}
pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
self.read_cache_size_bytes = bytes / 10 * 9;
self.write_cache_size_bytes = bytes / 10;
self
}
#[cfg(any(test, fuzzing))]
pub fn set_region_size(&mut self, size: u64) -> &mut Self {
assert!(size.is_power_of_two());
self.region_size = Some(size);
self
}
pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
Database::new(
Box::new(FileBackend::new(file)?),
self.page_size,
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
)
}
pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
let file = OpenOptions::new().read(true).write(true).open(path)?;
if file.metadata()?.len() == 0 {
return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
}
Database::new(
Box::new(FileBackend::new(file)?),
self.page_size,
None,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
)
}
pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
Database::new(
Box::new(FileBackend::new(file)?),
self.page_size,
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
)
}
pub fn create_with_backend(
&self,
backend: impl StorageBackend,
) -> Result<Database, DatabaseError> {
Database::new(
Box::new(backend),
self.page_size,
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
)
}
}
impl std::fmt::Debug for Database {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Database").finish()
}
}
#[cfg(test)]
mod test {
use crate::backends::FileBackend;
use crate::{
Database, DatabaseError, Durability, ReadableTable, StorageBackend, StorageError,
TableDefinition,
};
use std::io::ErrorKind;
#[cfg(not(target_has_atomic = "64"))]
use portable_atomic::{AtomicU64, Ordering};
#[cfg(target_has_atomic = "64")]
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug)]
struct FailingBackend {
inner: FileBackend,
countdown: AtomicU64,
}
impl FailingBackend {
fn new(backend: FileBackend, countdown: u64) -> Self {
Self {
inner: backend,
countdown: AtomicU64::new(countdown),
}
}
fn check_countdown(&self) -> Result<(), std::io::Error> {
if self.countdown.load(Ordering::SeqCst) == 0 {
return Err(std::io::Error::from(ErrorKind::Other));
}
Ok(())
}
fn decrement_countdown(&self) -> Result<(), std::io::Error> {
if self
.countdown
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
if x > 0 {
Some(x - 1)
} else {
None
}
})
.is_err()
{
return Err(std::io::Error::from(ErrorKind::Other));
}
Ok(())
}
}
impl StorageBackend for FailingBackend {
fn len(&self) -> Result<u64, std::io::Error> {
self.inner.len()
}
fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
self.check_countdown()?;
self.inner.read(offset, len)
}
fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
self.inner.set_len(len)
}
fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
self.check_countdown()?;
self.inner.sync_data(eventual)
}
fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
self.decrement_countdown()?;
self.inner.write(offset, data)
}
}
#[test]
fn crash_regression4() {
let tmpfile = crate::create_tempfile();
let backend = FailingBackend::new(
FileBackend::new(tmpfile.as_file().try_clone().unwrap()).unwrap(),
23,
);
let db = Database::builder()
.set_cache_size(12686)
.set_page_size(8 * 1024)
.set_region_size(32 * 4096)
.create_with_backend(backend)
.unwrap();
let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let tx = db.begin_write().unwrap();
let _savepoint = tx.ephemeral_savepoint().unwrap();
let _persistent_savepoint = tx.persistent_savepoint().unwrap();
tx.commit().unwrap();
let tx = db.begin_write().unwrap();
{
let mut table = tx.open_table(table_def).unwrap();
let _ = table.insert_reserve(118821, 360).unwrap();
}
let result = tx.commit();
assert!(result.is_err());
drop(db);
Database::builder()
.set_cache_size(1024 * 1024)
.set_page_size(8 * 1024)
.set_region_size(32 * 4096)
.create(tmpfile.path())
.unwrap();
}
#[test]
fn small_pages() {
let tmpfile = crate::create_tempfile();
let db = Database::builder()
.set_page_size(512)
.create(tmpfile.path())
.unwrap();
let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let txn = db.begin_write().unwrap();
{
txn.open_table(table_definition).unwrap();
}
txn.commit().unwrap();
}
#[test]
fn small_pages2() {
let tmpfile = crate::create_tempfile();
let db = Database::builder()
.set_page_size(512)
.create(tmpfile.path())
.unwrap();
let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let mut tx = db.begin_write().unwrap();
tx.set_durability(Durability::Paranoid);
let savepoint0 = tx.ephemeral_savepoint().unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_durability(Durability::Paranoid);
let savepoint1 = tx.ephemeral_savepoint().unwrap();
tx.restore_savepoint(&savepoint0).unwrap();
tx.set_durability(Durability::None);
{
let mut t = tx.open_table(table_def).unwrap();
t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
assert!(t.remove(&291295).unwrap().is_none());
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_durability(Durability::Paranoid);
tx.restore_savepoint(&savepoint0).unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_durability(Durability::Paranoid);
let savepoint2 = tx.ephemeral_savepoint().unwrap();
drop(savepoint0);
tx.restore_savepoint(&savepoint2).unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
assert!(t.get(&2059).unwrap().is_none());
assert!(t.remove(&145227).unwrap().is_none());
assert!(t.remove(&145227).unwrap().is_none());
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_durability(Durability::Paranoid);
let savepoint3 = tx.ephemeral_savepoint().unwrap();
drop(savepoint1);
tx.restore_savepoint(&savepoint3).unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_durability(Durability::Paranoid);
let savepoint4 = tx.ephemeral_savepoint().unwrap();
drop(savepoint2);
tx.restore_savepoint(&savepoint3).unwrap();
tx.set_durability(Durability::None);
{
let mut t = tx.open_table(table_def).unwrap();
assert!(t.remove(&207936).unwrap().is_none());
}
tx.abort().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_durability(Durability::Paranoid);
let savepoint5 = tx.ephemeral_savepoint().unwrap();
drop(savepoint3);
assert!(tx.restore_savepoint(&savepoint4).is_err());
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_durability(Durability::Paranoid);
tx.restore_savepoint(&savepoint5).unwrap();
tx.set_durability(Durability::None);
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
}
#[test]
fn small_pages3() {
let tmpfile = crate::create_tempfile();
let db = Database::builder()
.set_page_size(1024)
.create(tmpfile.path())
.unwrap();
let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let mut tx = db.begin_write().unwrap();
let _savepoint0 = tx.ephemeral_savepoint().unwrap();
tx.set_durability(Durability::None);
{
let mut t = tx.open_table(table_def).unwrap();
let value = vec![0; 306];
t.insert(&539717, value.as_slice()).unwrap();
}
tx.abort().unwrap();
let mut tx = db.begin_write().unwrap();
let savepoint1 = tx.ephemeral_savepoint().unwrap();
tx.restore_savepoint(&savepoint1).unwrap();
tx.set_durability(Durability::None);
{
let mut t = tx.open_table(table_def).unwrap();
let value = vec![0; 2008];
t.insert(&784384, value.as_slice()).unwrap();
}
tx.abort().unwrap();
}
#[test]
fn small_pages4() {
let tmpfile = crate::create_tempfile();
let db = Database::builder()
.set_cache_size(1024 * 1024)
.set_page_size(1024)
.create(tmpfile.path())
.unwrap();
let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let tx = db.begin_write().unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let tx = db.begin_write().unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
assert!(t.get(&131072).unwrap().is_none());
let value = vec![0xFF; 1130];
t.insert(&42394, value.as_slice()).unwrap();
t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
assert!(t.get(&0).unwrap().is_none());
}
tx.abort().unwrap();
let tx = db.begin_write().unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
}
tx.abort().unwrap();
}
#[test]
fn dynamic_shrink() {
let tmpfile = crate::create_tempfile();
let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let big_value = vec![0u8; 1024];
let db = Database::builder()
.set_region_size(1024 * 1024)
.create(tmpfile.path())
.unwrap();
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_definition).unwrap();
for i in 0..2048 {
table.insert(&i, big_value.as_slice()).unwrap();
}
}
txn.commit().unwrap();
let file_size = tmpfile.as_file().metadata().unwrap().len();
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_definition).unwrap();
for i in 0..2048 {
table.remove(&i).unwrap();
}
}
txn.commit().unwrap();
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_definition).unwrap();
table.insert(0, [].as_slice()).unwrap();
}
txn.commit().unwrap();
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_definition).unwrap();
table.remove(0).unwrap();
}
txn.commit().unwrap();
let txn = db.begin_write().unwrap();
txn.commit().unwrap();
let final_file_size = tmpfile.as_file().metadata().unwrap().len();
assert!(final_file_size < file_size);
}
#[test]
fn create_new_db_in_empty_file() {
let tmpfile = crate::create_tempfile();
let _db = Database::builder()
.create_file(tmpfile.into_file())
.unwrap();
}
#[test]
fn open_missing_file() {
let tmpfile = crate::create_tempfile();
let err = Database::builder()
.open(tmpfile.path().with_extension("missing"))
.unwrap_err();
match err {
DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
err => panic!("Unexpected error for empty file: {err}"),
}
}
#[test]
fn open_empty_file() {
let tmpfile = crate::create_tempfile();
let err = Database::builder().open(tmpfile.path()).unwrap_err();
match err {
DatabaseError::Storage(StorageError::Io(err))
if err.kind() == ErrorKind::InvalidData => {}
err => panic!("Unexpected error for empty file: {err}"),
}
}
}