diesel_async/sync_connection_wrapper/
sqlite.rs

1use diesel::connection::AnsiTransactionManager;
2use diesel::SqliteConnection;
3use scoped_futures::ScopedBoxFuture;
4
5use crate::sync_connection_wrapper::SyncTransactionManagerWrapper;
6use crate::TransactionManager;
7
8use super::SyncConnectionWrapper;
9
10impl SyncConnectionWrapper<SqliteConnection> {
11    /// Run a transaction with `BEGIN IMMEDIATE`
12    ///
13    /// This method will return an error if a transaction is already open.
14    ///
15    /// **WARNING:** Canceling the returned future does currently **not**
16    /// close an already open transaction. You may end up with a connection
17    /// containing a dangling transaction.
18    ///
19    /// # Example
20    ///
21    /// ```rust
22    /// # include!("../doctest_setup.rs");
23    /// use diesel::result::Error;
24    /// use scoped_futures::ScopedFutureExt;
25    /// use diesel_async::{RunQueryDsl, AsyncConnection};
26    /// #
27    /// # #[tokio::main(flavor = "current_thread")]
28    /// # async fn main() {
29    /// #     run_test().await.unwrap();
30    /// # }
31    /// #
32    /// # async fn run_test() -> QueryResult<()> {
33    /// #     use schema::users::dsl::*;
34    /// #     let conn = &mut connection_no_transaction().await;
35    /// conn.immediate_transaction(|conn| async move {
36    ///     diesel::insert_into(users)
37    ///         .values(name.eq("Ruby"))
38    ///         .execute(conn)
39    ///         .await?;
40    ///
41    ///     let all_names = users.select(name).load::<String>(conn).await?;
42    ///     assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
43    ///
44    ///     Ok(())
45    /// }.scope_boxed()).await
46    /// # }
47    /// ```
48    pub async fn immediate_transaction<'a, R, E, F>(&mut self, f: F) -> Result<R, E>
49    where
50        F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
51        E: From<diesel::result::Error> + Send + 'a,
52        R: Send + 'a,
53    {
54        self.transaction_sql(f, "BEGIN IMMEDIATE").await
55    }
56
57    /// Run a transaction with `BEGIN EXCLUSIVE`
58    ///
59    /// This method will return an error if a transaction is already open.
60    ///
61    /// **WARNING:** Canceling the returned future does currently **not**
62    /// close an already open transaction. You may end up with a connection
63    /// containing a dangling transaction.
64    ///
65    /// # Example
66    ///
67    /// ```rust
68    /// # include!("../doctest_setup.rs");
69    /// use diesel::result::Error;
70    /// use scoped_futures::ScopedFutureExt;
71    /// use diesel_async::{RunQueryDsl, AsyncConnection};
72    /// #
73    /// # #[tokio::main(flavor = "current_thread")]
74    /// # async fn main() {
75    /// #     run_test().await.unwrap();
76    /// # }
77    /// #
78    /// # async fn run_test() -> QueryResult<()> {
79    /// #     use schema::users::dsl::*;
80    /// #     let conn = &mut connection_no_transaction().await;
81    /// conn.exclusive_transaction(|conn| async move {
82    ///     diesel::insert_into(users)
83    ///         .values(name.eq("Ruby"))
84    ///         .execute(conn)
85    ///         .await?;
86    ///
87    ///     let all_names = users.select(name).load::<String>(conn).await?;
88    ///     assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
89    ///
90    ///     Ok(())
91    /// }.scope_boxed()).await
92    /// # }
93    /// ```
94    pub async fn exclusive_transaction<'a, R, E, F>(&mut self, f: F) -> Result<R, E>
95    where
96        F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
97        E: From<diesel::result::Error> + Send + 'a,
98        R: Send + 'a,
99    {
100        self.transaction_sql(f, "BEGIN EXCLUSIVE").await
101    }
102
103    async fn transaction_sql<'a, R, E, F>(&mut self, f: F, sql: &'static str) -> Result<R, E>
104    where
105        F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
106        E: From<diesel::result::Error> + Send + 'a,
107        R: Send + 'a,
108    {
109        self.spawn_blocking(|conn| AnsiTransactionManager::begin_transaction_sql(conn, sql))
110            .await?;
111
112        match f(&mut *self).await {
113            Ok(value) => {
114                SyncTransactionManagerWrapper::<AnsiTransactionManager>::commit_transaction(
115                    &mut *self,
116                )
117                .await?;
118                Ok(value)
119            }
120            Err(e) => {
121                SyncTransactionManagerWrapper::<AnsiTransactionManager>::rollback_transaction(
122                    &mut *self,
123                )
124                .await?;
125                Err(e)
126            }
127        }
128    }
129}