sqlx_core/pool/
connection.rs

1use std::fmt::{self, Debug, Formatter};
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::sync::AsyncSemaphoreReleaser;
7
8use crate::connection::Connection;
9use crate::database::Database;
10use crate::error::Error;
11
12use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner};
13use crate::pool::options::PoolConnectionMetadata;
14use std::future::Future;
15
16const CLOSE_ON_DROP_TIMEOUT: Duration = Duration::from_secs(5);
17
18/// A connection managed by a [`Pool`][crate::pool::Pool].
19///
20/// Will be returned to the pool on-drop.
21pub struct PoolConnection<DB: Database> {
22    live: Option<Live<DB>>,
23    close_on_drop: bool,
24    pub(crate) pool: Arc<PoolInner<DB>>,
25}
26
27pub(super) struct Live<DB: Database> {
28    pub(super) raw: DB::Connection,
29    pub(super) created_at: Instant,
30}
31
32pub(super) struct Idle<DB: Database> {
33    pub(super) live: Live<DB>,
34    pub(super) idle_since: Instant,
35}
36
37/// RAII wrapper for connections being handled by functions that may drop them
38pub(super) struct Floating<DB: Database, C> {
39    pub(super) inner: C,
40    pub(super) guard: DecrementSizeGuard<DB>,
41}
42
43const EXPECT_MSG: &str = "BUG: inner connection already taken!";
44
45impl<DB: Database> Debug for PoolConnection<DB> {
46    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
47        // TODO: Show the type name of the connection ?
48        f.debug_struct("PoolConnection").finish()
49    }
50}
51
52impl<DB: Database> Deref for PoolConnection<DB> {
53    type Target = DB::Connection;
54
55    fn deref(&self) -> &Self::Target {
56        &self.live.as_ref().expect(EXPECT_MSG).raw
57    }
58}
59
60impl<DB: Database> DerefMut for PoolConnection<DB> {
61    fn deref_mut(&mut self) -> &mut Self::Target {
62        &mut self.live.as_mut().expect(EXPECT_MSG).raw
63    }
64}
65
66impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
67    fn as_ref(&self) -> &DB::Connection {
68        self
69    }
70}
71
72impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
73    fn as_mut(&mut self) -> &mut DB::Connection {
74        self
75    }
76}
77
78impl<DB: Database> PoolConnection<DB> {
79    /// Close this connection, allowing the pool to open a replacement.
80    ///
81    /// Equivalent to calling [`.detach()`] then [`.close()`], but the connection permit is retained
82    /// for the duration so that the pool may not exceed `max_connections`.
83    ///
84    /// [`.detach()`]: PoolConnection::detach
85    /// [`.close()`]: Connection::close
86    pub async fn close(mut self) -> Result<(), Error> {
87        let floating = self.take_live().float(self.pool.clone());
88        floating.inner.raw.close().await
89    }
90
91    /// Close this connection on-drop, instead of returning it to the pool.
92    ///
93    /// May be used in cases where waiting for the [`.close()`][Self::close] call
94    /// to complete is unacceptable, but you still want the connection to be closed gracefully
95    /// so that the server can clean up resources.
96    #[inline(always)]
97    pub fn close_on_drop(&mut self) {
98        self.close_on_drop = true;
99    }
100
101    /// Detach this connection from the pool, allowing it to open a replacement.
102    ///
103    /// Note that if your application uses a single shared pool, this
104    /// effectively lets the application exceed the [`max_connections`] setting.
105    ///
106    /// If [`min_connections`] is nonzero, a task will be spawned to replace this connection.
107    ///
108    /// If you want the pool to treat this connection as permanently checked-out,
109    /// use [`.leak()`][Self::leak] instead.
110    ///
111    /// [`max_connections`]: crate::pool::PoolOptions::max_connections
112    /// [`min_connections`]: crate::pool::PoolOptions::min_connections
113    pub fn detach(mut self) -> DB::Connection {
114        self.take_live().float(self.pool.clone()).detach()
115    }
116
117    /// Detach this connection from the pool, treating it as permanently checked-out.
118    ///
119    /// This effectively will reduce the maximum capacity of the pool by 1 every time it is used.
120    ///
121    /// If you don't want to impact the pool's capacity, use [`.detach()`][Self::detach] instead.
122    pub fn leak(mut self) -> DB::Connection {
123        self.take_live().raw
124    }
125
126    fn take_live(&mut self) -> Live<DB> {
127        self.live.take().expect(EXPECT_MSG)
128    }
129
130    /// Test the connection to make sure it is still live before returning it to the pool.
131    ///
132    /// This effectively runs the drop handler eagerly instead of spawning a task to do it.
133    #[doc(hidden)]
134    pub fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
135        // float the connection in the pool before we move into the task
136        // in case the returned `Future` isn't executed, like if it's spawned into a dying runtime
137        // https://github.com/launchbadge/sqlx/issues/1396
138        // Type hints seem to be broken by `Option` combinators in IntelliJ Rust right now (6/22).
139        let floating: Option<Floating<DB, Live<DB>>> =
140            self.live.take().map(|live| live.float(self.pool.clone()));
141
142        let pool = self.pool.clone();
143
144        async move {
145            let returned_to_pool = if let Some(floating) = floating {
146                floating.return_to_pool().await
147            } else {
148                false
149            };
150
151            if !returned_to_pool {
152                pool.min_connections_maintenance(None).await;
153            }
154        }
155    }
156
157    fn take_and_close(&mut self) -> impl Future<Output = ()> + Send + 'static {
158        // float the connection in the pool before we move into the task
159        // in case the returned `Future` isn't executed, like if it's spawned into a dying runtime
160        // https://github.com/launchbadge/sqlx/issues/1396
161        // Type hints seem to be broken by `Option` combinators in IntelliJ Rust right now (6/22).
162        let floating = self.live.take().map(|live| live.float(self.pool.clone()));
163
164        let pool = self.pool.clone();
165
166        async move {
167            if let Some(floating) = floating {
168                // Don't hold the connection forever if it hangs while trying to close
169                crate::rt::timeout(CLOSE_ON_DROP_TIMEOUT, floating.close())
170                    .await
171                    .ok();
172            }
173
174            pool.min_connections_maintenance(None).await;
175        }
176    }
177}
178
179impl<'c, DB: Database> crate::acquire::Acquire<'c> for &'c mut PoolConnection<DB> {
180    type Database = DB;
181
182    type Connection = &'c mut <DB as Database>::Connection;
183
184    #[inline]
185    fn acquire(self) -> futures_core::future::BoxFuture<'c, Result<Self::Connection, Error>> {
186        Box::pin(futures_util::future::ok(&mut **self))
187    }
188
189    #[inline]
190    fn begin(
191        self,
192    ) -> futures_core::future::BoxFuture<'c, Result<crate::transaction::Transaction<'c, DB>, Error>>
193    {
194        crate::transaction::Transaction::begin(&mut **self)
195    }
196}
197
198/// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from.
199impl<DB: Database> Drop for PoolConnection<DB> {
200    fn drop(&mut self) {
201        if self.close_on_drop {
202            crate::rt::spawn(self.take_and_close());
203            return;
204        }
205
206        // We still need to spawn a task to maintain `min_connections`.
207        if self.live.is_some() || self.pool.options.min_connections > 0 {
208            crate::rt::spawn(self.return_to_pool());
209        }
210    }
211}
212
213impl<DB: Database> Live<DB> {
214    pub fn float(self, pool: Arc<PoolInner<DB>>) -> Floating<DB, Self> {
215        Floating {
216            inner: self,
217            // create a new guard from a previously leaked permit
218            guard: DecrementSizeGuard::new_permit(pool),
219        }
220    }
221
222    pub fn into_idle(self) -> Idle<DB> {
223        Idle {
224            live: self,
225            idle_since: Instant::now(),
226        }
227    }
228}
229
230impl<DB: Database> Deref for Idle<DB> {
231    type Target = Live<DB>;
232
233    fn deref(&self) -> &Self::Target {
234        &self.live
235    }
236}
237
238impl<DB: Database> DerefMut for Idle<DB> {
239    fn deref_mut(&mut self) -> &mut Self::Target {
240        &mut self.live
241    }
242}
243
244impl<DB: Database> Floating<DB, Live<DB>> {
245    pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
246        Self {
247            inner: Live {
248                raw: conn,
249                created_at: Instant::now(),
250            },
251            guard,
252        }
253    }
254
255    pub fn reattach(self) -> PoolConnection<DB> {
256        let Floating { inner, guard } = self;
257
258        let pool = Arc::clone(&guard.pool);
259
260        guard.cancel();
261        PoolConnection {
262            live: Some(inner),
263            close_on_drop: false,
264            pool,
265        }
266    }
267
268    pub fn release(self) {
269        self.guard.pool.clone().release(self);
270    }
271
272    /// Return the connection to the pool.
273    ///
274    /// Returns `true` if the connection was successfully returned, `false` if it was closed.
275    async fn return_to_pool(mut self) -> bool {
276        // Immediately close the connection.
277        if self.guard.pool.is_closed() {
278            self.close().await;
279            return false;
280        }
281
282        // If the connection is beyond max lifetime, close the connection and
283        // immediately create a new connection
284        if is_beyond_max_lifetime(&self.inner, &self.guard.pool.options) {
285            self.close().await;
286            return false;
287        }
288
289        if let Some(test) = &self.guard.pool.options.after_release {
290            let meta = self.metadata();
291            match (test)(&mut self.inner.raw, meta).await {
292                Ok(true) => (),
293                Ok(false) => {
294                    self.close().await;
295                    return false;
296                }
297                Err(error) => {
298                    tracing::warn!(%error, "error from `after_release`");
299                    // Connection is broken, don't try to gracefully close as
300                    // something weird might happen.
301                    self.close_hard().await;
302                    return false;
303                }
304            }
305        }
306
307        // test the connection on-release to ensure it is still viable,
308        // and flush anything time-sensitive like transaction rollbacks
309        // if an Executor future/stream is dropped during an `.await` call, the connection
310        // is likely to be left in an inconsistent state, in which case it should not be
311        // returned to the pool; also of course, if it was dropped due to an error
312        // this is simply a band-aid as SQLx-next connections should be able
313        // to recover from cancellations
314        if let Err(error) = self.raw.ping().await {
315            tracing::warn!(
316                %error,
317                "error occurred while testing the connection on-release",
318            );
319
320            // Connection is broken, don't try to gracefully close.
321            self.close_hard().await;
322            false
323        } else {
324            // if the connection is still viable, release it to the pool
325            self.release();
326            true
327        }
328    }
329
330    pub async fn close(self) {
331        // This isn't used anywhere that we care about the return value
332        let _ = self.inner.raw.close().await;
333
334        // `guard` is dropped as intended
335    }
336
337    pub async fn close_hard(self) {
338        let _ = self.inner.raw.close_hard().await;
339    }
340
341    pub fn detach(self) -> DB::Connection {
342        self.inner.raw
343    }
344
345    pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
346        Floating {
347            inner: self.inner.into_idle(),
348            guard: self.guard,
349        }
350    }
351
352    pub fn metadata(&self) -> PoolConnectionMetadata {
353        PoolConnectionMetadata {
354            age: self.created_at.elapsed(),
355            idle_for: Duration::ZERO,
356        }
357    }
358}
359
360impl<DB: Database> Floating<DB, Idle<DB>> {
361    pub fn from_idle(
362        idle: Idle<DB>,
363        pool: Arc<PoolInner<DB>>,
364        permit: AsyncSemaphoreReleaser<'_>,
365    ) -> Self {
366        Self {
367            inner: idle,
368            guard: DecrementSizeGuard::from_permit(pool, permit),
369        }
370    }
371
372    pub async fn ping(&mut self) -> Result<(), Error> {
373        self.live.raw.ping().await
374    }
375
376    pub fn into_live(self) -> Floating<DB, Live<DB>> {
377        Floating {
378            inner: self.inner.live,
379            guard: self.guard,
380        }
381    }
382
383    pub async fn close(self) -> DecrementSizeGuard<DB> {
384        if let Err(error) = self.inner.live.raw.close().await {
385            tracing::debug!(%error, "error occurred while closing the pool connection");
386        }
387        self.guard
388    }
389
390    pub async fn close_hard(self) -> DecrementSizeGuard<DB> {
391        let _ = self.inner.live.raw.close_hard().await;
392
393        self.guard
394    }
395
396    pub fn metadata(&self) -> PoolConnectionMetadata {
397        // Use a single `now` value for consistency.
398        let now = Instant::now();
399
400        PoolConnectionMetadata {
401            // NOTE: the receiver is the later `Instant` and the arg is the earlier
402            // https://github.com/launchbadge/sqlx/issues/1912
403            age: now.saturating_duration_since(self.created_at),
404            idle_for: now.saturating_duration_since(self.idle_since),
405        }
406    }
407}
408
409impl<DB: Database, C> Deref for Floating<DB, C> {
410    type Target = C;
411
412    fn deref(&self) -> &Self::Target {
413        &self.inner
414    }
415}
416
417impl<DB: Database, C> DerefMut for Floating<DB, C> {
418    fn deref_mut(&mut self) -> &mut Self::Target {
419        &mut self.inner
420    }
421}