ckb_db/
transaction.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
//! RocksDB optimistic transaction wrapper
use crate::db::cf_handle;
use crate::{internal_error, Result};
use ckb_db_schema::Col;
use rocksdb::ops::{DeleteCF, GetPinnedCF, PutCF};
pub use rocksdb::{DBPinnableSlice, DBVector};
use rocksdb::{
    OptimisticTransaction, OptimisticTransactionDB, OptimisticTransactionSnapshot, ReadOptions,
};
use std::sync::Arc;

/// An optimistic transaction database.
pub struct RocksDBTransaction {
    pub(crate) db: Arc<OptimisticTransactionDB>,
    pub(crate) inner: OptimisticTransaction,
}

impl RocksDBTransaction {
    /// Return the bytes associated with the given key and given column.
    pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
        let cf = cf_handle(&self.db, col)?;
        self.inner.get_pinned_cf(cf, key).map_err(internal_error)
    }

    /// Write the bytes into the given column with associated key.
    pub fn put(&self, col: Col, key: &[u8], value: &[u8]) -> Result<()> {
        let cf = cf_handle(&self.db, col)?;
        self.inner.put_cf(cf, key, value).map_err(internal_error)
    }

    /// Delete the data associated with the given key and given column.
    pub fn delete(&self, col: Col, key: &[u8]) -> Result<()> {
        let cf = cf_handle(&self.db, col)?;
        self.inner.delete_cf(cf, key).map_err(internal_error)
    }

    /// Read a key and make the read value a precondition for transaction commit.
    pub fn get_for_update(
        &self,
        col: Col,
        key: &[u8],
        snapshot: &RocksDBTransactionSnapshot<'_>,
    ) -> Result<Option<DBVector>> {
        let cf = cf_handle(&self.db, col)?;
        let mut opts = ReadOptions::default();
        opts.set_snapshot(&snapshot.inner);
        self.inner
            .get_for_update_cf_opt(cf, key, &opts, true)
            .map_err(internal_error)
    }

    /// Commit the transaction.
    pub fn commit(&self) -> Result<()> {
        self.inner.commit().map_err(internal_error)
    }

    /// Rollback the transaction.
    pub fn rollback(&self) -> Result<()> {
        self.inner.rollback().map_err(internal_error)
    }

    /// Return `RocksDBTransactionSnapshot`
    pub fn get_snapshot(&self) -> RocksDBTransactionSnapshot<'_> {
        RocksDBTransactionSnapshot {
            db: Arc::clone(&self.db),
            inner: self.inner.snapshot(),
        }
    }

    /// Set savepoint for transaction.
    pub fn set_savepoint(&self) {
        self.inner.set_savepoint()
    }

    /// Rollback the transaction to savepoint.
    pub fn rollback_to_savepoint(&self) -> Result<()> {
        self.inner.rollback_to_savepoint().map_err(internal_error)
    }
}

/// A snapshot captures a point-in-time view of the transaction at the time it's created
pub struct RocksDBTransactionSnapshot<'a> {
    pub(crate) db: Arc<OptimisticTransactionDB>,
    pub(crate) inner: OptimisticTransactionSnapshot<'a>,
}

impl<'a> RocksDBTransactionSnapshot<'a> {
    /// Return the bytes associated with the given key and given column.
    pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
        let cf = cf_handle(&self.db, col)?;
        self.inner.get_pinned_cf(cf, key).map_err(internal_error)
    }
}