diesel_async/sync_connection_wrapper/sqlite.rs
1use diesel::connection::AnsiTransactionManager;
2use diesel::SqliteConnection;
3use scoped_futures::ScopedBoxFuture;
4
5use crate::sync_connection_wrapper::SyncTransactionManagerWrapper;
6use crate::TransactionManager;
7
8use super::SyncConnectionWrapper;
9
10impl SyncConnectionWrapper<SqliteConnection> {
11 /// Run a transaction with `BEGIN IMMEDIATE`
12 ///
13 /// This method will return an error if a transaction is already open.
14 ///
15 /// **WARNING:** Canceling the returned future does currently **not**
16 /// close an already open transaction. You may end up with a connection
17 /// containing a dangling transaction.
18 ///
19 /// # Example
20 ///
21 /// ```rust
22 /// # include!("../doctest_setup.rs");
23 /// use diesel::result::Error;
24 /// use scoped_futures::ScopedFutureExt;
25 /// use diesel_async::{RunQueryDsl, AsyncConnection};
26 /// #
27 /// # #[tokio::main(flavor = "current_thread")]
28 /// # async fn main() {
29 /// # run_test().await.unwrap();
30 /// # }
31 /// #
32 /// # async fn run_test() -> QueryResult<()> {
33 /// # use schema::users::dsl::*;
34 /// # let conn = &mut connection_no_transaction().await;
35 /// conn.immediate_transaction(|conn| async move {
36 /// diesel::insert_into(users)
37 /// .values(name.eq("Ruby"))
38 /// .execute(conn)
39 /// .await?;
40 ///
41 /// let all_names = users.select(name).load::<String>(conn).await?;
42 /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
43 ///
44 /// Ok(())
45 /// }.scope_boxed()).await
46 /// # }
47 /// ```
48 pub async fn immediate_transaction<'a, R, E, F>(&mut self, f: F) -> Result<R, E>
49 where
50 F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
51 E: From<diesel::result::Error> + Send + 'a,
52 R: Send + 'a,
53 {
54 self.transaction_sql(f, "BEGIN IMMEDIATE").await
55 }
56
57 /// Run a transaction with `BEGIN EXCLUSIVE`
58 ///
59 /// This method will return an error if a transaction is already open.
60 ///
61 /// **WARNING:** Canceling the returned future does currently **not**
62 /// close an already open transaction. You may end up with a connection
63 /// containing a dangling transaction.
64 ///
65 /// # Example
66 ///
67 /// ```rust
68 /// # include!("../doctest_setup.rs");
69 /// use diesel::result::Error;
70 /// use scoped_futures::ScopedFutureExt;
71 /// use diesel_async::{RunQueryDsl, AsyncConnection};
72 /// #
73 /// # #[tokio::main(flavor = "current_thread")]
74 /// # async fn main() {
75 /// # run_test().await.unwrap();
76 /// # }
77 /// #
78 /// # async fn run_test() -> QueryResult<()> {
79 /// # use schema::users::dsl::*;
80 /// # let conn = &mut connection_no_transaction().await;
81 /// conn.exclusive_transaction(|conn| async move {
82 /// diesel::insert_into(users)
83 /// .values(name.eq("Ruby"))
84 /// .execute(conn)
85 /// .await?;
86 ///
87 /// let all_names = users.select(name).load::<String>(conn).await?;
88 /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
89 ///
90 /// Ok(())
91 /// }.scope_boxed()).await
92 /// # }
93 /// ```
94 pub async fn exclusive_transaction<'a, R, E, F>(&mut self, f: F) -> Result<R, E>
95 where
96 F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
97 E: From<diesel::result::Error> + Send + 'a,
98 R: Send + 'a,
99 {
100 self.transaction_sql(f, "BEGIN EXCLUSIVE").await
101 }
102
103 async fn transaction_sql<'a, R, E, F>(&mut self, f: F, sql: &'static str) -> Result<R, E>
104 where
105 F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
106 E: From<diesel::result::Error> + Send + 'a,
107 R: Send + 'a,
108 {
109 self.spawn_blocking(|conn| AnsiTransactionManager::begin_transaction_sql(conn, sql))
110 .await?;
111
112 match f(&mut *self).await {
113 Ok(value) => {
114 SyncTransactionManagerWrapper::<AnsiTransactionManager>::commit_transaction(
115 &mut *self,
116 )
117 .await?;
118 Ok(value)
119 }
120 Err(e) => {
121 SyncTransactionManagerWrapper::<AnsiTransactionManager>::rollback_transaction(
122 &mut *self,
123 )
124 .await?;
125 Err(e)
126 }
127 }
128 }
129}