ckb_db/
db_with_ttl.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
//! DB with ttl support wrapper

use crate::{internal_error, Result};
use rocksdb::ops::{DropCF, GetColumnFamilys, GetPinnedCF, GetPropertyCF, OpenCF, PutCF};
use rocksdb::{
    ColumnFamilyDescriptor, DBPinnableSlice, DBWithTTL as RawDBWithTTL, Options, TTLOpenDescriptor,
};
use std::path::Path;

const PROPERTY_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
const DB_LOG_KEEP_NUM: usize = 10;

/// DB with ttl support wrapper
///
/// TTL is accepted in seconds
/// If TTL is non positive or not provided, the behaviour is TTL = infinity
/// (int32_t)Timestamp(creation) is suffixed to values in Put internally
/// Expired TTL values are deleted in compaction only:(Timestamp+ttl<time_now)
/// Get/Iterator may return expired entries(compaction not run on them yet)
/// Different TTL may be used during different Opens
/// Example: Open1 at t=0 with ttl=4 and insert k1,k2, close at t=2. Open2 at t=3 with ttl=5. Now k1,k2 should be deleted at t>=5
/// read_only=true opens in the usual read-only mode. Compactions will not be triggered(neither manual nor automatic), so no expired entries removed
#[derive(Debug)]
pub struct DBWithTTL {
    pub(crate) inner: RawDBWithTTL,
}

impl DBWithTTL {
    /// Open a database with ttl support.
    pub fn open_cf<P, I, N>(path: P, cf_names: I, ttl: i32) -> Result<Self>
    where
        P: AsRef<Path>,
        I: IntoIterator<Item = N>,
        N: Into<String>,
    {
        let mut opts = Options::default();
        opts.create_if_missing(true);
        opts.create_missing_column_families(true);
        opts.set_keep_log_file_num(DB_LOG_KEEP_NUM);

        let cf_descriptors: Vec<_> = cf_names
            .into_iter()
            .map(|name| ColumnFamilyDescriptor::new(name, Options::default()))
            .collect();

        let descriptor = TTLOpenDescriptor::by_default(ttl);
        let inner = RawDBWithTTL::open_cf_descriptors_with_descriptor(
            &opts,
            path,
            cf_descriptors,
            descriptor,
        )
        .map_err(|err| internal_error(format!("failed to open database: {err}")))?;
        Ok(DBWithTTL { inner })
    }

    /// Return the value associated with a key using RocksDB's PinnableSlice from the given column
    /// so as to avoid unnecessary memory copy.
    pub fn get_pinned(&self, col: &str, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
        let cf = self
            .inner
            .cf_handle(col)
            .ok_or_else(|| internal_error(format!("column {col} not found")))?;
        self.inner.get_pinned_cf(cf, key).map_err(internal_error)
    }

    /// Insert a value into the database under the given key.
    pub fn put<K, V>(&self, col: &str, key: K, value: V) -> Result<()>
    where
        K: AsRef<[u8]>,
        V: AsRef<[u8]>,
    {
        let cf = self
            .inner
            .cf_handle(col)
            .ok_or_else(|| internal_error(format!("column {col} not found")))?;
        self.inner.put_cf(cf, key, value).map_err(internal_error)
    }

    /// Create a new column family for the database.
    pub fn create_cf_with_ttl(&mut self, col: &str, ttl: i32) -> Result<()> {
        let opts = Options::default();
        self.inner
            .create_cf_with_ttl(col, &opts, ttl)
            .map_err(internal_error)
    }

    /// Delete column family.
    pub fn drop_cf(&mut self, col: &str) -> Result<()> {
        self.inner.drop_cf(col).map_err(internal_error)
    }

    /// "rocksdb.estimate-num-keys" - returns estimated number of total keys in
    /// the active and unflushed immutable memtables and storage.
    pub fn estimate_num_keys_cf(&self, col: &str) -> Result<Option<u64>> {
        let cf = self
            .inner
            .cf_handle(col)
            .ok_or_else(|| internal_error(format!("column {col} not found")))?;
        self.inner
            .property_int_value_cf(cf, PROPERTY_NUM_KEYS)
            .map_err(internal_error)
    }
}