lmdb/
transaction.rs

1use libc::{
2    c_uint,
3    c_void,
4    size_t,
5};
6use std::marker::PhantomData;
7use std::{
8    fmt,
9    mem,
10    ptr,
11    result,
12    slice,
13};
14
15use ffi;
16
17use cursor::{
18    RoCursor,
19    RwCursor,
20};
21use database::Database;
22use environment::{
23    Environment,
24    Stat,
25};
26use error::{
27    lmdb_result,
28    Error,
29    Result,
30};
31use flags::{
32    DatabaseFlags,
33    EnvironmentFlags,
34    WriteFlags,
35};
36
37/// An LMDB transaction.
38///
39/// All database operations require a transaction.
40pub trait Transaction: Sized {
41    /// Returns a raw pointer to the underlying LMDB transaction.
42    ///
43    /// The caller **must** ensure that the pointer is not used after the
44    /// lifetime of the transaction.
45    fn txn(&self) -> *mut ffi::MDB_txn;
46
47    /// Commits the transaction.
48    ///
49    /// Any pending operations will be saved.
50    fn commit(self) -> Result<()> {
51        unsafe {
52            let result = lmdb_result(ffi::mdb_txn_commit(self.txn()));
53            mem::forget(self);
54            result
55        }
56    }
57
58    /// Aborts the transaction.
59    ///
60    /// Any pending operations will not be saved.
61    fn abort(self) {
62        // Abort should be performed in transaction destructors.
63    }
64
65    /// Opens a database in the transaction.
66    ///
67    /// If `name` is `None`, then the default database will be opened, otherwise
68    /// a named database will be opened. The database handle will be private to
69    /// the transaction until the transaction is successfully committed. If the
70    /// transaction is aborted the returned database handle should no longer be
71    /// used.
72    ///
73    /// Prefer using `Environment::open_db`.
74    ///
75    /// ## Safety
76    ///
77    /// This function (as well as `Environment::open_db`,
78    /// `Environment::create_db`, and `Database::create`) **must not** be called
79    /// from multiple concurrent transactions in the same environment. A
80    /// transaction which uses this function must finish (either commit or
81    /// abort) before any other transaction may use this function.
82    unsafe fn open_db(&self, name: Option<&str>) -> Result<Database> {
83        Database::new(self.txn(), name, 0)
84    }
85
86    /// Gets an item from a database.
87    ///
88    /// This function retrieves the data associated with the given key in the
89    /// database. If the database supports duplicate keys
90    /// (`DatabaseFlags::DUP_SORT`) then the first data item for the key will be
91    /// returned. Retrieval of other items requires the use of
92    /// `Transaction::cursor_get`. If the item is not in the database, then
93    /// `Error::NotFound` will be returned.
94    fn get<'txn, K>(&'txn self, database: Database, key: &K) -> Result<&'txn [u8]>
95    where
96        K: AsRef<[u8]>,
97    {
98        let key = key.as_ref();
99        let mut key_val: ffi::MDB_val = ffi::MDB_val {
100            mv_size: key.len() as size_t,
101            mv_data: key.as_ptr() as *mut c_void,
102        };
103        let mut data_val: ffi::MDB_val = ffi::MDB_val {
104            mv_size: 0,
105            mv_data: ptr::null_mut(),
106        };
107        unsafe {
108            match ffi::mdb_get(self.txn(), database.dbi(), &mut key_val, &mut data_val) {
109                ffi::MDB_SUCCESS => Ok(slice::from_raw_parts(data_val.mv_data as *const u8, data_val.mv_size as usize)),
110                err_code => Err(Error::from_err_code(err_code)),
111            }
112        }
113    }
114
115    /// Open a new read-only cursor on the given database.
116    fn open_ro_cursor<'txn>(&'txn self, db: Database) -> Result<RoCursor<'txn>> {
117        RoCursor::new(self, db)
118    }
119
120    /// Gets the option flags for the given database in the transaction.
121    fn db_flags(&self, db: Database) -> Result<DatabaseFlags> {
122        let mut flags: c_uint = 0;
123        unsafe {
124            lmdb_result(ffi::mdb_dbi_flags(self.txn(), db.dbi(), &mut flags))?;
125        }
126        Ok(DatabaseFlags::from_bits_truncate(flags))
127    }
128
129    /// Retrieves database statistics.
130    fn stat(&self, db: Database) -> Result<Stat> {
131        unsafe {
132            let mut stat = Stat::new();
133            lmdb_try!(ffi::mdb_stat(self.txn(), db.dbi(), stat.mdb_stat()));
134            Ok(stat)
135        }
136    }
137}
138
139/// An LMDB read-only transaction.
140pub struct RoTransaction<'env> {
141    txn: *mut ffi::MDB_txn,
142    _marker: PhantomData<&'env ()>,
143}
144
145impl<'env> fmt::Debug for RoTransaction<'env> {
146    fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
147        f.debug_struct("RoTransaction").finish()
148    }
149}
150
151impl<'env> Drop for RoTransaction<'env> {
152    fn drop(&mut self) {
153        unsafe { ffi::mdb_txn_abort(self.txn) }
154    }
155}
156
157impl<'env> RoTransaction<'env> {
158    /// Creates a new read-only transaction in the given environment. Prefer
159    /// using `Environment::begin_ro_txn`.
160    pub(crate) fn new(env: &'env Environment) -> Result<RoTransaction<'env>> {
161        let mut txn: *mut ffi::MDB_txn = ptr::null_mut();
162        unsafe {
163            lmdb_result(ffi::mdb_txn_begin(env.env(), ptr::null_mut(), ffi::MDB_RDONLY, &mut txn))?;
164            Ok(RoTransaction {
165                txn,
166                _marker: PhantomData,
167            })
168        }
169    }
170
171    /// Resets the read-only transaction.
172    ///
173    /// Abort the transaction like `Transaction::abort`, but keep the
174    /// transaction handle.  `InactiveTransaction::renew` may reuse the handle.
175    /// This saves allocation overhead if the process will start a new read-only
176    /// transaction soon, and also locking overhead if
177    /// `EnvironmentFlags::NO_TLS` is in use. The reader table lock is released,
178    /// but the table slot stays tied to its thread or transaction. Reader locks
179    /// generally don't interfere with writers, but they keep old versions of
180    /// database pages allocated. Thus they prevent the old pages from being
181    /// reused when writers commit new data, and so under heavy load the
182    /// database size may grow much more rapidly than otherwise.
183    pub fn reset(self) -> InactiveTransaction<'env> {
184        let txn = self.txn;
185        unsafe {
186            mem::forget(self);
187            ffi::mdb_txn_reset(txn)
188        };
189        InactiveTransaction {
190            txn,
191            _marker: PhantomData,
192        }
193    }
194}
195
196impl<'env> Transaction for RoTransaction<'env> {
197    fn txn(&self) -> *mut ffi::MDB_txn {
198        self.txn
199    }
200}
201
202/// An inactive read-only transaction.
203pub struct InactiveTransaction<'env> {
204    txn: *mut ffi::MDB_txn,
205    _marker: PhantomData<&'env ()>,
206}
207
208impl<'env> fmt::Debug for InactiveTransaction<'env> {
209    fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
210        f.debug_struct("InactiveTransaction").finish()
211    }
212}
213
214impl<'env> Drop for InactiveTransaction<'env> {
215    fn drop(&mut self) {
216        unsafe { ffi::mdb_txn_abort(self.txn) }
217    }
218}
219
220impl<'env> InactiveTransaction<'env> {
221    /// Renews the inactive transaction, returning an active read-only
222    /// transaction.
223    ///
224    /// This acquires a new reader lock for a transaction handle that had been
225    /// released by `RoTransaction::reset`.
226    pub fn renew(self) -> Result<RoTransaction<'env>> {
227        let txn = self.txn;
228        unsafe {
229            mem::forget(self);
230            lmdb_result(ffi::mdb_txn_renew(txn))?
231        };
232        Ok(RoTransaction {
233            txn,
234            _marker: PhantomData,
235        })
236    }
237}
238
239/// An LMDB read-write transaction.
240pub struct RwTransaction<'env> {
241    txn: *mut ffi::MDB_txn,
242    _marker: PhantomData<&'env ()>,
243}
244
245impl<'env> fmt::Debug for RwTransaction<'env> {
246    fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
247        f.debug_struct("RwTransaction").finish()
248    }
249}
250
251impl<'env> Drop for RwTransaction<'env> {
252    fn drop(&mut self) {
253        unsafe { ffi::mdb_txn_abort(self.txn) }
254    }
255}
256
257impl<'env> RwTransaction<'env> {
258    /// Creates a new read-write transaction in the given environment. Prefer
259    /// using `Environment::begin_ro_txn`.
260    pub(crate) fn new(env: &'env Environment) -> Result<RwTransaction<'env>> {
261        let mut txn: *mut ffi::MDB_txn = ptr::null_mut();
262        unsafe {
263            lmdb_result(ffi::mdb_txn_begin(env.env(), ptr::null_mut(), EnvironmentFlags::empty().bits(), &mut txn))?;
264            Ok(RwTransaction {
265                txn,
266                _marker: PhantomData,
267            })
268        }
269    }
270
271    /// Opens a database in the provided transaction, creating it if necessary.
272    ///
273    /// If `name` is `None`, then the default database will be opened, otherwise
274    /// a named database will be opened. The database handle will be private to
275    /// the transaction until the transaction is successfully committed. If the
276    /// transaction is aborted the returned database handle should no longer be
277    /// used.
278    ///
279    /// Prefer using `Environment::create_db`.
280    ///
281    /// ## Safety
282    ///
283    /// This function (as well as `Environment::open_db`,
284    /// `Environment::create_db`, and `Database::open`) **must not** be called
285    /// from multiple concurrent transactions in the same environment. A
286    /// transaction which uses this function must finish (either commit or
287    /// abort) before any other transaction may use this function.
288    pub unsafe fn create_db(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> {
289        Database::new(self.txn(), name, flags.bits() | ffi::MDB_CREATE)
290    }
291
292    /// Opens a new read-write cursor on the given database and transaction.
293    pub fn open_rw_cursor<'txn>(&'txn mut self, db: Database) -> Result<RwCursor<'txn>> {
294        RwCursor::new(self, db)
295    }
296
297    /// Stores an item into a database.
298    ///
299    /// This function stores key/data pairs in the database. The default
300    /// behavior is to enter the new key/data pair, replacing any previously
301    /// existing key if duplicates are disallowed, or adding a duplicate data
302    /// item if duplicates are allowed (`DatabaseFlags::DUP_SORT`).
303    pub fn put<K, D>(&mut self, database: Database, key: &K, data: &D, flags: WriteFlags) -> Result<()>
304    where
305        K: AsRef<[u8]>,
306        D: AsRef<[u8]>,
307    {
308        let key = key.as_ref();
309        let data = data.as_ref();
310        let mut key_val: ffi::MDB_val = ffi::MDB_val {
311            mv_size: key.len() as size_t,
312            mv_data: key.as_ptr() as *mut c_void,
313        };
314        let mut data_val: ffi::MDB_val = ffi::MDB_val {
315            mv_size: data.len() as size_t,
316            mv_data: data.as_ptr() as *mut c_void,
317        };
318        unsafe { lmdb_result(ffi::mdb_put(self.txn(), database.dbi(), &mut key_val, &mut data_val, flags.bits())) }
319    }
320
321    /// Returns a buffer which can be used to write a value into the item at the
322    /// given key and with the given length. The buffer must be completely
323    /// filled by the caller.
324    pub fn reserve<'txn, K>(
325        &'txn mut self,
326        database: Database,
327        key: &K,
328        len: size_t,
329        flags: WriteFlags,
330    ) -> Result<&'txn mut [u8]>
331    where
332        K: AsRef<[u8]>,
333    {
334        let key = key.as_ref();
335        let mut key_val: ffi::MDB_val = ffi::MDB_val {
336            mv_size: key.len() as size_t,
337            mv_data: key.as_ptr() as *mut c_void,
338        };
339        let mut data_val: ffi::MDB_val = ffi::MDB_val {
340            mv_size: len,
341            mv_data: ptr::null_mut::<c_void>(),
342        };
343        unsafe {
344            lmdb_result(ffi::mdb_put(
345                self.txn(),
346                database.dbi(),
347                &mut key_val,
348                &mut data_val,
349                flags.bits() | ffi::MDB_RESERVE,
350            ))?;
351            Ok(slice::from_raw_parts_mut(data_val.mv_data as *mut u8, data_val.mv_size as usize))
352        }
353    }
354
355    /// Deletes an item from a database.
356    ///
357    /// This function removes key/data pairs from the database. If the database
358    /// does not support sorted duplicate data items (`DatabaseFlags::DUP_SORT`)
359    /// the data parameter is ignored.  If the database supports sorted
360    /// duplicates and the data parameter is `None`, all of the duplicate data
361    /// items for the key will be deleted. Otherwise, if the data parameter is
362    /// `Some` only the matching data item will be deleted. This function will
363    /// return `Error::NotFound` if the specified key/data pair is not in the
364    /// database.
365    pub fn del<K>(&mut self, database: Database, key: &K, data: Option<&[u8]>) -> Result<()>
366    where
367        K: AsRef<[u8]>,
368    {
369        let key = key.as_ref();
370        let mut key_val: ffi::MDB_val = ffi::MDB_val {
371            mv_size: key.len() as size_t,
372            mv_data: key.as_ptr() as *mut c_void,
373        };
374        let data_val: Option<ffi::MDB_val> = data.map(|data| ffi::MDB_val {
375            mv_size: data.len() as size_t,
376            mv_data: data.as_ptr() as *mut c_void,
377        });
378
379        if let Some(mut d) = data_val {
380            unsafe { lmdb_result(ffi::mdb_del(self.txn(), database.dbi(), &mut key_val, &mut d)) }
381        } else {
382            unsafe { lmdb_result(ffi::mdb_del(self.txn(), database.dbi(), &mut key_val, ptr::null_mut())) }
383        }
384    }
385
386    /// Empties the given database. All items will be removed.
387    pub fn clear_db(&mut self, db: Database) -> Result<()> {
388        unsafe { lmdb_result(ffi::mdb_drop(self.txn(), db.dbi(), 0)) }
389    }
390
391    /// Drops the database from the environment.
392    ///
393    /// ## Safety
394    ///
395    /// This method is unsafe in the same ways as `Environment::close_db`, and
396    /// should be used accordingly.
397    pub unsafe fn drop_db(&mut self, db: Database) -> Result<()> {
398        lmdb_result(ffi::mdb_drop(self.txn, db.dbi(), 1))
399    }
400
401    /// Begins a new nested transaction inside of this transaction.
402    pub fn begin_nested_txn<'txn>(&'txn mut self) -> Result<RwTransaction<'txn>> {
403        let mut nested: *mut ffi::MDB_txn = ptr::null_mut();
404        unsafe {
405            let env: *mut ffi::MDB_env = ffi::mdb_txn_env(self.txn());
406            ffi::mdb_txn_begin(env, self.txn(), 0, &mut nested);
407        }
408        Ok(RwTransaction {
409            txn: nested,
410            _marker: PhantomData,
411        })
412    }
413}
414
415impl<'env> Transaction for RwTransaction<'env> {
416    fn txn(&self) -> *mut ffi::MDB_txn {
417        self.txn
418    }
419}
420
421#[cfg(test)]
422mod test {
423
424    use std::io::Write;
425    use std::sync::{
426        Arc,
427        Barrier,
428    };
429    use std::thread::{
430        self,
431        JoinHandle,
432    };
433
434    use tempdir::TempDir;
435
436    use super::*;
437    use cursor::Cursor;
438    use error::*;
439    use flags::*;
440
441    #[test]
442    fn test_put_get_del() {
443        let dir = TempDir::new("test").unwrap();
444        let env = Environment::new().open(dir.path()).unwrap();
445        let db = env.open_db(None).unwrap();
446
447        let mut txn = env.begin_rw_txn().unwrap();
448        txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
449        txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
450        txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap();
451        txn.commit().unwrap();
452
453        let mut txn = env.begin_rw_txn().unwrap();
454        assert_eq!(b"val1", txn.get(db, b"key1").unwrap());
455        assert_eq!(b"val2", txn.get(db, b"key2").unwrap());
456        assert_eq!(b"val3", txn.get(db, b"key3").unwrap());
457        assert_eq!(txn.get(db, b"key"), Err(Error::NotFound));
458
459        txn.del(db, b"key1", None).unwrap();
460        assert_eq!(txn.get(db, b"key1"), Err(Error::NotFound));
461    }
462
463    #[test]
464    fn test_put_get_del_multi() {
465        let dir = TempDir::new("test").unwrap();
466        let env = Environment::new().open(dir.path()).unwrap();
467        let db = env.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
468
469        let mut txn = env.begin_rw_txn().unwrap();
470        txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
471        txn.put(db, b"key1", b"val2", WriteFlags::empty()).unwrap();
472        txn.put(db, b"key1", b"val3", WriteFlags::empty()).unwrap();
473        txn.put(db, b"key2", b"val1", WriteFlags::empty()).unwrap();
474        txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
475        txn.put(db, b"key2", b"val3", WriteFlags::empty()).unwrap();
476        txn.put(db, b"key3", b"val1", WriteFlags::empty()).unwrap();
477        txn.put(db, b"key3", b"val2", WriteFlags::empty()).unwrap();
478        txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap();
479        txn.commit().unwrap();
480
481        let txn = env.begin_rw_txn().unwrap();
482        {
483            let mut cur = txn.open_ro_cursor(db).unwrap();
484            let iter = cur.iter_dup_of(b"key1");
485            let vals = iter.map(|x| x.unwrap()).map(|(_, x)| x).collect::<Vec<_>>();
486            assert_eq!(vals, vec![b"val1", b"val2", b"val3"]);
487        }
488        txn.commit().unwrap();
489
490        let mut txn = env.begin_rw_txn().unwrap();
491        txn.del(db, b"key1", Some(b"val2")).unwrap();
492        txn.del(db, b"key2", None).unwrap();
493        txn.commit().unwrap();
494
495        let txn = env.begin_rw_txn().unwrap();
496        {
497            let mut cur = txn.open_ro_cursor(db).unwrap();
498            let iter = cur.iter_dup_of(b"key1");
499            let vals = iter.map(|x| x.unwrap()).map(|(_, x)| x).collect::<Vec<_>>();
500            assert_eq!(vals, vec![b"val1", b"val3"]);
501
502            let iter = cur.iter_dup_of(b"key2");
503            assert_eq!(0, iter.count());
504        }
505        txn.commit().unwrap();
506    }
507
508    #[test]
509    fn test_reserve() {
510        let dir = TempDir::new("test").unwrap();
511        let env = Environment::new().open(dir.path()).unwrap();
512        let db = env.open_db(None).unwrap();
513
514        let mut txn = env.begin_rw_txn().unwrap();
515        {
516            let mut writer = txn.reserve(db, b"key1", 4, WriteFlags::empty()).unwrap();
517            writer.write_all(b"val1").unwrap();
518        }
519        txn.commit().unwrap();
520
521        let mut txn = env.begin_rw_txn().unwrap();
522        assert_eq!(b"val1", txn.get(db, b"key1").unwrap());
523        assert_eq!(txn.get(db, b"key"), Err(Error::NotFound));
524
525        txn.del(db, b"key1", None).unwrap();
526        assert_eq!(txn.get(db, b"key1"), Err(Error::NotFound));
527    }
528
529    #[test]
530    fn test_inactive_txn() {
531        let dir = TempDir::new("test").unwrap();
532        let env = Environment::new().open(dir.path()).unwrap();
533        let db = env.open_db(None).unwrap();
534
535        {
536            let mut txn = env.begin_rw_txn().unwrap();
537            txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap();
538            txn.commit().unwrap();
539        }
540
541        let txn = env.begin_ro_txn().unwrap();
542        let inactive = txn.reset();
543        let active = inactive.renew().unwrap();
544        assert!(active.get(db, b"key").is_ok());
545    }
546
547    #[test]
548    fn test_nested_txn() {
549        let dir = TempDir::new("test").unwrap();
550        let env = Environment::new().open(dir.path()).unwrap();
551        let db = env.open_db(None).unwrap();
552
553        let mut txn = env.begin_rw_txn().unwrap();
554        txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
555
556        {
557            let mut nested = txn.begin_nested_txn().unwrap();
558            nested.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
559            assert_eq!(nested.get(db, b"key1").unwrap(), b"val1");
560            assert_eq!(nested.get(db, b"key2").unwrap(), b"val2");
561        }
562
563        assert_eq!(txn.get(db, b"key1").unwrap(), b"val1");
564        assert_eq!(txn.get(db, b"key2"), Err(Error::NotFound));
565    }
566
567    #[test]
568    fn test_clear_db() {
569        let dir = TempDir::new("test").unwrap();
570        let env = Environment::new().open(dir.path()).unwrap();
571        let db = env.open_db(None).unwrap();
572
573        {
574            let mut txn = env.begin_rw_txn().unwrap();
575            txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap();
576            txn.commit().unwrap();
577        }
578
579        {
580            let mut txn = env.begin_rw_txn().unwrap();
581            txn.clear_db(db).unwrap();
582            txn.commit().unwrap();
583        }
584
585        let txn = env.begin_ro_txn().unwrap();
586        assert_eq!(txn.get(db, b"key"), Err(Error::NotFound));
587    }
588
589    #[test]
590    fn test_drop_db() {
591        let dir = TempDir::new("test").unwrap();
592        let env = Environment::new().set_max_dbs(2).open(dir.path()).unwrap();
593        let db = env.create_db(Some("test"), DatabaseFlags::empty()).unwrap();
594
595        {
596            let mut txn = env.begin_rw_txn().unwrap();
597            txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap();
598            txn.commit().unwrap();
599        }
600        {
601            let mut txn = env.begin_rw_txn().unwrap();
602            unsafe {
603                txn.drop_db(db).unwrap();
604            }
605            txn.commit().unwrap();
606        }
607
608        assert_eq!(env.open_db(Some("test")), Err(Error::NotFound));
609    }
610
611    #[test]
612    fn test_concurrent_readers_single_writer() {
613        let dir = TempDir::new("test").unwrap();
614        let env: Arc<Environment> = Arc::new(Environment::new().open(dir.path()).unwrap());
615
616        let n = 10usize; // Number of concurrent readers
617        let barrier = Arc::new(Barrier::new(n + 1));
618        let mut threads: Vec<JoinHandle<bool>> = Vec::with_capacity(n);
619
620        let key = b"key";
621        let val = b"val";
622
623        for _ in 0..n {
624            let reader_env = env.clone();
625            let reader_barrier = barrier.clone();
626
627            threads.push(thread::spawn(move || {
628                let db = reader_env.open_db(None).unwrap();
629                {
630                    let txn = reader_env.begin_ro_txn().unwrap();
631                    assert_eq!(txn.get(db, key), Err(Error::NotFound));
632                    txn.abort();
633                }
634                reader_barrier.wait();
635                reader_barrier.wait();
636                {
637                    let txn = reader_env.begin_ro_txn().unwrap();
638                    txn.get(db, key).unwrap() == val
639                }
640            }));
641        }
642
643        let db = env.open_db(None).unwrap();
644        let mut txn = env.begin_rw_txn().unwrap();
645        barrier.wait();
646        txn.put(db, key, val, WriteFlags::empty()).unwrap();
647        txn.commit().unwrap();
648        barrier.wait();
649
650        assert!(threads.into_iter().all(|b| b.join().unwrap()))
651    }
652
653    #[test]
654    fn test_concurrent_writers() {
655        let dir = TempDir::new("test").unwrap();
656        let env = Arc::new(Environment::new().open(dir.path()).unwrap());
657
658        let n = 10usize; // Number of concurrent writers
659        let mut threads: Vec<JoinHandle<bool>> = Vec::with_capacity(n);
660
661        let key = "key";
662        let val = "val";
663
664        for i in 0..n {
665            let writer_env = env.clone();
666
667            threads.push(thread::spawn(move || {
668                let db = writer_env.open_db(None).unwrap();
669                let mut txn = writer_env.begin_rw_txn().unwrap();
670                txn.put(db, &format!("{}{}", key, i), &format!("{}{}", val, i), WriteFlags::empty()).unwrap();
671                txn.commit().is_ok()
672            }));
673        }
674        assert!(threads.into_iter().all(|b| b.join().unwrap()));
675
676        let db = env.open_db(None).unwrap();
677        let txn = env.begin_ro_txn().unwrap();
678
679        for i in 0..n {
680            assert_eq!(format!("{}{}", val, i).as_bytes(), txn.get(db, &format!("{}{}", key, i)).unwrap());
681        }
682    }
683
684    #[test]
685    fn test_stat() {
686        let dir = TempDir::new("test").unwrap();
687        let env = Environment::new().open(dir.path()).unwrap();
688        let db = env.create_db(None, DatabaseFlags::empty()).unwrap();
689
690        let mut txn = env.begin_rw_txn().unwrap();
691        txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
692        txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
693        txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap();
694        txn.commit().unwrap();
695
696        {
697            let txn = env.begin_ro_txn().unwrap();
698            let stat = txn.stat(db).unwrap();
699            assert_eq!(stat.entries(), 3);
700        }
701
702        let mut txn = env.begin_rw_txn().unwrap();
703        txn.del(db, b"key1", None).unwrap();
704        txn.del(db, b"key2", None).unwrap();
705        txn.commit().unwrap();
706
707        {
708            let txn = env.begin_ro_txn().unwrap();
709            let stat = txn.stat(db).unwrap();
710            assert_eq!(stat.entries(), 1);
711        }
712
713        let mut txn = env.begin_rw_txn().unwrap();
714        txn.put(db, b"key4", b"val4", WriteFlags::empty()).unwrap();
715        txn.put(db, b"key5", b"val5", WriteFlags::empty()).unwrap();
716        txn.put(db, b"key6", b"val6", WriteFlags::empty()).unwrap();
717        txn.commit().unwrap();
718
719        {
720            let txn = env.begin_ro_txn().unwrap();
721            let stat = txn.stat(db).unwrap();
722            assert_eq!(stat.entries(), 4);
723        }
724    }
725
726    #[test]
727    fn test_stat_dupsort() {
728        let dir = TempDir::new("test").unwrap();
729        let env = Environment::new().open(dir.path()).unwrap();
730        let db = env.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
731
732        let mut txn = env.begin_rw_txn().unwrap();
733        txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
734        txn.put(db, b"key1", b"val2", WriteFlags::empty()).unwrap();
735        txn.put(db, b"key1", b"val3", WriteFlags::empty()).unwrap();
736        txn.put(db, b"key2", b"val1", WriteFlags::empty()).unwrap();
737        txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
738        txn.put(db, b"key2", b"val3", WriteFlags::empty()).unwrap();
739        txn.put(db, b"key3", b"val1", WriteFlags::empty()).unwrap();
740        txn.put(db, b"key3", b"val2", WriteFlags::empty()).unwrap();
741        txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap();
742        txn.commit().unwrap();
743
744        {
745            let txn = env.begin_ro_txn().unwrap();
746            let stat = txn.stat(db).unwrap();
747            assert_eq!(stat.entries(), 9);
748        }
749
750        let mut txn = env.begin_rw_txn().unwrap();
751        txn.del(db, b"key1", Some(b"val2")).unwrap();
752        txn.del(db, b"key2", None).unwrap();
753        txn.commit().unwrap();
754
755        {
756            let txn = env.begin_ro_txn().unwrap();
757            let stat = txn.stat(db).unwrap();
758            assert_eq!(stat.entries(), 5);
759        }
760
761        let mut txn = env.begin_rw_txn().unwrap();
762        txn.put(db, b"key4", b"val1", WriteFlags::empty()).unwrap();
763        txn.put(db, b"key4", b"val2", WriteFlags::empty()).unwrap();
764        txn.put(db, b"key4", b"val3", WriteFlags::empty()).unwrap();
765        txn.commit().unwrap();
766
767        {
768            let txn = env.begin_ro_txn().unwrap();
769            let stat = txn.stat(db).unwrap();
770            assert_eq!(stat.entries(), 8);
771        }
772    }
773}