ckb_rocksdb/
optimistic_transaction_db.rs

1use crate::{
2    db_iterator::DBRawIterator,
3    db_options::{OptionsMustOutliveDB, ReadOptions},
4    db_vector::DBVector,
5    handle::{ConstHandle, Handle},
6    open_raw::{OpenRaw, OpenRawFFI},
7    ops::*,
8    ColumnFamily, Error, OptimisticTransaction, Options, WriteOptions,
9};
10
11use crate::ffi;
12use crate::ffi_util::to_cpath;
13use libc::c_uchar;
14use std::collections::BTreeMap;
15use std::marker::PhantomData;
16use std::path::{Path, PathBuf};
17use std::ptr;
18
19/// A optimistic transaction database.
20pub struct OptimisticTransactionDB {
21    inner: *mut ffi::rocksdb_optimistictransactiondb_t,
22    path: PathBuf,
23    cfs: BTreeMap<String, ColumnFamily>,
24    base_db: *mut ffi::rocksdb_t,
25    _outlive: Vec<OptionsMustOutliveDB>,
26}
27
28impl Handle<ffi::rocksdb_optimistictransactiondb_t> for OptimisticTransactionDB {
29    fn handle(&self) -> *mut ffi::rocksdb_optimistictransactiondb_t {
30        self.inner
31    }
32}
33
34impl Open for OptimisticTransactionDB {}
35impl OpenCF for OptimisticTransactionDB {}
36
37impl OpenRaw for OptimisticTransactionDB {
38    type Pointer = ffi::rocksdb_optimistictransactiondb_t;
39    type Descriptor = ();
40
41    fn open_ffi(input: OpenRawFFI<'_, Self::Descriptor>) -> Result<*mut Self::Pointer, Error> {
42        let pointer = unsafe {
43            if input.num_column_families <= 0 {
44                ffi_try!(ffi::rocksdb_optimistictransactiondb_open(
45                    input.options,
46                    input.path,
47                ))
48            } else {
49                ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families(
50                    input.options,
51                    input.path,
52                    input.num_column_families,
53                    input.column_family_names,
54                    input.column_family_options,
55                    input.column_family_handles,
56                ))
57            }
58        };
59
60        Ok(pointer)
61    }
62
63    fn build<I>(
64        path: PathBuf,
65        _open_descriptor: Self::Descriptor,
66        pointer: *mut Self::Pointer,
67        column_families: I,
68        outlive: Vec<OptionsMustOutliveDB>,
69    ) -> Result<Self, Error>
70    where
71        I: IntoIterator<Item = (String, *mut ffi::rocksdb_column_family_handle_t)>,
72    {
73        let cfs: BTreeMap<_, _> = column_families
74            .into_iter()
75            .map(|(k, h)| (k, ColumnFamily::new(h)))
76            .collect();
77        let base_db = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(pointer) };
78        Ok(OptimisticTransactionDB {
79            inner: pointer,
80            cfs,
81            path,
82            base_db,
83            _outlive: outlive,
84        })
85    }
86}
87
88impl Read for OptimisticTransactionDB {}
89impl Write for OptimisticTransactionDB {}
90
91unsafe impl Send for OptimisticTransactionDB {}
92unsafe impl Sync for OptimisticTransactionDB {}
93
94impl GetColumnFamilys for OptimisticTransactionDB {
95    fn get_cfs(&self) -> &BTreeMap<String, ColumnFamily> {
96        &self.cfs
97    }
98    fn get_mut_cfs(&mut self) -> &mut BTreeMap<String, ColumnFamily> {
99        &mut self.cfs
100    }
101}
102
103impl OptimisticTransactionDB {
104    pub fn path(&self) -> &Path {
105        self.path.as_path()
106    }
107
108    pub fn base_db_ptr(&self) -> *mut ffi::rocksdb_t {
109        self.base_db
110    }
111
112    pub fn repair<P: AsRef<Path>>(opts: Options, path: P) -> Result<(), Error> {
113        let cpath = to_cpath(
114            path,
115            "Failed to convert path to CString when opening database.",
116        )?;
117        unsafe {
118            ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr(),));
119        }
120        Ok(())
121    }
122
123    pub fn transaction(
124        &self,
125        write_options: &WriteOptions,
126        tx_options: &OptimisticTransactionOptions,
127    ) -> OptimisticTransaction {
128        unsafe {
129            let inner = ffi::rocksdb_optimistictransaction_begin(
130                self.inner,
131                write_options.handle(),
132                tx_options.inner,
133                ptr::null_mut(),
134            );
135            OptimisticTransaction::new(inner)
136        }
137    }
138
139    pub fn transaction_default(&self) -> OptimisticTransaction {
140        let write_options = WriteOptions::default();
141        let transaction_options = OptimisticTransactionOptions::default();
142        self.transaction(&write_options, &transaction_options)
143    }
144}
145
146impl Drop for OptimisticTransactionDB {
147    fn drop(&mut self) {
148        unsafe {
149            for cf in self.cfs.values() {
150                ffi::rocksdb_column_family_handle_destroy(cf.inner);
151            }
152            ffi::rocksdb_optimistictransactiondb_close_base_db(self.base_db);
153            ffi::rocksdb_optimistictransactiondb_close(self.inner);
154        }
155    }
156}
157
158// impl TransactionBegin for OptimisticTransactionDB {
159//     type WriteOptions = WriteOptions;
160//     type TransactionOptions = OptimisticTransactionOptions;
161//     fn transaction(
162//         &self,
163//         write_options: &WriteOptions,
164//         tx_options: &OptimisticTransactionOptions,
165//     ) -> Transaction<OptimisticTransactionDB> {
166//         unsafe {
167//             let inner = ffi::rocksdb_optimistictransaction_begin(
168//                 self.inner,
169//                 write_options.handle(),
170//                 tx_options.inner,
171//                 ptr::null_mut(),
172//             );
173//             Transaction::new(inner)
174//         }
175//     }
176// }
177
178pub struct OptimisticTransactionOptions {
179    pub inner: *mut ffi::rocksdb_optimistictransaction_options_t,
180}
181
182impl OptimisticTransactionOptions {
183    /// Create new optimistic transaction options
184    pub fn new() -> OptimisticTransactionOptions {
185        unsafe {
186            let inner = ffi::rocksdb_optimistictransaction_options_create();
187            OptimisticTransactionOptions { inner }
188        }
189    }
190
191    /// Set a snapshot at start of transaction by setting set_snapshot=true
192    /// Default: false
193    pub fn set_snapshot(&mut self, set_snapshot: bool) {
194        unsafe {
195            ffi::rocksdb_optimistictransaction_options_set_set_snapshot(
196                self.inner,
197                set_snapshot as c_uchar,
198            );
199        }
200    }
201}
202
203impl Drop for OptimisticTransactionOptions {
204    fn drop(&mut self) {
205        unsafe {
206            ffi::rocksdb_optimistictransaction_options_destroy(self.inner);
207        }
208    }
209}
210
211impl Default for OptimisticTransactionOptions {
212    fn default() -> OptimisticTransactionOptions {
213        OptimisticTransactionOptions::new()
214    }
215}
216
217impl Handle<ffi::rocksdb_t> for OptimisticTransactionDB {
218    fn handle(&self) -> *mut ffi::rocksdb_t {
219        self.base_db
220    }
221}
222
223impl Iterate for OptimisticTransactionDB {
224    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
225        unsafe {
226            DBRawIterator {
227                inner: ffi::rocksdb_create_iterator(self.base_db, readopts.handle()),
228                db: PhantomData,
229            }
230        }
231    }
232}
233
234impl IterateCF for OptimisticTransactionDB {
235    fn get_raw_iter_cf<'a: 'b, 'b>(
236        &'a self,
237        cf_handle: &ColumnFamily,
238        readopts: &ReadOptions,
239    ) -> Result<DBRawIterator<'b>, Error> {
240        unsafe {
241            Ok(DBRawIterator {
242                inner: ffi::rocksdb_create_iterator_cf(
243                    self.base_db,
244                    readopts.handle(),
245                    cf_handle.inner,
246                ),
247                db: PhantomData,
248            })
249        }
250    }
251}
252
253impl OptimisticTransactionDB {
254    pub fn snapshot(&self) -> Snapshot<'_> {
255        let snapshot = unsafe { ffi::rocksdb_create_snapshot(self.base_db) };
256        Snapshot {
257            db: self,
258            inner: snapshot,
259        }
260    }
261}
262
263pub struct Snapshot<'a> {
264    db: &'a OptimisticTransactionDB,
265    inner: *const ffi::rocksdb_snapshot_t,
266}
267
268impl<'a> ConstHandle<ffi::rocksdb_snapshot_t> for Snapshot<'a> {
269    fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
270        self.inner
271    }
272}
273
274impl<'a> Read for Snapshot<'a> {}
275
276impl<'a> GetCF<ReadOptions> for Snapshot<'a> {
277    fn get_cf_full<K: AsRef<[u8]>>(
278        &self,
279        cf: Option<&ColumnFamily>,
280        key: K,
281        readopts: Option<&ReadOptions>,
282    ) -> Result<Option<DBVector>, Error> {
283        let mut ro = readopts.cloned().unwrap_or_default();
284        ro.set_snapshot(self);
285
286        self.db.get_cf_full(cf, key, Some(&ro))
287    }
288}
289
290impl<'a> MultiGet<ReadOptions> for Snapshot<'a> {
291    fn multi_get_full<K, I>(
292        &self,
293        keys: I,
294        readopts: Option<&ReadOptions>,
295    ) -> Vec<Result<Option<DBVector>, Error>>
296    where
297        K: AsRef<[u8]>,
298        I: IntoIterator<Item = K>,
299    {
300        let mut ro = readopts.cloned().unwrap_or_default();
301        ro.set_snapshot(self);
302
303        self.db.multi_get_full(keys, Some(&ro))
304    }
305}
306
307impl<'a> MultiGetCF<ReadOptions> for Snapshot<'a> {
308    fn multi_get_cf_full<'m, K, I>(
309        &self,
310        keys: I,
311        readopts: Option<&ReadOptions>,
312    ) -> Vec<Result<Option<DBVector>, Error>>
313    where
314        K: AsRef<[u8]>,
315        I: IntoIterator<Item = (&'m ColumnFamily, K)>,
316    {
317        let mut ro = readopts.cloned().unwrap_or_default();
318        ro.set_snapshot(self);
319
320        self.db.multi_get_cf_full(keys, Some(&ro))
321    }
322}
323
324impl<'a> Drop for Snapshot<'a> {
325    fn drop(&mut self) {
326        unsafe {
327            ffi::rocksdb_release_snapshot(self.db.base_db, self.inner);
328        }
329    }
330}
331
332impl Iterate for Snapshot<'_> {
333    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
334        let mut ro = readopts.to_owned();
335        ro.set_snapshot(self);
336        self.db.get_raw_iter(&ro)
337    }
338}
339
340impl IterateCF for Snapshot<'_> {
341    fn get_raw_iter_cf<'a: 'b, 'b>(
342        &'a self,
343        cf_handle: &ColumnFamily,
344        readopts: &ReadOptions,
345    ) -> Result<DBRawIterator<'b>, Error> {
346        let mut ro = readopts.to_owned();
347        ro.set_snapshot(self);
348        self.db.get_raw_iter_cf(cf_handle, &ro)
349    }
350}