1use libc::{
2 c_uint,
3 c_void,
4 size_t,
5};
6use std::marker::PhantomData;
7use std::{
8 fmt,
9 mem,
10 ptr,
11 result,
12 slice,
13};
14
15use ffi;
16
17use cursor::{
18 RoCursor,
19 RwCursor,
20};
21use database::Database;
22use environment::{
23 Environment,
24 Stat,
25};
26use error::{
27 lmdb_result,
28 Error,
29 Result,
30};
31use flags::{
32 DatabaseFlags,
33 EnvironmentFlags,
34 WriteFlags,
35};
36
37pub trait Transaction: Sized {
41 fn txn(&self) -> *mut ffi::MDB_txn;
46
47 fn commit(self) -> Result<()> {
51 unsafe {
52 let result = lmdb_result(ffi::mdb_txn_commit(self.txn()));
53 mem::forget(self);
54 result
55 }
56 }
57
58 fn abort(self) {
62 }
64
65 unsafe fn open_db(&self, name: Option<&str>) -> Result<Database> {
83 Database::new(self.txn(), name, 0)
84 }
85
86 fn get<'txn, K>(&'txn self, database: Database, key: &K) -> Result<&'txn [u8]>
95 where
96 K: AsRef<[u8]>,
97 {
98 let key = key.as_ref();
99 let mut key_val: ffi::MDB_val = ffi::MDB_val {
100 mv_size: key.len() as size_t,
101 mv_data: key.as_ptr() as *mut c_void,
102 };
103 let mut data_val: ffi::MDB_val = ffi::MDB_val {
104 mv_size: 0,
105 mv_data: ptr::null_mut(),
106 };
107 unsafe {
108 match ffi::mdb_get(self.txn(), database.dbi(), &mut key_val, &mut data_val) {
109 ffi::MDB_SUCCESS => Ok(slice::from_raw_parts(data_val.mv_data as *const u8, data_val.mv_size as usize)),
110 err_code => Err(Error::from_err_code(err_code)),
111 }
112 }
113 }
114
115 fn open_ro_cursor<'txn>(&'txn self, db: Database) -> Result<RoCursor<'txn>> {
117 RoCursor::new(self, db)
118 }
119
120 fn db_flags(&self, db: Database) -> Result<DatabaseFlags> {
122 let mut flags: c_uint = 0;
123 unsafe {
124 lmdb_result(ffi::mdb_dbi_flags(self.txn(), db.dbi(), &mut flags))?;
125 }
126 Ok(DatabaseFlags::from_bits_truncate(flags))
127 }
128
129 fn stat(&self, db: Database) -> Result<Stat> {
131 unsafe {
132 let mut stat = Stat::new();
133 lmdb_try!(ffi::mdb_stat(self.txn(), db.dbi(), stat.mdb_stat()));
134 Ok(stat)
135 }
136 }
137}
138
139pub struct RoTransaction<'env> {
141 txn: *mut ffi::MDB_txn,
142 _marker: PhantomData<&'env ()>,
143}
144
145impl<'env> fmt::Debug for RoTransaction<'env> {
146 fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
147 f.debug_struct("RoTransaction").finish()
148 }
149}
150
151impl<'env> Drop for RoTransaction<'env> {
152 fn drop(&mut self) {
153 unsafe { ffi::mdb_txn_abort(self.txn) }
154 }
155}
156
157impl<'env> RoTransaction<'env> {
158 pub(crate) fn new(env: &'env Environment) -> Result<RoTransaction<'env>> {
161 let mut txn: *mut ffi::MDB_txn = ptr::null_mut();
162 unsafe {
163 lmdb_result(ffi::mdb_txn_begin(env.env(), ptr::null_mut(), ffi::MDB_RDONLY, &mut txn))?;
164 Ok(RoTransaction {
165 txn,
166 _marker: PhantomData,
167 })
168 }
169 }
170
171 pub fn reset(self) -> InactiveTransaction<'env> {
184 let txn = self.txn;
185 unsafe {
186 mem::forget(self);
187 ffi::mdb_txn_reset(txn)
188 };
189 InactiveTransaction {
190 txn,
191 _marker: PhantomData,
192 }
193 }
194}
195
196impl<'env> Transaction for RoTransaction<'env> {
197 fn txn(&self) -> *mut ffi::MDB_txn {
198 self.txn
199 }
200}
201
202pub struct InactiveTransaction<'env> {
204 txn: *mut ffi::MDB_txn,
205 _marker: PhantomData<&'env ()>,
206}
207
208impl<'env> fmt::Debug for InactiveTransaction<'env> {
209 fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
210 f.debug_struct("InactiveTransaction").finish()
211 }
212}
213
214impl<'env> Drop for InactiveTransaction<'env> {
215 fn drop(&mut self) {
216 unsafe { ffi::mdb_txn_abort(self.txn) }
217 }
218}
219
220impl<'env> InactiveTransaction<'env> {
221 pub fn renew(self) -> Result<RoTransaction<'env>> {
227 let txn = self.txn;
228 unsafe {
229 mem::forget(self);
230 lmdb_result(ffi::mdb_txn_renew(txn))?
231 };
232 Ok(RoTransaction {
233 txn,
234 _marker: PhantomData,
235 })
236 }
237}
238
239pub struct RwTransaction<'env> {
241 txn: *mut ffi::MDB_txn,
242 _marker: PhantomData<&'env ()>,
243}
244
245impl<'env> fmt::Debug for RwTransaction<'env> {
246 fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
247 f.debug_struct("RwTransaction").finish()
248 }
249}
250
251impl<'env> Drop for RwTransaction<'env> {
252 fn drop(&mut self) {
253 unsafe { ffi::mdb_txn_abort(self.txn) }
254 }
255}
256
257impl<'env> RwTransaction<'env> {
258 pub(crate) fn new(env: &'env Environment) -> Result<RwTransaction<'env>> {
261 let mut txn: *mut ffi::MDB_txn = ptr::null_mut();
262 unsafe {
263 lmdb_result(ffi::mdb_txn_begin(env.env(), ptr::null_mut(), EnvironmentFlags::empty().bits(), &mut txn))?;
264 Ok(RwTransaction {
265 txn,
266 _marker: PhantomData,
267 })
268 }
269 }
270
271 pub unsafe fn create_db(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> {
289 Database::new(self.txn(), name, flags.bits() | ffi::MDB_CREATE)
290 }
291
292 pub fn open_rw_cursor<'txn>(&'txn mut self, db: Database) -> Result<RwCursor<'txn>> {
294 RwCursor::new(self, db)
295 }
296
297 pub fn put<K, D>(&mut self, database: Database, key: &K, data: &D, flags: WriteFlags) -> Result<()>
304 where
305 K: AsRef<[u8]>,
306 D: AsRef<[u8]>,
307 {
308 let key = key.as_ref();
309 let data = data.as_ref();
310 let mut key_val: ffi::MDB_val = ffi::MDB_val {
311 mv_size: key.len() as size_t,
312 mv_data: key.as_ptr() as *mut c_void,
313 };
314 let mut data_val: ffi::MDB_val = ffi::MDB_val {
315 mv_size: data.len() as size_t,
316 mv_data: data.as_ptr() as *mut c_void,
317 };
318 unsafe { lmdb_result(ffi::mdb_put(self.txn(), database.dbi(), &mut key_val, &mut data_val, flags.bits())) }
319 }
320
321 pub fn reserve<'txn, K>(
325 &'txn mut self,
326 database: Database,
327 key: &K,
328 len: size_t,
329 flags: WriteFlags,
330 ) -> Result<&'txn mut [u8]>
331 where
332 K: AsRef<[u8]>,
333 {
334 let key = key.as_ref();
335 let mut key_val: ffi::MDB_val = ffi::MDB_val {
336 mv_size: key.len() as size_t,
337 mv_data: key.as_ptr() as *mut c_void,
338 };
339 let mut data_val: ffi::MDB_val = ffi::MDB_val {
340 mv_size: len,
341 mv_data: ptr::null_mut::<c_void>(),
342 };
343 unsafe {
344 lmdb_result(ffi::mdb_put(
345 self.txn(),
346 database.dbi(),
347 &mut key_val,
348 &mut data_val,
349 flags.bits() | ffi::MDB_RESERVE,
350 ))?;
351 Ok(slice::from_raw_parts_mut(data_val.mv_data as *mut u8, data_val.mv_size as usize))
352 }
353 }
354
355 pub fn del<K>(&mut self, database: Database, key: &K, data: Option<&[u8]>) -> Result<()>
366 where
367 K: AsRef<[u8]>,
368 {
369 let key = key.as_ref();
370 let mut key_val: ffi::MDB_val = ffi::MDB_val {
371 mv_size: key.len() as size_t,
372 mv_data: key.as_ptr() as *mut c_void,
373 };
374 let data_val: Option<ffi::MDB_val> = data.map(|data| ffi::MDB_val {
375 mv_size: data.len() as size_t,
376 mv_data: data.as_ptr() as *mut c_void,
377 });
378
379 if let Some(mut d) = data_val {
380 unsafe { lmdb_result(ffi::mdb_del(self.txn(), database.dbi(), &mut key_val, &mut d)) }
381 } else {
382 unsafe { lmdb_result(ffi::mdb_del(self.txn(), database.dbi(), &mut key_val, ptr::null_mut())) }
383 }
384 }
385
386 pub fn clear_db(&mut self, db: Database) -> Result<()> {
388 unsafe { lmdb_result(ffi::mdb_drop(self.txn(), db.dbi(), 0)) }
389 }
390
391 pub unsafe fn drop_db(&mut self, db: Database) -> Result<()> {
398 lmdb_result(ffi::mdb_drop(self.txn, db.dbi(), 1))
399 }
400
401 pub fn begin_nested_txn<'txn>(&'txn mut self) -> Result<RwTransaction<'txn>> {
403 let mut nested: *mut ffi::MDB_txn = ptr::null_mut();
404 unsafe {
405 let env: *mut ffi::MDB_env = ffi::mdb_txn_env(self.txn());
406 ffi::mdb_txn_begin(env, self.txn(), 0, &mut nested);
407 }
408 Ok(RwTransaction {
409 txn: nested,
410 _marker: PhantomData,
411 })
412 }
413}
414
415impl<'env> Transaction for RwTransaction<'env> {
416 fn txn(&self) -> *mut ffi::MDB_txn {
417 self.txn
418 }
419}
420
421#[cfg(test)]
422mod test {
423
424 use std::io::Write;
425 use std::sync::{
426 Arc,
427 Barrier,
428 };
429 use std::thread::{
430 self,
431 JoinHandle,
432 };
433
434 use tempdir::TempDir;
435
436 use super::*;
437 use cursor::Cursor;
438 use error::*;
439 use flags::*;
440
441 #[test]
442 fn test_put_get_del() {
443 let dir = TempDir::new("test").unwrap();
444 let env = Environment::new().open(dir.path()).unwrap();
445 let db = env.open_db(None).unwrap();
446
447 let mut txn = env.begin_rw_txn().unwrap();
448 txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
449 txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
450 txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap();
451 txn.commit().unwrap();
452
453 let mut txn = env.begin_rw_txn().unwrap();
454 assert_eq!(b"val1", txn.get(db, b"key1").unwrap());
455 assert_eq!(b"val2", txn.get(db, b"key2").unwrap());
456 assert_eq!(b"val3", txn.get(db, b"key3").unwrap());
457 assert_eq!(txn.get(db, b"key"), Err(Error::NotFound));
458
459 txn.del(db, b"key1", None).unwrap();
460 assert_eq!(txn.get(db, b"key1"), Err(Error::NotFound));
461 }
462
463 #[test]
464 fn test_put_get_del_multi() {
465 let dir = TempDir::new("test").unwrap();
466 let env = Environment::new().open(dir.path()).unwrap();
467 let db = env.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
468
469 let mut txn = env.begin_rw_txn().unwrap();
470 txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
471 txn.put(db, b"key1", b"val2", WriteFlags::empty()).unwrap();
472 txn.put(db, b"key1", b"val3", WriteFlags::empty()).unwrap();
473 txn.put(db, b"key2", b"val1", WriteFlags::empty()).unwrap();
474 txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
475 txn.put(db, b"key2", b"val3", WriteFlags::empty()).unwrap();
476 txn.put(db, b"key3", b"val1", WriteFlags::empty()).unwrap();
477 txn.put(db, b"key3", b"val2", WriteFlags::empty()).unwrap();
478 txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap();
479 txn.commit().unwrap();
480
481 let txn = env.begin_rw_txn().unwrap();
482 {
483 let mut cur = txn.open_ro_cursor(db).unwrap();
484 let iter = cur.iter_dup_of(b"key1");
485 let vals = iter.map(|x| x.unwrap()).map(|(_, x)| x).collect::<Vec<_>>();
486 assert_eq!(vals, vec![b"val1", b"val2", b"val3"]);
487 }
488 txn.commit().unwrap();
489
490 let mut txn = env.begin_rw_txn().unwrap();
491 txn.del(db, b"key1", Some(b"val2")).unwrap();
492 txn.del(db, b"key2", None).unwrap();
493 txn.commit().unwrap();
494
495 let txn = env.begin_rw_txn().unwrap();
496 {
497 let mut cur = txn.open_ro_cursor(db).unwrap();
498 let iter = cur.iter_dup_of(b"key1");
499 let vals = iter.map(|x| x.unwrap()).map(|(_, x)| x).collect::<Vec<_>>();
500 assert_eq!(vals, vec![b"val1", b"val3"]);
501
502 let iter = cur.iter_dup_of(b"key2");
503 assert_eq!(0, iter.count());
504 }
505 txn.commit().unwrap();
506 }
507
508 #[test]
509 fn test_reserve() {
510 let dir = TempDir::new("test").unwrap();
511 let env = Environment::new().open(dir.path()).unwrap();
512 let db = env.open_db(None).unwrap();
513
514 let mut txn = env.begin_rw_txn().unwrap();
515 {
516 let mut writer = txn.reserve(db, b"key1", 4, WriteFlags::empty()).unwrap();
517 writer.write_all(b"val1").unwrap();
518 }
519 txn.commit().unwrap();
520
521 let mut txn = env.begin_rw_txn().unwrap();
522 assert_eq!(b"val1", txn.get(db, b"key1").unwrap());
523 assert_eq!(txn.get(db, b"key"), Err(Error::NotFound));
524
525 txn.del(db, b"key1", None).unwrap();
526 assert_eq!(txn.get(db, b"key1"), Err(Error::NotFound));
527 }
528
529 #[test]
530 fn test_inactive_txn() {
531 let dir = TempDir::new("test").unwrap();
532 let env = Environment::new().open(dir.path()).unwrap();
533 let db = env.open_db(None).unwrap();
534
535 {
536 let mut txn = env.begin_rw_txn().unwrap();
537 txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap();
538 txn.commit().unwrap();
539 }
540
541 let txn = env.begin_ro_txn().unwrap();
542 let inactive = txn.reset();
543 let active = inactive.renew().unwrap();
544 assert!(active.get(db, b"key").is_ok());
545 }
546
547 #[test]
548 fn test_nested_txn() {
549 let dir = TempDir::new("test").unwrap();
550 let env = Environment::new().open(dir.path()).unwrap();
551 let db = env.open_db(None).unwrap();
552
553 let mut txn = env.begin_rw_txn().unwrap();
554 txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
555
556 {
557 let mut nested = txn.begin_nested_txn().unwrap();
558 nested.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
559 assert_eq!(nested.get(db, b"key1").unwrap(), b"val1");
560 assert_eq!(nested.get(db, b"key2").unwrap(), b"val2");
561 }
562
563 assert_eq!(txn.get(db, b"key1").unwrap(), b"val1");
564 assert_eq!(txn.get(db, b"key2"), Err(Error::NotFound));
565 }
566
567 #[test]
568 fn test_clear_db() {
569 let dir = TempDir::new("test").unwrap();
570 let env = Environment::new().open(dir.path()).unwrap();
571 let db = env.open_db(None).unwrap();
572
573 {
574 let mut txn = env.begin_rw_txn().unwrap();
575 txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap();
576 txn.commit().unwrap();
577 }
578
579 {
580 let mut txn = env.begin_rw_txn().unwrap();
581 txn.clear_db(db).unwrap();
582 txn.commit().unwrap();
583 }
584
585 let txn = env.begin_ro_txn().unwrap();
586 assert_eq!(txn.get(db, b"key"), Err(Error::NotFound));
587 }
588
589 #[test]
590 fn test_drop_db() {
591 let dir = TempDir::new("test").unwrap();
592 let env = Environment::new().set_max_dbs(2).open(dir.path()).unwrap();
593 let db = env.create_db(Some("test"), DatabaseFlags::empty()).unwrap();
594
595 {
596 let mut txn = env.begin_rw_txn().unwrap();
597 txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap();
598 txn.commit().unwrap();
599 }
600 {
601 let mut txn = env.begin_rw_txn().unwrap();
602 unsafe {
603 txn.drop_db(db).unwrap();
604 }
605 txn.commit().unwrap();
606 }
607
608 assert_eq!(env.open_db(Some("test")), Err(Error::NotFound));
609 }
610
611 #[test]
612 fn test_concurrent_readers_single_writer() {
613 let dir = TempDir::new("test").unwrap();
614 let env: Arc<Environment> = Arc::new(Environment::new().open(dir.path()).unwrap());
615
616 let n = 10usize; let barrier = Arc::new(Barrier::new(n + 1));
618 let mut threads: Vec<JoinHandle<bool>> = Vec::with_capacity(n);
619
620 let key = b"key";
621 let val = b"val";
622
623 for _ in 0..n {
624 let reader_env = env.clone();
625 let reader_barrier = barrier.clone();
626
627 threads.push(thread::spawn(move || {
628 let db = reader_env.open_db(None).unwrap();
629 {
630 let txn = reader_env.begin_ro_txn().unwrap();
631 assert_eq!(txn.get(db, key), Err(Error::NotFound));
632 txn.abort();
633 }
634 reader_barrier.wait();
635 reader_barrier.wait();
636 {
637 let txn = reader_env.begin_ro_txn().unwrap();
638 txn.get(db, key).unwrap() == val
639 }
640 }));
641 }
642
643 let db = env.open_db(None).unwrap();
644 let mut txn = env.begin_rw_txn().unwrap();
645 barrier.wait();
646 txn.put(db, key, val, WriteFlags::empty()).unwrap();
647 txn.commit().unwrap();
648 barrier.wait();
649
650 assert!(threads.into_iter().all(|b| b.join().unwrap()))
651 }
652
653 #[test]
654 fn test_concurrent_writers() {
655 let dir = TempDir::new("test").unwrap();
656 let env = Arc::new(Environment::new().open(dir.path()).unwrap());
657
658 let n = 10usize; let mut threads: Vec<JoinHandle<bool>> = Vec::with_capacity(n);
660
661 let key = "key";
662 let val = "val";
663
664 for i in 0..n {
665 let writer_env = env.clone();
666
667 threads.push(thread::spawn(move || {
668 let db = writer_env.open_db(None).unwrap();
669 let mut txn = writer_env.begin_rw_txn().unwrap();
670 txn.put(db, &format!("{}{}", key, i), &format!("{}{}", val, i), WriteFlags::empty()).unwrap();
671 txn.commit().is_ok()
672 }));
673 }
674 assert!(threads.into_iter().all(|b| b.join().unwrap()));
675
676 let db = env.open_db(None).unwrap();
677 let txn = env.begin_ro_txn().unwrap();
678
679 for i in 0..n {
680 assert_eq!(format!("{}{}", val, i).as_bytes(), txn.get(db, &format!("{}{}", key, i)).unwrap());
681 }
682 }
683
684 #[test]
685 fn test_stat() {
686 let dir = TempDir::new("test").unwrap();
687 let env = Environment::new().open(dir.path()).unwrap();
688 let db = env.create_db(None, DatabaseFlags::empty()).unwrap();
689
690 let mut txn = env.begin_rw_txn().unwrap();
691 txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
692 txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
693 txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap();
694 txn.commit().unwrap();
695
696 {
697 let txn = env.begin_ro_txn().unwrap();
698 let stat = txn.stat(db).unwrap();
699 assert_eq!(stat.entries(), 3);
700 }
701
702 let mut txn = env.begin_rw_txn().unwrap();
703 txn.del(db, b"key1", None).unwrap();
704 txn.del(db, b"key2", None).unwrap();
705 txn.commit().unwrap();
706
707 {
708 let txn = env.begin_ro_txn().unwrap();
709 let stat = txn.stat(db).unwrap();
710 assert_eq!(stat.entries(), 1);
711 }
712
713 let mut txn = env.begin_rw_txn().unwrap();
714 txn.put(db, b"key4", b"val4", WriteFlags::empty()).unwrap();
715 txn.put(db, b"key5", b"val5", WriteFlags::empty()).unwrap();
716 txn.put(db, b"key6", b"val6", WriteFlags::empty()).unwrap();
717 txn.commit().unwrap();
718
719 {
720 let txn = env.begin_ro_txn().unwrap();
721 let stat = txn.stat(db).unwrap();
722 assert_eq!(stat.entries(), 4);
723 }
724 }
725
726 #[test]
727 fn test_stat_dupsort() {
728 let dir = TempDir::new("test").unwrap();
729 let env = Environment::new().open(dir.path()).unwrap();
730 let db = env.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
731
732 let mut txn = env.begin_rw_txn().unwrap();
733 txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
734 txn.put(db, b"key1", b"val2", WriteFlags::empty()).unwrap();
735 txn.put(db, b"key1", b"val3", WriteFlags::empty()).unwrap();
736 txn.put(db, b"key2", b"val1", WriteFlags::empty()).unwrap();
737 txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
738 txn.put(db, b"key2", b"val3", WriteFlags::empty()).unwrap();
739 txn.put(db, b"key3", b"val1", WriteFlags::empty()).unwrap();
740 txn.put(db, b"key3", b"val2", WriteFlags::empty()).unwrap();
741 txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap();
742 txn.commit().unwrap();
743
744 {
745 let txn = env.begin_ro_txn().unwrap();
746 let stat = txn.stat(db).unwrap();
747 assert_eq!(stat.entries(), 9);
748 }
749
750 let mut txn = env.begin_rw_txn().unwrap();
751 txn.del(db, b"key1", Some(b"val2")).unwrap();
752 txn.del(db, b"key2", None).unwrap();
753 txn.commit().unwrap();
754
755 {
756 let txn = env.begin_ro_txn().unwrap();
757 let stat = txn.stat(db).unwrap();
758 assert_eq!(stat.entries(), 5);
759 }
760
761 let mut txn = env.begin_rw_txn().unwrap();
762 txn.put(db, b"key4", b"val1", WriteFlags::empty()).unwrap();
763 txn.put(db, b"key4", b"val2", WriteFlags::empty()).unwrap();
764 txn.put(db, b"key4", b"val3", WriteFlags::empty()).unwrap();
765 txn.commit().unwrap();
766
767 {
768 let txn = env.begin_ro_txn().unwrap();
769 let stat = txn.stat(db).unwrap();
770 assert_eq!(stat.entries(), 8);
771 }
772 }
773}