sqlx_core/
transaction.rs

1use std::borrow::Cow;
2use std::fmt::{self, Debug, Formatter};
3use std::ops::{Deref, DerefMut};
4
5use futures_core::future::BoxFuture;
6
7use crate::database::Database;
8use crate::error::Error;
9use crate::pool::MaybePoolConnection;
10
11/// Generic management of database transactions.
12///
13/// This trait should not be used, except when implementing [`Connection`].
14#[doc(hidden)]
15pub trait TransactionManager {
16    type Database: Database;
17
18    /// Begin a new transaction or establish a savepoint within the active transaction.
19    fn begin(
20        conn: &mut <Self::Database as Database>::Connection,
21    ) -> BoxFuture<'_, Result<(), Error>>;
22
23    /// Commit the active transaction or release the most recent savepoint.
24    fn commit(
25        conn: &mut <Self::Database as Database>::Connection,
26    ) -> BoxFuture<'_, Result<(), Error>>;
27
28    /// Abort the active transaction or restore from the most recent savepoint.
29    fn rollback(
30        conn: &mut <Self::Database as Database>::Connection,
31    ) -> BoxFuture<'_, Result<(), Error>>;
32
33    /// Starts to abort the active transaction or restore from the most recent snapshot.
34    fn start_rollback(conn: &mut <Self::Database as Database>::Connection);
35}
36
37/// An in-progress database transaction or savepoint.
38///
39/// A transaction starts with a call to [`Pool::begin`] or [`Connection::begin`].
40///
41/// A transaction should end with a call to [`commit`] or [`rollback`]. If neither are called
42/// before the transaction goes out-of-scope, [`rollback`] is called. In other
43/// words, [`rollback`] is called on `drop` if the transaction is still in-progress.
44///
45/// A savepoint is a special mark inside a transaction that allows all commands that are
46/// executed after it was established to be rolled back, restoring the transaction state to
47/// what it was at the time of the savepoint.
48///
49/// A transaction can be used as an [`Executor`] when performing queries:
50/// ```rust,no_run
51/// # use sqlx_core::acquire::Acquire;
52/// # async fn example() -> sqlx::Result<()> {
53/// # let id = 1;
54/// # let mut conn: sqlx::PgConnection = unimplemented!();
55/// let mut tx = conn.begin().await?;
56///
57/// let result = sqlx::query("DELETE FROM \"testcases\" WHERE id = $1")
58///     .bind(id)
59///     .execute(&mut *tx)
60///     .await?
61///     .rows_affected();
62///
63/// tx.commit().await
64/// # }
65/// ```
66/// [`Executor`]: crate::executor::Executor
67/// [`Connection::begin`]: crate::connection::Connection::begin()
68/// [`Pool::begin`]: crate::pool::Pool::begin()
69/// [`commit`]: Self::commit()
70/// [`rollback`]: Self::rollback()
71pub struct Transaction<'c, DB>
72where
73    DB: Database,
74{
75    connection: MaybePoolConnection<'c, DB>,
76    open: bool,
77}
78
79impl<'c, DB> Transaction<'c, DB>
80where
81    DB: Database,
82{
83    #[doc(hidden)]
84    pub fn begin(
85        conn: impl Into<MaybePoolConnection<'c, DB>>,
86    ) -> BoxFuture<'c, Result<Self, Error>> {
87        let mut conn = conn.into();
88
89        Box::pin(async move {
90            DB::TransactionManager::begin(&mut conn).await?;
91
92            Ok(Self {
93                connection: conn,
94                open: true,
95            })
96        })
97    }
98
99    /// Commits this transaction or savepoint.
100    pub async fn commit(mut self) -> Result<(), Error> {
101        DB::TransactionManager::commit(&mut self.connection).await?;
102        self.open = false;
103
104        Ok(())
105    }
106
107    /// Aborts this transaction or savepoint.
108    pub async fn rollback(mut self) -> Result<(), Error> {
109        DB::TransactionManager::rollback(&mut self.connection).await?;
110        self.open = false;
111
112        Ok(())
113    }
114}
115
116// NOTE: fails to compile due to lack of lazy normalization
117// impl<'c, 't, DB: Database> crate::executor::Executor<'t>
118//     for &'t mut crate::transaction::Transaction<'c, DB>
119// where
120//     &'c mut DB::Connection: Executor<'c, Database = DB>,
121// {
122//     type Database = DB;
123//
124//
125//
126//     fn fetch_many<'e, 'q: 'e, E: 'q>(
127//         self,
128//         query: E,
129//     ) -> futures_core::stream::BoxStream<
130//         'e,
131//         Result<
132//             crate::Either<<DB as crate::database::Database>::QueryResult, DB::Row>,
133//             crate::error::Error,
134//         >,
135//     >
136//     where
137//         't: 'e,
138//         E: crate::executor::Execute<'q, Self::Database>,
139//     {
140//         (&mut **self).fetch_many(query)
141//     }
142//
143//     fn fetch_optional<'e, 'q: 'e, E: 'q>(
144//         self,
145//         query: E,
146//     ) -> futures_core::future::BoxFuture<'e, Result<Option<DB::Row>, crate::error::Error>>
147//     where
148//         't: 'e,
149//         E: crate::executor::Execute<'q, Self::Database>,
150//     {
151//         (&mut **self).fetch_optional(query)
152//     }
153//
154//     fn prepare_with<'e, 'q: 'e>(
155//         self,
156//         sql: &'q str,
157//         parameters: &'e [<Self::Database as crate::database::Database>::TypeInfo],
158//     ) -> futures_core::future::BoxFuture<
159//         'e,
160//         Result<
161//             <Self::Database as crate::database::Database>::Statement<'q>,
162//             crate::error::Error,
163//         >,
164//     >
165//     where
166//         't: 'e,
167//     {
168//         (&mut **self).prepare_with(sql, parameters)
169//     }
170//
171//     #[doc(hidden)]
172//     fn describe<'e, 'q: 'e>(
173//         self,
174//         query: &'q str,
175//     ) -> futures_core::future::BoxFuture<
176//         'e,
177//         Result<crate::describe::Describe<Self::Database>, crate::error::Error>,
178//     >
179//     where
180//         't: 'e,
181//     {
182//         (&mut **self).describe(query)
183//     }
184// }
185
186impl<'c, DB> Debug for Transaction<'c, DB>
187where
188    DB: Database,
189{
190    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
191        // TODO: Show the full type <..<..<..
192        f.debug_struct("Transaction").finish()
193    }
194}
195
196impl<'c, DB> Deref for Transaction<'c, DB>
197where
198    DB: Database,
199{
200    type Target = DB::Connection;
201
202    #[inline]
203    fn deref(&self) -> &Self::Target {
204        &self.connection
205    }
206}
207
208impl<'c, DB> DerefMut for Transaction<'c, DB>
209where
210    DB: Database,
211{
212    #[inline]
213    fn deref_mut(&mut self) -> &mut Self::Target {
214        &mut self.connection
215    }
216}
217
218// Implement `AsMut<DB::Connection>` so `Transaction` can be given to a
219// `PgAdvisoryLockGuard`.
220//
221// See: https://github.com/launchbadge/sqlx/issues/2520
222impl<'c, DB: Database> AsMut<DB::Connection> for Transaction<'c, DB> {
223    fn as_mut(&mut self) -> &mut DB::Connection {
224        &mut self.connection
225    }
226}
227
228impl<'c, 't, DB: Database> crate::acquire::Acquire<'t> for &'t mut Transaction<'c, DB> {
229    type Database = DB;
230
231    type Connection = &'t mut <DB as Database>::Connection;
232
233    #[inline]
234    fn acquire(self) -> BoxFuture<'t, Result<Self::Connection, Error>> {
235        Box::pin(futures_util::future::ok(&mut **self))
236    }
237
238    #[inline]
239    fn begin(self) -> BoxFuture<'t, Result<Transaction<'t, DB>, Error>> {
240        Transaction::begin(&mut **self)
241    }
242}
243
244impl<'c, DB> Drop for Transaction<'c, DB>
245where
246    DB: Database,
247{
248    fn drop(&mut self) {
249        if self.open {
250            // starts a rollback operation
251
252            // what this does depends on the database but generally this means we queue a rollback
253            // operation that will happen on the next asynchronous invocation of the underlying
254            // connection (including if the connection is returned to a pool)
255
256            DB::TransactionManager::start_rollback(&mut self.connection);
257        }
258    }
259}
260
261pub fn begin_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
262    if depth == 0 {
263        Cow::Borrowed("BEGIN")
264    } else {
265        Cow::Owned(format!("SAVEPOINT _sqlx_savepoint_{depth}"))
266    }
267}
268
269pub fn commit_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
270    if depth == 1 {
271        Cow::Borrowed("COMMIT")
272    } else {
273        Cow::Owned(format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", depth - 1))
274    }
275}
276
277pub fn rollback_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
278    if depth == 1 {
279        Cow::Borrowed("ROLLBACK")
280    } else {
281        Cow::Owned(format!(
282            "ROLLBACK TO SAVEPOINT _sqlx_savepoint_{}",
283            depth - 1
284        ))
285    }
286}