sqlx_mysql/
transaction.rs1use std::borrow::Cow;
2
3use futures_core::future::BoxFuture;
4
5use crate::connection::Waiting;
6use crate::error::Error;
7use crate::executor::Executor;
8use crate::protocol::text::Query;
9use crate::{MySql, MySqlConnection};
10
11pub(crate) use sqlx_core::transaction::*;
12
13pub struct MySqlTransactionManager;
15
16impl TransactionManager for MySqlTransactionManager {
17 type Database = MySql;
18
19 fn begin<'conn>(
20 conn: &'conn mut MySqlConnection,
21 statement: Option<Cow<'static, str>>,
22 ) -> BoxFuture<'conn, Result<(), Error>> {
23 Box::pin(async move {
24 let depth = conn.inner.transaction_depth;
25 let statement = match statement {
26 Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
29 Some(statement) => statement,
30 None => begin_ansi_transaction_sql(depth),
31 };
32 conn.execute(&*statement).await?;
33 if !conn.in_transaction() {
34 return Err(Error::BeginFailed);
35 }
36 conn.inner.transaction_depth += 1;
37
38 Ok(())
39 })
40 }
41
42 fn commit(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
43 Box::pin(async move {
44 let depth = conn.inner.transaction_depth;
45
46 if depth > 0 {
47 conn.execute(&*commit_ansi_transaction_sql(depth)).await?;
48 conn.inner.transaction_depth = depth - 1;
49 }
50
51 Ok(())
52 })
53 }
54
55 fn rollback(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
56 Box::pin(async move {
57 let depth = conn.inner.transaction_depth;
58
59 if depth > 0 {
60 conn.execute(&*rollback_ansi_transaction_sql(depth)).await?;
61 conn.inner.transaction_depth = depth - 1;
62 }
63
64 Ok(())
65 })
66 }
67
68 fn start_rollback(conn: &mut MySqlConnection) {
69 let depth = conn.inner.transaction_depth;
70
71 if depth > 0 {
72 conn.inner.stream.waiting.push_back(Waiting::Result);
73 conn.inner.stream.sequence_id = 0;
74 conn.inner
75 .stream
76 .write_packet(Query(&rollback_ansi_transaction_sql(depth)))
77 .expect("BUG: unexpected error queueing ROLLBACK");
78
79 conn.inner.transaction_depth = depth - 1;
80 }
81 }
82
83 fn get_transaction_depth(conn: &MySqlConnection) -> usize {
84 conn.inner.transaction_depth
85 }
86}