ckb_rocksdb/
db_with_ttl.rs

1use crate::ffi;
2use crate::ffi_util::to_cstring;
3use crate::ops::GetColumnFamilys;
4use crate::{
5    db_iterator::DBRawIterator,
6    db_options::{OptionsMustOutliveDB, ReadOptions},
7    handle::Handle,
8    open_raw::{OpenRaw, OpenRawFFI},
9    ops, ColumnFamily, Error, Options,
10};
11use std::collections::BTreeMap;
12use std::fmt;
13use std::marker::PhantomData;
14use std::path::{Path, PathBuf};
15
16pub struct DBWithTTL {
17    pub(crate) inner: *mut ffi::rocksdb_t,
18    cfs: BTreeMap<String, ColumnFamily>,
19    path: PathBuf,
20    _outlive: Vec<OptionsMustOutliveDB>,
21}
22
23impl DBWithTTL {
24    pub fn path(&self) -> &Path {
25        self.path.as_path()
26    }
27
28    pub fn create_cf_with_ttl<N: AsRef<str>>(
29        &mut self,
30        name: N,
31        opts: &Options,
32        ttl: i32,
33    ) -> Result<(), Error> {
34        let cname = to_cstring(
35            name.as_ref(),
36            "Failed to convert path to CString when opening rocksdb",
37        )?;
38        unsafe {
39            let cf_handle = ffi_try!(ffi::rocksdb_create_column_family_with_ttl(
40                self.handle(),
41                opts.inner,
42                cname.as_ptr(),
43                ttl as libc::c_int,
44            ));
45
46            self.get_mut_cfs()
47                .insert(name.as_ref().to_string(), ColumnFamily::new(cf_handle));
48        };
49        Ok(())
50    }
51}
52
53impl Default for TTLOpenDescriptor {
54    fn default() -> Self {
55        TTLOpenDescriptor {
56            ttls: TTLs::Default(-1),
57        }
58    }
59}
60
61pub enum TTLs {
62    Default(i32),
63    Columns(Vec<i32>),
64}
65
66// TTL is accepted in seconds
67// If TTL is non positive or not provided, the behaviour is TTL = infinity
68// (int32_t)Timestamp(creation) is suffixed to values in Put internally
69// Expired TTL values are deleted in compaction only:(Timestamp+ttl<time_now)
70// Get/Iterator may return expired entries(compaction not run on them yet)
71// Different TTL may be used during different Opens
72// Example: Open1 at t=0 with ttl=4 and insert k1,k2, close at t=2. Open2 at t=3 with ttl=5. Now k1,k2 should be deleted at t>=5
73// read_only=true opens in the usual read-only mode. Compactions will not be triggered(neither manual nor automatic), so no expired entries removed
74pub struct TTLOpenDescriptor {
75    ttls: TTLs,
76}
77
78impl TTLOpenDescriptor {
79    pub fn by_columns(ttls: Vec<i32>) -> Self {
80        TTLOpenDescriptor {
81            ttls: TTLs::Columns(ttls),
82        }
83    }
84
85    pub fn by_default(ttl: i32) -> Self {
86        TTLOpenDescriptor {
87            ttls: TTLs::Default(ttl),
88        }
89    }
90}
91
92impl ops::Open for DBWithTTL {}
93impl ops::OpenCF for DBWithTTL {}
94
95impl OpenRaw for DBWithTTL {
96    type Pointer = ffi::rocksdb_t;
97    type Descriptor = TTLOpenDescriptor;
98
99    fn open_ffi(input: OpenRawFFI<'_, Self::Descriptor>) -> Result<*mut Self::Pointer, Error> {
100        let pointer = unsafe {
101            if input.num_column_families <= 0 {
102                let ttl = match input.open_descriptor.ttls {
103                    TTLs::Default(ttl) => ttl as libc::c_int,
104                    TTLs::Columns(_) => {
105                        return Err(Error::new(
106                            "Ttls size has to be the same as number of column families".to_owned(),
107                        ));
108                    }
109                };
110
111                ffi_try!(ffi::rocksdb_open_with_ttl(input.options, input.path, ttl,))
112            } else {
113                let ttls = match input.open_descriptor.ttls {
114                    TTLs::Default(ttl) => (0..input.num_column_families)
115                        .map(|_| ttl as libc::c_int)
116                        .collect::<Vec<_>>(),
117                    TTLs::Columns(ref ttls) => {
118                        let ttls: Vec<_> = ttls.iter().map(|t| *t as libc::c_int).collect();
119
120                        let is_ttls_match = if input.num_column_families <= 0 {
121                            ttls.len() as i32 == 1
122                        } else {
123                            ttls.len() as i32 == input.num_column_families
124                        };
125
126                        if !is_ttls_match {
127                            return Err(Error::new(
128                                "Ttls size has to be the same as number of column families"
129                                    .to_owned(),
130                            ));
131                        }
132
133                        ttls
134                    }
135                };
136
137                ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
138                    input.options,
139                    input.path,
140                    input.num_column_families,
141                    input.column_family_names,
142                    input.column_family_options,
143                    input.column_family_handles,
144                    ttls.as_ptr(),
145                ))
146            }
147        };
148
149        Ok(pointer)
150    }
151
152    fn build<I>(
153        path: PathBuf,
154        _open_descriptor: Self::Descriptor,
155        pointer: *mut Self::Pointer,
156        column_families: I,
157        outlive: Vec<OptionsMustOutliveDB>,
158    ) -> Result<Self, Error>
159    where
160        I: IntoIterator<Item = (String, *mut ffi::rocksdb_column_family_handle_t)>,
161    {
162        let cfs: BTreeMap<_, _> = column_families
163            .into_iter()
164            .map(|(k, h)| (k, ColumnFamily::new(h)))
165            .collect();
166        Ok(DBWithTTL {
167            inner: pointer,
168            cfs,
169            path,
170            _outlive: outlive,
171        })
172    }
173}
174
175impl Handle<ffi::rocksdb_t> for DBWithTTL {
176    fn handle(&self) -> *mut ffi::rocksdb_t {
177        self.inner
178    }
179}
180
181impl ops::Iterate for DBWithTTL {
182    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
183        unsafe {
184            DBRawIterator {
185                inner: ffi::rocksdb_create_iterator(self.inner, readopts.handle()),
186                db: PhantomData,
187            }
188        }
189    }
190}
191
192impl ops::IterateCF for DBWithTTL {
193    fn get_raw_iter_cf<'a: 'b, 'b>(
194        &'a self,
195        cf_handle: &ColumnFamily,
196        readopts: &ReadOptions,
197    ) -> Result<DBRawIterator<'b>, Error> {
198        unsafe {
199            Ok(DBRawIterator {
200                inner: ffi::rocksdb_create_iterator_cf(
201                    self.inner,
202                    readopts.handle(),
203                    cf_handle.inner,
204                ),
205                db: PhantomData,
206            })
207        }
208    }
209}
210
211impl ops::GetColumnFamilys for DBWithTTL {
212    fn get_cfs(&self) -> &BTreeMap<String, ColumnFamily> {
213        &self.cfs
214    }
215    fn get_mut_cfs(&mut self) -> &mut BTreeMap<String, ColumnFamily> {
216        &mut self.cfs
217    }
218}
219
220impl ops::Read for DBWithTTL {}
221impl ops::Write for DBWithTTL {}
222
223unsafe impl Send for DBWithTTL {}
224unsafe impl Sync for DBWithTTL {}
225
226impl Drop for DBWithTTL {
227    fn drop(&mut self) {
228        unsafe {
229            for cf in self.cfs.values() {
230                ffi::rocksdb_column_family_handle_destroy(cf.inner);
231            }
232            ffi::rocksdb_close(self.inner);
233        }
234    }
235}
236
237impl fmt::Debug for DBWithTTL {
238    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239        write!(f, "Read-only RocksDB {{ path: {:?} }}", self.path())
240    }
241}