sqlx_postgres/
transaction.rs

1use futures_core::future::BoxFuture;
2
3use crate::error::Error;
4use crate::executor::Executor;
5
6use crate::{PgConnection, Postgres};
7
8pub(crate) use sqlx_core::transaction::*;
9
10/// Implementation of [`TransactionManager`] for PostgreSQL.
11pub struct PgTransactionManager;
12
13impl TransactionManager for PgTransactionManager {
14    type Database = Postgres;
15
16    fn begin(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
17        Box::pin(async move {
18            let rollback = Rollback::new(conn);
19            let query = begin_ansi_transaction_sql(rollback.conn.inner.transaction_depth);
20            rollback.conn.queue_simple_query(&query)?;
21            rollback.conn.inner.transaction_depth += 1;
22            rollback.conn.wait_until_ready().await?;
23            rollback.defuse();
24
25            Ok(())
26        })
27    }
28
29    fn commit(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
30        Box::pin(async move {
31            if conn.inner.transaction_depth > 0 {
32                conn.execute(&*commit_ansi_transaction_sql(conn.inner.transaction_depth))
33                    .await?;
34
35                conn.inner.transaction_depth -= 1;
36            }
37
38            Ok(())
39        })
40    }
41
42    fn rollback(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
43        Box::pin(async move {
44            if conn.inner.transaction_depth > 0 {
45                conn.execute(&*rollback_ansi_transaction_sql(
46                    conn.inner.transaction_depth,
47                ))
48                .await?;
49
50                conn.inner.transaction_depth -= 1;
51            }
52
53            Ok(())
54        })
55    }
56
57    fn start_rollback(conn: &mut PgConnection) {
58        if conn.inner.transaction_depth > 0 {
59            conn.queue_simple_query(&rollback_ansi_transaction_sql(conn.inner.transaction_depth))
60                .expect("BUG: Rollback query somehow too large for protocol");
61
62            conn.inner.transaction_depth -= 1;
63        }
64    }
65}
66
67struct Rollback<'c> {
68    conn: &'c mut PgConnection,
69    defuse: bool,
70}
71
72impl Drop for Rollback<'_> {
73    fn drop(&mut self) {
74        if !self.defuse {
75            PgTransactionManager::start_rollback(self.conn)
76        }
77    }
78}
79
80impl<'c> Rollback<'c> {
81    fn new(conn: &'c mut PgConnection) -> Self {
82        Self {
83            conn,
84            defuse: false,
85        }
86    }
87    fn defuse(mut self) {
88        self.defuse = true;
89    }
90}