1use crate::ffi;
2use crate::{
3 ffi_util,
4 handle::{ConstHandle, Handle},
5 ops::*,
6 ColumnFamily, DBPinnableSlice, DBRawIterator, DBVector, Error, ReadOptions,
7};
8use libc::{c_char, c_uchar, c_void, size_t};
9use std::marker::PhantomData;
10use std::ptr;
11
12pub struct Transaction<'a, T> {
13 inner: *mut ffi::rocksdb_transaction_t,
14 db: PhantomData<&'a T>,
15}
16
17impl<'a, T> Transaction<'a, T> {
18 pub(crate) fn new(inner: *mut ffi::rocksdb_transaction_t) -> Transaction<'a, T> {
19 Transaction {
20 inner,
21 db: PhantomData,
22 }
23 }
24
25 pub fn commit(&self) -> Result<(), Error> {
27 unsafe {
28 ffi_try!(ffi::rocksdb_transaction_commit(self.inner,));
29 }
30 Ok(())
31 }
32
33 pub fn rollback(&self) -> Result<(), Error> {
35 unsafe { ffi_try!(ffi::rocksdb_transaction_rollback(self.inner,)) }
36 Ok(())
37 }
38
39 pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
41 unsafe { ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner,)) }
42 Ok(())
43 }
44
45 pub fn set_savepoint(&self) {
47 unsafe { ffi::rocksdb_transaction_set_savepoint(self.inner) }
48 }
49
50 pub fn snapshot(&'a self) -> TransactionSnapshot<'a, T> {
52 unsafe {
53 let snapshot = ffi::rocksdb_transaction_get_snapshot(self.inner);
54 TransactionSnapshot {
55 inner: snapshot,
56 db: self,
57 }
58 }
59 }
60
61 pub fn get_for_update<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBVector>, Error> {
65 let opt = ReadOptions::default();
66 self.get_for_update_opt(key, &opt, true)
67 }
68
69 pub fn get_for_update_opt<K: AsRef<[u8]>>(
71 &self,
72 key: K,
73 readopts: &ReadOptions,
74 exclusive: bool,
75 ) -> Result<Option<DBVector>, Error> {
76 let key = key.as_ref();
77 let key_ptr = key.as_ptr() as *const c_char;
78 let key_len = key.len() as size_t;
79 unsafe {
80 let mut val_len: size_t = 0;
81 let val = ffi_try!(ffi::rocksdb_transaction_get_for_update(
82 self.handle(),
83 readopts.handle(),
84 key_ptr,
85 key_len,
86 &mut val_len,
87 exclusive as c_uchar,
88 )) as *mut u8;
89
90 if val.is_null() {
91 Ok(None)
92 } else {
93 Ok(Some(DBVector::from_c(val, val_len)))
94 }
95 }
96 }
97
98 pub fn get_for_update_cf<K: AsRef<[u8]>>(
99 &self,
100 cf: &ColumnFamily,
101 key: K,
102 ) -> Result<Option<DBVector>, Error> {
103 let opt = ReadOptions::default();
104 self.get_for_update_cf_opt(cf, key, &opt, true)
105 }
106
107 pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
108 &self,
109 cf: &ColumnFamily,
110 key: K,
111 readopts: &ReadOptions,
112 exclusive: bool,
113 ) -> Result<Option<DBVector>, Error> {
114 let key = key.as_ref();
115 let key_ptr = key.as_ptr() as *const c_char;
116 let key_len = key.len() as size_t;
117 unsafe {
118 let mut val_len: size_t = 0;
119 let val = ffi_try!(ffi::rocksdb_transaction_get_for_update_cf(
120 self.handle(),
121 readopts.handle(),
122 cf.handle(),
123 key_ptr,
124 key_len,
125 &mut val_len,
126 exclusive as c_uchar,
127 )) as *mut u8;
128
129 if val.is_null() {
130 Ok(None)
131 } else {
132 Ok(Some(DBVector::from_c(val, val_len)))
133 }
134 }
135 }
136}
137
138impl<'a, T> Drop for Transaction<'a, T> {
139 fn drop(&mut self) {
140 unsafe {
141 ffi::rocksdb_transaction_destroy(self.inner);
142 }
143 }
144}
145
146impl<'a, T> Handle<ffi::rocksdb_transaction_t> for Transaction<'a, T> {
147 fn handle(&self) -> *mut ffi::rocksdb_transaction_t {
148 self.inner
149 }
150}
151
152impl<'a, T> Read for Transaction<'a, T> {}
153
154impl<'a, T> GetCF<ReadOptions> for Transaction<'a, T>
155where
156 Transaction<'a, T>: Handle<ffi::rocksdb_transaction_t> + Read,
157{
158 fn get_cf_full<K: AsRef<[u8]>>(
159 &self,
160 cf: Option<&ColumnFamily>,
161 key: K,
162 readopts: Option<&ReadOptions>,
163 ) -> Result<Option<DBVector>, Error> {
164 let mut default_readopts = None;
165
166 let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
167
168 let key = key.as_ref();
169 let key_ptr = key.as_ptr() as *const c_char;
170 let key_len = key.len() as size_t;
171
172 unsafe {
173 let mut val_len: size_t = 0;
174
175 let val = match cf {
176 Some(cf) => ffi_try!(ffi::rocksdb_transaction_get_cf(
177 self.handle(),
178 ro_handle,
179 cf.inner,
180 key_ptr,
181 key_len,
182 &mut val_len,
183 )),
184 None => ffi_try!(ffi::rocksdb_transaction_get(
185 self.handle(),
186 ro_handle,
187 key_ptr,
188 key_len,
189 &mut val_len,
190 )),
191 } as *mut u8;
192
193 if val.is_null() {
194 Ok(None)
195 } else {
196 Ok(Some(DBVector::from_c(val, val_len)))
197 }
198 }
199 }
200}
201
202impl<'a, T> MultiGet<ReadOptions> for Transaction<'a, T>
203where
204 Transaction<'a, T>: Handle<ffi::rocksdb_transaction_t> + Read,
205{
206 fn multi_get_full<K, I>(
207 &self,
208 keys: I,
209 readopts: Option<&ReadOptions>,
210 ) -> Vec<Result<Option<DBVector>, Error>>
211 where
212 K: AsRef<[u8]>,
213 I: IntoIterator<Item = K>,
214 {
215 let mut default_readopts = None;
216 let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
217 Ok(ro) => ro,
218 Err(e) => {
219 let key_count = keys.into_iter().count();
220
221 return vec![e; key_count]
222 .iter()
223 .map(|e| Err(e.to_owned()))
224 .collect();
225 }
226 };
227
228 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
229 .into_iter()
230 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
231 .unzip();
232 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
233
234 let mut values = vec![ptr::null_mut(); keys.len()];
235 let mut values_sizes = vec![0_usize; keys.len()];
236 let mut errors = vec![ptr::null_mut(); keys.len()];
237 unsafe {
238 ffi::rocksdb_transaction_multi_get(
239 self.inner,
240 ro_handle,
241 ptr_keys.len(),
242 ptr_keys.as_ptr(),
243 keys_sizes.as_ptr(),
244 values.as_mut_ptr(),
245 values_sizes.as_mut_ptr(),
246 errors.as_mut_ptr(),
247 );
248 }
249
250 convert_values(values, values_sizes, errors)
251 }
252}
253
254impl<'a, T> MultiGetCF<ReadOptions> for Transaction<'a, T>
255where
256 Transaction<'a, T>: Handle<ffi::rocksdb_transaction_t> + Read,
257{
258 fn multi_get_cf_full<'m, K, I>(
259 &self,
260 keys: I,
261 readopts: Option<&ReadOptions>,
262 ) -> Vec<Result<Option<DBVector>, Error>>
263 where
264 K: AsRef<[u8]>,
265 I: IntoIterator<Item = (&'m ColumnFamily, K)>,
266 {
267 let mut default_readopts = None;
268 let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
269 Ok(ro) => ro,
270 Err(e) => {
271 let key_count = keys.into_iter().count();
272
273 return vec![e; key_count]
274 .iter()
275 .map(|e| Err(e.to_owned()))
276 .collect();
277 }
278 };
279 let (cfs_and_keys, keys_sizes): (Vec<CFAndKey>, Vec<_>) = keys
280 .into_iter()
281 .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
282 .unzip();
283 let ptr_keys: Vec<_> = cfs_and_keys
284 .iter()
285 .map(|(_, k)| k.as_ptr() as *const c_char)
286 .collect();
287 let ptr_cfs: Vec<_> = cfs_and_keys
288 .iter()
289 .map(|(c, _)| c.inner as *const _)
290 .collect();
291
292 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
293 let mut values_sizes = vec![0_usize; ptr_keys.len()];
294 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
295 unsafe {
296 ffi::rocksdb_transaction_multi_get_cf(
297 self.inner,
298 ro_handle,
299 ptr_cfs.as_ptr(),
300 ptr_keys.len(),
301 ptr_keys.as_ptr(),
302 keys_sizes.as_ptr(),
303 values.as_mut_ptr(),
304 values_sizes.as_mut_ptr(),
305 errors.as_mut_ptr(),
306 );
307 }
308
309 convert_values(values, values_sizes, errors)
310 }
311}
312
313impl<T> Iterate for Transaction<'_, T> {
314 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
315 unsafe {
316 DBRawIterator {
317 inner: ffi::rocksdb_transaction_create_iterator(self.inner, readopts.handle()),
318 db: PhantomData,
319 }
320 }
321 }
322}
323
324impl<T> IterateCF for Transaction<'_, T> {
325 fn get_raw_iter_cf<'a: 'b, 'b>(
326 &'a self,
327 cf_handle: &ColumnFamily,
328 readopts: &ReadOptions,
329 ) -> Result<DBRawIterator<'b>, Error> {
330 unsafe {
331 Ok(DBRawIterator {
332 inner: ffi::rocksdb_transaction_create_iterator_cf(
333 self.inner,
334 readopts.handle(),
335 cf_handle.inner,
336 ),
337 db: PhantomData,
338 })
339 }
340 }
341}
342
343pub struct TransactionSnapshot<'a, T> {
344 db: &'a Transaction<'a, T>,
345 inner: *const ffi::rocksdb_snapshot_t,
346}
347
348impl<'a, T> ConstHandle<ffi::rocksdb_snapshot_t> for TransactionSnapshot<'a, T> {
349 fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
350 self.inner
351 }
352}
353
354impl<'a, T> Read for TransactionSnapshot<'a, T> {}
355
356impl<'a, T> GetCF<ReadOptions> for TransactionSnapshot<'a, T>
357where
358 Transaction<'a, T>: GetCF<ReadOptions>,
359{
360 fn get_cf_full<K: AsRef<[u8]>>(
361 &self,
362 cf: Option<&ColumnFamily>,
363 key: K,
364 readopts: Option<&ReadOptions>,
365 ) -> Result<Option<DBVector>, Error> {
366 let mut ro = readopts.cloned().unwrap_or_default();
367 ro.set_snapshot(self);
368 self.db.get_cf_full(cf, key, Some(&ro))
369 }
370}
371
372impl<'a, T> MultiGet<ReadOptions> for TransactionSnapshot<'a, T>
373where
374 Transaction<'a, T>: MultiGet<ReadOptions>,
375{
376 fn multi_get_full<K, I>(
377 &self,
378 keys: I,
379 readopts: Option<&ReadOptions>,
380 ) -> Vec<Result<Option<DBVector>, Error>>
381 where
382 K: AsRef<[u8]>,
383 I: IntoIterator<Item = K>,
384 {
385 let mut ro = readopts.cloned().unwrap_or_default();
386 ro.set_snapshot(self);
387 self.db.multi_get_full(keys, Some(&ro))
388 }
389}
390
391impl<'a, T> MultiGetCF<ReadOptions> for TransactionSnapshot<'a, T>
392where
393 Transaction<'a, T>: MultiGet<ReadOptions>,
394{
395 fn multi_get_cf_full<'m, K, I>(
396 &self,
397 keys: I,
398 readopts: Option<&ReadOptions>,
399 ) -> Vec<Result<Option<DBVector>, Error>>
400 where
401 K: AsRef<[u8]>,
402 I: IntoIterator<Item = (&'m ColumnFamily, K)>,
403 {
404 let mut ro = readopts.cloned().unwrap_or_default();
405 ro.set_snapshot(self);
406 self.db.multi_get_cf_full(keys, Some(&ro))
407 }
408}
409
410impl<'a, T> PutCF<()> for Transaction<'a, T> {
411 fn put_cf_full<K, V>(
412 &self,
413 cf: Option<&ColumnFamily>,
414 key: K,
415 value: V,
416 _: Option<&()>,
417 ) -> Result<(), Error>
418 where
419 K: AsRef<[u8]>,
420 V: AsRef<[u8]>,
421 {
422 let key = key.as_ref();
423 let value = value.as_ref();
424 let key_ptr = key.as_ptr() as *const c_char;
425 let key_len = key.len() as size_t;
426 let val_ptr = value.as_ptr() as *const c_char;
427 let val_len = value.len() as size_t;
428
429 unsafe {
430 match cf {
431 Some(cf) => ffi_try!(ffi::rocksdb_transaction_put_cf(
432 self.handle(),
433 cf.handle(),
434 key_ptr,
435 key_len,
436 val_ptr,
437 val_len,
438 )),
439 None => ffi_try!(ffi::rocksdb_transaction_put(
440 self.handle(),
441 key_ptr,
442 key_len,
443 val_ptr,
444 val_len,
445 )),
446 }
447
448 Ok(())
449 }
450 }
451}
452
453impl<'a, T> MergeCF<()> for Transaction<'a, T> {
454 fn merge_cf_full<K, V>(
455 &self,
456 cf: Option<&ColumnFamily>,
457 key: K,
458 value: V,
459 _: Option<&()>,
460 ) -> Result<(), Error>
461 where
462 K: AsRef<[u8]>,
463 V: AsRef<[u8]>,
464 {
465 let key = key.as_ref();
466 let value = value.as_ref();
467 let key_ptr = key.as_ptr() as *const c_char;
468 let key_len = key.len() as size_t;
469 let val_ptr = value.as_ptr() as *const c_char;
470 let val_len = value.len() as size_t;
471
472 unsafe {
473 match cf {
474 Some(cf) => ffi_try!(ffi::rocksdb_transaction_merge_cf(
475 self.handle(),
476 cf.handle(),
477 key_ptr,
478 key_len,
479 val_ptr,
480 val_len,
481 )),
482 None => ffi_try!(ffi::rocksdb_transaction_merge(
483 self.handle(),
484 key_ptr,
485 key_len,
486 val_ptr,
487 val_len,
488 )),
489 }
490
491 Ok(())
492 }
493 }
494}
495
496impl<'a, T> DeleteCF<()> for Transaction<'a, T> {
497 fn delete_cf_full<K>(
498 &self,
499 cf: Option<&ColumnFamily>,
500 key: K,
501 _: Option<&()>,
502 ) -> Result<(), Error>
503 where
504 K: AsRef<[u8]>,
505 {
506 let key = key.as_ref();
507 let key_ptr = key.as_ptr() as *const c_char;
508 let key_len = key.len() as size_t;
509
510 unsafe {
511 match cf {
512 Some(cf) => ffi_try!(ffi::rocksdb_transaction_delete_cf(
513 self.handle(),
514 cf.inner,
515 key_ptr,
516 key_len,
517 )),
518 None => ffi_try!(ffi::rocksdb_transaction_delete(
519 self.handle(),
520 key_ptr,
521 key_len,
522 )),
523 }
524
525 Ok(())
526 }
527 }
528}
529
530impl<'a, T> Drop for TransactionSnapshot<'a, T> {
531 fn drop(&mut self) {
532 unsafe {
533 ffi::rocksdb_free(self.inner as *mut c_void);
534 }
535 }
536}
537
538impl<T: Iterate> Iterate for TransactionSnapshot<'_, T> {
539 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
540 let mut readopts = readopts.to_owned();
541 readopts.set_snapshot(self);
542 self.db.get_raw_iter(&readopts)
543 }
544}
545
546impl<T: IterateCF> IterateCF for TransactionSnapshot<'_, T> {
547 fn get_raw_iter_cf<'a: 'b, 'b>(
548 &'a self,
549 cf_handle: &ColumnFamily,
550 readopts: &ReadOptions,
551 ) -> Result<DBRawIterator<'b>, Error> {
552 let mut readopts = readopts.to_owned();
553 readopts.set_snapshot(self);
554 self.db.get_raw_iter_cf(cf_handle, &readopts)
555 }
556}
557
558impl<'a, T> GetPinnedCF<'a> for Transaction<'a, T> {
559 type ColumnFamily = &'a ColumnFamily;
560 type ReadOptions = &'a ReadOptions;
561
562 fn get_pinned_cf_full<K: AsRef<[u8]>>(
563 &'a self,
564 cf: Option<Self::ColumnFamily>,
565 key: K,
566 readopts: Option<Self::ReadOptions>,
567 ) -> Result<Option<DBPinnableSlice<'a>>, Error> {
568 let mut default_readopts = None;
569
570 let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
571
572 let key = key.as_ref();
573 let key_ptr = key.as_ptr() as *const c_char;
574 let key_len = key.len() as size_t;
575
576 unsafe {
577 let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
578 let val = match cf {
579 Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
580 self.handle(),
581 ro_handle,
582 cf.handle(),
583 key_ptr,
584 key_len,
585 &mut err,
586 ),
587 None => ffi::rocksdb_transaction_get_pinned(
588 self.handle(),
589 ro_handle,
590 key_ptr,
591 key_len,
592 &mut err,
593 ),
594 };
595
596 if !err.is_null() {
597 return Err(Error::new(ffi_util::error_message(err)));
598 }
599
600 if val.is_null() {
601 Ok(None)
602 } else {
603 Ok(Some(DBPinnableSlice::from_c(val)))
604 }
605 }
606 }
607}
608
609impl<'a, T> GetPinnedCF<'a> for TransactionSnapshot<'a, T> {
610 type ColumnFamily = &'a ColumnFamily;
611 type ReadOptions = &'a ReadOptions;
612
613 fn get_pinned_cf_full<K: AsRef<[u8]>>(
614 &'a self,
615 cf: Option<Self::ColumnFamily>,
616 key: K,
617 readopts: Option<Self::ReadOptions>,
618 ) -> ::std::result::Result<Option<DBPinnableSlice<'a>>, Error> {
619 let mut ro = readopts.cloned().unwrap_or_default();
620 ro.set_snapshot(self);
621
622 let key = key.as_ref();
623 let key_ptr = key.as_ptr() as *const c_char;
624 let key_len = key.len() as size_t;
625
626 unsafe {
627 let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
628 let val = match cf {
629 Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
630 self.db.handle(),
631 ro.handle(),
632 cf.handle(),
633 key_ptr,
634 key_len,
635 &mut err,
636 ),
637 None => ffi::rocksdb_transaction_get_pinned(
638 self.db.handle(),
639 ro.handle(),
640 key_ptr,
641 key_len,
642 &mut err,
643 ),
644 };
645
646 if !err.is_null() {
647 return Err(Error::new(ffi_util::error_message(err)));
648 }
649
650 if val.is_null() {
651 Ok(None)
652 } else {
653 Ok(Some(DBPinnableSlice::from_c(val)))
654 }
655 }
656 }
657}