sqlx_core/pool/
connection.rs1use std::fmt::{self, Debug, Formatter};
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use crate::sync::AsyncSemaphoreReleaser;
7
8use crate::connection::Connection;
9use crate::database::Database;
10use crate::error::Error;
11
12use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner};
13use crate::pool::options::PoolConnectionMetadata;
14use std::future::Future;
15
16const CLOSE_ON_DROP_TIMEOUT: Duration = Duration::from_secs(5);
17
18pub struct PoolConnection<DB: Database> {
22 live: Option<Live<DB>>,
23 close_on_drop: bool,
24 pub(crate) pool: Arc<PoolInner<DB>>,
25}
26
27pub(super) struct Live<DB: Database> {
28 pub(super) raw: DB::Connection,
29 pub(super) created_at: Instant,
30}
31
32pub(super) struct Idle<DB: Database> {
33 pub(super) live: Live<DB>,
34 pub(super) idle_since: Instant,
35}
36
37pub(super) struct Floating<DB: Database, C> {
39 pub(super) inner: C,
40 pub(super) guard: DecrementSizeGuard<DB>,
41}
42
43const EXPECT_MSG: &str = "BUG: inner connection already taken!";
44
45impl<DB: Database> Debug for PoolConnection<DB> {
46 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
47 f.debug_struct("PoolConnection").finish()
49 }
50}
51
52impl<DB: Database> Deref for PoolConnection<DB> {
53 type Target = DB::Connection;
54
55 fn deref(&self) -> &Self::Target {
56 &self.live.as_ref().expect(EXPECT_MSG).raw
57 }
58}
59
60impl<DB: Database> DerefMut for PoolConnection<DB> {
61 fn deref_mut(&mut self) -> &mut Self::Target {
62 &mut self.live.as_mut().expect(EXPECT_MSG).raw
63 }
64}
65
66impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
67 fn as_ref(&self) -> &DB::Connection {
68 self
69 }
70}
71
72impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
73 fn as_mut(&mut self) -> &mut DB::Connection {
74 self
75 }
76}
77
78impl<DB: Database> PoolConnection<DB> {
79 pub async fn close(mut self) -> Result<(), Error> {
87 let floating = self.take_live().float(self.pool.clone());
88 floating.inner.raw.close().await
89 }
90
91 #[inline(always)]
97 pub fn close_on_drop(&mut self) {
98 self.close_on_drop = true;
99 }
100
101 pub fn detach(mut self) -> DB::Connection {
114 self.take_live().float(self.pool.clone()).detach()
115 }
116
117 pub fn leak(mut self) -> DB::Connection {
123 self.take_live().raw
124 }
125
126 fn take_live(&mut self) -> Live<DB> {
127 self.live.take().expect(EXPECT_MSG)
128 }
129
130 #[doc(hidden)]
134 pub fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
135 let floating: Option<Floating<DB, Live<DB>>> =
140 self.live.take().map(|live| live.float(self.pool.clone()));
141
142 let pool = self.pool.clone();
143
144 async move {
145 let returned_to_pool = if let Some(floating) = floating {
146 floating.return_to_pool().await
147 } else {
148 false
149 };
150
151 if !returned_to_pool {
152 pool.min_connections_maintenance(None).await;
153 }
154 }
155 }
156
157 fn take_and_close(&mut self) -> impl Future<Output = ()> + Send + 'static {
158 let floating = self.live.take().map(|live| live.float(self.pool.clone()));
163
164 let pool = self.pool.clone();
165
166 async move {
167 if let Some(floating) = floating {
168 crate::rt::timeout(CLOSE_ON_DROP_TIMEOUT, floating.close())
170 .await
171 .ok();
172 }
173
174 pool.min_connections_maintenance(None).await;
175 }
176 }
177}
178
179impl<'c, DB: Database> crate::acquire::Acquire<'c> for &'c mut PoolConnection<DB> {
180 type Database = DB;
181
182 type Connection = &'c mut <DB as Database>::Connection;
183
184 #[inline]
185 fn acquire(self) -> futures_core::future::BoxFuture<'c, Result<Self::Connection, Error>> {
186 Box::pin(futures_util::future::ok(&mut **self))
187 }
188
189 #[inline]
190 fn begin(
191 self,
192 ) -> futures_core::future::BoxFuture<'c, Result<crate::transaction::Transaction<'c, DB>, Error>>
193 {
194 crate::transaction::Transaction::begin(&mut **self)
195 }
196}
197
198impl<DB: Database> Drop for PoolConnection<DB> {
200 fn drop(&mut self) {
201 if self.close_on_drop {
202 crate::rt::spawn(self.take_and_close());
203 return;
204 }
205
206 if self.live.is_some() || self.pool.options.min_connections > 0 {
208 crate::rt::spawn(self.return_to_pool());
209 }
210 }
211}
212
213impl<DB: Database> Live<DB> {
214 pub fn float(self, pool: Arc<PoolInner<DB>>) -> Floating<DB, Self> {
215 Floating {
216 inner: self,
217 guard: DecrementSizeGuard::new_permit(pool),
219 }
220 }
221
222 pub fn into_idle(self) -> Idle<DB> {
223 Idle {
224 live: self,
225 idle_since: Instant::now(),
226 }
227 }
228}
229
230impl<DB: Database> Deref for Idle<DB> {
231 type Target = Live<DB>;
232
233 fn deref(&self) -> &Self::Target {
234 &self.live
235 }
236}
237
238impl<DB: Database> DerefMut for Idle<DB> {
239 fn deref_mut(&mut self) -> &mut Self::Target {
240 &mut self.live
241 }
242}
243
244impl<DB: Database> Floating<DB, Live<DB>> {
245 pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
246 Self {
247 inner: Live {
248 raw: conn,
249 created_at: Instant::now(),
250 },
251 guard,
252 }
253 }
254
255 pub fn reattach(self) -> PoolConnection<DB> {
256 let Floating { inner, guard } = self;
257
258 let pool = Arc::clone(&guard.pool);
259
260 guard.cancel();
261 PoolConnection {
262 live: Some(inner),
263 close_on_drop: false,
264 pool,
265 }
266 }
267
268 pub fn release(self) {
269 self.guard.pool.clone().release(self);
270 }
271
272 async fn return_to_pool(mut self) -> bool {
276 if self.guard.pool.is_closed() {
278 self.close().await;
279 return false;
280 }
281
282 if is_beyond_max_lifetime(&self.inner, &self.guard.pool.options) {
285 self.close().await;
286 return false;
287 }
288
289 if let Some(test) = &self.guard.pool.options.after_release {
290 let meta = self.metadata();
291 match (test)(&mut self.inner.raw, meta).await {
292 Ok(true) => (),
293 Ok(false) => {
294 self.close().await;
295 return false;
296 }
297 Err(error) => {
298 tracing::warn!(%error, "error from `after_release`");
299 self.close_hard().await;
302 return false;
303 }
304 }
305 }
306
307 if let Err(error) = self.raw.ping().await {
315 tracing::warn!(
316 %error,
317 "error occurred while testing the connection on-release",
318 );
319
320 self.close_hard().await;
322 false
323 } else {
324 self.release();
326 true
327 }
328 }
329
330 pub async fn close(self) {
331 let _ = self.inner.raw.close().await;
333
334 }
336
337 pub async fn close_hard(self) {
338 let _ = self.inner.raw.close_hard().await;
339 }
340
341 pub fn detach(self) -> DB::Connection {
342 self.inner.raw
343 }
344
345 pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
346 Floating {
347 inner: self.inner.into_idle(),
348 guard: self.guard,
349 }
350 }
351
352 pub fn metadata(&self) -> PoolConnectionMetadata {
353 PoolConnectionMetadata {
354 age: self.created_at.elapsed(),
355 idle_for: Duration::ZERO,
356 }
357 }
358}
359
360impl<DB: Database> Floating<DB, Idle<DB>> {
361 pub fn from_idle(
362 idle: Idle<DB>,
363 pool: Arc<PoolInner<DB>>,
364 permit: AsyncSemaphoreReleaser<'_>,
365 ) -> Self {
366 Self {
367 inner: idle,
368 guard: DecrementSizeGuard::from_permit(pool, permit),
369 }
370 }
371
372 pub async fn ping(&mut self) -> Result<(), Error> {
373 self.live.raw.ping().await
374 }
375
376 pub fn into_live(self) -> Floating<DB, Live<DB>> {
377 Floating {
378 inner: self.inner.live,
379 guard: self.guard,
380 }
381 }
382
383 pub async fn close(self) -> DecrementSizeGuard<DB> {
384 if let Err(error) = self.inner.live.raw.close().await {
385 tracing::debug!(%error, "error occurred while closing the pool connection");
386 }
387 self.guard
388 }
389
390 pub async fn close_hard(self) -> DecrementSizeGuard<DB> {
391 let _ = self.inner.live.raw.close_hard().await;
392
393 self.guard
394 }
395
396 pub fn metadata(&self) -> PoolConnectionMetadata {
397 let now = Instant::now();
399
400 PoolConnectionMetadata {
401 age: now.saturating_duration_since(self.created_at),
404 idle_for: now.saturating_duration_since(self.idle_since),
405 }
406 }
407}
408
409impl<DB: Database, C> Deref for Floating<DB, C> {
410 type Target = C;
411
412 fn deref(&self) -> &Self::Target {
413 &self.inner
414 }
415}
416
417impl<DB: Database, C> DerefMut for Floating<DB, C> {
418 fn deref_mut(&mut self) -> &mut Self::Target {
419 &mut self.inner
420 }
421}