1use crate::{
2 db_options::OptionsMustOutliveDB,
3 db_vector::DBVector,
4 ffi_util::to_cstring,
5 handle::{ConstHandle, Handle},
6 open_raw::{OpenRaw, OpenRawFFI},
7 ops::*,
8 write_batch::WriteBatch,
9 ColumnFamily, DBRawIterator, Error, Options, ReadOptions, Transaction, WriteOptions,
10};
11
12use crate::ffi;
13use libc::{c_char, c_uchar, size_t};
14use std::collections::BTreeMap;
15use std::marker::PhantomData;
16use std::path::Path;
17use std::path::PathBuf;
18use std::ptr;
19
20pub struct TransactionDB {
22 inner: *mut ffi::rocksdb_transactiondb_t,
23 path: PathBuf,
24 cfs: BTreeMap<String, ColumnFamily>,
25 _outlive: Vec<OptionsMustOutliveDB>,
26}
27
28impl TransactionDB {
29 pub fn path(&self) -> &Path {
30 self.path.as_path()
31 }
32}
33
34impl Handle<ffi::rocksdb_transactiondb_t> for TransactionDB {
35 fn handle(&self) -> *mut ffi::rocksdb_transactiondb_t {
36 self.inner
37 }
38}
39
40impl Open for TransactionDB {}
41
42impl OpenCF for TransactionDB {}
43
44impl OpenRaw for TransactionDB {
45 type Pointer = ffi::rocksdb_transactiondb_t;
46 type Descriptor = TransactionDBOptions;
47
48 fn open_ffi(input: OpenRawFFI<'_, Self::Descriptor>) -> Result<*mut Self::Pointer, Error> {
49 let pointer = unsafe {
50 if input.num_column_families <= 0 {
51 ffi_try!(ffi::rocksdb_transactiondb_open(
52 input.options,
53 input.open_descriptor.inner,
54 input.path,
55 ))
56 } else {
57 ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
58 input.options,
59 input.open_descriptor.inner,
60 input.path,
61 input.num_column_families,
62 input.column_family_names,
63 input.column_family_options,
64 input.column_family_handles,
65 ))
66 }
67 };
68
69 Ok(pointer)
70 }
71
72 fn build<I>(
73 path: PathBuf,
74 _open_descriptor: Self::Descriptor,
75 pointer: *mut Self::Pointer,
76 column_families: I,
77 outlive: Vec<OptionsMustOutliveDB>,
78 ) -> Result<Self, Error>
79 where
80 I: IntoIterator<Item = (String, *mut ffi::rocksdb_column_family_handle_t)>,
81 {
82 let cfs: BTreeMap<_, _> = column_families
83 .into_iter()
84 .map(|(k, h)| (k, ColumnFamily::new(h)))
85 .collect();
86 Ok(TransactionDB {
87 inner: pointer,
88 path,
89 cfs,
90 _outlive: outlive,
91 })
92 }
93}
94
95impl GetColumnFamilys for TransactionDB {
96 fn get_cfs(&self) -> &BTreeMap<String, ColumnFamily> {
97 &self.cfs
98 }
99 fn get_mut_cfs(&mut self) -> &mut BTreeMap<String, ColumnFamily> {
100 &mut self.cfs
101 }
102}
103
104impl Read for TransactionDB {}
105impl Write for TransactionDB {}
106
107unsafe impl Send for TransactionDB {}
108unsafe impl Sync for TransactionDB {}
109
110impl TransactionBegin for TransactionDB {
111 type WriteOptions = WriteOptions;
112 type TransactionOptions = TransactionOptions;
113 fn transaction(
114 &self,
115 write_options: &WriteOptions,
116 tx_options: &TransactionOptions,
117 ) -> Transaction<'_, TransactionDB> {
118 unsafe {
119 let inner = ffi::rocksdb_transaction_begin(
120 self.inner,
121 write_options.handle(),
122 tx_options.inner,
123 ptr::null_mut(),
124 );
125 Transaction::new(inner)
126 }
127 }
128}
129
130impl Iterate for TransactionDB {
131 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
132 unsafe {
133 DBRawIterator {
134 inner: ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.handle()),
135 db: PhantomData,
136 }
137 }
138 }
139}
140
141impl IterateCF for TransactionDB {
142 fn get_raw_iter_cf<'a: 'b, 'b>(
143 &'a self,
144 cf_handle: &ColumnFamily,
145 readopts: &ReadOptions,
146 ) -> Result<DBRawIterator<'b>, Error> {
147 unsafe {
148 Ok(DBRawIterator {
149 inner: ffi::rocksdb_transactiondb_create_iterator_cf(
150 self.inner,
151 readopts.handle(),
152 cf_handle.handle(),
153 ),
154 db: PhantomData,
155 })
156 }
157 }
158}
159
160impl Drop for TransactionDB {
161 fn drop(&mut self) {
162 unsafe {
163 ffi::rocksdb_transactiondb_close(self.inner);
164 }
165 }
166}
167
168pub struct TransactionDBOptions {
169 inner: *mut ffi::rocksdb_transactiondb_options_t,
170}
171
172impl TransactionDBOptions {
173 pub fn new() -> TransactionDBOptions {
175 unsafe {
176 let inner = ffi::rocksdb_transactiondb_options_create();
177 TransactionDBOptions { inner }
178 }
179 }
180
181 pub fn set_default_lock_timeout(&self, default_lock_timeout: i64) {
182 unsafe {
183 ffi::rocksdb_transactiondb_options_set_default_lock_timeout(
184 self.inner,
185 default_lock_timeout,
186 )
187 }
188 }
189
190 pub fn set_max_num_locks(&self, max_num_locks: i64) {
191 unsafe { ffi::rocksdb_transactiondb_options_set_max_num_locks(self.inner, max_num_locks) }
192 }
193
194 pub fn set_num_stripes(&self, num_stripes: usize) {
195 unsafe { ffi::rocksdb_transactiondb_options_set_num_stripes(self.inner, num_stripes) }
196 }
197
198 pub fn set_transaction_lock_timeout(&self, txn_lock_timeout: i64) {
199 unsafe {
200 ffi::rocksdb_transactiondb_options_set_transaction_lock_timeout(
201 self.inner,
202 txn_lock_timeout,
203 )
204 }
205 }
206}
207
208impl Drop for TransactionDBOptions {
209 fn drop(&mut self) {
210 unsafe {
211 ffi::rocksdb_transactiondb_options_destroy(self.inner);
212 }
213 }
214}
215
216impl Default for TransactionDBOptions {
217 fn default() -> TransactionDBOptions {
218 TransactionDBOptions::new()
219 }
220}
221
222pub struct TransactionOptions {
223 inner: *mut ffi::rocksdb_transaction_options_t,
224}
225
226impl TransactionOptions {
227 pub fn new() -> TransactionOptions {
229 unsafe {
230 let inner = ffi::rocksdb_transaction_options_create();
231 TransactionOptions { inner }
232 }
233 }
234
235 pub fn set_deadlock_detect(&self, deadlock_detect: bool) {
236 unsafe {
237 ffi::rocksdb_transaction_options_set_deadlock_detect(
238 self.inner,
239 deadlock_detect as c_uchar,
240 )
241 }
242 }
243
244 pub fn set_deadlock_detect_depth(&self, depth: i64) {
245 unsafe { ffi::rocksdb_transaction_options_set_deadlock_detect_depth(self.inner, depth) }
246 }
247
248 pub fn set_expiration(&self, expiration: i64) {
249 unsafe { ffi::rocksdb_transaction_options_set_expiration(self.inner, expiration) }
250 }
251
252 pub fn set_lock_timeout(&self, lock_timeout: i64) {
253 unsafe { ffi::rocksdb_transaction_options_set_lock_timeout(self.inner, lock_timeout) }
254 }
255
256 pub fn set_max_write_batch_size(&self, size: usize) {
257 unsafe { ffi::rocksdb_transaction_options_set_max_write_batch_size(self.inner, size) }
258 }
259
260 pub fn set_snapshot(&mut self, set_snapshot: bool) {
261 unsafe {
262 ffi::rocksdb_transaction_options_set_set_snapshot(self.inner, set_snapshot as c_uchar);
263 }
264 }
265}
266
267impl Drop for TransactionOptions {
268 fn drop(&mut self) {
269 unsafe {
270 ffi::rocksdb_transaction_options_destroy(self.inner);
271 }
272 }
273}
274
275impl Default for TransactionOptions {
276 fn default() -> TransactionOptions {
277 TransactionOptions::new()
278 }
279}
280
281impl CreateCheckpointObject for TransactionDB {
282 unsafe fn create_checkpoint_object_raw(&self) -> Result<*mut ffi::rocksdb_checkpoint_t, Error> {
283 Ok(ffi_try!(
284 ffi::rocksdb_transactiondb_checkpoint_object_create(self.inner,)
285 ))
286 }
287}
288
289impl GetCF<ReadOptions> for TransactionDB {
290 fn get_cf_full<K: AsRef<[u8]>>(
291 &self,
292 cf: Option<&ColumnFamily>,
293 key: K,
294 readopts: Option<&ReadOptions>,
295 ) -> Result<Option<DBVector>, Error> {
296 let mut default_readopts = None;
297
298 let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
299
300 let key = key.as_ref();
301 let key_ptr = key.as_ptr() as *const c_char;
302 let key_len = key.len() as size_t;
303
304 unsafe {
305 let mut val_len: size_t = 0;
306
307 let val = match cf {
308 Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_get_cf(
309 self.handle(),
310 ro_handle,
311 cf.handle(),
312 key_ptr,
313 key_len,
314 &mut val_len,
315 )),
316 None => ffi_try!(ffi::rocksdb_transactiondb_get(
317 self.handle(),
318 ro_handle,
319 key_ptr,
320 key_len,
321 &mut val_len,
322 )),
323 } as *mut u8;
324
325 if val.is_null() {
326 Ok(None)
327 } else {
328 Ok(Some(DBVector::from_c(val, val_len)))
329 }
330 }
331 }
332}
333
334impl MultiGet<ReadOptions> for TransactionDB {
335 fn multi_get_full<K, I>(
336 &self,
337 keys: I,
338 readopts: Option<&ReadOptions>,
339 ) -> Vec<Result<Option<DBVector>, Error>>
340 where
341 K: AsRef<[u8]>,
342 I: IntoIterator<Item = K>,
343 {
344 let mut default_readopts = None;
345 let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
346 Ok(ro) => ro,
347 Err(e) => {
348 let key_count = keys.into_iter().count();
349
350 return vec![e; key_count]
351 .iter()
352 .map(|e| Err(e.to_owned()))
353 .collect();
354 }
355 };
356
357 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
358 .into_iter()
359 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
360 .unzip();
361 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
362
363 let mut values = vec![ptr::null_mut(); keys.len()];
364 let mut values_sizes = vec![0_usize; keys.len()];
365 let mut errors = vec![ptr::null_mut(); keys.len()];
366 unsafe {
367 ffi::rocksdb_transactiondb_multi_get(
368 self.inner,
369 ro_handle,
370 ptr_keys.len(),
371 ptr_keys.as_ptr(),
372 keys_sizes.as_ptr(),
373 values.as_mut_ptr(),
374 values_sizes.as_mut_ptr(),
375 errors.as_mut_ptr(),
376 );
377 }
378
379 convert_values(values, values_sizes, errors)
380 }
381}
382
383impl MultiGetCF<ReadOptions> for TransactionDB {
384 fn multi_get_cf_full<'a, K, I>(
385 &self,
386 keys: I,
387 readopts: Option<&ReadOptions>,
388 ) -> Vec<Result<Option<DBVector>, Error>>
389 where
390 K: AsRef<[u8]>,
391 I: IntoIterator<Item = (&'a ColumnFamily, K)>,
392 {
393 let mut default_readopts = None;
394 let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
395 Ok(ro) => ro,
396 Err(e) => {
397 let key_count = keys.into_iter().count();
398
399 return vec![e; key_count]
400 .iter()
401 .map(|e| Err(e.to_owned()))
402 .collect();
403 }
404 };
405 let (cfs_and_keys, keys_sizes): (Vec<CFAndKey>, Vec<_>) = keys
406 .into_iter()
407 .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
408 .unzip();
409 let ptr_keys: Vec<_> = cfs_and_keys
410 .iter()
411 .map(|(_, k)| k.as_ptr() as *const c_char)
412 .collect();
413 let ptr_cfs: Vec<_> = cfs_and_keys
414 .iter()
415 .map(|(c, _)| c.inner as *const _)
416 .collect();
417
418 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
419 let mut values_sizes = vec![0_usize; ptr_keys.len()];
420 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
421 unsafe {
422 ffi::rocksdb_transactiondb_multi_get_cf(
423 self.inner,
424 ro_handle,
425 ptr_cfs.as_ptr(),
426 ptr_keys.len(),
427 ptr_keys.as_ptr(),
428 keys_sizes.as_ptr(),
429 values.as_mut_ptr(),
430 values_sizes.as_mut_ptr(),
431 errors.as_mut_ptr(),
432 );
433 }
434
435 convert_values(values, values_sizes, errors)
436 }
437}
438
439impl PutCF<WriteOptions> for TransactionDB {
440 fn put_cf_full<K, V>(
441 &self,
442 cf: Option<&ColumnFamily>,
443 key: K,
444 value: V,
445 writeopts: Option<&WriteOptions>,
446 ) -> Result<(), Error>
447 where
448 K: AsRef<[u8]>,
449 V: AsRef<[u8]>,
450 {
451 let mut default_writeopts = None;
452
453 let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
454
455 let key = key.as_ref();
456 let value = value.as_ref();
457 let key_ptr = key.as_ptr() as *const c_char;
458 let key_len = key.len() as size_t;
459 let val_ptr = value.as_ptr() as *const c_char;
460 let val_len = value.len() as size_t;
461
462 unsafe {
463 match cf {
464 Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_put_cf(
465 self.handle(),
466 wo_handle,
467 cf.handle(),
468 key_ptr,
469 key_len,
470 val_ptr,
471 val_len,
472 )),
473 None => ffi_try!(ffi::rocksdb_transactiondb_put(
474 self.handle(),
475 wo_handle,
476 key_ptr,
477 key_len,
478 val_ptr,
479 val_len,
480 )),
481 }
482
483 Ok(())
484 }
485 }
486}
487
488impl DeleteCF<WriteOptions> for TransactionDB {
489 fn delete_cf_full<K>(
490 &self,
491 cf: Option<&ColumnFamily>,
492 key: K,
493 writeopts: Option<&WriteOptions>,
494 ) -> Result<(), Error>
495 where
496 K: AsRef<[u8]>,
497 {
498 let mut default_writeopts = None;
499
500 let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
501
502 let key = key.as_ref();
503 let key_ptr = key.as_ptr() as *const c_char;
504 let key_len = key.len() as size_t;
505
506 unsafe {
507 match cf {
508 Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
509 self.handle(),
510 wo_handle,
511 cf.handle(),
512 key_ptr,
513 key_len,
514 )),
515 None => ffi_try!(ffi::rocksdb_transactiondb_delete(
516 self.handle(),
517 wo_handle,
518 key_ptr,
519 key_len,
520 )),
521 }
522
523 Ok(())
524 }
525 }
526}
527
528impl MergeCF<WriteOptions> for TransactionDB {
529 fn merge_cf_full<K, V>(
530 &self,
531 cf: Option<&ColumnFamily>,
532 key: K,
533 value: V,
534 writeopts: Option<&WriteOptions>,
535 ) -> Result<(), Error>
536 where
537 K: AsRef<[u8]>,
538 V: AsRef<[u8]>,
539 {
540 let mut default_writeopts = None;
541
542 let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
543
544 let key = key.as_ref();
545 let value = value.as_ref();
546 let key_ptr = key.as_ptr() as *const c_char;
547 let key_len = key.len() as size_t;
548 let val_ptr = value.as_ptr() as *const c_char;
549 let val_len = value.len() as size_t;
550
551 unsafe {
552 match cf {
553 Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
554 self.handle(),
555 wo_handle,
556 cf.handle(),
557 key_ptr,
558 key_len,
559 val_ptr,
560 val_len,
561 )),
562 None => ffi_try!(ffi::rocksdb_transactiondb_merge(
563 self.handle(),
564 wo_handle,
565 key_ptr,
566 key_len,
567 val_ptr,
568 val_len,
569 )),
570 }
571
572 Ok(())
573 }
574 }
575}
576
577impl CreateCF for TransactionDB {
578 fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
579 let cname = to_cstring(
580 name.as_ref(),
581 "Failed to convert path to CString when opening rocksdb",
582 )?;
583 unsafe {
584 let cf_handle = ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
585 self.handle(),
586 opts.const_handle(),
587 cname.as_ptr(),
588 ));
589
590 self.get_mut_cfs()
591 .insert(name.as_ref().to_string(), ColumnFamily::new(cf_handle));
592 };
593 Ok(())
594 }
595}
596
597impl TransactionDB {
598 pub fn snapshot(&self) -> Snapshot<'_> {
599 let snapshot = unsafe { ffi::rocksdb_transactiondb_create_snapshot(self.inner) };
600 Snapshot {
601 db: self,
602 inner: snapshot,
603 }
604 }
605}
606
607pub struct Snapshot<'a> {
608 db: &'a TransactionDB,
609 inner: *const ffi::rocksdb_snapshot_t,
610}
611
612impl<'a> ConstHandle<ffi::rocksdb_snapshot_t> for Snapshot<'a> {
613 fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
614 self.inner
615 }
616}
617
618impl<'a> Read for Snapshot<'a> {}
619
620impl<'a> GetCF<ReadOptions> for Snapshot<'a> {
621 fn get_cf_full<K: AsRef<[u8]>>(
622 &self,
623 cf: Option<&ColumnFamily>,
624 key: K,
625 readopts: Option<&ReadOptions>,
626 ) -> Result<Option<DBVector>, Error> {
627 let mut ro = readopts.cloned().unwrap_or_default();
628 ro.set_snapshot(self);
629
630 self.db.get_cf_full(cf, key, Some(&ro))
631 }
632}
633
634impl<'a> MultiGet<ReadOptions> for Snapshot<'a> {
635 fn multi_get_full<K, I>(
636 &self,
637 keys: I,
638 readopts: Option<&ReadOptions>,
639 ) -> Vec<Result<Option<DBVector>, Error>>
640 where
641 K: AsRef<[u8]>,
642 I: IntoIterator<Item = K>,
643 {
644 let mut ro = readopts.cloned().unwrap_or_default();
645 ro.set_snapshot(self);
646
647 self.db.multi_get_full(keys, Some(&ro))
648 }
649}
650
651impl<'a> MultiGetCF<ReadOptions> for Snapshot<'a> {
652 fn multi_get_cf_full<'m, K, I>(
653 &self,
654 keys: I,
655 readopts: Option<&ReadOptions>,
656 ) -> Vec<Result<Option<DBVector>, Error>>
657 where
658 K: AsRef<[u8]>,
659 I: IntoIterator<Item = (&'m ColumnFamily, K)>,
660 {
661 let mut ro = readopts.cloned().unwrap_or_default();
662 ro.set_snapshot(self);
663
664 self.db.multi_get_cf_full(keys, Some(&ro))
665 }
666}
667
668impl<'a> Drop for Snapshot<'a> {
669 fn drop(&mut self) {
670 unsafe {
671 ffi::rocksdb_transactiondb_release_snapshot(self.db.inner, self.inner);
672 }
673 }
674}
675
676impl Iterate for Snapshot<'_> {
677 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
678 let mut ro = readopts.to_owned();
679 ro.set_snapshot(self);
680 self.db.get_raw_iter(&ro)
681 }
682}
683
684impl IterateCF for Snapshot<'_> {
685 fn get_raw_iter_cf<'a: 'b, 'b>(
686 &'a self,
687 cf_handle: &ColumnFamily,
688 readopts: &ReadOptions,
689 ) -> Result<DBRawIterator<'b>, Error> {
690 let mut ro = readopts.to_owned();
691 ro.set_snapshot(self);
692 self.db.get_raw_iter_cf(cf_handle, &ro)
693 }
694}
695
696impl WriteOps for TransactionDB {
697 fn write_full(
698 &self,
699 batch: &WriteBatch,
700 writeopts: Option<&WriteOptions>,
701 ) -> Result<(), Error> {
702 let mut default_writeopts = None;
703
704 let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
705
706 unsafe {
707 ffi_try!(ffi::rocksdb_transactiondb_write(
708 self.handle(),
709 wo_handle,
710 batch.handle(),
711 ));
712 Ok(())
713 }
714 }
715}