sqlx_mysql/
transaction.rs

1use std::borrow::Cow;
2
3use futures_core::future::BoxFuture;
4
5use crate::connection::Waiting;
6use crate::error::Error;
7use crate::executor::Executor;
8use crate::protocol::text::Query;
9use crate::{MySql, MySqlConnection};
10
11pub(crate) use sqlx_core::transaction::*;
12
13/// Implementation of [`TransactionManager`] for MySQL.
14pub struct MySqlTransactionManager;
15
16impl TransactionManager for MySqlTransactionManager {
17    type Database = MySql;
18
19    fn begin<'conn>(
20        conn: &'conn mut MySqlConnection,
21        statement: Option<Cow<'static, str>>,
22    ) -> BoxFuture<'conn, Result<(), Error>> {
23        Box::pin(async move {
24            let depth = conn.inner.transaction_depth;
25            let statement = match statement {
26                // custom `BEGIN` statements are not allowed if we're already in a transaction
27                // (we need to issue a `SAVEPOINT` instead)
28                Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
29                Some(statement) => statement,
30                None => begin_ansi_transaction_sql(depth),
31            };
32            conn.execute(&*statement).await?;
33            if !conn.in_transaction() {
34                return Err(Error::BeginFailed);
35            }
36            conn.inner.transaction_depth += 1;
37
38            Ok(())
39        })
40    }
41
42    fn commit(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
43        Box::pin(async move {
44            let depth = conn.inner.transaction_depth;
45
46            if depth > 0 {
47                conn.execute(&*commit_ansi_transaction_sql(depth)).await?;
48                conn.inner.transaction_depth = depth - 1;
49            }
50
51            Ok(())
52        })
53    }
54
55    fn rollback(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
56        Box::pin(async move {
57            let depth = conn.inner.transaction_depth;
58
59            if depth > 0 {
60                conn.execute(&*rollback_ansi_transaction_sql(depth)).await?;
61                conn.inner.transaction_depth = depth - 1;
62            }
63
64            Ok(())
65        })
66    }
67
68    fn start_rollback(conn: &mut MySqlConnection) {
69        let depth = conn.inner.transaction_depth;
70
71        if depth > 0 {
72            conn.inner.stream.waiting.push_back(Waiting::Result);
73            conn.inner.stream.sequence_id = 0;
74            conn.inner
75                .stream
76                .write_packet(Query(&rollback_ansi_transaction_sql(depth)))
77                .expect("BUG: unexpected error queueing ROLLBACK");
78
79            conn.inner.transaction_depth = depth - 1;
80        }
81    }
82
83    fn get_transaction_depth(conn: &MySqlConnection) -> usize {
84        conn.inner.transaction_depth
85    }
86}