1use crate::ffi;
2use crate::{
3 ffi_util,
4 handle::{ConstHandle, Handle},
5 ops::*,
6 ColumnFamily, DBPinnableSlice, DBRawIterator, DBVector, Error, ReadOptions,
7};
8use libc::{c_char, c_uchar, c_void, size_t};
9use std::marker::PhantomData;
10use std::ptr;
11
12pub struct OptimisticTransaction {
13 inner: *mut ffi::rocksdb_transaction_t,
14}
15
16unsafe impl Send for OptimisticTransaction {}
17unsafe impl Sync for OptimisticTransaction {}
18
19impl OptimisticTransaction {
20 pub(crate) fn new(inner: *mut ffi::rocksdb_transaction_t) -> OptimisticTransaction {
21 OptimisticTransaction { inner }
22 }
23
24 pub fn commit(&self) -> Result<(), Error> {
26 unsafe {
27 ffi_try!(ffi::rocksdb_transaction_commit(self.inner,));
28 }
29 Ok(())
30 }
31
32 pub fn rollback(&self) -> Result<(), Error> {
34 unsafe { ffi_try!(ffi::rocksdb_transaction_rollback(self.inner,)) }
35 Ok(())
36 }
37
38 pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
40 unsafe { ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner,)) }
41 Ok(())
42 }
43
44 pub fn set_savepoint(&self) {
46 unsafe { ffi::rocksdb_transaction_set_savepoint(self.inner) }
47 }
48
49 pub fn snapshot(&self) -> OptimisticTransactionSnapshot<'_> {
51 unsafe {
52 let snapshot = ffi::rocksdb_transaction_get_snapshot(self.inner);
53 OptimisticTransactionSnapshot {
54 txn: self,
55 inner: snapshot,
56 }
57 }
58 }
59
60 pub fn get_for_update<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBVector>, Error> {
64 let opt = ReadOptions::default();
65 self.get_for_update_opt(key, &opt, true)
66 }
67
68 pub fn get_for_update_opt<K: AsRef<[u8]>>(
70 &self,
71 key: K,
72 readopts: &ReadOptions,
73 exclusive: bool,
74 ) -> Result<Option<DBVector>, Error> {
75 let key = key.as_ref();
76 let key_ptr = key.as_ptr() as *const c_char;
77 let key_len = key.len() as size_t;
78 unsafe {
79 let mut val_len: size_t = 0;
80 let val = ffi_try!(ffi::rocksdb_transaction_get_for_update(
81 self.handle(),
82 readopts.handle(),
83 key_ptr,
84 key_len,
85 &mut val_len,
86 exclusive as c_uchar,
87 )) as *mut u8;
88
89 if val.is_null() {
90 Ok(None)
91 } else {
92 Ok(Some(DBVector::from_c(val, val_len)))
93 }
94 }
95 }
96
97 pub fn get_for_update_cf<K: AsRef<[u8]>>(
98 &self,
99 cf: &ColumnFamily,
100 key: K,
101 ) -> Result<Option<DBVector>, Error> {
102 let opt = ReadOptions::default();
103 self.get_for_update_cf_opt(cf, key, &opt, true)
104 }
105
106 pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
107 &self,
108 cf: &ColumnFamily,
109 key: K,
110 readopts: &ReadOptions,
111 exclusive: bool,
112 ) -> Result<Option<DBVector>, Error> {
113 let key = key.as_ref();
114 let key_ptr = key.as_ptr() as *const c_char;
115 let key_len = key.len() as size_t;
116 unsafe {
117 let mut val_len: size_t = 0;
118 let val = ffi_try!(ffi::rocksdb_transaction_get_for_update_cf(
119 self.handle(),
120 readopts.handle(),
121 cf.handle(),
122 key_ptr,
123 key_len,
124 &mut val_len,
125 exclusive as c_uchar,
126 )) as *mut u8;
127
128 if val.is_null() {
129 Ok(None)
130 } else {
131 Ok(Some(DBVector::from_c(val, val_len)))
132 }
133 }
134 }
135}
136
137impl Drop for OptimisticTransaction {
138 fn drop(&mut self) {
139 unsafe {
140 ffi::rocksdb_transaction_destroy(self.inner);
141 }
142 }
143}
144
145impl Handle<ffi::rocksdb_transaction_t> for OptimisticTransaction {
146 fn handle(&self) -> *mut ffi::rocksdb_transaction_t {
147 self.inner
148 }
149}
150
151impl Read for OptimisticTransaction {}
152
153impl GetCF<ReadOptions> for OptimisticTransaction {
154 fn get_cf_full<K: AsRef<[u8]>>(
155 &self,
156 cf: Option<&ColumnFamily>,
157 key: K,
158 readopts: Option<&ReadOptions>,
159 ) -> Result<Option<DBVector>, Error> {
160 let mut default_readopts = None;
161
162 let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
163
164 let key = key.as_ref();
165 let key_ptr = key.as_ptr() as *const c_char;
166 let key_len = key.len() as size_t;
167
168 unsafe {
169 let mut val_len: size_t = 0;
170
171 let val = match cf {
172 Some(cf) => ffi_try!(ffi::rocksdb_transaction_get_cf(
173 self.handle(),
174 ro_handle,
175 cf.inner,
176 key_ptr,
177 key_len,
178 &mut val_len,
179 )),
180 None => ffi_try!(ffi::rocksdb_transaction_get(
181 self.handle(),
182 ro_handle,
183 key_ptr,
184 key_len,
185 &mut val_len,
186 )),
187 } as *mut u8;
188
189 if val.is_null() {
190 Ok(None)
191 } else {
192 Ok(Some(DBVector::from_c(val, val_len)))
193 }
194 }
195 }
196}
197
198impl MultiGet<ReadOptions> for OptimisticTransaction {
199 fn multi_get_full<K, I>(
200 &self,
201 keys: I,
202 readopts: Option<&ReadOptions>,
203 ) -> Vec<Result<Option<DBVector>, Error>>
204 where
205 K: AsRef<[u8]>,
206 I: IntoIterator<Item = K>,
207 {
208 let mut default_readopts = None;
209 let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
210 Ok(ro) => ro,
211 Err(e) => {
212 let key_count = keys.into_iter().count();
213
214 return vec![e; key_count]
215 .iter()
216 .map(|e| Err(e.to_owned()))
217 .collect();
218 }
219 };
220
221 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
222 .into_iter()
223 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
224 .unzip();
225 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
226
227 let mut values = vec![ptr::null_mut(); keys.len()];
228 let mut values_sizes = vec![0_usize; keys.len()];
229 let mut errors = vec![ptr::null_mut(); keys.len()];
230 unsafe {
231 ffi::rocksdb_transaction_multi_get(
232 self.inner,
233 ro_handle,
234 ptr_keys.len(),
235 ptr_keys.as_ptr(),
236 keys_sizes.as_ptr(),
237 values.as_mut_ptr(),
238 values_sizes.as_mut_ptr(),
239 errors.as_mut_ptr(),
240 );
241 }
242
243 convert_values(values, values_sizes, errors)
244 }
245}
246
247impl MultiGetCF<ReadOptions> for OptimisticTransaction {
248 fn multi_get_cf_full<'a, K, I>(
249 &self,
250 keys: I,
251 readopts: Option<&ReadOptions>,
252 ) -> Vec<Result<Option<DBVector>, Error>>
253 where
254 K: AsRef<[u8]>,
255 I: IntoIterator<Item = (&'a ColumnFamily, K)>,
256 {
257 let mut default_readopts = None;
258 let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
259 Ok(ro) => ro,
260 Err(e) => {
261 let key_count = keys.into_iter().count();
262
263 return vec![e; key_count]
264 .iter()
265 .map(|e| Err(e.to_owned()))
266 .collect();
267 }
268 };
269
270 let (cfs_and_keys, keys_sizes): (Vec<CFAndKey>, Vec<_>) = keys
271 .into_iter()
272 .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
273 .unzip();
274 let ptr_keys: Vec<_> = cfs_and_keys
275 .iter()
276 .map(|(_, k)| k.as_ptr() as *const c_char)
277 .collect();
278 let ptr_cfs: Vec<_> = cfs_and_keys
279 .iter()
280 .map(|(c, _)| c.inner as *const _)
281 .collect();
282
283 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
284 let mut values_sizes = vec![0_usize; ptr_keys.len()];
285 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
286 unsafe {
287 ffi::rocksdb_transaction_multi_get_cf(
288 self.inner,
289 ro_handle,
290 ptr_cfs.as_ptr(),
291 ptr_keys.len(),
292 ptr_keys.as_ptr(),
293 keys_sizes.as_ptr(),
294 values.as_mut_ptr(),
295 values_sizes.as_mut_ptr(),
296 errors.as_mut_ptr(),
297 );
298 }
299
300 convert_values(values, values_sizes, errors)
301 }
302}
303
304impl Iterate for OptimisticTransaction {
305 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
306 unsafe {
307 DBRawIterator {
308 inner: ffi::rocksdb_transaction_create_iterator(self.inner, readopts.handle()),
309 db: PhantomData,
310 }
311 }
312 }
313}
314
315impl IterateCF for OptimisticTransaction {
316 fn get_raw_iter_cf<'a: 'b, 'b>(
317 &'a self,
318 cf_handle: &ColumnFamily,
319 readopts: &ReadOptions,
320 ) -> Result<DBRawIterator<'b>, Error> {
321 unsafe {
322 Ok(DBRawIterator {
323 inner: ffi::rocksdb_transaction_create_iterator_cf(
324 self.inner,
325 readopts.handle(),
326 cf_handle.inner,
327 ),
328 db: PhantomData,
329 })
330 }
331 }
332}
333
334impl PutCF<()> for OptimisticTransaction {
335 fn put_cf_full<K, V>(
336 &self,
337 cf: Option<&ColumnFamily>,
338 key: K,
339 value: V,
340 _: Option<&()>,
341 ) -> Result<(), Error>
342 where
343 K: AsRef<[u8]>,
344 V: AsRef<[u8]>,
345 {
346 let key = key.as_ref();
347 let value = value.as_ref();
348 let key_ptr = key.as_ptr() as *const c_char;
349 let key_len = key.len() as size_t;
350 let val_ptr = value.as_ptr() as *const c_char;
351 let val_len = value.len() as size_t;
352
353 unsafe {
354 match cf {
355 Some(cf) => ffi_try!(ffi::rocksdb_transaction_put_cf(
356 self.handle(),
357 cf.handle(),
358 key_ptr,
359 key_len,
360 val_ptr,
361 val_len,
362 )),
363 None => ffi_try!(ffi::rocksdb_transaction_put(
364 self.handle(),
365 key_ptr,
366 key_len,
367 val_ptr,
368 val_len,
369 )),
370 }
371
372 Ok(())
373 }
374 }
375}
376
377impl MergeCF<()> for OptimisticTransaction {
378 fn merge_cf_full<K, V>(
379 &self,
380 cf: Option<&ColumnFamily>,
381 key: K,
382 value: V,
383 _: Option<&()>,
384 ) -> Result<(), Error>
385 where
386 K: AsRef<[u8]>,
387 V: AsRef<[u8]>,
388 {
389 let key = key.as_ref();
390 let value = value.as_ref();
391 let key_ptr = key.as_ptr() as *const c_char;
392 let key_len = key.len() as size_t;
393 let val_ptr = value.as_ptr() as *const c_char;
394 let val_len = value.len() as size_t;
395
396 unsafe {
397 match cf {
398 Some(cf) => ffi_try!(ffi::rocksdb_transaction_merge_cf(
399 self.handle(),
400 cf.handle(),
401 key_ptr,
402 key_len,
403 val_ptr,
404 val_len,
405 )),
406 None => ffi_try!(ffi::rocksdb_transaction_merge(
407 self.handle(),
408 key_ptr,
409 key_len,
410 val_ptr,
411 val_len,
412 )),
413 }
414
415 Ok(())
416 }
417 }
418}
419
420impl DeleteCF<()> for OptimisticTransaction {
421 fn delete_cf_full<K>(
422 &self,
423 cf: Option<&ColumnFamily>,
424 key: K,
425 _: Option<&()>,
426 ) -> Result<(), Error>
427 where
428 K: AsRef<[u8]>,
429 {
430 let key = key.as_ref();
431 let key_ptr = key.as_ptr() as *const c_char;
432 let key_len = key.len() as size_t;
433
434 unsafe {
435 match cf {
436 Some(cf) => ffi_try!(ffi::rocksdb_transaction_delete_cf(
437 self.handle(),
438 cf.inner,
439 key_ptr,
440 key_len,
441 )),
442 None => ffi_try!(ffi::rocksdb_transaction_delete(
443 self.handle(),
444 key_ptr,
445 key_len,
446 )),
447 }
448
449 Ok(())
450 }
451 }
452}
453
454pub struct OptimisticTransactionSnapshot<'a> {
455 txn: &'a OptimisticTransaction,
456 inner: *const ffi::rocksdb_snapshot_t,
457}
458
459unsafe impl<'a> Send for OptimisticTransactionSnapshot<'a> {}
460unsafe impl<'a> Sync for OptimisticTransactionSnapshot<'a> {}
461
462impl<'a> ConstHandle<ffi::rocksdb_snapshot_t> for OptimisticTransactionSnapshot<'a> {
463 fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
464 self.inner
465 }
466}
467
468impl<'a> Read for OptimisticTransactionSnapshot<'a> {}
469
470impl<'a> GetCF<ReadOptions> for OptimisticTransactionSnapshot<'a> {
471 fn get_cf_full<K: AsRef<[u8]>>(
472 &self,
473 cf: Option<&ColumnFamily>,
474 key: K,
475 readopts: Option<&ReadOptions>,
476 ) -> Result<Option<DBVector>, Error> {
477 let mut ro = readopts.cloned().unwrap_or_default();
478 ro.set_snapshot(self);
479 self.txn.get_cf_full(cf, key, Some(&ro))
480 }
481}
482
483impl<'a> MultiGet<ReadOptions> for OptimisticTransactionSnapshot<'a> {
484 fn multi_get_full<K, I>(
485 &self,
486 keys: I,
487 readopts: Option<&ReadOptions>,
488 ) -> Vec<Result<Option<DBVector>, Error>>
489 where
490 K: AsRef<[u8]>,
491 I: IntoIterator<Item = K>,
492 {
493 let mut ro = readopts.cloned().unwrap_or_default();
494 ro.set_snapshot(self);
495 self.txn.multi_get_full(keys, Some(&ro))
496 }
497}
498
499impl<'a> MultiGetCF<ReadOptions> for OptimisticTransactionSnapshot<'a> {
500 fn multi_get_cf_full<'m, K, I>(
501 &self,
502 keys: I,
503 readopts: Option<&ReadOptions>,
504 ) -> Vec<Result<Option<DBVector>, Error>>
505 where
506 K: AsRef<[u8]>,
507 I: IntoIterator<Item = (&'m ColumnFamily, K)>,
508 {
509 let mut ro = readopts.cloned().unwrap_or_default();
510 ro.set_snapshot(self);
511 self.txn.multi_get_cf_full(keys, Some(&ro))
512 }
513}
514
515impl<'a> Drop for OptimisticTransactionSnapshot<'a> {
516 fn drop(&mut self) {
517 unsafe {
518 ffi::rocksdb_free(self.inner as *mut c_void);
519 }
520 }
521}
522
523impl Iterate for OptimisticTransactionSnapshot<'_> {
524 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
525 let mut readopts = readopts.to_owned();
526 readopts.set_snapshot(self);
527 self.txn.get_raw_iter(&readopts)
528 }
529}
530
531impl IterateCF for OptimisticTransactionSnapshot<'_> {
532 fn get_raw_iter_cf<'a: 'b, 'b>(
533 &'a self,
534 cf_handle: &ColumnFamily,
535 readopts: &ReadOptions,
536 ) -> Result<DBRawIterator<'b>, Error> {
537 let mut readopts = readopts.to_owned();
538 readopts.set_snapshot(self);
539 self.txn.get_raw_iter_cf(cf_handle, &readopts)
540 }
541}
542
543impl<'a> GetPinnedCF<'a> for OptimisticTransaction {
544 type ColumnFamily = &'a ColumnFamily;
545 type ReadOptions = &'a ReadOptions;
546
547 fn get_pinned_cf_full<K: AsRef<[u8]>>(
548 &'a self,
549 cf: Option<Self::ColumnFamily>,
550 key: K,
551 readopts: Option<Self::ReadOptions>,
552 ) -> Result<Option<DBPinnableSlice<'a>>, Error> {
553 let mut default_readopts = None;
554
555 let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
556
557 let key = key.as_ref();
558 let key_ptr = key.as_ptr() as *const c_char;
559 let key_len = key.len() as size_t;
560
561 unsafe {
562 let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
563 let val = match cf {
564 Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
565 self.handle(),
566 ro_handle,
567 cf.handle(),
568 key_ptr,
569 key_len,
570 &mut err,
571 ),
572 None => ffi::rocksdb_transaction_get_pinned(
573 self.handle(),
574 ro_handle,
575 key_ptr,
576 key_len,
577 &mut err,
578 ),
579 };
580
581 if !err.is_null() {
582 return Err(Error::new(ffi_util::error_message(err)));
583 }
584
585 if val.is_null() {
586 Ok(None)
587 } else {
588 Ok(Some(DBPinnableSlice::from_c(val)))
589 }
590 }
591 }
592}
593
594impl<'a> GetPinnedCF<'a> for OptimisticTransactionSnapshot<'a> {
595 type ColumnFamily = &'a ColumnFamily;
596 type ReadOptions = &'a ReadOptions;
597
598 fn get_pinned_cf_full<K: AsRef<[u8]>>(
599 &'a self,
600 cf: Option<Self::ColumnFamily>,
601 key: K,
602 readopts: Option<Self::ReadOptions>,
603 ) -> ::std::result::Result<Option<DBPinnableSlice<'a>>, Error> {
604 let mut ro = readopts.cloned().unwrap_or_default();
605 ro.set_snapshot(self);
606
607 let key = key.as_ref();
608 let key_ptr = key.as_ptr() as *const c_char;
609 let key_len = key.len() as size_t;
610
611 unsafe {
612 let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
613 let val = match cf {
614 Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
615 self.txn.handle(),
616 ro.handle(),
617 cf.handle(),
618 key_ptr,
619 key_len,
620 &mut err,
621 ),
622 None => ffi::rocksdb_transaction_get_pinned(
623 self.txn.handle(),
624 ro.handle(),
625 key_ptr,
626 key_len,
627 &mut err,
628 ),
629 };
630
631 if !err.is_null() {
632 return Err(Error::new(ffi_util::error_message(err)));
633 }
634
635 if val.is_null() {
636 Ok(None)
637 } else {
638 Ok(Some(DBPinnableSlice::from_c(val)))
639 }
640 }
641 }
642}