pub struct Transaction<Cod: Codec = ApiV1TxnCodec, PdC: PdClient<Codec = Cod> = PdRpcClient<Cod>> { /* private fields */ }
Expand description

An undo-able set of actions on the dataset.

Create a transaction using a TransactionClient, then run actions (such as get, or put) on the transaction. Reads are executed immediately, writes are buffered locally. Once complete, commit the transaction. Behind the scenes, the client will perform a two phase commit and return success as soon as the writes are guaranteed to be committed (some finalisation may continue in the background after the return, but no data can be lost).

TiKV transactions use multi-version concurrency control. All reads logically happen at the start of the transaction (at the start timestamp, start_ts). Once a transaction is commited, a its writes atomically become visible to other transactions at (logically) the commit timestamp.

In other words, a transaction can read data that was committed at commit_ts < its start_ts, and its writes are readable by transactions with start_ts >= its commit_ts.

Mutations are buffered locally and sent to the TiKV cluster at the time of commit. In a pessimistic transaction, all write operations and xxx_for_update operations will immediately acquire locks from TiKV. Such a lock blocks other transactions from writing to that key. A lock exists until the transaction is committed or rolled back, or the lock reaches its time to live (TTL).

For details, the SIG-Transaction provides materials explaining designs and implementations of TiKV transactions.

Examples

let client = TransactionClient::new(vec!["192.168.0.100"]).await.unwrap();
let mut txn = client.begin_optimistic().await.unwrap();
let foo = txn.get("foo".to_owned()).await.unwrap().unwrap();
txn.put("bar".to_owned(), foo).await.unwrap();
txn.commit().await.unwrap();

Implementations§

source§

impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC>

source

pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>>

Create a new ‘get’ request

Once resolved this request will result in the fetching of the value associated with the given key.

Retuning Ok(None) indicates the key does not exist in TiKV.

Examples
let mut txn = client.begin_optimistic().await.unwrap();
let key = "TiKV".to_owned();
let result: Option<Value> = txn.get(key).await.unwrap();
source

pub async fn get_for_update( &mut self, key: impl Into<Key> ) -> Result<Option<Value>>

Create a get for update request.

The request reads and “locks” a key. It is similar to SELECT ... FOR UPDATE in TiDB, and has different behavior in optimistic and pessimistic transactions.

Optimistic transaction

It reads at the “start timestamp” and caches the value, just like normal get requests. The lock is written in prewrite and commit, so it cannot prevent concurrent transactions from writing the same key, but can only prevent itself from committing.

Pessimistic transaction

It reads at the “current timestamp” and thus does not cache the value. So following read requests won’t be affected by the get_for_udpate. A lock will be acquired immediately with this request, which prevents concurrent transactions from mutating the keys.

The “current timestamp” (also called for_update_ts of the request) is fetched from PD.

Note: The behavior of this command under pessimistic transaction does not follow snapshot. It reads the latest value (using current timestamp), and the value is not cached in the local buffer. So normal get-like commands after get_for_update will not be influenced, they still read values at the transaction’s start_ts.

Examples
let mut txn = client.begin_pessimistic().await.unwrap();
let key = "TiKV".to_owned();
let result: Value = txn.get_for_update(key).await.unwrap().unwrap();
// now the key "TiKV" is locked, other transactions cannot modify it
// Finish the transaction...
txn.commit().await.unwrap();
source

pub async fn key_exists(&mut self, key: impl Into<Key>) -> Result<bool>

Check whether a key exists.

Examples
let mut txn = client.begin_pessimistic().await.unwrap();
let exists = txn.key_exists("k1".to_owned()).await.unwrap();
txn.commit().await.unwrap();
source

pub async fn batch_get( &mut self, keys: impl IntoIterator<Item = impl Into<Key>> ) -> Result<impl Iterator<Item = KvPair>>

Create a new ‘batch get’ request.

Once resolved this request will result in the fetching of the values associated with the given keys.

Non-existent entries will not appear in the result. The order of the keys is not retained in the result.

Examples
let mut txn = client.begin_optimistic().await.unwrap();
let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
let result: HashMap<Key, Value> = txn
    .batch_get(keys)
    .await
    .unwrap()
    .map(|pair| (pair.0, pair.1))
    .collect();
// Finish the transaction...
txn.commit().await.unwrap();
source

pub async fn batch_get_for_update( &mut self, keys: impl IntoIterator<Item = impl Into<Key>> ) -> Result<Vec<KvPair>>

Create a new ‘batch get for update’ request.

Similar to get_for_update, but it works for a batch of keys.

Non-existent entries will not appear in the result. The order of the keys is not retained in the result.

Examples
let mut txn = client.begin_pessimistic().await.unwrap();
let keys = vec!["foo".to_owned(), "bar".to_owned()];
let result: Vec<KvPair> = txn
    .batch_get_for_update(keys)
    .await
    .unwrap();
// now "foo" and "bar" are both locked
// Finish the transaction...
txn.commit().await.unwrap();
source

pub async fn scan( &mut self, range: impl Into<BoundRange>, limit: u32 ) -> Result<impl Iterator<Item = KvPair>>

Create a new ‘scan’ request.

Once resolved this request will result in a Vec of all key-value pairs that lie in the specified range.

If the number of eligible key-value pairs are greater than limit, only the first limit pairs are returned, ordered by key.

