diesel_async/lib.rs
1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
2//! Diesel-async provides async variants of diesel related query functionality
3//!
4//! diesel-async is an extension to diesel itself. It is designed to be used together
5//! with the main diesel crate. It only provides async variants of core diesel traits,
6//! that perform actual io-work.
7//! This includes async counterparts the following traits:
8//! * [`diesel::prelude::RunQueryDsl`](https://docs.diesel.rs/2.0.x/diesel/prelude/trait.RunQueryDsl.html)
9//! -> [`diesel_async::RunQueryDsl`](crate::RunQueryDsl)
10//! * [`diesel::connection::Connection`](https://docs.diesel.rs/2.0.x/diesel/connection/trait.Connection.html)
11//! -> [`diesel_async::AsyncConnection`](crate::AsyncConnection)
12//! * [`diesel::query_dsl::UpdateAndFetchResults`](https://docs.diesel.rs/2.0.x/diesel/query_dsl/trait.UpdateAndFetchResults.html)
13//! -> [`diesel_async::UpdateAndFetchResults`](crate::UpdateAndFetchResults)
14//!
15//! These traits closely mirror their diesel counter parts while providing async functionality.
16//!
17//! In addition to these core traits 3 fully async connection implementations are provided
18//! by diesel-async:
19//!
20//! * [`AsyncMysqlConnection`] (enabled by the `mysql` feature)
21//! * [`AsyncPgConnection`] (enabled by the `postgres` feature)
22//! * [`SyncConnectionWrapper`] (enabled by the `sync-connection-wrapper`/`sqlite` feature)
23//!
24//! Ordinary usage of `diesel-async` assumes that you just replace the corresponding sync trait
25//! method calls and connections with their async counterparts.
26//!
27//! ```rust
28//! # include!("./doctest_setup.rs");
29//! #
30//! use diesel::prelude::*;
31//! use diesel_async::{RunQueryDsl, AsyncConnection};
32//!
33//! diesel::table! {
34//! users(id) {
35//! id -> Integer,
36//! name -> Text,
37//! }
38//! }
39//! #
40//! # #[tokio::main(flavor = "current_thread")]
41//! # async fn main() {
42//! # run_test().await;
43//! # }
44//! #
45//! # async fn run_test() -> QueryResult<()> {
46//!
47//! use crate::users::dsl::*;
48//!
49//! # let mut connection = establish_connection().await;
50//! # /*
51//! let mut connection = AsyncPgConnection::establish(std::env::var("DATABASE_URL")?).await?;
52//! # */
53//! let data = users
54//! // use ordinary diesel query dsl here
55//! .filter(id.gt(0))
56//! // execute the query via the provided
57//! // async variant of `diesel_async::RunQueryDsl`
58//! .load::<(i32, String)>(&mut connection)
59//! .await?;
60//! let expected_data = vec![
61//! (1, String::from("Sean")),
62//! (2, String::from("Tess")),
63//! ];
64//! assert_eq!(expected_data, data);
65//! # Ok(())
66//! # }
67//! ```
68
69#![warn(
70 missing_docs,
71 clippy::cast_possible_wrap,
72 clippy::cast_possible_truncation,
73 clippy::cast_sign_loss
74)]
75
76use diesel::backend::Backend;
77use diesel::connection::Instrumentation;
78use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
79use diesel::result::Error;
80use diesel::row::Row;
81use diesel::{ConnectionResult, QueryResult};
82use futures_util::{Future, Stream};
83use std::fmt::Debug;
84
85pub use scoped_futures;
86use scoped_futures::{ScopedBoxFuture, ScopedFutureExt};
87
88#[cfg(feature = "async-connection-wrapper")]
89pub mod async_connection_wrapper;
90#[cfg(feature = "mysql")]
91mod mysql;
92#[cfg(feature = "postgres")]
93pub mod pg;
94#[cfg(feature = "pool")]
95pub mod pooled_connection;
96mod run_query_dsl;
97#[cfg(any(feature = "postgres", feature = "mysql"))]
98mod stmt_cache;
99#[cfg(feature = "sync-connection-wrapper")]
100pub mod sync_connection_wrapper;
101mod transaction_manager;
102
103#[cfg(feature = "mysql")]
104#[doc(inline)]
105pub use self::mysql::AsyncMysqlConnection;
106#[cfg(feature = "postgres")]
107#[doc(inline)]
108pub use self::pg::AsyncPgConnection;
109#[doc(inline)]
110pub use self::run_query_dsl::*;
111
112#[doc(inline)]
113pub use self::transaction_manager::{AnsiTransactionManager, TransactionManager};
114
115/// Perform simple operations on a backend.
116///
117/// You should likely use [`AsyncConnection`] instead.
118#[async_trait::async_trait]
119pub trait SimpleAsyncConnection {
120 /// Execute multiple SQL statements within the same string.
121 ///
122 /// This function is used to execute migrations,
123 /// which may contain more than one SQL statement.
124 async fn batch_execute(&mut self, query: &str) -> QueryResult<()>;
125}
126
127/// An async connection to a database
128///
129/// This trait represents a n async database connection. It can be used to query the database through
130/// the query dsl provided by diesel, custom extensions or raw sql queries. It essentially mirrors
131/// the sync diesel [`Connection`](diesel::connection::Connection) implementation
132#[async_trait::async_trait]
133pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
134 /// The future returned by `AsyncConnection::execute`
135 type ExecuteFuture<'conn, 'query>: Future<Output = QueryResult<usize>> + Send;
136 /// The future returned by `AsyncConnection::load`
137 type LoadFuture<'conn, 'query>: Future<Output = QueryResult<Self::Stream<'conn, 'query>>> + Send;
138 /// The inner stream returned by `AsyncConnection::load`
139 type Stream<'conn, 'query>: Stream<Item = QueryResult<Self::Row<'conn, 'query>>> + Send;
140 /// The row type used by the stream returned by `AsyncConnection::load`
141 type Row<'conn, 'query>: Row<'conn, Self::Backend>;
142
143 /// The backend this type connects to
144 type Backend: Backend;
145
146 #[doc(hidden)]
147 type TransactionManager: TransactionManager<Self>;
148
149 /// Establishes a new connection to the database
150 ///
151 /// The argument to this method and the method's behavior varies by backend.
152 /// See the documentation for that backend's connection class
153 /// for details about what it accepts and how it behaves.
154 async fn establish(database_url: &str) -> ConnectionResult<Self>;
155
156 /// Executes the given function inside of a database transaction
157 ///
158 /// This function executes the provided closure `f` inside a database
159 /// transaction. If there is already an open transaction for the current
160 /// connection savepoints will be used instead. The connection is committed if
161 /// the closure returns `Ok(_)`, it will be rolled back if it returns `Err(_)`.
162 /// For both cases the original result value will be returned from this function.
163 ///
164 /// If the transaction fails to commit due to a `SerializationFailure` or a
165 /// `ReadOnlyTransaction` a rollback will be attempted.
166 /// If the rollback fails, the error will be returned in a
167 /// [`Error::RollbackErrorOnCommit`](diesel::result::Error::RollbackErrorOnCommit),
168 /// from which you will be able to extract both the original commit error and
169 /// the rollback error.
170 /// In addition, the connection will be considered broken
171 /// as it contains a uncommitted unabortable open transaction. Any further
172 /// interaction with the transaction system will result in an returned error
173 /// in this case.
174 ///
175 /// If the closure returns an `Err(_)` and the rollback fails the function
176 /// will return that rollback error directly, and the transaction manager will
177 /// be marked as broken as it contains a uncommitted unabortable open transaction.
178 ///
179 /// If a nested transaction fails to release the corresponding savepoint
180 /// the error will be returned directly.
181 ///
182 /// **WARNING:** Canceling the returned future does currently **not**
183 /// close an already open transaction. You may end up with a connection
184 /// containing a dangling transaction.
185 ///
186 /// # Example
187 ///
188 /// ```rust
189 /// # include!("doctest_setup.rs");
190 /// use diesel::result::Error;
191 /// use scoped_futures::ScopedFutureExt;
192 /// use diesel_async::{RunQueryDsl, AsyncConnection};
193 ///
194 /// # #[tokio::main(flavor = "current_thread")]
195 /// # async fn main() {
196 /// # run_test().await.unwrap();
197 /// # }
198 /// #
199 /// # async fn run_test() -> QueryResult<()> {
200 /// # use schema::users::dsl::*;
201 /// # let conn = &mut establish_connection().await;
202 /// conn.transaction::<_, Error, _>(|conn| async move {
203 /// diesel::insert_into(users)
204 /// .values(name.eq("Ruby"))
205 /// .execute(conn)
206 /// .await?;
207 ///
208 /// let all_names = users.select(name).load::<String>(conn).await?;
209 /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
210 ///
211 /// Ok(())
212 /// }.scope_boxed()).await?;
213 ///
214 /// conn.transaction::<(), _, _>(|conn| async move {
215 /// diesel::insert_into(users)
216 /// .values(name.eq("Pascal"))
217 /// .execute(conn)
218 /// .await?;
219 ///
220 /// let all_names = users.select(name).load::<String>(conn).await?;
221 /// assert_eq!(vec!["Sean", "Tess", "Ruby", "Pascal"], all_names);
222 ///
223 /// // If we want to roll back the transaction, but don't have an
224 /// // actual error to return, we can return `RollbackTransaction`.
225 /// Err(Error::RollbackTransaction)
226 /// }.scope_boxed()).await;
227 ///
228 /// let all_names = users.select(name).load::<String>(conn).await?;
229 /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
230 /// # Ok(())
231 /// # }
232 /// ```
233 async fn transaction<'a, R, E, F>(&mut self, callback: F) -> Result<R, E>
234 where
235 F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
236 E: From<diesel::result::Error> + Send + 'a,
237 R: Send + 'a,
238 {
239 Self::TransactionManager::transaction(self, callback).await
240 }
241
242 /// Creates a transaction that will never be committed. This is useful for
243 /// tests. Panics if called while inside of a transaction or
244 /// if called with a connection containing a broken transaction
245 async fn begin_test_transaction(&mut self) -> QueryResult<()> {
246 use diesel::connection::TransactionManagerStatus;
247
248 match Self::TransactionManager::transaction_manager_status_mut(self) {
249 TransactionManagerStatus::Valid(valid_status) => {
250 assert_eq!(None, valid_status.transaction_depth())
251 }
252 TransactionManagerStatus::InError => panic!("Transaction manager in error"),
253 };
254 Self::TransactionManager::begin_transaction(self).await?;
255 // set the test transaction flag
256 // to prevent that this connection gets dropped in connection pools
257 // Tests commonly set the poolsize to 1 and use `begin_test_transaction`
258 // to prevent modifications to the schema
259 Self::TransactionManager::transaction_manager_status_mut(self).set_test_transaction_flag();
260 Ok(())
261 }
262
263 /// Executes the given function inside a transaction, but does not commit
264 /// it. Panics if the given function returns an error.
265 ///
266 /// # Example
267 ///
268 /// ```rust
269 /// # include!("doctest_setup.rs");
270 /// use diesel::result::Error;
271 /// use scoped_futures::ScopedFutureExt;
272 /// use diesel_async::{RunQueryDsl, AsyncConnection};
273 ///
274 /// # #[tokio::main(flavor = "current_thread")]
275 /// # async fn main() {
276 /// # run_test().await.unwrap();
277 /// # }
278 /// #
279 /// # async fn run_test() -> QueryResult<()> {
280 /// # use schema::users::dsl::*;
281 /// # let conn = &mut establish_connection().await;
282 /// conn.test_transaction::<_, Error, _>(|conn| async move {
283 /// diesel::insert_into(users)
284 /// .values(name.eq("Ruby"))
285 /// .execute(conn)
286 /// .await?;
287 ///
288 /// let all_names = users.select(name).load::<String>(conn).await?;
289 /// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
290 ///
291 /// Ok(())
292 /// }.scope_boxed()).await;
293 ///
294 /// // Even though we returned `Ok`, the transaction wasn't committed.
295 /// let all_names = users.select(name).load::<String>(conn).await?;
296 /// assert_eq!(vec!["Sean", "Tess"], all_names);
297 /// # Ok(())
298 /// # }
299 /// ```
300 async fn test_transaction<'a, R, E, F>(&'a mut self, f: F) -> R
301 where
302 F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
303 E: Debug + Send + 'a,
304 R: Send + 'a,
305 Self: 'a,
306 {
307 use futures_util::TryFutureExt;
308
309 let mut user_result = None;
310 let _ = self
311 .transaction::<R, _, _>(|c| {
312 f(c).map_err(|_| Error::RollbackTransaction)
313 .and_then(|r| {
314 user_result = Some(r);
315 futures_util::future::ready(Err(Error::RollbackTransaction))
316 })
317 .scope_boxed()
318 })
319 .await;
320 user_result.expect("Transaction did not succeed")
321 }
322
323 #[doc(hidden)]
324 fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
325 where
326 T: AsQuery + 'query,
327 T::Query: QueryFragment<Self::Backend> + QueryId + 'query;
328
329 #[doc(hidden)]
330 fn execute_returning_count<'conn, 'query, T>(
331 &'conn mut self,
332 source: T,
333 ) -> Self::ExecuteFuture<'conn, 'query>
334 where
335 T: QueryFragment<Self::Backend> + QueryId + 'query;
336
337 #[doc(hidden)]
338 fn transaction_state(
339 &mut self,
340 ) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData;
341
342 // These functions allow the associated types (`ExecuteFuture`, `LoadFuture`, etc.) to
343 // compile without a `where Self: '_` clause. This is needed the because bound causes
344 // lifetime issues when using `transaction()` with generic `AsyncConnection`s.
345 //
346 // See: https://github.com/rust-lang/rust/issues/87479
347 #[doc(hidden)]
348 fn _silence_lint_on_execute_future(_: Self::ExecuteFuture<'_, '_>) {}
349 #[doc(hidden)]
350 fn _silence_lint_on_load_future(_: Self::LoadFuture<'_, '_>) {}
351
352 #[doc(hidden)]
353 fn instrumentation(&mut self) -> &mut dyn Instrumentation;
354
355 /// Set a specific [`Instrumentation`] implementation for this connection
356 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation);
357}