ckb_indexer_sync/store.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
use crate::error::Error;
use ckb_db_schema::Col;
use ckb_store::{ChainStore, Freezer, StoreCache};
use rocksdb::{
ops::OpenCF, prelude::*, ColumnFamilyDescriptor, DBIterator, DBPinnableSlice, IteratorMode,
SecondaryDB as SecondaryRocksDB, SecondaryOpenDescriptor,
};
use std::path::Path;
use std::sync::Arc;
/// Open DB as secondary instance with specified column families
//
// When opening DB in secondary mode, you can specify only a subset of column
// families in the database that should be opened. However, you always need
// to specify default column family. The default column family name is
// 'default' and it's stored in ROCKSDB_NAMESPACE::kDefaultColumnFamilyName
//
// Column families created by the primary after the secondary instance starts
// are currently ignored by the secondary instance. Column families opened
// by secondary and dropped by the primary will be dropped by secondary as
// well (on next invocation of TryCatchUpWithPrimary()). However the user
// of the secondary instance can still access the data of such dropped column
// family as long as they do not destroy the corresponding column family
// handle.
//
// The options argument specifies the options to open the secondary instance.
// Options.max_open_files should be set to -1.
// The name argument specifies the name of the primary db that you have used
// to open the primary instance.
// The secondary_path argument points to a directory where the secondary
// instance stores its info log.
// The column_families argument specifies a list of column families to open.
// If default column family is not specified or if any specified column
// families does not exist, the function returns non-OK status.
// Notice: rust-rocksdb `OpenRaw` handle 'default' column automatically
#[derive(Clone)]
pub struct SecondaryDB {
inner: Arc<SecondaryRocksDB>,
}
impl SecondaryDB {
/// Open a SecondaryDB
pub fn open_cf<P, I, N>(opts: &Options, path: P, cf_names: I, secondary_path: String) -> Self
where
P: AsRef<Path>,
I: IntoIterator<Item = N>,
N: Into<String>,
{
let cf_descriptors: Vec<_> = cf_names
.into_iter()
.map(|name| ColumnFamilyDescriptor::new(name, Options::default()))
.collect();
let descriptor = SecondaryOpenDescriptor::new(secondary_path);
let inner = SecondaryRocksDB::open_cf_descriptors_with_descriptor(
opts,
path,
cf_descriptors,
descriptor,
)
.expect("Failed to open SecondaryDB");
SecondaryDB {
inner: Arc::new(inner),
}
}
/// Return the value associated with a key using RocksDB's PinnableSlice from the given column
/// so as to avoid unnecessary memory copy.
pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice>, Error> {
let cf = self
.inner
.cf_handle(col)
.ok_or_else(|| Error::DB(format!("column {col} not found")))?;
self.inner.get_pinned_cf(cf, key).map_err(Into::into)
}
/// Make the secondary instance catch up with the primary by tailing and
/// replaying the MANIFEST and WAL of the primary.
// Column families created by the primary after the secondary instance starts
// will be ignored unless the secondary instance closes and restarts with the
// newly created column families.
// Column families that exist before secondary instance starts and dropped by
// the primary afterwards will be marked as dropped. However, as long as the
// secondary instance does not delete the corresponding column family
// handles, the data of the column family is still accessible to the
// secondary.
pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
self.inner.try_catch_up_with_primary().map_err(Into::into)
}
/// This is used when you want to iterate over a specific ColumnFamily
fn iter(&self, col: Col, mode: IteratorMode) -> Result<DBIterator, Error> {
let opts = ReadOptions::default();
let cf = self
.inner
.cf_handle(col)
.ok_or_else(|| Error::DB(format!("column {col} not found")))?;
self.inner
.iterator_cf_opt(cf, mode, &opts)
.map_err(Into::into)
}
}
impl ChainStore for SecondaryDB {
fn cache(&self) -> Option<&StoreCache> {
None
}
fn freezer(&self) -> Option<&Freezer> {
None
}
fn get(&self, col: Col, key: &[u8]) -> Option<DBPinnableSlice> {
self.get_pinned(col, key)
.expect("db operation should be ok")
}
fn get_iter(&self, col: Col, mode: IteratorMode) -> DBIterator {
self.iter(col, mode).expect("db operation should be ok")
}
}