use crate::{
database::{
Column,
Error as DatabaseError,
Result as DatabaseResult,
},
state::{
BatchOperations,
IterDirection,
KVItem,
KeyValueStore,
TransactableStorage,
Value,
WriteOperation,
},
};
#[cfg(feature = "metrics")]
use fuel_core_metrics::core_metrics::DATABASE_METRICS;
use fuel_core_storage::iter::{
BoxedIter,
IntoBoxedIter,
};
use rocksdb::{
BoundColumnFamily,
Cache,
ColumnFamilyDescriptor,
DBCompressionType,
DBWithThreadMode,
IteratorMode,
MultiThreaded,
Options,
ReadOptions,
SliceTransform,
WriteBatch,
};
use std::{
iter,
path::Path,
sync::Arc,
};
type DB = DBWithThreadMode<MultiThreaded>;
#[derive(Debug)]
pub struct RocksDb {
db: DB,
}
impl RocksDb {
pub fn default_open<P: AsRef<Path>>(
path: P,
capacity: Option<usize>,
) -> DatabaseResult<RocksDb> {
Self::open(
path,
enum_iterator::all::<Column>().collect::<Vec<_>>(),
capacity,
)
}
pub fn open<P: AsRef<Path>>(
path: P,
columns: Vec<Column>,
capacity: Option<usize>,
) -> DatabaseResult<RocksDb> {
let cf_descriptors: Vec<_> = columns
.clone()
.into_iter()
.map(|i| ColumnFamilyDescriptor::new(RocksDb::col_name(i), Self::cf_opts(i)))
.collect();
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_compression_type(DBCompressionType::Lz4);
if let Some(capacity) = capacity {
let cache = Cache::new_lru_cache(capacity).unwrap();
opts.set_row_cache(&cache);
}
let db = match DB::open_cf_descriptors(&opts, &path, cf_descriptors) {
Err(_) => {
match DB::open_cf(&opts, &path, &[] as &[&str]) {
Ok(db) => {
for i in columns {
let opts = Self::cf_opts(i);
db.create_cf(RocksDb::col_name(i), &opts)
.map_err(|e| DatabaseError::Other(e.into()))?;
}
Ok(db)
}
err => err,
}
}
ok => ok,
}
.map_err(|e| DatabaseError::Other(e.into()))?;
let rocks_db = RocksDb { db };
Ok(rocks_db)
}
fn cf(&self, column: Column) -> Arc<BoundColumnFamily> {
self.db
.cf_handle(&RocksDb::col_name(column))
.expect("invalid column state")
}
fn col_name(column: Column) -> String {
format!("column-{}", column.as_usize())
}
fn cf_opts(column: Column) -> Options {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_compression_type(DBCompressionType::Lz4);
match column {
Column::OwnedCoins
| Column::TransactionsByOwnerBlockIdx
| Column::OwnedMessageIds
| Column::ContractsAssets
| Column::ContractsState => {
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(32))
}
_ => {}
};
opts
}
fn _iter_all(
&self,
column: Column,
opts: ReadOptions,
iter_mode: IteratorMode,
) -> impl Iterator<Item = KVItem> + '_ {
self.db
.iterator_cf_opt(&self.cf(column), opts, iter_mode)
.map(|item| {
item.map(|(key, value)| {
let value_as_vec = Vec::from(value);
let key_as_vec = Vec::from(key);
#[cfg(feature = "metrics")]
{
DATABASE_METRICS.read_meter.inc();
DATABASE_METRICS
.bytes_read
.observe((key_as_vec.len() + value_as_vec.len()) as f64);
}
(key_as_vec, Arc::new(value_as_vec))
})
.map_err(|e| DatabaseError::Other(e.into()))
})
}
}
impl KeyValueStore for RocksDb {
fn get(&self, key: &[u8], column: Column) -> DatabaseResult<Option<Value>> {
#[cfg(feature = "metrics")]
DATABASE_METRICS.read_meter.inc();
let value = self
.db
.get_cf(&self.cf(column), key)
.map_err(|e| DatabaseError::Other(e.into()));
#[cfg(feature = "metrics")]
{
if let Ok(Some(value)) = &value {
DATABASE_METRICS.bytes_read.observe(value.len() as f64);
}
}
value.map(|value| value.map(Arc::new))
}
fn put(
&self,
key: &[u8],
column: Column,
value: Value,
) -> DatabaseResult<Option<Value>> {
#[cfg(feature = "metrics")]
{
DATABASE_METRICS.write_meter.inc();
DATABASE_METRICS.bytes_written.observe(value.len() as f64);
}
let prev = self.get(key, column)?;
self.db
.put_cf(&self.cf(column), key, value.as_ref())
.map_err(|e| DatabaseError::Other(e.into()))
.map(|_| prev)
}
fn delete(&self, key: &[u8], column: Column) -> DatabaseResult<Option<Value>> {
let prev = self.get(key, column)?;
self.db
.delete_cf(&self.cf(column), key)
.map_err(|e| DatabaseError::Other(e.into()))
.map(|_| prev)
}
fn exists(&self, key: &[u8], column: Column) -> DatabaseResult<bool> {
self.db
.get_pinned_cf(&self.cf(column), key)
.map_err(|e| DatabaseError::Other(e.into()))
.map(|v| v.is_some())
}
fn iter_all(
&self,
column: Column,
prefix: Option<&[u8]>,
start: Option<&[u8]>,
direction: IterDirection,
) -> BoxedIter<KVItem> {
match (prefix, start) {
(None, None) => {
let iter_mode =
match direction {
IterDirection::Forward => IteratorMode::Start,
IterDirection::Reverse => IteratorMode::End,
};
self._iter_all(column, ReadOptions::default(), iter_mode)
.into_boxed()
}
(Some(prefix), None) => {
let iter_mode = IteratorMode::From(prefix, direction.into());
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);
self._iter_all(column, opts, iter_mode).into_boxed()
}
(None, Some(start)) => {
let iter_mode = IteratorMode::From(start, direction.into());
self._iter_all(column, ReadOptions::default(), iter_mode)
.into_boxed()
}
(Some(prefix), Some(start)) => {
if !start.starts_with(prefix) {
return iter::empty().into_boxed()
}
let prefix = prefix.to_vec();
let iter_mode = IteratorMode::From(start, direction.into());
self._iter_all(column, ReadOptions::default(), iter_mode)
.take_while(move |item| {
if let Ok((key, _)) = item {
key.starts_with(prefix.as_slice())
} else {
true
}
})
.into_boxed()
}
}
}
}
impl BatchOperations for RocksDb {
fn batch_write(
&self,
entries: &mut dyn Iterator<Item = (Vec<u8>, Column, WriteOperation)>,
) -> DatabaseResult<()> {
let mut batch = WriteBatch::default();
for (key, column, op) in entries {
match op {
WriteOperation::Insert(value) => {
batch.put_cf(&self.cf(column), key, value.as_ref());
}
WriteOperation::Remove => {
batch.delete_cf(&self.cf(column), key);
}
}
}
#[cfg(feature = "metrics")]
{
DATABASE_METRICS.write_meter.inc();
DATABASE_METRICS
.bytes_written
.observe(batch.size_in_bytes() as f64);
}
self.db
.write(batch)
.map_err(|e| DatabaseError::Other(e.into()))
}
}
impl TransactableStorage for RocksDb {}
impl From<IterDirection> for rocksdb::Direction {
fn from(d: IterDirection) -> Self {
match d {
IterDirection::Forward => rocksdb::Direction::Forward,
IterDirection::Reverse => rocksdb::Direction::Reverse,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn create_db() -> (RocksDb, TempDir) {
let tmp_dir = TempDir::new().unwrap();
(
RocksDb::default_open(tmp_dir.path(), None).unwrap(),
tmp_dir,
)
}
#[test]
fn can_put_and_read() {
let key = vec![0xA, 0xB, 0xC];
let (db, _tmp) = create_db();
let expected = Arc::new(vec![1, 2, 3]);
db.put(&key, Column::Metadata, expected.clone()).unwrap();
assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected)
}
#[test]
fn put_returns_previous_value() {
let key = vec![0xA, 0xB, 0xC];
let (db, _tmp) = create_db();
let expected = Arc::new(vec![1, 2, 3]);
db.put(&key, Column::Metadata, expected.clone()).unwrap();
let prev = db
.put(&key, Column::Metadata, Arc::new(vec![2, 4, 6]))
.unwrap();
assert_eq!(prev, Some(expected));
}
#[test]
fn delete_and_get() {
let key = vec![0xA, 0xB, 0xC];
let (db, _tmp) = create_db();
let expected = Arc::new(vec![1, 2, 3]);
db.put(&key, Column::Metadata, expected.clone()).unwrap();
assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
db.delete(&key, Column::Metadata).unwrap();
assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
}
#[test]
fn key_exists() {
let key = vec![0xA, 0xB, 0xC];
let (db, _tmp) = create_db();
let expected = Arc::new(vec![1, 2, 3]);
db.put(&key, Column::Metadata, expected).unwrap();
assert!(db.exists(&key, Column::Metadata).unwrap());
}
#[test]
fn batch_write_inserts() {
let key = vec![0xA, 0xB, 0xC];
let value = Arc::new(vec![1, 2, 3]);
let (db, _tmp) = create_db();
let ops = vec![(
key.clone(),
Column::Metadata,
WriteOperation::Insert(value.clone()),
)];
db.batch_write(&mut ops.into_iter()).unwrap();
assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), value)
}
#[test]
fn batch_write_removes() {
let key = vec![0xA, 0xB, 0xC];
let value = Arc::new(vec![1, 2, 3]);
let (db, _tmp) = create_db();
db.put(&key, Column::Metadata, value).unwrap();
let ops = vec![(key.clone(), Column::Metadata, WriteOperation::Remove)];
db.batch_write(&mut ops.into_iter()).unwrap();
assert_eq!(db.get(&key, Column::Metadata).unwrap(), None);
}
#[test]
fn can_use_unit_value() {
let key = vec![0x00];
let (db, _tmp) = create_db();
let expected = Arc::new(vec![]);
db.put(&key, Column::Metadata, expected.clone()).unwrap();
assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
assert!(db.exists(&key, Column::Metadata).unwrap());
assert_eq!(
db.iter_all(Column::Metadata, None, None, IterDirection::Forward)
.collect::<Result<Vec<_>, _>>()
.unwrap()[0],
(key.clone(), expected.clone())
);
assert_eq!(
db.delete(&key, Column::Metadata).unwrap().unwrap(),
expected
);
assert!(!db.exists(&key, Column::Metadata).unwrap());
}
#[test]
fn can_use_unit_key() {
let key: Vec<u8> = Vec::with_capacity(0);
let (db, _tmp) = create_db();
let expected = Arc::new(vec![1, 2, 3]);
db.put(&key, Column::Metadata, expected.clone()).unwrap();
assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
assert!(db.exists(&key, Column::Metadata).unwrap());
assert_eq!(
db.iter_all(Column::Metadata, None, None, IterDirection::Forward)
.collect::<Result<Vec<_>, _>>()
.unwrap()[0],
(key.clone(), expected.clone())
);
assert_eq!(
db.delete(&key, Column::Metadata).unwrap().unwrap(),
expected
);
assert!(!db.exists(&key, Column::Metadata).unwrap());
}
#[test]
fn can_use_unit_key_and_value() {
let key: Vec<u8> = Vec::with_capacity(0);
let (db, _tmp) = create_db();
let expected = Arc::new(vec![]);
db.put(&key, Column::Metadata, expected.clone()).unwrap();
assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), expected);
assert!(db.exists(&key, Column::Metadata).unwrap());
assert_eq!(
db.iter_all(Column::Metadata, None, None, IterDirection::Forward)
.collect::<Result<Vec<_>, _>>()
.unwrap()[0],
(key.clone(), expected.clone())
);
assert_eq!(
db.delete(&key, Column::Metadata).unwrap().unwrap(),
expected
);
assert!(!db.exists(&key, Column::Metadata).unwrap());
}
}