ckb_rocksdb/
transaction.rs

1use crate::ffi;
2use crate::{
3    ffi_util,
4    handle::{ConstHandle, Handle},
5    ops::*,
6    ColumnFamily, DBPinnableSlice, DBRawIterator, DBVector, Error, ReadOptions,
7};
8use libc::{c_char, c_uchar, c_void, size_t};
9use std::marker::PhantomData;
10use std::ptr;
11
12pub struct Transaction<'a, T> {
13    inner: *mut ffi::rocksdb_transaction_t,
14    db: PhantomData<&'a T>,
15}
16
17impl<'a, T> Transaction<'a, T> {
18    pub(crate) fn new(inner: *mut ffi::rocksdb_transaction_t) -> Transaction<'a, T> {
19        Transaction {
20            inner,
21            db: PhantomData,
22        }
23    }
24
25    /// commits a transaction
26    pub fn commit(&self) -> Result<(), Error> {
27        unsafe {
28            ffi_try!(ffi::rocksdb_transaction_commit(self.inner,));
29        }
30        Ok(())
31    }
32
33    /// Transaction rollback
34    pub fn rollback(&self) -> Result<(), Error> {
35        unsafe { ffi_try!(ffi::rocksdb_transaction_rollback(self.inner,)) }
36        Ok(())
37    }
38
39    /// Transaction rollback to savepoint
40    pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
41        unsafe { ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner,)) }
42        Ok(())
43    }
44
45    /// Set savepoint for transaction
46    pub fn set_savepoint(&self) {
47        unsafe { ffi::rocksdb_transaction_set_savepoint(self.inner) }
48    }
49
50    /// Get Snapshot
51    pub fn snapshot(&'a self) -> TransactionSnapshot<'a, T> {
52        unsafe {
53            let snapshot = ffi::rocksdb_transaction_get_snapshot(self.inner);
54            TransactionSnapshot {
55                inner: snapshot,
56                db: self,
57            }
58        }
59    }
60
61    /// Get For Update
62    /// ReadOptions: Default
63    /// exclusive: true
64    pub fn get_for_update<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBVector>, Error> {
65        let opt = ReadOptions::default();
66        self.get_for_update_opt(key, &opt, true)
67    }
68
69    /// Get For Update with custom ReadOptions and exclusive
70    pub fn get_for_update_opt<K: AsRef<[u8]>>(
71        &self,
72        key: K,
73        readopts: &ReadOptions,
74        exclusive: bool,
75    ) -> Result<Option<DBVector>, Error> {
76        let key = key.as_ref();
77        let key_ptr = key.as_ptr() as *const c_char;
78        let key_len = key.len() as size_t;
79        unsafe {
80            let mut val_len: size_t = 0;
81            let val = ffi_try!(ffi::rocksdb_transaction_get_for_update(
82                self.handle(),
83                readopts.handle(),
84                key_ptr,
85                key_len,
86                &mut val_len,
87                exclusive as c_uchar,
88            )) as *mut u8;
89
90            if val.is_null() {
91                Ok(None)
92            } else {
93                Ok(Some(DBVector::from_c(val, val_len)))
94            }
95        }
96    }
97
98    pub fn get_for_update_cf<K: AsRef<[u8]>>(
99        &self,
100        cf: &ColumnFamily,
101        key: K,
102    ) -> Result<Option<DBVector>, Error> {
103        let opt = ReadOptions::default();
104        self.get_for_update_cf_opt(cf, key, &opt, true)
105    }
106
107    pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
108        &self,
109        cf: &ColumnFamily,
110        key: K,
111        readopts: &ReadOptions,
112        exclusive: bool,
113    ) -> Result<Option<DBVector>, Error> {
114        let key = key.as_ref();
115        let key_ptr = key.as_ptr() as *const c_char;
116        let key_len = key.len() as size_t;
117        unsafe {
118            let mut val_len: size_t = 0;
119            let val = ffi_try!(ffi::rocksdb_transaction_get_for_update_cf(
120                self.handle(),
121                readopts.handle(),
122                cf.handle(),
123                key_ptr,
124                key_len,
125                &mut val_len,
126                exclusive as c_uchar,
127            )) as *mut u8;
128
129            if val.is_null() {
130                Ok(None)
131            } else {
132                Ok(Some(DBVector::from_c(val, val_len)))
133            }
134        }
135    }
136}
137
138impl<'a, T> Drop for Transaction<'a, T> {
139    fn drop(&mut self) {
140        unsafe {
141            ffi::rocksdb_transaction_destroy(self.inner);
142        }
143    }
144}
145
146impl<'a, T> Handle<ffi::rocksdb_transaction_t> for Transaction<'a, T> {
147    fn handle(&self) -> *mut ffi::rocksdb_transaction_t {
148        self.inner
149    }
150}
151
152impl<'a, T> Read for Transaction<'a, T> {}
153
154impl<'a, T> GetCF<ReadOptions> for Transaction<'a, T>
155where
156    Transaction<'a, T>: Handle<ffi::rocksdb_transaction_t> + Read,
157{
158    fn get_cf_full<K: AsRef<[u8]>>(
159        &self,
160        cf: Option<&ColumnFamily>,
161        key: K,
162        readopts: Option<&ReadOptions>,
163    ) -> Result<Option<DBVector>, Error> {
164        let mut default_readopts = None;
165
166        let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
167
168        let key = key.as_ref();
169        let key_ptr = key.as_ptr() as *const c_char;
170        let key_len = key.len() as size_t;
171
172        unsafe {
173            let mut val_len: size_t = 0;
174
175            let val = match cf {
176                Some(cf) => ffi_try!(ffi::rocksdb_transaction_get_cf(
177                    self.handle(),
178                    ro_handle,
179                    cf.inner,
180                    key_ptr,
181                    key_len,
182                    &mut val_len,
183                )),
184                None => ffi_try!(ffi::rocksdb_transaction_get(
185                    self.handle(),
186                    ro_handle,
187                    key_ptr,
188                    key_len,
189                    &mut val_len,
190                )),
191            } as *mut u8;
192
193            if val.is_null() {
194                Ok(None)
195            } else {
196                Ok(Some(DBVector::from_c(val, val_len)))
197            }
198        }
199    }
200}
201
202impl<'a, T> MultiGet<ReadOptions> for Transaction<'a, T>
203where
204    Transaction<'a, T>: Handle<ffi::rocksdb_transaction_t> + Read,
205{
206    fn multi_get_full<K, I>(
207        &self,
208        keys: I,
209        readopts: Option<&ReadOptions>,
210    ) -> Vec<Result<Option<DBVector>, Error>>
211    where
212        K: AsRef<[u8]>,
213        I: IntoIterator<Item = K>,
214    {
215        let mut default_readopts = None;
216        let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
217            Ok(ro) => ro,
218            Err(e) => {
219                let key_count = keys.into_iter().count();
220
221                return vec![e; key_count]
222                    .iter()
223                    .map(|e| Err(e.to_owned()))
224                    .collect();
225            }
226        };
227
228        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
229            .into_iter()
230            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
231            .unzip();
232        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
233
234        let mut values = vec![ptr::null_mut(); keys.len()];
235        let mut values_sizes = vec![0_usize; keys.len()];
236        let mut errors = vec![ptr::null_mut(); keys.len()];
237        unsafe {
238            ffi::rocksdb_transaction_multi_get(
239                self.inner,
240                ro_handle,
241                ptr_keys.len(),
242                ptr_keys.as_ptr(),
243                keys_sizes.as_ptr(),
244                values.as_mut_ptr(),
245                values_sizes.as_mut_ptr(),
246                errors.as_mut_ptr(),
247            );
248        }
249
250        convert_values(values, values_sizes, errors)
251    }
252}
253
254impl<'a, T> MultiGetCF<ReadOptions> for Transaction<'a, T>
255where
256    Transaction<'a, T>: Handle<ffi::rocksdb_transaction_t> + Read,
257{
258    fn multi_get_cf_full<'m, K, I>(
259        &self,
260        keys: I,
261        readopts: Option<&ReadOptions>,
262    ) -> Vec<Result<Option<DBVector>, Error>>
263    where
264        K: AsRef<[u8]>,
265        I: IntoIterator<Item = (&'m ColumnFamily, K)>,
266    {
267        let mut default_readopts = None;
268        let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
269            Ok(ro) => ro,
270            Err(e) => {
271                let key_count = keys.into_iter().count();
272
273                return vec![e; key_count]
274                    .iter()
275                    .map(|e| Err(e.to_owned()))
276                    .collect();
277            }
278        };
279        let (cfs_and_keys, keys_sizes): (Vec<CFAndKey>, Vec<_>) = keys
280            .into_iter()
281            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
282            .unzip();
283        let ptr_keys: Vec<_> = cfs_and_keys
284            .iter()
285            .map(|(_, k)| k.as_ptr() as *const c_char)
286            .collect();
287        let ptr_cfs: Vec<_> = cfs_and_keys
288            .iter()
289            .map(|(c, _)| c.inner as *const _)
290            .collect();
291
292        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
293        let mut values_sizes = vec![0_usize; ptr_keys.len()];
294        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
295        unsafe {
296            ffi::rocksdb_transaction_multi_get_cf(
297                self.inner,
298                ro_handle,
299                ptr_cfs.as_ptr(),
300                ptr_keys.len(),
301                ptr_keys.as_ptr(),
302                keys_sizes.as_ptr(),
303                values.as_mut_ptr(),
304                values_sizes.as_mut_ptr(),
305                errors.as_mut_ptr(),
306            );
307        }
308
309        convert_values(values, values_sizes, errors)
310    }
311}
312
313impl<T> Iterate for Transaction<'_, T> {
314    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
315        unsafe {
316            DBRawIterator {
317                inner: ffi::rocksdb_transaction_create_iterator(self.inner, readopts.handle()),
318                db: PhantomData,
319            }
320        }
321    }
322}
323
324impl<T> IterateCF for Transaction<'_, T> {
325    fn get_raw_iter_cf<'a: 'b, 'b>(
326        &'a self,
327        cf_handle: &ColumnFamily,
328        readopts: &ReadOptions,
329    ) -> Result<DBRawIterator<'b>, Error> {
330        unsafe {
331            Ok(DBRawIterator {
332                inner: ffi::rocksdb_transaction_create_iterator_cf(
333                    self.inner,
334                    readopts.handle(),
335                    cf_handle.inner,
336                ),
337                db: PhantomData,
338            })
339        }
340    }
341}
342
343pub struct TransactionSnapshot<'a, T> {
344    db: &'a Transaction<'a, T>,
345    inner: *const ffi::rocksdb_snapshot_t,
346}
347
348impl<'a, T> ConstHandle<ffi::rocksdb_snapshot_t> for TransactionSnapshot<'a, T> {
349    fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
350        self.inner
351    }
352}
353
354impl<'a, T> Read for TransactionSnapshot<'a, T> {}
355
356impl<'a, T> GetCF<ReadOptions> for TransactionSnapshot<'a, T>
357where
358    Transaction<'a, T>: GetCF<ReadOptions>,
359{
360    fn get_cf_full<K: AsRef<[u8]>>(
361        &self,
362        cf: Option<&ColumnFamily>,
363        key: K,
364        readopts: Option<&ReadOptions>,
365    ) -> Result<Option<DBVector>, Error> {
366        let mut ro = readopts.cloned().unwrap_or_default();
367        ro.set_snapshot(self);
368        self.db.get_cf_full(cf, key, Some(&ro))
369    }
370}
371
372impl<'a, T> MultiGet<ReadOptions> for TransactionSnapshot<'a, T>
373where
374    Transaction<'a, T>: MultiGet<ReadOptions>,
375{
376    fn multi_get_full<K, I>(
377        &self,
378        keys: I,
379        readopts: Option<&ReadOptions>,
380    ) -> Vec<Result<Option<DBVector>, Error>>
381    where
382        K: AsRef<[u8]>,
383        I: IntoIterator<Item = K>,
384    {
385        let mut ro = readopts.cloned().unwrap_or_default();
386        ro.set_snapshot(self);
387        self.db.multi_get_full(keys, Some(&ro))
388    }
389}
390
391impl<'a, T> MultiGetCF<ReadOptions> for TransactionSnapshot<'a, T>
392where
393    Transaction<'a, T>: MultiGet<ReadOptions>,
394{
395    fn multi_get_cf_full<'m, K, I>(
396        &self,
397        keys: I,
398        readopts: Option<&ReadOptions>,
399    ) -> Vec<Result<Option<DBVector>, Error>>
400    where
401        K: AsRef<[u8]>,
402        I: IntoIterator<Item = (&'m ColumnFamily, K)>,
403    {
404        let mut ro = readopts.cloned().unwrap_or_default();
405        ro.set_snapshot(self);
406        self.db.multi_get_cf_full(keys, Some(&ro))
407    }
408}
409
410impl<'a, T> PutCF<()> for Transaction<'a, T> {
411    fn put_cf_full<K, V>(
412        &self,
413        cf: Option<&ColumnFamily>,
414        key: K,
415        value: V,
416        _: Option<&()>,
417    ) -> Result<(), Error>
418    where
419        K: AsRef<[u8]>,
420        V: AsRef<[u8]>,
421    {
422        let key = key.as_ref();
423        let value = value.as_ref();
424        let key_ptr = key.as_ptr() as *const c_char;
425        let key_len = key.len() as size_t;
426        let val_ptr = value.as_ptr() as *const c_char;
427        let val_len = value.len() as size_t;
428
429        unsafe {
430            match cf {
431                Some(cf) => ffi_try!(ffi::rocksdb_transaction_put_cf(
432                    self.handle(),
433                    cf.handle(),
434                    key_ptr,
435                    key_len,
436                    val_ptr,
437                    val_len,
438                )),
439                None => ffi_try!(ffi::rocksdb_transaction_put(
440                    self.handle(),
441                    key_ptr,
442                    key_len,
443                    val_ptr,
444                    val_len,
445                )),
446            }
447
448            Ok(())
449        }
450    }
451}
452
453impl<'a, T> MergeCF<()> for Transaction<'a, T> {
454    fn merge_cf_full<K, V>(
455        &self,
456        cf: Option<&ColumnFamily>,
457        key: K,
458        value: V,
459        _: Option<&()>,
460    ) -> Result<(), Error>
461    where
462        K: AsRef<[u8]>,
463        V: AsRef<[u8]>,
464    {
465        let key = key.as_ref();
466        let value = value.as_ref();
467        let key_ptr = key.as_ptr() as *const c_char;
468        let key_len = key.len() as size_t;
469        let val_ptr = value.as_ptr() as *const c_char;
470        let val_len = value.len() as size_t;
471
472        unsafe {
473            match cf {
474                Some(cf) => ffi_try!(ffi::rocksdb_transaction_merge_cf(
475                    self.handle(),
476                    cf.handle(),
477                    key_ptr,
478                    key_len,
479                    val_ptr,
480                    val_len,
481                )),
482                None => ffi_try!(ffi::rocksdb_transaction_merge(
483                    self.handle(),
484                    key_ptr,
485                    key_len,
486                    val_ptr,
487                    val_len,
488                )),
489            }
490
491            Ok(())
492        }
493    }
494}
495
496impl<'a, T> DeleteCF<()> for Transaction<'a, T> {
497    fn delete_cf_full<K>(
498        &self,
499        cf: Option<&ColumnFamily>,
500        key: K,
501        _: Option<&()>,
502    ) -> Result<(), Error>
503    where
504        K: AsRef<[u8]>,
505    {
506        let key = key.as_ref();
507        let key_ptr = key.as_ptr() as *const c_char;
508        let key_len = key.len() as size_t;
509
510        unsafe {
511            match cf {
512                Some(cf) => ffi_try!(ffi::rocksdb_transaction_delete_cf(
513                    self.handle(),
514                    cf.inner,
515                    key_ptr,
516                    key_len,
517                )),
518                None => ffi_try!(ffi::rocksdb_transaction_delete(
519                    self.handle(),
520                    key_ptr,
521                    key_len,
522                )),
523            }
524
525            Ok(())
526        }
527    }
528}
529
530impl<'a, T> Drop for TransactionSnapshot<'a, T> {
531    fn drop(&mut self) {
532        unsafe {
533            ffi::rocksdb_free(self.inner as *mut c_void);
534        }
535    }
536}
537
538impl<T: Iterate> Iterate for TransactionSnapshot<'_, T> {
539    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
540        let mut readopts = readopts.to_owned();
541        readopts.set_snapshot(self);
542        self.db.get_raw_iter(&readopts)
543    }
544}
545
546impl<T: IterateCF> IterateCF for TransactionSnapshot<'_, T> {
547    fn get_raw_iter_cf<'a: 'b, 'b>(
548        &'a self,
549        cf_handle: &ColumnFamily,
550        readopts: &ReadOptions,
551    ) -> Result<DBRawIterator<'b>, Error> {
552        let mut readopts = readopts.to_owned();
553        readopts.set_snapshot(self);
554        self.db.get_raw_iter_cf(cf_handle, &readopts)
555    }
556}
557
558impl<'a, T> GetPinnedCF<'a> for Transaction<'a, T> {
559    type ColumnFamily = &'a ColumnFamily;
560    type ReadOptions = &'a ReadOptions;
561
562    fn get_pinned_cf_full<K: AsRef<[u8]>>(
563        &'a self,
564        cf: Option<Self::ColumnFamily>,
565        key: K,
566        readopts: Option<Self::ReadOptions>,
567    ) -> Result<Option<DBPinnableSlice<'a>>, Error> {
568        let mut default_readopts = None;
569
570        let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
571
572        let key = key.as_ref();
573        let key_ptr = key.as_ptr() as *const c_char;
574        let key_len = key.len() as size_t;
575
576        unsafe {
577            let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
578            let val = match cf {
579                Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
580                    self.handle(),
581                    ro_handle,
582                    cf.handle(),
583                    key_ptr,
584                    key_len,
585                    &mut err,
586                ),
587                None => ffi::rocksdb_transaction_get_pinned(
588                    self.handle(),
589                    ro_handle,
590                    key_ptr,
591                    key_len,
592                    &mut err,
593                ),
594            };
595
596            if !err.is_null() {
597                return Err(Error::new(ffi_util::error_message(err)));
598            }
599
600            if val.is_null() {
601                Ok(None)
602            } else {
603                Ok(Some(DBPinnableSlice::from_c(val)))
604            }
605        }
606    }
607}
608
609impl<'a, T> GetPinnedCF<'a> for TransactionSnapshot<'a, T> {
610    type ColumnFamily = &'a ColumnFamily;
611    type ReadOptions = &'a ReadOptions;
612
613    fn get_pinned_cf_full<K: AsRef<[u8]>>(
614        &'a self,
615        cf: Option<Self::ColumnFamily>,
616        key: K,
617        readopts: Option<Self::ReadOptions>,
618    ) -> ::std::result::Result<Option<DBPinnableSlice<'a>>, Error> {
619        let mut ro = readopts.cloned().unwrap_or_default();
620        ro.set_snapshot(self);
621
622        let key = key.as_ref();
623        let key_ptr = key.as_ptr() as *const c_char;
624        let key_len = key.len() as size_t;
625
626        unsafe {
627            let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
628            let val = match cf {
629                Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
630                    self.db.handle(),
631                    ro.handle(),
632                    cf.handle(),
633                    key_ptr,
634                    key_len,
635                    &mut err,
636                ),
637                None => ffi::rocksdb_transaction_get_pinned(
638                    self.db.handle(),
639                    ro.handle(),
640                    key_ptr,
641                    key_len,
642                    &mut err,
643                ),
644            };
645
646            if !err.is_null() {
647                return Err(Error::new(ffi_util::error_message(err)));
648            }
649
650            if val.is_null() {
651                Ok(None)
652            } else {
653                Ok(Some(DBPinnableSlice::from_c(val)))
654            }
655        }
656    }
657}