diesel_async/pooled_connection/
deadpool.rs

1//! A connection pool implementation for `diesel-async` based on [`deadpool`]
2//!
3//! ```rust
4//! # include!("../doctest_setup.rs");
5//! use diesel::result::Error;
6//! use futures_util::FutureExt;
7//! use diesel_async::pooled_connection::AsyncDieselConnectionManager;
8//! use diesel_async::pooled_connection::deadpool::Pool;
9//! use diesel_async::{RunQueryDsl, AsyncConnection};
10//!
11//! # #[tokio::main(flavor = "current_thread")]
12//! # async fn main() {
13//! #     run_test().await.unwrap();
14//! # }
15//! #
16//! # #[cfg(feature = "postgres")]
17//! # fn get_config() -> AsyncDieselConnectionManager<diesel_async::AsyncPgConnection> {
18//! #     let db_url = database_url_from_env("PG_DATABASE_URL");
19//! let config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(db_url);
20//! #     config
21//! #  }
22//! #
23//! # #[cfg(feature = "mysql")]
24//! # fn get_config() -> AsyncDieselConnectionManager<diesel_async::AsyncMysqlConnection> {
25//! #     let db_url = database_url_from_env("MYSQL_DATABASE_URL");
26//! #    let config = AsyncDieselConnectionManager::<diesel_async::AsyncMysqlConnection>::new(db_url);
27//! #     config
28//! #  }
29//! #
30//! # #[cfg(feature = "sqlite")]
31//! # fn get_config() -> AsyncDieselConnectionManager<diesel_async::sync_connection_wrapper::SyncConnectionWrapper<diesel::SqliteConnection>> {
32//! #     let db_url = database_url_from_env("SQLITE_DATABASE_URL");
33//! #     let config = AsyncDieselConnectionManager::<diesel_async::sync_connection_wrapper::SyncConnectionWrapper<diesel::SqliteConnection>>::new(db_url);
34//! #     config
35//! # }
36//! #
37//! # async fn run_test() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
38//! #     use schema::users::dsl::*;
39//! #     let config = get_config();
40//! # #[cfg(feature = "postgres")]
41//! let pool: Pool<AsyncPgConnection> = Pool::builder(config).build()?;
42//! # #[cfg(not(feature = "postgres"))]
43//! # let pool = Pool::builder(config).build()?;
44//! let mut conn = pool.get().await?;
45//! # conn.begin_test_transaction();
46//! # create_tables(&mut conn).await;
47//! # conn.begin_test_transaction();
48//! let res = users.load::<(i32, String)>(&mut conn).await?;
49//! #     Ok(())
50//! # }
51//! ```
52use super::{AsyncDieselConnectionManager, PoolableConnection};
53use deadpool::managed::Manager;
54use diesel::query_builder::QueryFragment;
55
56/// Type alias for using [`deadpool::managed::Pool`] with [`diesel-async`]
57///
58/// This is **not** equal to [`deadpool::managed::Pool`]. It already uses the correct
59/// connection manager and expects only the connection type as generic argument
60pub type Pool<C> = deadpool::managed::Pool<AsyncDieselConnectionManager<C>>;
61/// Type alias for using [`deadpool::managed::PoolBuilder`] with [`diesel-async`]
62pub type PoolBuilder<C> = deadpool::managed::PoolBuilder<AsyncDieselConnectionManager<C>>;
63/// Type alias for using [`deadpool::managed::BuildError`] with [`diesel-async`]
64pub type BuildError = deadpool::managed::BuildError;
65/// Type alias for using [`deadpool::managed::PoolError`] with [`diesel-async`]
66pub type PoolError = deadpool::managed::PoolError<super::PoolError>;
67/// Type alias for using [`deadpool::managed::Object`] with [`diesel-async`]
68pub type Object<C> = deadpool::managed::Object<AsyncDieselConnectionManager<C>>;
69/// Type alias for using [`deadpool::managed::Hook`] with [`diesel-async`]
70pub type Hook<C> = deadpool::managed::Hook<AsyncDieselConnectionManager<C>>;
71/// Type alias for using [`deadpool::managed::HookError`] with [`diesel-async`]
72pub type HookError = deadpool::managed::HookError<super::PoolError>;
73
74impl<C> Manager for AsyncDieselConnectionManager<C>
75where
76    C: PoolableConnection + Send + 'static,
77    diesel::dsl::select<diesel::dsl::AsExprOf<i32, diesel::sql_types::Integer>>:
78        crate::methods::ExecuteDsl<C>,
79    diesel::query_builder::SqlQuery: QueryFragment<C::Backend>,
80{
81    type Type = C;
82
83    type Error = super::PoolError;
84
85    async fn create(&self) -> Result<Self::Type, Self::Error> {
86        (self.manager_config.custom_setup)(&self.connection_url)
87            .await
88            .map_err(super::PoolError::ConnectionError)
89    }
90
91    async fn recycle(
92        &self,
93        obj: &mut Self::Type,
94        _: &deadpool::managed::Metrics,
95    ) -> deadpool::managed::RecycleResult<Self::Error> {
96        if std::thread::panicking() || obj.is_broken() {
97            return Err(deadpool::managed::RecycleError::Message(
98                "Broken connection".into(),
99            ));
100        }
101        obj.ping(&self.manager_config.recycling_method)
102            .await
103            .map_err(super::PoolError::QueryError)?;
104        Ok(())
105    }
106}