1use crate::{
2 db_iterator::DBRawIterator,
3 db_options::{OptionsMustOutliveDB, ReadOptions},
4 db_vector::DBVector,
5 handle::{ConstHandle, Handle},
6 open_raw::{OpenRaw, OpenRawFFI},
7 ops::*,
8 ColumnFamily, Error, OptimisticTransaction, Options, WriteOptions,
9};
10
11use crate::ffi;
12use crate::ffi_util::to_cpath;
13use libc::c_uchar;
14use std::collections::BTreeMap;
15use std::marker::PhantomData;
16use std::path::{Path, PathBuf};
17use std::ptr;
18
19pub struct OptimisticTransactionDB {
21 inner: *mut ffi::rocksdb_optimistictransactiondb_t,
22 path: PathBuf,
23 cfs: BTreeMap<String, ColumnFamily>,
24 base_db: *mut ffi::rocksdb_t,
25 _outlive: Vec<OptionsMustOutliveDB>,
26}
27
28impl Handle<ffi::rocksdb_optimistictransactiondb_t> for OptimisticTransactionDB {
29 fn handle(&self) -> *mut ffi::rocksdb_optimistictransactiondb_t {
30 self.inner
31 }
32}
33
34impl Open for OptimisticTransactionDB {}
35impl OpenCF for OptimisticTransactionDB {}
36
37impl OpenRaw for OptimisticTransactionDB {
38 type Pointer = ffi::rocksdb_optimistictransactiondb_t;
39 type Descriptor = ();
40
41 fn open_ffi(input: OpenRawFFI<'_, Self::Descriptor>) -> Result<*mut Self::Pointer, Error> {
42 let pointer = unsafe {
43 if input.num_column_families <= 0 {
44 ffi_try!(ffi::rocksdb_optimistictransactiondb_open(
45 input.options,
46 input.path,
47 ))
48 } else {
49 ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families(
50 input.options,
51 input.path,
52 input.num_column_families,
53 input.column_family_names,
54 input.column_family_options,
55 input.column_family_handles,
56 ))
57 }
58 };
59
60 Ok(pointer)
61 }
62
63 fn build<I>(
64 path: PathBuf,
65 _open_descriptor: Self::Descriptor,
66 pointer: *mut Self::Pointer,
67 column_families: I,
68 outlive: Vec<OptionsMustOutliveDB>,
69 ) -> Result<Self, Error>
70 where
71 I: IntoIterator<Item = (String, *mut ffi::rocksdb_column_family_handle_t)>,
72 {
73 let cfs: BTreeMap<_, _> = column_families
74 .into_iter()
75 .map(|(k, h)| (k, ColumnFamily::new(h)))
76 .collect();
77 let base_db = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(pointer) };
78 Ok(OptimisticTransactionDB {
79 inner: pointer,
80 cfs,
81 path,
82 base_db,
83 _outlive: outlive,
84 })
85 }
86}
87
88impl Read for OptimisticTransactionDB {}
89impl Write for OptimisticTransactionDB {}
90
91unsafe impl Send for OptimisticTransactionDB {}
92unsafe impl Sync for OptimisticTransactionDB {}
93
94impl GetColumnFamilys for OptimisticTransactionDB {
95 fn get_cfs(&self) -> &BTreeMap<String, ColumnFamily> {
96 &self.cfs
97 }
98 fn get_mut_cfs(&mut self) -> &mut BTreeMap<String, ColumnFamily> {
99 &mut self.cfs
100 }
101}
102
103impl OptimisticTransactionDB {
104 pub fn path(&self) -> &Path {
105 self.path.as_path()
106 }
107
108 pub fn base_db_ptr(&self) -> *mut ffi::rocksdb_t {
109 self.base_db
110 }
111
112 pub fn repair<P: AsRef<Path>>(opts: Options, path: P) -> Result<(), Error> {
113 let cpath = to_cpath(
114 path,
115 "Failed to convert path to CString when opening database.",
116 )?;
117 unsafe {
118 ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr(),));
119 }
120 Ok(())
121 }
122
123 pub fn transaction(
124 &self,
125 write_options: &WriteOptions,
126 tx_options: &OptimisticTransactionOptions,
127 ) -> OptimisticTransaction {
128 unsafe {
129 let inner = ffi::rocksdb_optimistictransaction_begin(
130 self.inner,
131 write_options.handle(),
132 tx_options.inner,
133 ptr::null_mut(),
134 );
135 OptimisticTransaction::new(inner)
136 }
137 }
138
139 pub fn transaction_default(&self) -> OptimisticTransaction {
140 let write_options = WriteOptions::default();
141 let transaction_options = OptimisticTransactionOptions::default();
142 self.transaction(&write_options, &transaction_options)
143 }
144}
145
146impl Drop for OptimisticTransactionDB {
147 fn drop(&mut self) {
148 unsafe {
149 for cf in self.cfs.values() {
150 ffi::rocksdb_column_family_handle_destroy(cf.inner);
151 }
152 ffi::rocksdb_optimistictransactiondb_close_base_db(self.base_db);
153 ffi::rocksdb_optimistictransactiondb_close(self.inner);
154 }
155 }
156}
157
158pub struct OptimisticTransactionOptions {
179 pub inner: *mut ffi::rocksdb_optimistictransaction_options_t,
180}
181
182impl OptimisticTransactionOptions {
183 pub fn new() -> OptimisticTransactionOptions {
185 unsafe {
186 let inner = ffi::rocksdb_optimistictransaction_options_create();
187 OptimisticTransactionOptions { inner }
188 }
189 }
190
191 pub fn set_snapshot(&mut self, set_snapshot: bool) {
194 unsafe {
195 ffi::rocksdb_optimistictransaction_options_set_set_snapshot(
196 self.inner,
197 set_snapshot as c_uchar,
198 );
199 }
200 }
201}
202
203impl Drop for OptimisticTransactionOptions {
204 fn drop(&mut self) {
205 unsafe {
206 ffi::rocksdb_optimistictransaction_options_destroy(self.inner);
207 }
208 }
209}
210
211impl Default for OptimisticTransactionOptions {
212 fn default() -> OptimisticTransactionOptions {
213 OptimisticTransactionOptions::new()
214 }
215}
216
217impl Handle<ffi::rocksdb_t> for OptimisticTransactionDB {
218 fn handle(&self) -> *mut ffi::rocksdb_t {
219 self.base_db
220 }
221}
222
223impl Iterate for OptimisticTransactionDB {
224 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
225 unsafe {
226 DBRawIterator {
227 inner: ffi::rocksdb_create_iterator(self.base_db, readopts.handle()),
228 db: PhantomData,
229 }
230 }
231 }
232}
233
234impl IterateCF for OptimisticTransactionDB {
235 fn get_raw_iter_cf<'a: 'b, 'b>(
236 &'a self,
237 cf_handle: &ColumnFamily,
238 readopts: &ReadOptions,
239 ) -> Result<DBRawIterator<'b>, Error> {
240 unsafe {
241 Ok(DBRawIterator {
242 inner: ffi::rocksdb_create_iterator_cf(
243 self.base_db,
244 readopts.handle(),
245 cf_handle.inner,
246 ),
247 db: PhantomData,
248 })
249 }
250 }
251}
252
253impl OptimisticTransactionDB {
254 pub fn snapshot(&self) -> Snapshot<'_> {
255 let snapshot = unsafe { ffi::rocksdb_create_snapshot(self.base_db) };
256 Snapshot {
257 db: self,
258 inner: snapshot,
259 }
260 }
261}
262
263pub struct Snapshot<'a> {
264 db: &'a OptimisticTransactionDB,
265 inner: *const ffi::rocksdb_snapshot_t,
266}
267
268impl<'a> ConstHandle<ffi::rocksdb_snapshot_t> for Snapshot<'a> {
269 fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
270 self.inner
271 }
272}
273
274impl<'a> Read for Snapshot<'a> {}
275
276impl<'a> GetCF<ReadOptions> for Snapshot<'a> {
277 fn get_cf_full<K: AsRef<[u8]>>(
278 &self,
279 cf: Option<&ColumnFamily>,
280 key: K,
281 readopts: Option<&ReadOptions>,
282 ) -> Result<Option<DBVector>, Error> {
283 let mut ro = readopts.cloned().unwrap_or_default();
284 ro.set_snapshot(self);
285
286 self.db.get_cf_full(cf, key, Some(&ro))
287 }
288}
289
290impl<'a> MultiGet<ReadOptions> for Snapshot<'a> {
291 fn multi_get_full<K, I>(
292 &self,
293 keys: I,
294 readopts: Option<&ReadOptions>,
295 ) -> Vec<Result<Option<DBVector>, Error>>
296 where
297 K: AsRef<[u8]>,
298 I: IntoIterator<Item = K>,
299 {
300 let mut ro = readopts.cloned().unwrap_or_default();
301 ro.set_snapshot(self);
302
303 self.db.multi_get_full(keys, Some(&ro))
304 }
305}
306
307impl<'a> MultiGetCF<ReadOptions> for Snapshot<'a> {
308 fn multi_get_cf_full<'m, K, I>(
309 &self,
310 keys: I,
311 readopts: Option<&ReadOptions>,
312 ) -> Vec<Result<Option<DBVector>, Error>>
313 where
314 K: AsRef<[u8]>,
315 I: IntoIterator<Item = (&'m ColumnFamily, K)>,
316 {
317 let mut ro = readopts.cloned().unwrap_or_default();
318 ro.set_snapshot(self);
319
320 self.db.multi_get_cf_full(keys, Some(&ro))
321 }
322}
323
324impl<'a> Drop for Snapshot<'a> {
325 fn drop(&mut self) {
326 unsafe {
327 ffi::rocksdb_release_snapshot(self.db.base_db, self.inner);
328 }
329 }
330}
331
332impl Iterate for Snapshot<'_> {
333 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
334 let mut ro = readopts.to_owned();
335 ro.set_snapshot(self);
336 self.db.get_raw_iter(&ro)
337 }
338}
339
340impl IterateCF for Snapshot<'_> {
341 fn get_raw_iter_cf<'a: 'b, 'b>(
342 &'a self,
343 cf_handle: &ColumnFamily,
344 readopts: &ReadOptions,
345 ) -> Result<DBRawIterator<'b>, Error> {
346 let mut ro = readopts.to_owned();
347 ro.set_snapshot(self);
348 self.db.get_raw_iter_cf(cf_handle, &ro)
349 }
350}