sqlx_core/pool/
options.rs

1use crate::connection::Connection;
2use crate::database::Database;
3use crate::error::Error;
4use crate::pool::inner::PoolInner;
5use crate::pool::Pool;
6use futures_core::future::BoxFuture;
7use log::LevelFilter;
8use std::fmt::{self, Debug, Formatter};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11
12/// Configuration options for [`Pool`][super::Pool].
13///
14/// ### Callback Functions: Why Do I Need `Box::pin()`?
15/// Essentially, because it's impossible to write generic bounds that describe a closure
16/// with a higher-ranked lifetime parameter, returning a future with that same lifetime.
17///
18/// Ideally, you could define it like this:
19/// ```rust,ignore
20/// async fn takes_foo_callback(f: impl for<'a> Fn(&'a mut Foo) -> impl Future<'a, Output = ()>)
21/// ```
22///
23/// However, the compiler does not allow using `impl Trait` in the return type of an `impl Fn`.
24///
25/// And if you try to do it like this:
26/// ```rust,ignore
27/// async fn takes_foo_callback<F, Fut>(f: F)
28/// where
29///     F: for<'a> Fn(&'a mut Foo) -> Fut,
30///     Fut: for<'a> Future<Output = ()> + 'a
31/// ```
32///
33/// There's no way to tell the compiler that those two `'a`s should be the same lifetime.
34///
35/// It's possible to make this work with a custom trait, but it's fiddly and requires naming
36///  the type of the closure parameter.
37///
38/// Having the closure return `BoxFuture` allows us to work around this, as all the type information
39/// fits into a single generic parameter.
40///
41/// We still need to `Box` the future internally to give it a concrete type to avoid leaking a type
42/// parameter everywhere, and `Box` is in the prelude so it doesn't need to be manually imported,
43/// so having the closure return `Pin<Box<dyn Future>` directly is the path of least resistance from
44/// the perspectives of both API designer and consumer.
45pub struct PoolOptions<DB: Database> {
46    pub(crate) test_before_acquire: bool,
47    pub(crate) after_connect: Option<
48        Arc<
49            dyn Fn(&mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'_, Result<(), Error>>
50                + 'static
51                + Send
52                + Sync,
53        >,
54    >,
55    pub(crate) before_acquire: Option<
56        Arc<
57            dyn Fn(
58                    &mut DB::Connection,
59                    PoolConnectionMetadata,
60                ) -> BoxFuture<'_, Result<bool, Error>>
61                + 'static
62                + Send
63                + Sync,
64        >,
65    >,
66    pub(crate) after_release: Option<
67        Arc<
68            dyn Fn(
69                    &mut DB::Connection,
70                    PoolConnectionMetadata,
71                ) -> BoxFuture<'_, Result<bool, Error>>
72                + 'static
73                + Send
74                + Sync,
75        >,
76    >,
77    pub(crate) max_connections: u32,
78    pub(crate) acquire_time_level: LevelFilter,
79    pub(crate) acquire_slow_level: LevelFilter,
80    pub(crate) acquire_slow_threshold: Duration,
81    pub(crate) acquire_timeout: Duration,
82    pub(crate) min_connections: u32,
83    pub(crate) max_lifetime: Option<Duration>,
84    pub(crate) idle_timeout: Option<Duration>,
85    pub(crate) fair: bool,
86
87    pub(crate) parent_pool: Option<Pool<DB>>,
88}
89
90// Manually implement `Clone` to avoid a trait bound issue.
91//
92// See: https://github.com/launchbadge/sqlx/issues/2548
93impl<DB: Database> Clone for PoolOptions<DB> {
94    fn clone(&self) -> Self {
95        PoolOptions {
96            test_before_acquire: self.test_before_acquire,
97            after_connect: self.after_connect.clone(),
98            before_acquire: self.before_acquire.clone(),
99            after_release: self.after_release.clone(),
100            max_connections: self.max_connections,
101            acquire_time_level: self.acquire_time_level,
102            acquire_slow_threshold: self.acquire_slow_threshold,
103            acquire_slow_level: self.acquire_slow_level,
104            acquire_timeout: self.acquire_timeout,
105            min_connections: self.min_connections,
106            max_lifetime: self.max_lifetime,
107            idle_timeout: self.idle_timeout,
108            fair: self.fair,
109            parent_pool: self.parent_pool.clone(),
110        }
111    }
112}
113
114/// Metadata for the connection being processed by a [`PoolOptions`] callback.
115#[derive(Debug)] // Don't want to commit to any other trait impls yet.
116#[non_exhaustive] // So we can safely add fields in the future.
117pub struct PoolConnectionMetadata {
118    /// The duration since the connection was first opened.
119    ///
120    /// For [`after_connect`][PoolOptions::after_connect], this is [`Duration::ZERO`].
121    pub age: Duration,
122
123    /// The duration that the connection spent in the idle queue.
124    ///
125    /// Only relevant for [`before_acquire`][PoolOptions::before_acquire].
126    /// For other callbacks, this is [`Duration::ZERO`].
127    pub idle_for: Duration,
128}
129
130impl<DB: Database> Default for PoolOptions<DB> {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136impl<DB: Database> PoolOptions<DB> {
137    /// Returns a default "sane" configuration, suitable for testing or light-duty applications.
138    ///
139    /// Production applications will likely want to at least modify
140    /// [`max_connections`][Self::max_connections].
141    ///
142    /// See the source of this method for the current default values.
143    pub fn new() -> Self {
144        Self {
145            // User-specifiable routines
146            after_connect: None,
147            before_acquire: None,
148            after_release: None,
149            test_before_acquire: true,
150            // A production application will want to set a higher limit than this.
151            max_connections: 10,
152            min_connections: 0,
153            // Logging all acquires is opt-in
154            acquire_time_level: LevelFilter::Off,
155            // Default to warning, because an acquire timeout will be an error
156            acquire_slow_level: LevelFilter::Warn,
157            // Fast enough to catch problems (e.g. a full pool); slow enough
158            // to not flag typical time to add a new connection to a pool.
159            acquire_slow_threshold: Duration::from_secs(2),
160            acquire_timeout: Duration::from_secs(30),
161            idle_timeout: Some(Duration::from_secs(10 * 60)),
162            max_lifetime: Some(Duration::from_secs(30 * 60)),
163            fair: true,
164            parent_pool: None,
165        }
166    }
167
168    /// Set the maximum number of connections that this pool should maintain.
169    ///
170    /// Be mindful of the connection limits for your database as well as other applications
171    /// which may want to connect to the same database (or even multiple instances of the same
172    /// application in high-availability deployments).
173    pub fn max_connections(mut self, max: u32) -> Self {
174        self.max_connections = max;
175        self
176    }
177
178    /// Get the maximum number of connections that this pool should maintain
179    pub fn get_max_connections(&self) -> u32 {
180        self.max_connections
181    }
182
183    /// Set the minimum number of connections to maintain at all times.
184    ///
185    /// When the pool is built, this many connections will be automatically spun up.
186    ///
187    /// If any connection is reaped by [`max_lifetime`] or [`idle_timeout`], or explicitly closed,
188    /// and it brings the connection count below this amount, a new connection will be opened to
189    /// replace it.
190    ///
191    /// This is only done on a best-effort basis, however. The routine that maintains this value
192    /// has a deadline so it doesn't wait forever if the database is being slow or returning errors.
193    ///
194    /// This value is clamped internally to not exceed [`max_connections`].
195    ///
196    /// We've chosen not to assert `min_connections <= max_connections` anywhere
197    /// because it shouldn't break anything internally if the condition doesn't hold,
198    /// and if the application allows either value to be dynamically set
199    /// then it should be checking this condition itself and returning
200    /// a nicer error than a panic anyway.
201    ///
202    /// [`max_lifetime`]: Self::max_lifetime
203    /// [`idle_timeout`]: Self::idle_timeout
204    /// [`max_connections`]: Self::max_connections
205    pub fn min_connections(mut self, min: u32) -> Self {
206        self.min_connections = min;
207        self
208    }
209
210    /// Get the minimum number of connections to maintain at all times.
211    pub fn get_min_connections(&self) -> u32 {
212        self.min_connections
213    }
214
215    /// Enable logging of time taken to acquire a connection from the connection pool via
216    /// [`Pool::acquire()`].
217    ///
218    /// If slow acquire logging is also enabled, this level is used for acquires that are not
219    /// considered slow.
220    pub fn acquire_time_level(mut self, level: LevelFilter) -> Self {
221        self.acquire_time_level = level;
222        self
223    }
224
225    /// Log excessive time taken to acquire a connection at a different log level than time taken
226    /// for faster connection acquires via [`Pool::acquire()`].
227    pub fn acquire_slow_level(mut self, level: LevelFilter) -> Self {
228        self.acquire_slow_level = level;
229        self
230    }
231
232    /// Set a threshold for reporting excessive time taken to acquire a connection from
233    /// the connection pool via [`Pool::acquire()`]. When the threshold is exceeded, a warning is logged.
234    ///
235    /// Defaults to a value that should not typically be exceeded by the pool enlarging
236    /// itself with an additional new connection.
237    pub fn acquire_slow_threshold(mut self, threshold: Duration) -> Self {
238        self.acquire_slow_threshold = threshold;
239        self
240    }
241
242    /// Get the threshold for reporting excessive time taken to acquire a connection via
243    /// [`Pool::acquire()`].
244    pub fn get_acquire_slow_threshold(&self) -> Duration {
245        self.acquire_slow_threshold
246    }
247
248    /// Set the maximum amount of time to spend waiting for a connection in [`Pool::acquire()`].
249    ///
250    /// Caps the total amount of time `Pool::acquire()` can spend waiting across multiple phases:
251    ///
252    /// * First, it may need to wait for a permit from the semaphore, which grants it the privilege
253    ///   of opening a connection or popping one from the idle queue.
254    /// * If an existing idle connection is acquired, by default it will be checked for liveness
255    ///   and integrity before being returned, which may require executing a command on the
256    ///   connection. This can be disabled with [`test_before_acquire(false)`][Self::test_before_acquire].
257    ///     * If [`before_acquire`][Self::before_acquire] is set, that will also be executed.
258    /// * If a new connection needs to be opened, that will obviously require I/O, handshaking,
259    ///   and initialization commands.
260    ///     * If [`after_connect`][Self::after_connect] is set, that will also be executed.
261    pub fn acquire_timeout(mut self, timeout: Duration) -> Self {
262        self.acquire_timeout = timeout;
263        self
264    }
265
266    /// Get the maximum amount of time to spend waiting for a connection in [`Pool::acquire()`].
267    pub fn get_acquire_timeout(&self) -> Duration {
268        self.acquire_timeout
269    }
270
271    /// Set the maximum lifetime of individual connections.
272    ///
273    /// Any connection with a lifetime greater than this will be closed.
274    ///
275    /// When set to `None`, all connections live until either reaped by [`idle_timeout`]
276    /// or explicitly disconnected.
277    ///
278    /// Infinite connections are not recommended due to the unfortunate reality of memory/resource
279    /// leaks on the database-side. It is better to retire connections periodically
280    /// (even if only once daily) to allow the database the opportunity to clean up data structures
281    /// (parse trees, query metadata caches, thread-local storage, etc.) that are associated with a
282    /// session.
283    ///
284    /// [`idle_timeout`]: Self::idle_timeout
285    pub fn max_lifetime(mut self, lifetime: impl Into<Option<Duration>>) -> Self {
286        self.max_lifetime = lifetime.into();
287        self
288    }
289
290    /// Get the maximum lifetime of individual connections.
291    pub fn get_max_lifetime(&self) -> Option<Duration> {
292        self.max_lifetime
293    }
294
295    /// Set a maximum idle duration for individual connections.
296    ///
297    /// Any connection that remains in the idle queue longer than this will be closed.
298    ///
299    /// For usage-based database server billing, this can be a cost saver.
300    pub fn idle_timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
301        self.idle_timeout = timeout.into();
302        self
303    }
304
305    /// Get the maximum idle duration for individual connections.
306    pub fn get_idle_timeout(&self) -> Option<Duration> {
307        self.idle_timeout
308    }
309
310    /// If true, the health of a connection will be verified by a call to [`Connection::ping`]
311    /// before returning the connection.
312    ///
313    /// Defaults to `true`.
314    pub fn test_before_acquire(mut self, test: bool) -> Self {
315        self.test_before_acquire = test;
316        self
317    }
318
319    /// Get whether `test_before_acquire` is currently set.
320    pub fn get_test_before_acquire(&self) -> bool {
321        self.test_before_acquire
322    }
323
324    /// If set to `true`, calls to `acquire()` are fair and connections  are issued
325    /// in first-come-first-serve order. If `false`, "drive-by" tasks may steal idle connections
326    /// ahead of tasks that have been waiting.
327    ///
328    /// According to `sqlx-bench/benches/pg_pool` this may slightly increase time
329    /// to `acquire()` at low pool contention but at very high contention it helps
330    /// avoid tasks at the head of the waiter queue getting repeatedly preempted by
331    /// these "drive-by" tasks and tasks further back in the queue timing out because
332    /// the queue isn't moving.
333    ///
334    /// Currently only exposed for benchmarking; `fair = true` seems to be the superior option
335    /// in most cases.
336    #[doc(hidden)]
337    pub fn __fair(mut self, fair: bool) -> Self {
338        self.fair = fair;
339        self
340    }
341
342    /// Perform an asynchronous action after connecting to the database.
343    ///
344    /// If the operation returns with an error then the error is logged, the connection is closed
345    /// and a new one is opened in its place and the callback is invoked again.
346    ///
347    /// This occurs in a backoff loop to avoid high CPU usage and spamming logs during a transient
348    /// error condition.
349    ///
350    /// Note that this may be called for internally opened connections, such as when maintaining
351    /// [`min_connections`][Self::min_connections], that are then immediately returned to the pool
352    /// without invoking [`after_release`][Self::after_release].
353    ///
354    /// # Example: Additional Parameters
355    /// This callback may be used to set additional configuration parameters
356    /// that are not exposed by the database's `ConnectOptions`.
357    ///
358    /// This example is written for PostgreSQL but can likely be adapted to other databases.
359    ///
360    /// ```no_run
361    /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
362    /// use sqlx::Executor;
363    /// use sqlx::postgres::PgPoolOptions;
364    ///
365    /// let pool = PgPoolOptions::new()
366    ///     .after_connect(|conn, _meta| Box::pin(async move {
367    ///         // When directly invoking `Executor` methods,
368    ///         // it is possible to execute multiple statements with one call.
369    ///         conn.execute("SET application_name = 'your_app'; SET search_path = 'my_schema';")
370    ///             .await?;
371    ///
372    ///         Ok(())
373    ///     }))
374    ///     .connect("postgres:// …").await?;
375    /// # Ok(())
376    /// # }
377    /// ```
378    ///
379    /// For a discussion on why `Box::pin()` is required, see [the type-level docs][Self].
380    pub fn after_connect<F>(mut self, callback: F) -> Self
381    where
382        // We're passing the `PoolConnectionMetadata` here mostly for future-proofing.
383        // `age` and `idle_for` are obviously not useful for fresh connections.
384        for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result<(), Error>>
385            + 'static
386            + Send
387            + Sync,
388    {
389        self.after_connect = Some(Arc::new(callback));
390        self
391    }
392
393    /// Perform an asynchronous action on a previously idle connection before giving it out.
394    ///
395    /// Alongside the connection, the closure gets [`PoolConnectionMetadata`] which contains
396    /// potentially useful information such as the connection's age and the duration it was
397    /// idle.
398    ///
399    /// If the operation returns `Ok(true)`, the connection is returned to the task that called
400    /// [`Pool::acquire`].
401    ///
402    /// If the operation returns `Ok(false)` or an error, the error is logged (if applicable)
403    /// and then the connection is closed and [`Pool::acquire`] tries again with another idle
404    /// connection. If it runs out of idle connections, it opens a new connection instead.
405    ///
406    /// This is *not* invoked for new connections. Use [`after_connect`][Self::after_connect]
407    /// for those.
408    ///
409    /// # Example: Custom `test_before_acquire` Logic
410    /// If you only want to ping connections if they've been idle a certain amount of time,
411    /// you can implement your own logic here:
412    ///
413    /// This example is written for Postgres but should be trivially adaptable to other databases.
414    /// ```no_run
415    /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
416    /// use sqlx::{Connection, Executor};
417    /// use sqlx::postgres::PgPoolOptions;
418    ///
419    /// let pool = PgPoolOptions::new()
420    ///     .test_before_acquire(false)
421    ///     .before_acquire(|conn, meta| Box::pin(async move {
422    ///         // One minute
423    ///         if meta.idle_for.as_secs() > 60 {
424    ///             conn.ping().await?;
425    ///         }
426    ///
427    ///         Ok(true)
428    ///     }))
429    ///     .connect("postgres:// …").await?;
430    /// # Ok(())
431    /// # }
432    ///```
433    ///
434    /// For a discussion on why `Box::pin()` is required, see [the type-level docs][Self].
435    pub fn before_acquire<F>(mut self, callback: F) -> Self
436    where
437        for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result<bool, Error>>
438            + 'static
439            + Send
440            + Sync,
441    {
442        self.before_acquire = Some(Arc::new(callback));
443        self
444    }
445
446    /// Perform an asynchronous action on a connection before it is returned to the pool.
447    ///
448    /// Alongside the connection, the closure gets [`PoolConnectionMetadata`] which contains
449    /// potentially useful information such as the connection's age.
450    ///
451    /// If the operation returns `Ok(true)`, the connection is returned to the pool's idle queue.
452    /// If the operation returns `Ok(false)` or an error, the error is logged (if applicable)
453    /// and the connection is closed, allowing a task waiting on [`Pool::acquire`] to
454    /// open a new one in its place.
455    ///
456    /// # Example (Postgres): Close Memory-Hungry Connections
457    /// Instead of relying on [`max_lifetime`][Self::max_lifetime] to close connections,
458    /// we can monitor their memory usage directly and close any that have allocated too much.
459    ///
460    /// Note that this is purely an example showcasing a possible use for this callback
461    /// and may be flawed as it has not been tested.
462    ///
463    /// This example queries [`pg_backend_memory_contexts`](https://www.postgresql.org/docs/current/view-pg-backend-memory-contexts.html)
464    /// which is only allowed for superusers.
465    ///
466    /// ```no_run
467    /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
468    /// use sqlx::{Connection, Executor};
469    /// use sqlx::postgres::PgPoolOptions;
470    ///
471    /// let pool = PgPoolOptions::new()
472    ///     // Let connections live as long as they want.
473    ///     .max_lifetime(None)
474    ///     .after_release(|conn, meta| Box::pin(async move {
475    ///         // Only check connections older than 6 hours.
476    ///         if meta.age.as_secs() < 6 * 60 * 60 {
477    ///             return Ok(true);
478    ///         }
479    ///
480    ///         let total_memory_usage: i64 = sqlx::query_scalar(
481    ///             "select sum(used_bytes) from pg_backend_memory_contexts"
482    ///         )
483    ///         .fetch_one(conn)
484    ///         .await?;
485    ///
486    ///         // Close the connection if the backend memory usage exceeds 256 MiB.
487    ///         Ok(total_memory_usage <= (2 << 28))
488    ///     }))
489    ///     .connect("postgres:// …").await?;
490    /// # Ok(())
491    /// # }
492    pub fn after_release<F>(mut self, callback: F) -> Self
493    where
494        for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result<bool, Error>>
495            + 'static
496            + Send
497            + Sync,
498    {
499        self.after_release = Some(Arc::new(callback));
500        self
501    }
502
503    /// Set the parent `Pool` from which the new pool will inherit its semaphore.
504    ///
505    /// This is currently an internal-only API.
506    ///
507    /// ### Panics
508    /// If `self.max_connections` is greater than the setting the given pool was created with,
509    /// or `self.fair` differs from the setting the given pool was created with.
510    #[doc(hidden)]
511    pub fn parent(mut self, pool: Pool<DB>) -> Self {
512        self.parent_pool = Some(pool);
513        self
514    }
515
516    /// Create a new pool from this `PoolOptions` and immediately open at least one connection.
517    ///
518    /// This ensures the configuration is correct.
519    ///
520    /// The total number of connections opened is <code>max(1, [min_connections][Self::min_connections])</code>.
521    ///
522    /// Refer to the relevant `ConnectOptions` impl for your database for the expected URL format:
523    ///
524    /// * Postgres: [`PgConnectOptions`][crate::postgres::PgConnectOptions]
525    /// * MySQL: [`MySqlConnectOptions`][crate::mysql::MySqlConnectOptions]
526    /// * SQLite: [`SqliteConnectOptions`][crate::sqlite::SqliteConnectOptions]
527    /// * MSSQL: [`MssqlConnectOptions`][crate::mssql::MssqlConnectOptions]
528    pub async fn connect(self, url: &str) -> Result<Pool<DB>, Error> {
529        self.connect_with(url.parse()?).await
530    }
531
532    /// Create a new pool from this `PoolOptions` and immediately open at least one connection.
533    ///
534    /// This ensures the configuration is correct.
535    ///
536    /// The total number of connections opened is <code>max(1, [min_connections][Self::min_connections])</code>.
537    pub async fn connect_with(
538        self,
539        options: <DB::Connection as Connection>::Options,
540    ) -> Result<Pool<DB>, Error> {
541        // Don't take longer than `acquire_timeout` starting from when this is called.
542        let deadline = Instant::now() + self.acquire_timeout;
543
544        let inner = PoolInner::new_arc(self, options);
545
546        if inner.options.min_connections > 0 {
547            // If the idle reaper is spawned then this will race with the call from that task
548            // and may not report any connection errors.
549            inner.try_min_connections(deadline).await?;
550        }
551
552        // If `min_connections` is nonzero then we'll likely just pull a connection
553        // from the idle queue here, but it should at least get tested first.
554        let conn = inner.acquire().await?;
555        inner.release(conn);
556
557        Ok(Pool(inner))
558    }
559
560    /// Create a new pool from this `PoolOptions`, but don't open any connections right now.
561    ///
562    /// If [`min_connections`][Self::min_connections] is set, a background task will be spawned to
563    /// optimistically establish that many connections for the pool.
564    ///
565    /// Refer to the relevant `ConnectOptions` impl for your database for the expected URL format:
566    ///
567    /// * Postgres: [`PgConnectOptions`][crate::postgres::PgConnectOptions]
568    /// * MySQL: [`MySqlConnectOptions`][crate::mysql::MySqlConnectOptions]
569    /// * SQLite: [`SqliteConnectOptions`][crate::sqlite::SqliteConnectOptions]
570    /// * MSSQL: [`MssqlConnectOptions`][crate::mssql::MssqlConnectOptions]
571    pub fn connect_lazy(self, url: &str) -> Result<Pool<DB>, Error> {
572        Ok(self.connect_lazy_with(url.parse()?))
573    }
574
575    /// Create a new pool from this `PoolOptions`, but don't open any connections right now.
576    ///
577    /// If [`min_connections`][Self::min_connections] is set, a background task will be spawned to
578    /// optimistically establish that many connections for the pool.
579    pub fn connect_lazy_with(self, options: <DB::Connection as Connection>::Options) -> Pool<DB> {
580        // `min_connections` is guaranteed by the idle reaper now.
581        Pool(PoolInner::new_arc(self, options))
582    }
583}
584
585impl<DB: Database> Debug for PoolOptions<DB> {
586    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
587        f.debug_struct("PoolOptions")
588            .field("max_connections", &self.max_connections)
589            .field("min_connections", &self.min_connections)
590            .field("connect_timeout", &self.acquire_timeout)
591            .field("max_lifetime", &self.max_lifetime)
592            .field("idle_timeout", &self.idle_timeout)
593            .field("test_before_acquire", &self.test_before_acquire)
594            .finish()
595    }
596}