use crate::snapshot::RocksDBSnapshot;
use crate::transaction::RocksDBTransaction;
use crate::write_batch::RocksDBWriteBatch;
use crate::{internal_error, Result};
use ckb_app_config::DBConfig;
use ckb_db_schema::Col;
use ckb_logger::info;
use rocksdb::ops::{
CompactRangeCF, CreateCF, DropCF, GetColumnFamilys, GetPinned, GetPinnedCF, IterateCF, OpenCF,
Put, SetOptions, WriteOps,
};
use rocksdb::{
ffi, BlockBasedIndexType, BlockBasedOptions, Cache, ColumnFamily, ColumnFamilyDescriptor,
DBPinnableSlice, FullOptions, IteratorMode, OptimisticTransactionDB,
OptimisticTransactionOptions, Options, SliceTransform, WriteBatch, WriteOptions,
};
use std::path::Path;
use std::sync::Arc;
#[derive(Clone)]
pub struct RocksDB {
pub(crate) inner: Arc<OptimisticTransactionDB>,
}
const DEFAULT_CACHE_SIZE: usize = 256 << 20;
const DEFAULT_CACHE_ENTRY_CHARGE_SIZE: usize = 4096;
impl RocksDB {
pub(crate) fn open_with_check(config: &DBConfig, columns: u32) -> Result<Self> {
let cf_names: Vec<_> = (0..columns).map(|c| c.to_string()).collect();
let mut cache = None;
let (mut opts, mut cf_descriptors) = if let Some(ref file) = config.options_file {
cache = match config.cache_size {
Some(0) => None,
Some(size) => Some(Cache::new_hyper_clock_cache(
size,
DEFAULT_CACHE_ENTRY_CHARGE_SIZE,
)),
None => Some(Cache::new_hyper_clock_cache(
DEFAULT_CACHE_SIZE,
DEFAULT_CACHE_ENTRY_CHARGE_SIZE,
)),
};
let mut full_opts = FullOptions::load_from_file_with_cache(file, cache.clone(), false)
.map_err(|err| internal_error(format!("failed to load the options file: {err}")))?;
let cf_names_str: Vec<&str> = cf_names.iter().map(|s| s.as_str()).collect();
full_opts
.complete_column_families(&cf_names_str, false)
.map_err(|err| {
internal_error(format!("failed to check all column families: {err}"))
})?;
let FullOptions {
db_opts,
cf_descriptors,
} = full_opts;
(db_opts, cf_descriptors)
} else {
let opts = Options::default();
let cf_descriptors: Vec<_> = cf_names
.iter()
.map(|c| ColumnFamilyDescriptor::new(c, Options::default()))
.collect();
(opts, cf_descriptors)
};
for cf in cf_descriptors.iter_mut() {
let mut block_opts = BlockBasedOptions::default();
block_opts.set_ribbon_filter(10.0);
block_opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
block_opts.set_partition_filters(true);
block_opts.set_metadata_block_size(4096);
block_opts.set_pin_top_level_index_and_filter(true);
match cache {
Some(ref cache) => {
block_opts.set_block_cache(cache);
block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
}
None => block_opts.disable_cache(),
}
if cf.name() == "2" {
block_opts.set_whole_key_filtering(false);
cf.options
.set_prefix_extractor(SliceTransform::create_fixed_prefix(32));
}
cf.options.set_block_based_table_factory(&block_opts);
}
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.enable_statistics();
let db = OptimisticTransactionDB::open_cf_descriptors(&opts, &config.path, cf_descriptors)
.map_err(|err| internal_error(format!("failed to open database: {err}")))?;
if !config.options.is_empty() {
let rocksdb_options: Vec<(&str, &str)> = config
.options
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
db.set_options(&rocksdb_options)
.map_err(|_| internal_error("failed to set database option"))?;
}
Ok(RocksDB {
inner: Arc::new(db),
})
}
pub fn open(config: &DBConfig, columns: u32) -> Self {
Self::open_with_check(config, columns).unwrap_or_else(|err| panic!("{err}"))
}
pub fn open_in<P: AsRef<Path>>(path: P, columns: u32) -> Self {
let config = DBConfig {
path: path.as_ref().to_path_buf(),
..Default::default()
};
Self::open_with_check(&config, columns).unwrap_or_else(|err| panic!("{err}"))
}
pub fn prepare_for_bulk_load_open<P: AsRef<Path>>(
path: P,
columns: u32,
) -> Result<Option<Self>> {
let mut opts = Options::default();
opts.create_missing_column_families(true);
opts.set_prepare_for_bulk_load();
let cfnames: Vec<_> = (0..columns).map(|c| c.to_string()).collect();
let cf_options: Vec<&str> = cfnames.iter().map(|n| n as &str).collect();
OptimisticTransactionDB::open_cf(&opts, path, cf_options).map_or_else(
|err| {
let err_str = err.as_ref();
if err_str.starts_with("Invalid argument:")
&& err_str.ends_with("does not exist (create_if_missing is false)")
{
Ok(None)
} else if err_str.starts_with("Corruption:") {
info!("DB corrupted: {err_str}.");
Err(internal_error(err_str))
} else {
Err(internal_error(format!(
"failed to open the database: {err}"
)))
}
},
|db| {
Ok(Some(RocksDB {
inner: Arc::new(db),
}))
},
)
}
pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
let cf = cf_handle(&self.inner, col)?;
self.inner.get_pinned_cf(cf, key).map_err(internal_error)
}
pub fn get_pinned_default(&self, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
self.inner.get_pinned(key).map_err(internal_error)
}
pub fn put_default<K, V>(&self, key: K, value: V) -> Result<()>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
self.inner.put(key, value).map_err(internal_error)
}
pub fn full_traverse<F>(&self, col: Col, callback: &mut F) -> Result<()>
where
F: FnMut(&[u8], &[u8]) -> Result<()>,
{
let cf = cf_handle(&self.inner, col)?;
let iter = self
.inner
.full_iterator_cf(cf, IteratorMode::Start)
.map_err(internal_error)?;
for (key, val) in iter {
callback(&key, &val)?;
}
Ok(())
}
pub fn traverse<F>(
&self,
col: Col,
callback: &mut F,
mode: IteratorMode,
limit: usize,
) -> Result<(usize, Vec<u8>)>
where
F: FnMut(&[u8], &[u8]) -> Result<()>,
{
let mut count: usize = 0;
let mut next_key: Vec<u8> = vec![];
let cf = cf_handle(&self.inner, col)?;
let iter = self
.inner
.full_iterator_cf(cf, mode)
.map_err(internal_error)?;
for (key, val) in iter {
if count > limit {
next_key = key.to_vec();
break;
}
callback(&key, &val)?;
count += 1;
}
Ok((count, next_key))
}
pub fn transaction(&self) -> RocksDBTransaction {
let write_options = WriteOptions::default();
let mut transaction_options = OptimisticTransactionOptions::new();
transaction_options.set_snapshot(true);
RocksDBTransaction {
db: Arc::clone(&self.inner),
inner: self.inner.transaction(&write_options, &transaction_options),
}
}
pub fn new_write_batch(&self) -> RocksDBWriteBatch {
RocksDBWriteBatch {
db: Arc::clone(&self.inner),
inner: WriteBatch::default(),
}
}
pub fn write(&self, batch: &RocksDBWriteBatch) -> Result<()> {
self.inner.write(&batch.inner).map_err(internal_error)
}
pub fn write_sync(&self, batch: &RocksDBWriteBatch) -> Result<()> {
let mut wo = WriteOptions::new();
wo.set_sync(true);
self.inner
.write_opt(&batch.inner, &wo)
.map_err(internal_error)
}
pub fn compact_range(&self, col: Col, start: Option<&[u8]>, end: Option<&[u8]>) -> Result<()> {
let cf = cf_handle(&self.inner, col)?;
self.inner.compact_range_cf(cf, start, end);
Ok(())
}
pub fn get_snapshot(&self) -> RocksDBSnapshot {
unsafe {
let snapshot = ffi::rocksdb_create_snapshot(self.inner.base_db_ptr());
RocksDBSnapshot::new(&self.inner, snapshot)
}
}
pub fn inner(&self) -> Arc<OptimisticTransactionDB> {
Arc::clone(&self.inner)
}
pub fn create_cf(&mut self, col: Col) -> Result<()> {
let inner = Arc::get_mut(&mut self.inner)
.ok_or_else(|| internal_error("create_cf get_mut failed"))?;
let opts = Options::default();
inner.create_cf(col, &opts).map_err(internal_error)
}
pub fn drop_cf(&mut self, col: Col) -> Result<()> {
let inner = Arc::get_mut(&mut self.inner)
.ok_or_else(|| internal_error("drop_cf get_mut failed"))?;
inner.drop_cf(col).map_err(internal_error)
}
}
#[inline]
pub(crate) fn cf_handle(db: &OptimisticTransactionDB, col: Col) -> Result<&ColumnFamily> {
db.cf_handle(col)
.ok_or_else(|| internal_error(format!("column {col} not found")))
}