Examples
let mut txn = client.begin_optimistic().await.unwrap();
let key1: Key = b"foo".to_vec().into();
let key2: Key = b"bar".to_vec().into();
let result: Vec<KvPair> = txn
    .scan(key1..key2, 10)
    .await
    .unwrap()
    .collect();
// Finish the transaction...
txn.commit().await.unwrap();
source

pub async fn scan_keys( &mut self, range: impl Into<BoundRange>, limit: u32 ) -> Result<impl Iterator<Item = Key>>

Create a new ‘scan’ request that only returns the keys.

Once resolved this request will result in a Vec of keys that lies in the specified range.

If the number of eligible keys are greater than limit, only the first limit keys are returned, ordered by key.

Examples
let mut txn = client.begin_optimistic().await.unwrap();
let key1: Key = b"foo".to_vec().into();
let key2: Key = b"bar".to_vec().into();
let result: Vec<Key> = txn
    .scan_keys(key1..key2, 10)
    .await
    .unwrap()
    .collect();
// Finish the transaction...
txn.commit().await.unwrap();
source

pub async fn scan_reverse( &mut self, range: impl Into<BoundRange>, limit: u32 ) -> Result<impl Iterator<Item = KvPair>>

Create a ‘scan_reverse’ request.

Similar to scan, but scans in the reverse direction.

source

pub async fn scan_keys_reverse( &mut self, range: impl Into<BoundRange>, limit: u32 ) -> Result<impl Iterator<Item = Key>>

Create a ‘scan_keys_reverse’ request.

Similar to scan, but scans in the reverse direction.

source

pub async fn put( &mut self, key: impl Into<Key>, value: impl Into<Value> ) -> Result<()>

Sets the value associated with the given key.

Examples
let mut txn = client.begin_optimistic().await.unwrap();
let key = "foo".to_owned();
let val = "FOO".to_owned();
txn.put(key, val);
txn.commit().await.unwrap();
source

pub async fn insert( &mut self, key: impl Into<Key>, value: impl Into<Value> ) -> Result<()>

Inserts the value associated with the given key.

Similar to [`put’], but it has an additional constraint that the key should not exist before this operation.

Examples
let mut txn = client.begin_optimistic().await.unwrap();
let key = "foo".to_owned();
let val = "FOO".to_owned();
txn.insert(key, val);
txn.commit().await.unwrap();
source

pub async fn delete(&mut self, key: impl Into<Key>) -> Result<()>

Deletes the given key and its value from the database.

Deleting a non-existent key will not result in an error.

Examples
let mut txn = client.begin_optimistic().await.unwrap();
let key = "foo".to_owned();
txn.delete(key);
txn.commit().await.unwrap();
source

pub async fn batch_mutate( &mut self, mutations: impl IntoIterator<Item = Mutation> ) -> Result<()>

Batch mutate the database.

Only Put and Delete are supported.

Examples
let mut txn = client.begin_optimistic().await.unwrap();
let mutations = vec![
    kvrpcpb::Mutation {
        op: kvrpcpb::Op::Del.into(),
        key: b"k0".to_vec(),
        ..Default::default()
    },
    kvrpcpb::Mutation {
        op: kvrpcpb::Op::Put.into(),
        key: b"k1".to_vec(),
        value: b"v1".to_vec(),
        ..Default::default()
    },
];
txn.batch_mutate(mutations).await.unwrap();
txn.commit().await.unwrap();
source

pub async fn lock_keys( &mut self, keys: impl IntoIterator<Item = impl Into<Key>> ) -> Result<()>

Lock the given keys without mutating their values.

In optimistic mode, write conflicts are not checked until commit. So use this command to indicate that “I do not want to commit if the value associated with this key has been modified”. It’s useful to avoid the write skew anomaly.

In pessimistic mode, it is similar to batch_get_for_update, except that it does not read values.

Examples
let mut txn = client.begin_optimistic().await.unwrap();
txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]);
// ... Do some actions.
txn.commit().await.unwrap();
source

pub async fn commit(&mut self) -> Result<Option<Timestamp>>

Commits the actions of the transaction. On success, we return the commit timestamp (or None if there was nothing to commit).

Examples
let mut txn = client.begin_optimistic().await.unwrap();
// ... Do some actions.
let result: Timestamp = txn.commit().await.unwrap().unwrap();
source

pub async fn rollback(&mut self) -> Result<()>

Rollback the transaction.

If it succeeds, all mutations made by this transaction will be discarded.

Examples
let mut txn = client.begin_optimistic().await.unwrap();
// ... Do some actions.
txn.rollback().await.unwrap();
source

pub fn start_timestamp(&self) -> Timestamp

Get the start timestamp of this transaction.

Trait Implementations§

source§

impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Drop for Transaction<Cod, PdC>

source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl<Cod, PdC> RefUnwindSafe for Transaction<Cod, PdC>where Cod: RefUnwindSafe, PdC: RefUnwindSafe,

§

impl<Cod, PdC> Send for Transaction<Cod, PdC>

§

impl<Cod, PdC> Sync for Transaction<Cod, PdC>

§

impl<Cod, PdC> Unpin for Transaction<Cod, PdC>where Cod: Unpin,

§

impl<Cod, PdC> UnwindSafe for Transaction<Cod, PdC>where Cod: UnwindSafe, PdC: RefUnwindSafe,

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for Twhere V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more