ckb_rocksdb/
optimistic_transaction.rs

1use crate::ffi;
2use crate::{
3    ffi_util,
4    handle::{ConstHandle, Handle},
5    ops::*,
6    ColumnFamily, DBPinnableSlice, DBRawIterator, DBVector, Error, ReadOptions,
7};
8use libc::{c_char, c_uchar, c_void, size_t};
9use std::marker::PhantomData;
10use std::ptr;
11
12pub struct OptimisticTransaction {
13    inner: *mut ffi::rocksdb_transaction_t,
14}
15
16unsafe impl Send for OptimisticTransaction {}
17unsafe impl Sync for OptimisticTransaction {}
18
19impl OptimisticTransaction {
20    pub(crate) fn new(inner: *mut ffi::rocksdb_transaction_t) -> OptimisticTransaction {
21        OptimisticTransaction { inner }
22    }
23
24    /// commits a transaction
25    pub fn commit(&self) -> Result<(), Error> {
26        unsafe {
27            ffi_try!(ffi::rocksdb_transaction_commit(self.inner,));
28        }
29        Ok(())
30    }
31
32    /// Transaction rollback
33    pub fn rollback(&self) -> Result<(), Error> {
34        unsafe { ffi_try!(ffi::rocksdb_transaction_rollback(self.inner,)) }
35        Ok(())
36    }
37
38    /// Transaction rollback to savepoint
39    pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
40        unsafe { ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner,)) }
41        Ok(())
42    }
43
44    /// Set savepoint for transaction
45    pub fn set_savepoint(&self) {
46        unsafe { ffi::rocksdb_transaction_set_savepoint(self.inner) }
47    }
48
49    /// Get Snapshot
50    pub fn snapshot(&self) -> OptimisticTransactionSnapshot<'_> {
51        unsafe {
52            let snapshot = ffi::rocksdb_transaction_get_snapshot(self.inner);
53            OptimisticTransactionSnapshot {
54                txn: self,
55                inner: snapshot,
56            }
57        }
58    }
59
60    /// Get For Update
61    /// ReadOptions: Default
62    /// exclusive: true
63    pub fn get_for_update<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBVector>, Error> {
64        let opt = ReadOptions::default();
65        self.get_for_update_opt(key, &opt, true)
66    }
67
68    /// Get For Update with custom ReadOptions and exclusive
69    pub fn get_for_update_opt<K: AsRef<[u8]>>(
70        &self,
71        key: K,
72        readopts: &ReadOptions,
73        exclusive: bool,
74    ) -> Result<Option<DBVector>, Error> {
75        let key = key.as_ref();
76        let key_ptr = key.as_ptr() as *const c_char;
77        let key_len = key.len() as size_t;
78        unsafe {
79            let mut val_len: size_t = 0;
80            let val = ffi_try!(ffi::rocksdb_transaction_get_for_update(
81                self.handle(),
82                readopts.handle(),
83                key_ptr,
84                key_len,
85                &mut val_len,
86                exclusive as c_uchar,
87            )) as *mut u8;
88
89            if val.is_null() {
90                Ok(None)
91            } else {
92                Ok(Some(DBVector::from_c(val, val_len)))
93            }
94        }
95    }
96
97    pub fn get_for_update_cf<K: AsRef<[u8]>>(
98        &self,
99        cf: &ColumnFamily,
100        key: K,
101    ) -> Result<Option<DBVector>, Error> {
102        let opt = ReadOptions::default();
103        self.get_for_update_cf_opt(cf, key, &opt, true)
104    }
105
106    pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
107        &self,
108        cf: &ColumnFamily,
109        key: K,
110        readopts: &ReadOptions,
111        exclusive: bool,
112    ) -> Result<Option<DBVector>, Error> {
113        let key = key.as_ref();
114        let key_ptr = key.as_ptr() as *const c_char;
115        let key_len = key.len() as size_t;
116        unsafe {
117            let mut val_len: size_t = 0;
118            let val = ffi_try!(ffi::rocksdb_transaction_get_for_update_cf(
119                self.handle(),
120                readopts.handle(),
121                cf.handle(),
122                key_ptr,
123                key_len,
124                &mut val_len,
125                exclusive as c_uchar,
126            )) as *mut u8;
127
128            if val.is_null() {
129                Ok(None)
130            } else {
131                Ok(Some(DBVector::from_c(val, val_len)))
132            }
133        }
134    }
135}
136
137impl Drop for OptimisticTransaction {
138    fn drop(&mut self) {
139        unsafe {
140            ffi::rocksdb_transaction_destroy(self.inner);
141        }
142    }
143}
144
145impl Handle<ffi::rocksdb_transaction_t> for OptimisticTransaction {
146    fn handle(&self) -> *mut ffi::rocksdb_transaction_t {
147        self.inner
148    }
149}
150
151impl Read for OptimisticTransaction {}
152
153impl GetCF<ReadOptions> for OptimisticTransaction {
154    fn get_cf_full<K: AsRef<[u8]>>(
155        &self,
156        cf: Option<&ColumnFamily>,
157        key: K,
158        readopts: Option<&ReadOptions>,
159    ) -> Result<Option<DBVector>, Error> {
160        let mut default_readopts = None;
161
162        let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
163
164        let key = key.as_ref();
165        let key_ptr = key.as_ptr() as *const c_char;
166        let key_len = key.len() as size_t;
167
168        unsafe {
169            let mut val_len: size_t = 0;
170
171            let val = match cf {
172                Some(cf) => ffi_try!(ffi::rocksdb_transaction_get_cf(
173                    self.handle(),
174                    ro_handle,
175                    cf.inner,
176                    key_ptr,
177                    key_len,
178                    &mut val_len,
179                )),
180                None => ffi_try!(ffi::rocksdb_transaction_get(
181                    self.handle(),
182                    ro_handle,
183                    key_ptr,
184                    key_len,
185                    &mut val_len,
186                )),
187            } as *mut u8;
188
189            if val.is_null() {
190                Ok(None)
191            } else {
192                Ok(Some(DBVector::from_c(val, val_len)))
193            }
194        }
195    }
196}
197
198impl MultiGet<ReadOptions> for OptimisticTransaction {
199    fn multi_get_full<K, I>(
200        &self,
201        keys: I,
202        readopts: Option<&ReadOptions>,
203    ) -> Vec<Result<Option<DBVector>, Error>>
204    where
205        K: AsRef<[u8]>,
206        I: IntoIterator<Item = K>,
207    {
208        let mut default_readopts = None;
209        let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
210            Ok(ro) => ro,
211            Err(e) => {
212                let key_count = keys.into_iter().count();
213
214                return vec![e; key_count]
215                    .iter()
216                    .map(|e| Err(e.to_owned()))
217                    .collect();
218            }
219        };
220
221        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
222            .into_iter()
223            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
224            .unzip();
225        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
226
227        let mut values = vec![ptr::null_mut(); keys.len()];
228        let mut values_sizes = vec![0_usize; keys.len()];
229        let mut errors = vec![ptr::null_mut(); keys.len()];
230        unsafe {
231            ffi::rocksdb_transaction_multi_get(
232                self.inner,
233                ro_handle,
234                ptr_keys.len(),
235                ptr_keys.as_ptr(),
236                keys_sizes.as_ptr(),
237                values.as_mut_ptr(),
238                values_sizes.as_mut_ptr(),
239                errors.as_mut_ptr(),
240            );
241        }
242
243        convert_values(values, values_sizes, errors)
244    }
245}
246
247impl MultiGetCF<ReadOptions> for OptimisticTransaction {
248    fn multi_get_cf_full<'a, K, I>(
249        &self,
250        keys: I,
251        readopts: Option<&ReadOptions>,
252    ) -> Vec<Result<Option<DBVector>, Error>>
253    where
254        K: AsRef<[u8]>,
255        I: IntoIterator<Item = (&'a ColumnFamily, K)>,
256    {
257        let mut default_readopts = None;
258        let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
259            Ok(ro) => ro,
260            Err(e) => {
261                let key_count = keys.into_iter().count();
262
263                return vec![e; key_count]
264                    .iter()
265                    .map(|e| Err(e.to_owned()))
266                    .collect();
267            }
268        };
269
270        let (cfs_and_keys, keys_sizes): (Vec<CFAndKey>, Vec<_>) = keys
271            .into_iter()
272            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
273            .unzip();
274        let ptr_keys: Vec<_> = cfs_and_keys
275            .iter()
276            .map(|(_, k)| k.as_ptr() as *const c_char)
277            .collect();
278        let ptr_cfs: Vec<_> = cfs_and_keys
279            .iter()
280            .map(|(c, _)| c.inner as *const _)
281            .collect();
282
283        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
284        let mut values_sizes = vec![0_usize; ptr_keys.len()];
285        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
286        unsafe {
287            ffi::rocksdb_transaction_multi_get_cf(
288                self.inner,
289                ro_handle,
290                ptr_cfs.as_ptr(),
291                ptr_keys.len(),
292                ptr_keys.as_ptr(),
293                keys_sizes.as_ptr(),
294                values.as_mut_ptr(),
295                values_sizes.as_mut_ptr(),
296                errors.as_mut_ptr(),
297            );
298        }
299
300        convert_values(values, values_sizes, errors)
301    }
302}
303
304impl Iterate for OptimisticTransaction {
305    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
306        unsafe {
307            DBRawIterator {
308                inner: ffi::rocksdb_transaction_create_iterator(self.inner, readopts.handle()),
309                db: PhantomData,
310            }
311        }
312    }
313}
314
315impl IterateCF for OptimisticTransaction {
316    fn get_raw_iter_cf<'a: 'b, 'b>(
317        &'a self,
318        cf_handle: &ColumnFamily,
319        readopts: &ReadOptions,
320    ) -> Result<DBRawIterator<'b>, Error> {
321        unsafe {
322            Ok(DBRawIterator {
323                inner: ffi::rocksdb_transaction_create_iterator_cf(
324                    self.inner,
325                    readopts.handle(),
326                    cf_handle.inner,
327                ),
328                db: PhantomData,
329            })
330        }
331    }
332}
333
334impl PutCF<()> for OptimisticTransaction {
335    fn put_cf_full<K, V>(
336        &self,
337        cf: Option<&ColumnFamily>,
338        key: K,
339        value: V,
340        _: Option<&()>,
341    ) -> Result<(), Error>
342    where
343        K: AsRef<[u8]>,
344        V: AsRef<[u8]>,
345    {
346        let key = key.as_ref();
347        let value = value.as_ref();
348        let key_ptr = key.as_ptr() as *const c_char;
349        let key_len = key.len() as size_t;
350        let val_ptr = value.as_ptr() as *const c_char;
351        let val_len = value.len() as size_t;
352
353        unsafe {
354            match cf {
355                Some(cf) => ffi_try!(ffi::rocksdb_transaction_put_cf(
356                    self.handle(),
357                    cf.handle(),
358                    key_ptr,
359                    key_len,
360                    val_ptr,
361                    val_len,
362                )),
363                None => ffi_try!(ffi::rocksdb_transaction_put(
364                    self.handle(),
365                    key_ptr,
366                    key_len,
367                    val_ptr,
368                    val_len,
369                )),
370            }
371
372            Ok(())
373        }
374    }
375}
376
377impl MergeCF<()> for OptimisticTransaction {
378    fn merge_cf_full<K, V>(
379        &self,
380        cf: Option<&ColumnFamily>,
381        key: K,
382        value: V,
383        _: Option<&()>,
384    ) -> Result<(), Error>
385    where
386        K: AsRef<[u8]>,
387        V: AsRef<[u8]>,
388    {
389        let key = key.as_ref();
390        let value = value.as_ref();
391        let key_ptr = key.as_ptr() as *const c_char;
392        let key_len = key.len() as size_t;
393        let val_ptr = value.as_ptr() as *const c_char;
394        let val_len = value.len() as size_t;
395
396        unsafe {
397            match cf {
398                Some(cf) => ffi_try!(ffi::rocksdb_transaction_merge_cf(
399                    self.handle(),
400                    cf.handle(),
401                    key_ptr,
402                    key_len,
403                    val_ptr,
404                    val_len,
405                )),
406                None => ffi_try!(ffi::rocksdb_transaction_merge(
407                    self.handle(),
408                    key_ptr,
409                    key_len,
410                    val_ptr,
411                    val_len,
412                )),
413            }
414
415            Ok(())
416        }
417    }
418}
419
420impl DeleteCF<()> for OptimisticTransaction {
421    fn delete_cf_full<K>(
422        &self,
423        cf: Option<&ColumnFamily>,
424        key: K,
425        _: Option<&()>,
426    ) -> Result<(), Error>
427    where
428        K: AsRef<[u8]>,
429    {
430        let key = key.as_ref();
431        let key_ptr = key.as_ptr() as *const c_char;
432        let key_len = key.len() as size_t;
433
434        unsafe {
435            match cf {
436                Some(cf) => ffi_try!(ffi::rocksdb_transaction_delete_cf(
437                    self.handle(),
438                    cf.inner,
439                    key_ptr,
440                    key_len,
441                )),
442                None => ffi_try!(ffi::rocksdb_transaction_delete(
443                    self.handle(),
444                    key_ptr,
445                    key_len,
446                )),
447            }
448
449            Ok(())
450        }
451    }
452}
453
454pub struct OptimisticTransactionSnapshot<'a> {
455    txn: &'a OptimisticTransaction,
456    inner: *const ffi::rocksdb_snapshot_t,
457}
458
459unsafe impl<'a> Send for OptimisticTransactionSnapshot<'a> {}
460unsafe impl<'a> Sync for OptimisticTransactionSnapshot<'a> {}
461
462impl<'a> ConstHandle<ffi::rocksdb_snapshot_t> for OptimisticTransactionSnapshot<'a> {
463    fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
464        self.inner
465    }
466}
467
468impl<'a> Read for OptimisticTransactionSnapshot<'a> {}
469
470impl<'a> GetCF<ReadOptions> for OptimisticTransactionSnapshot<'a> {
471    fn get_cf_full<K: AsRef<[u8]>>(
472        &self,
473        cf: Option<&ColumnFamily>,
474        key: K,
475        readopts: Option<&ReadOptions>,
476    ) -> Result<Option<DBVector>, Error> {
477        let mut ro = readopts.cloned().unwrap_or_default();
478        ro.set_snapshot(self);
479        self.txn.get_cf_full(cf, key, Some(&ro))
480    }
481}
482
483impl<'a> MultiGet<ReadOptions> for OptimisticTransactionSnapshot<'a> {
484    fn multi_get_full<K, I>(
485        &self,
486        keys: I,
487        readopts: Option<&ReadOptions>,
488    ) -> Vec<Result<Option<DBVector>, Error>>
489    where
490        K: AsRef<[u8]>,
491        I: IntoIterator<Item = K>,
492    {
493        let mut ro = readopts.cloned().unwrap_or_default();
494        ro.set_snapshot(self);
495        self.txn.multi_get_full(keys, Some(&ro))
496    }
497}
498
499impl<'a> MultiGetCF<ReadOptions> for OptimisticTransactionSnapshot<'a> {
500    fn multi_get_cf_full<'m, K, I>(
501        &self,
502        keys: I,
503        readopts: Option<&ReadOptions>,
504    ) -> Vec<Result<Option<DBVector>, Error>>
505    where
506        K: AsRef<[u8]>,
507        I: IntoIterator<Item = (&'m ColumnFamily, K)>,
508    {
509        let mut ro = readopts.cloned().unwrap_or_default();
510        ro.set_snapshot(self);
511        self.txn.multi_get_cf_full(keys, Some(&ro))
512    }
513}
514
515impl<'a> Drop for OptimisticTransactionSnapshot<'a> {
516    fn drop(&mut self) {
517        unsafe {
518            ffi::rocksdb_free(self.inner as *mut c_void);
519        }
520    }
521}
522
523impl Iterate for OptimisticTransactionSnapshot<'_> {
524    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
525        let mut readopts = readopts.to_owned();
526        readopts.set_snapshot(self);
527        self.txn.get_raw_iter(&readopts)
528    }
529}
530
531impl IterateCF for OptimisticTransactionSnapshot<'_> {
532    fn get_raw_iter_cf<'a: 'b, 'b>(
533        &'a self,
534        cf_handle: &ColumnFamily,
535        readopts: &ReadOptions,
536    ) -> Result<DBRawIterator<'b>, Error> {
537        let mut readopts = readopts.to_owned();
538        readopts.set_snapshot(self);
539        self.txn.get_raw_iter_cf(cf_handle, &readopts)
540    }
541}
542
543impl<'a> GetPinnedCF<'a> for OptimisticTransaction {
544    type ColumnFamily = &'a ColumnFamily;
545    type ReadOptions = &'a ReadOptions;
546
547    fn get_pinned_cf_full<K: AsRef<[u8]>>(
548        &'a self,
549        cf: Option<Self::ColumnFamily>,
550        key: K,
551        readopts: Option<Self::ReadOptions>,
552    ) -> Result<Option<DBPinnableSlice<'a>>, Error> {
553        let mut default_readopts = None;
554
555        let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
556
557        let key = key.as_ref();
558        let key_ptr = key.as_ptr() as *const c_char;
559        let key_len = key.len() as size_t;
560
561        unsafe {
562            let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
563            let val = match cf {
564                Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
565                    self.handle(),
566                    ro_handle,
567                    cf.handle(),
568                    key_ptr,
569                    key_len,
570                    &mut err,
571                ),
572                None => ffi::rocksdb_transaction_get_pinned(
573                    self.handle(),
574                    ro_handle,
575                    key_ptr,
576                    key_len,
577                    &mut err,
578                ),
579            };
580
581            if !err.is_null() {
582                return Err(Error::new(ffi_util::error_message(err)));
583            }
584
585            if val.is_null() {
586                Ok(None)
587            } else {
588                Ok(Some(DBPinnableSlice::from_c(val)))
589            }
590        }
591    }
592}
593
594impl<'a> GetPinnedCF<'a> for OptimisticTransactionSnapshot<'a> {
595    type ColumnFamily = &'a ColumnFamily;
596    type ReadOptions = &'a ReadOptions;
597
598    fn get_pinned_cf_full<K: AsRef<[u8]>>(
599        &'a self,
600        cf: Option<Self::ColumnFamily>,
601        key: K,
602        readopts: Option<Self::ReadOptions>,
603    ) -> ::std::result::Result<Option<DBPinnableSlice<'a>>, Error> {
604        let mut ro = readopts.cloned().unwrap_or_default();
605        ro.set_snapshot(self);
606
607        let key = key.as_ref();
608        let key_ptr = key.as_ptr() as *const c_char;
609        let key_len = key.len() as size_t;
610
611        unsafe {
612            let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
613            let val = match cf {
614                Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
615                    self.txn.handle(),
616                    ro.handle(),
617                    cf.handle(),
618                    key_ptr,
619                    key_len,
620                    &mut err,
621                ),
622                None => ffi::rocksdb_transaction_get_pinned(
623                    self.txn.handle(),
624                    ro.handle(),
625                    key_ptr,
626                    key_len,
627                    &mut err,
628                ),
629            };
630
631            if !err.is_null() {
632                return Err(Error::new(ffi_util::error_message(err)));
633            }
634
635            if val.is_null() {
636                Ok(None)
637            } else {
638                Ok(Some(DBPinnableSlice::from_c(val)))
639            }
640        }
641    }
642}