ckb_rocksdb/
transaction_db.rs

1use crate::{
2    db_options::OptionsMustOutliveDB,
3    db_vector::DBVector,
4    ffi_util::to_cstring,
5    handle::{ConstHandle, Handle},
6    open_raw::{OpenRaw, OpenRawFFI},
7    ops::*,
8    write_batch::WriteBatch,
9    ColumnFamily, DBRawIterator, Error, Options, ReadOptions, Transaction, WriteOptions,
10};
11
12use crate::ffi;
13use libc::{c_char, c_uchar, size_t};
14use std::collections::BTreeMap;
15use std::marker::PhantomData;
16use std::path::Path;
17use std::path::PathBuf;
18use std::ptr;
19
20/// A transaction database.
21pub struct TransactionDB {
22    inner: *mut ffi::rocksdb_transactiondb_t,
23    path: PathBuf,
24    cfs: BTreeMap<String, ColumnFamily>,
25    _outlive: Vec<OptionsMustOutliveDB>,
26}
27
28impl TransactionDB {
29    pub fn path(&self) -> &Path {
30        self.path.as_path()
31    }
32}
33
34impl Handle<ffi::rocksdb_transactiondb_t> for TransactionDB {
35    fn handle(&self) -> *mut ffi::rocksdb_transactiondb_t {
36        self.inner
37    }
38}
39
40impl Open for TransactionDB {}
41
42impl OpenCF for TransactionDB {}
43
44impl OpenRaw for TransactionDB {
45    type Pointer = ffi::rocksdb_transactiondb_t;
46    type Descriptor = TransactionDBOptions;
47
48    fn open_ffi(input: OpenRawFFI<'_, Self::Descriptor>) -> Result<*mut Self::Pointer, Error> {
49        let pointer = unsafe {
50            if input.num_column_families <= 0 {
51                ffi_try!(ffi::rocksdb_transactiondb_open(
52                    input.options,
53                    input.open_descriptor.inner,
54                    input.path,
55                ))
56            } else {
57                ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
58                    input.options,
59                    input.open_descriptor.inner,
60                    input.path,
61                    input.num_column_families,
62                    input.column_family_names,
63                    input.column_family_options,
64                    input.column_family_handles,
65                ))
66            }
67        };
68
69        Ok(pointer)
70    }
71
72    fn build<I>(
73        path: PathBuf,
74        _open_descriptor: Self::Descriptor,
75        pointer: *mut Self::Pointer,
76        column_families: I,
77        outlive: Vec<OptionsMustOutliveDB>,
78    ) -> Result<Self, Error>
79    where
80        I: IntoIterator<Item = (String, *mut ffi::rocksdb_column_family_handle_t)>,
81    {
82        let cfs: BTreeMap<_, _> = column_families
83            .into_iter()
84            .map(|(k, h)| (k, ColumnFamily::new(h)))
85            .collect();
86        Ok(TransactionDB {
87            inner: pointer,
88            path,
89            cfs,
90            _outlive: outlive,
91        })
92    }
93}
94
95impl GetColumnFamilys for TransactionDB {
96    fn get_cfs(&self) -> &BTreeMap<String, ColumnFamily> {
97        &self.cfs
98    }
99    fn get_mut_cfs(&mut self) -> &mut BTreeMap<String, ColumnFamily> {
100        &mut self.cfs
101    }
102}
103
104impl Read for TransactionDB {}
105impl Write for TransactionDB {}
106
107unsafe impl Send for TransactionDB {}
108unsafe impl Sync for TransactionDB {}
109
110impl TransactionBegin for TransactionDB {
111    type WriteOptions = WriteOptions;
112    type TransactionOptions = TransactionOptions;
113    fn transaction(
114        &self,
115        write_options: &WriteOptions,
116        tx_options: &TransactionOptions,
117    ) -> Transaction<'_, TransactionDB> {
118        unsafe {
119            let inner = ffi::rocksdb_transaction_begin(
120                self.inner,
121                write_options.handle(),
122                tx_options.inner,
123                ptr::null_mut(),
124            );
125            Transaction::new(inner)
126        }
127    }
128}
129
130impl Iterate for TransactionDB {
131    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
132        unsafe {
133            DBRawIterator {
134                inner: ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.handle()),
135                db: PhantomData,
136            }
137        }
138    }
139}
140
141impl IterateCF for TransactionDB {
142    fn get_raw_iter_cf<'a: 'b, 'b>(
143        &'a self,
144        cf_handle: &ColumnFamily,
145        readopts: &ReadOptions,
146    ) -> Result<DBRawIterator<'b>, Error> {
147        unsafe {
148            Ok(DBRawIterator {
149                inner: ffi::rocksdb_transactiondb_create_iterator_cf(
150                    self.inner,
151                    readopts.handle(),
152                    cf_handle.handle(),
153                ),
154                db: PhantomData,
155            })
156        }
157    }
158}
159
160impl Drop for TransactionDB {
161    fn drop(&mut self) {
162        unsafe {
163            ffi::rocksdb_transactiondb_close(self.inner);
164        }
165    }
166}
167
168pub struct TransactionDBOptions {
169    inner: *mut ffi::rocksdb_transactiondb_options_t,
170}
171
172impl TransactionDBOptions {
173    /// Create new transaction options
174    pub fn new() -> TransactionDBOptions {
175        unsafe {
176            let inner = ffi::rocksdb_transactiondb_options_create();
177            TransactionDBOptions { inner }
178        }
179    }
180
181    pub fn set_default_lock_timeout(&self, default_lock_timeout: i64) {
182        unsafe {
183            ffi::rocksdb_transactiondb_options_set_default_lock_timeout(
184                self.inner,
185                default_lock_timeout,
186            )
187        }
188    }
189
190    pub fn set_max_num_locks(&self, max_num_locks: i64) {
191        unsafe { ffi::rocksdb_transactiondb_options_set_max_num_locks(self.inner, max_num_locks) }
192    }
193
194    pub fn set_num_stripes(&self, num_stripes: usize) {
195        unsafe { ffi::rocksdb_transactiondb_options_set_num_stripes(self.inner, num_stripes) }
196    }
197
198    pub fn set_transaction_lock_timeout(&self, txn_lock_timeout: i64) {
199        unsafe {
200            ffi::rocksdb_transactiondb_options_set_transaction_lock_timeout(
201                self.inner,
202                txn_lock_timeout,
203            )
204        }
205    }
206}
207
208impl Drop for TransactionDBOptions {
209    fn drop(&mut self) {
210        unsafe {
211            ffi::rocksdb_transactiondb_options_destroy(self.inner);
212        }
213    }
214}
215
216impl Default for TransactionDBOptions {
217    fn default() -> TransactionDBOptions {
218        TransactionDBOptions::new()
219    }
220}
221
222pub struct TransactionOptions {
223    inner: *mut ffi::rocksdb_transaction_options_t,
224}
225
226impl TransactionOptions {
227    /// Create new transaction options
228    pub fn new() -> TransactionOptions {
229        unsafe {
230            let inner = ffi::rocksdb_transaction_options_create();
231            TransactionOptions { inner }
232        }
233    }
234
235    pub fn set_deadlock_detect(&self, deadlock_detect: bool) {
236        unsafe {
237            ffi::rocksdb_transaction_options_set_deadlock_detect(
238                self.inner,
239                deadlock_detect as c_uchar,
240            )
241        }
242    }
243
244    pub fn set_deadlock_detect_depth(&self, depth: i64) {
245        unsafe { ffi::rocksdb_transaction_options_set_deadlock_detect_depth(self.inner, depth) }
246    }
247
248    pub fn set_expiration(&self, expiration: i64) {
249        unsafe { ffi::rocksdb_transaction_options_set_expiration(self.inner, expiration) }
250    }
251
252    pub fn set_lock_timeout(&self, lock_timeout: i64) {
253        unsafe { ffi::rocksdb_transaction_options_set_lock_timeout(self.inner, lock_timeout) }
254    }
255
256    pub fn set_max_write_batch_size(&self, size: usize) {
257        unsafe { ffi::rocksdb_transaction_options_set_max_write_batch_size(self.inner, size) }
258    }
259
260    pub fn set_snapshot(&mut self, set_snapshot: bool) {
261        unsafe {
262            ffi::rocksdb_transaction_options_set_set_snapshot(self.inner, set_snapshot as c_uchar);
263        }
264    }
265}
266
267impl Drop for TransactionOptions {
268    fn drop(&mut self) {
269        unsafe {
270            ffi::rocksdb_transaction_options_destroy(self.inner);
271        }
272    }
273}
274
275impl Default for TransactionOptions {
276    fn default() -> TransactionOptions {
277        TransactionOptions::new()
278    }
279}
280
281impl CreateCheckpointObject for TransactionDB {
282    unsafe fn create_checkpoint_object_raw(&self) -> Result<*mut ffi::rocksdb_checkpoint_t, Error> {
283        Ok(ffi_try!(
284            ffi::rocksdb_transactiondb_checkpoint_object_create(self.inner,)
285        ))
286    }
287}
288
289impl GetCF<ReadOptions> for TransactionDB {
290    fn get_cf_full<K: AsRef<[u8]>>(
291        &self,
292        cf: Option<&ColumnFamily>,
293        key: K,
294        readopts: Option<&ReadOptions>,
295    ) -> Result<Option<DBVector>, Error> {
296        let mut default_readopts = None;
297
298        let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
299
300        let key = key.as_ref();
301        let key_ptr = key.as_ptr() as *const c_char;
302        let key_len = key.len() as size_t;
303
304        unsafe {
305            let mut val_len: size_t = 0;
306
307            let val = match cf {
308                Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_get_cf(
309                    self.handle(),
310                    ro_handle,
311                    cf.handle(),
312                    key_ptr,
313                    key_len,
314                    &mut val_len,
315                )),
316                None => ffi_try!(ffi::rocksdb_transactiondb_get(
317                    self.handle(),
318                    ro_handle,
319                    key_ptr,
320                    key_len,
321                    &mut val_len,
322                )),
323            } as *mut u8;
324
325            if val.is_null() {
326                Ok(None)
327            } else {
328                Ok(Some(DBVector::from_c(val, val_len)))
329            }
330        }
331    }
332}
333
334impl MultiGet<ReadOptions> for TransactionDB {
335    fn multi_get_full<K, I>(
336        &self,
337        keys: I,
338        readopts: Option<&ReadOptions>,
339    ) -> Vec<Result<Option<DBVector>, Error>>
340    where
341        K: AsRef<[u8]>,
342        I: IntoIterator<Item = K>,
343    {
344        let mut default_readopts = None;
345        let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
346            Ok(ro) => ro,
347            Err(e) => {
348                let key_count = keys.into_iter().count();
349
350                return vec![e; key_count]
351                    .iter()
352                    .map(|e| Err(e.to_owned()))
353                    .collect();
354            }
355        };
356
357        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
358            .into_iter()
359            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
360            .unzip();
361        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
362
363        let mut values = vec![ptr::null_mut(); keys.len()];
364        let mut values_sizes = vec![0_usize; keys.len()];
365        let mut errors = vec![ptr::null_mut(); keys.len()];
366        unsafe {
367            ffi::rocksdb_transactiondb_multi_get(
368                self.inner,
369                ro_handle,
370                ptr_keys.len(),
371                ptr_keys.as_ptr(),
372                keys_sizes.as_ptr(),
373                values.as_mut_ptr(),
374                values_sizes.as_mut_ptr(),
375                errors.as_mut_ptr(),
376            );
377        }
378
379        convert_values(values, values_sizes, errors)
380    }
381}
382
383impl MultiGetCF<ReadOptions> for TransactionDB {
384    fn multi_get_cf_full<'a, K, I>(
385        &self,
386        keys: I,
387        readopts: Option<&ReadOptions>,
388    ) -> Vec<Result<Option<DBVector>, Error>>
389    where
390        K: AsRef<[u8]>,
391        I: IntoIterator<Item = (&'a ColumnFamily, K)>,
392    {
393        let mut default_readopts = None;
394        let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
395            Ok(ro) => ro,
396            Err(e) => {
397                let key_count = keys.into_iter().count();
398
399                return vec![e; key_count]
400                    .iter()
401                    .map(|e| Err(e.to_owned()))
402                    .collect();
403            }
404        };
405        let (cfs_and_keys, keys_sizes): (Vec<CFAndKey>, Vec<_>) = keys
406            .into_iter()
407            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
408            .unzip();
409        let ptr_keys: Vec<_> = cfs_and_keys
410            .iter()
411            .map(|(_, k)| k.as_ptr() as *const c_char)
412            .collect();
413        let ptr_cfs: Vec<_> = cfs_and_keys
414            .iter()
415            .map(|(c, _)| c.inner as *const _)
416            .collect();
417
418        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
419        let mut values_sizes = vec![0_usize; ptr_keys.len()];
420        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
421        unsafe {
422            ffi::rocksdb_transactiondb_multi_get_cf(
423                self.inner,
424                ro_handle,
425                ptr_cfs.as_ptr(),
426                ptr_keys.len(),
427                ptr_keys.as_ptr(),
428                keys_sizes.as_ptr(),
429                values.as_mut_ptr(),
430                values_sizes.as_mut_ptr(),
431                errors.as_mut_ptr(),
432            );
433        }
434
435        convert_values(values, values_sizes, errors)
436    }
437}
438
439impl PutCF<WriteOptions> for TransactionDB {
440    fn put_cf_full<K, V>(
441        &self,
442        cf: Option<&ColumnFamily>,
443        key: K,
444        value: V,
445        writeopts: Option<&WriteOptions>,
446    ) -> Result<(), Error>
447    where
448        K: AsRef<[u8]>,
449        V: AsRef<[u8]>,
450    {
451        let mut default_writeopts = None;
452
453        let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
454
455        let key = key.as_ref();
456        let value = value.as_ref();
457        let key_ptr = key.as_ptr() as *const c_char;
458        let key_len = key.len() as size_t;
459        let val_ptr = value.as_ptr() as *const c_char;
460        let val_len = value.len() as size_t;
461
462        unsafe {
463            match cf {
464                Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_put_cf(
465                    self.handle(),
466                    wo_handle,
467                    cf.handle(),
468                    key_ptr,
469                    key_len,
470                    val_ptr,
471                    val_len,
472                )),
473                None => ffi_try!(ffi::rocksdb_transactiondb_put(
474                    self.handle(),
475                    wo_handle,
476                    key_ptr,
477                    key_len,
478                    val_ptr,
479                    val_len,
480                )),
481            }
482
483            Ok(())
484        }
485    }
486}
487
488impl DeleteCF<WriteOptions> for TransactionDB {
489    fn delete_cf_full<K>(
490        &self,
491        cf: Option<&ColumnFamily>,
492        key: K,
493        writeopts: Option<&WriteOptions>,
494    ) -> Result<(), Error>
495    where
496        K: AsRef<[u8]>,
497    {
498        let mut default_writeopts = None;
499
500        let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
501
502        let key = key.as_ref();
503        let key_ptr = key.as_ptr() as *const c_char;
504        let key_len = key.len() as size_t;
505
506        unsafe {
507            match cf {
508                Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
509                    self.handle(),
510                    wo_handle,
511                    cf.handle(),
512                    key_ptr,
513                    key_len,
514                )),
515                None => ffi_try!(ffi::rocksdb_transactiondb_delete(
516                    self.handle(),
517                    wo_handle,
518                    key_ptr,
519                    key_len,
520                )),
521            }
522
523            Ok(())
524        }
525    }
526}
527
528impl MergeCF<WriteOptions> for TransactionDB {
529    fn merge_cf_full<K, V>(
530        &self,
531        cf: Option<&ColumnFamily>,
532        key: K,
533        value: V,
534        writeopts: Option<&WriteOptions>,
535    ) -> Result<(), Error>
536    where
537        K: AsRef<[u8]>,
538        V: AsRef<[u8]>,
539    {
540        let mut default_writeopts = None;
541
542        let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
543
544        let key = key.as_ref();
545        let value = value.as_ref();
546        let key_ptr = key.as_ptr() as *const c_char;
547        let key_len = key.len() as size_t;
548        let val_ptr = value.as_ptr() as *const c_char;
549        let val_len = value.len() as size_t;
550
551        unsafe {
552            match cf {
553                Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
554                    self.handle(),
555                    wo_handle,
556                    cf.handle(),
557                    key_ptr,
558                    key_len,
559                    val_ptr,
560                    val_len,
561                )),
562                None => ffi_try!(ffi::rocksdb_transactiondb_merge(
563                    self.handle(),
564                    wo_handle,
565                    key_ptr,
566                    key_len,
567                    val_ptr,
568                    val_len,
569                )),
570            }
571
572            Ok(())
573        }
574    }
575}
576
577impl CreateCF for TransactionDB {
578    fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
579        let cname = to_cstring(
580            name.as_ref(),
581            "Failed to convert path to CString when opening rocksdb",
582        )?;
583        unsafe {
584            let cf_handle = ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
585                self.handle(),
586                opts.const_handle(),
587                cname.as_ptr(),
588            ));
589
590            self.get_mut_cfs()
591                .insert(name.as_ref().to_string(), ColumnFamily::new(cf_handle));
592        };
593        Ok(())
594    }
595}
596
597impl TransactionDB {
598    pub fn snapshot(&self) -> Snapshot<'_> {
599        let snapshot = unsafe { ffi::rocksdb_transactiondb_create_snapshot(self.inner) };
600        Snapshot {
601            db: self,
602            inner: snapshot,
603        }
604    }
605}
606
607pub struct Snapshot<'a> {
608    db: &'a TransactionDB,
609    inner: *const ffi::rocksdb_snapshot_t,
610}
611
612impl<'a> ConstHandle<ffi::rocksdb_snapshot_t> for Snapshot<'a> {
613    fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
614        self.inner
615    }
616}
617
618impl<'a> Read for Snapshot<'a> {}
619
620impl<'a> GetCF<ReadOptions> for Snapshot<'a> {
621    fn get_cf_full<K: AsRef<[u8]>>(
622        &self,
623        cf: Option<&ColumnFamily>,
624        key: K,
625        readopts: Option<&ReadOptions>,
626    ) -> Result<Option<DBVector>, Error> {
627        let mut ro = readopts.cloned().unwrap_or_default();
628        ro.set_snapshot(self);
629
630        self.db.get_cf_full(cf, key, Some(&ro))
631    }
632}
633
634impl<'a> MultiGet<ReadOptions> for Snapshot<'a> {
635    fn multi_get_full<K, I>(
636        &self,
637        keys: I,
638        readopts: Option<&ReadOptions>,
639    ) -> Vec<Result<Option<DBVector>, Error>>
640    where
641        K: AsRef<[u8]>,
642        I: IntoIterator<Item = K>,
643    {
644        let mut ro = readopts.cloned().unwrap_or_default();
645        ro.set_snapshot(self);
646
647        self.db.multi_get_full(keys, Some(&ro))
648    }
649}
650
651impl<'a> MultiGetCF<ReadOptions> for Snapshot<'a> {
652    fn multi_get_cf_full<'m, K, I>(
653        &self,
654        keys: I,
655        readopts: Option<&ReadOptions>,
656    ) -> Vec<Result<Option<DBVector>, Error>>
657    where
658        K: AsRef<[u8]>,
659        I: IntoIterator<Item = (&'m ColumnFamily, K)>,
660    {
661        let mut ro = readopts.cloned().unwrap_or_default();
662        ro.set_snapshot(self);
663
664        self.db.multi_get_cf_full(keys, Some(&ro))
665    }
666}
667
668impl<'a> Drop for Snapshot<'a> {
669    fn drop(&mut self) {
670        unsafe {
671            ffi::rocksdb_transactiondb_release_snapshot(self.db.inner, self.inner);
672        }
673    }
674}
675
676impl Iterate for Snapshot<'_> {
677    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
678        let mut ro = readopts.to_owned();
679        ro.set_snapshot(self);
680        self.db.get_raw_iter(&ro)
681    }
682}
683
684impl IterateCF for Snapshot<'_> {
685    fn get_raw_iter_cf<'a: 'b, 'b>(
686        &'a self,
687        cf_handle: &ColumnFamily,
688        readopts: &ReadOptions,
689    ) -> Result<DBRawIterator<'b>, Error> {
690        let mut ro = readopts.to_owned();
691        ro.set_snapshot(self);
692        self.db.get_raw_iter_cf(cf_handle, &ro)
693    }
694}
695
696impl WriteOps for TransactionDB {
697    fn write_full(
698        &self,
699        batch: &WriteBatch,
700        writeopts: Option<&WriteOptions>,
701    ) -> Result<(), Error> {
702        let mut default_writeopts = None;
703
704        let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
705
706        unsafe {
707            ffi_try!(ffi::rocksdb_transactiondb_write(
708                self.handle(),
709                wo_handle,
710                batch.handle(),
711            ));
712            Ok(())
713        }
714    }
715}