sqlx_core/
connection.rs

1use crate::database::{Database, HasStatementCache};
2use crate::error::Error;
3
4use crate::transaction::Transaction;
5use futures_core::future::BoxFuture;
6use log::LevelFilter;
7use std::fmt::Debug;
8use std::str::FromStr;
9use std::time::Duration;
10use url::Url;
11
12/// Represents a single database connection.
13pub trait Connection: Send {
14    type Database: Database<Connection = Self>;
15
16    type Options: ConnectOptions<Connection = Self>;
17
18    /// Explicitly close this database connection.
19    ///
20    /// This notifies the database server that the connection is closing so that it can
21    /// free up any server-side resources in use.
22    ///
23    /// While connections can simply be dropped to clean up local resources,
24    /// the `Drop` handler itself cannot notify the server that the connection is being closed
25    /// because that may require I/O to send a termination message. That can result in a delay
26    /// before the server learns that the connection is gone, usually from a TCP keepalive timeout.
27    ///
28    /// Creating and dropping many connections in short order without calling `.close()` may
29    /// lead to errors from the database server because those senescent connections will still
30    /// count against any connection limit or quota that is configured.
31    ///
32    /// Therefore it is recommended to call `.close()` on a connection when you are done using it
33    /// and to `.await` the result to ensure the termination message is sent.
34    fn close(self) -> BoxFuture<'static, Result<(), Error>>;
35
36    /// Immediately close the connection without sending a graceful shutdown.
37    ///
38    /// This should still at least send a TCP `FIN` frame to let the server know we're dying.
39    #[doc(hidden)]
40    fn close_hard(self) -> BoxFuture<'static, Result<(), Error>>;
41
42    /// Checks if a connection to the database is still valid.
43    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>>;
44
45    /// Begin a new transaction or establish a savepoint within the active transaction.
46    ///
47    /// Returns a [`Transaction`] for controlling and tracking the new transaction.
48    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
49    where
50        Self: Sized;
51
52    /// Execute the function inside a transaction.
53    ///
54    /// If the function returns an error, the transaction will be rolled back. If it does not
55    /// return an error, the transaction will be committed.
56    ///
57    /// # Example
58    ///
59    /// ```rust
60    /// use sqlx::postgres::{PgConnection, PgRow};
61    /// use sqlx::Connection;
62    ///
63    /// # pub async fn _f(conn: &mut PgConnection) -> sqlx::Result<Vec<PgRow>> {
64    /// conn.transaction(|txn| Box::pin(async move {
65    ///     sqlx::query("select * from ..").fetch_all(&mut **txn).await
66    /// })).await
67    /// # }
68    /// ```
69    fn transaction<'a, F, R, E>(&'a mut self, callback: F) -> BoxFuture<'a, Result<R, E>>
70    where
71        for<'c> F: FnOnce(&'c mut Transaction<'_, Self::Database>) -> BoxFuture<'c, Result<R, E>>
72            + 'a
73            + Send
74            + Sync,
75        Self: Sized,
76        R: Send,
77        E: From<Error> + Send,
78    {
79        Box::pin(async move {
80            let mut transaction = self.begin().await?;
81            let ret = callback(&mut transaction).await;
82
83            match ret {
84                Ok(ret) => {
85                    transaction.commit().await?;
86
87                    Ok(ret)
88                }
89                Err(err) => {
90                    transaction.rollback().await?;
91
92                    Err(err)
93                }
94            }
95        })
96    }
97
98    /// The number of statements currently cached in the connection.
99    fn cached_statements_size(&self) -> usize
100    where
101        Self::Database: HasStatementCache,
102    {
103        0
104    }
105
106    /// Removes all statements from the cache, closing them on the server if
107    /// needed.
108    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>>
109    where
110        Self::Database: HasStatementCache,
111    {
112        Box::pin(async move { Ok(()) })
113    }
114
115    /// Restore any buffers in the connection to their default capacity, if possible.
116    ///
117    /// Sending a large query or receiving a resultset with many columns can cause the connection
118    /// to allocate additional buffer space to fit the data which is retained afterwards in
119    /// case it's needed again. This can give the outward appearance of a memory leak, but is
120    /// in fact the intended behavior.
121    ///
122    /// Calling this method tells the connection to release that excess memory if it can,
123    /// though be aware that calling this too often can cause unnecessary thrashing or
124    /// fragmentation in the global allocator. If there's still data in the connection buffers
125    /// (unlikely if the last query was run to completion) then it may need to be moved to
126    /// allow the buffers to shrink.
127    fn shrink_buffers(&mut self);
128
129    #[doc(hidden)]
130    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>>;
131
132    #[doc(hidden)]
133    fn should_flush(&self) -> bool;
134
135    /// Establish a new database connection.
136    ///
137    /// A value of [`Options`][Self::Options] is parsed from the provided connection string. This parsing
138    /// is database-specific.
139    #[inline]
140    fn connect(url: &str) -> BoxFuture<'static, Result<Self, Error>>
141    where
142        Self: Sized,
143    {
144        let options = url.parse();
145
146        Box::pin(async move { Self::connect_with(&options?).await })
147    }
148
149    /// Establish a new database connection with the provided options.
150    fn connect_with(options: &Self::Options) -> BoxFuture<'_, Result<Self, Error>>
151    where
152        Self: Sized,
153    {
154        options.connect()
155    }
156}
157
158#[derive(Clone, Debug)]
159#[non_exhaustive]
160pub struct LogSettings {
161    pub statements_level: LevelFilter,
162    pub slow_statements_level: LevelFilter,
163    pub slow_statements_duration: Duration,
164}
165
166impl Default for LogSettings {
167    fn default() -> Self {
168        LogSettings {
169            statements_level: LevelFilter::Debug,
170            slow_statements_level: LevelFilter::Warn,
171            slow_statements_duration: Duration::from_secs(1),
172        }
173    }
174}
175
176impl LogSettings {
177    pub fn log_statements(&mut self, level: LevelFilter) {
178        self.statements_level = level;
179    }
180    pub fn log_slow_statements(&mut self, level: LevelFilter, duration: Duration) {
181        self.slow_statements_level = level;
182        self.slow_statements_duration = duration;
183    }
184}
185
186pub trait ConnectOptions: 'static + Send + Sync + FromStr<Err = Error> + Debug + Clone {
187    type Connection: Connection<Options = Self> + ?Sized;
188
189    /// Parse the `ConnectOptions` from a URL.
190    fn from_url(url: &Url) -> Result<Self, Error>;
191
192    /// Get a connection URL that may be used to connect to the same database as this `ConnectOptions`.
193    ///
194    /// ### Note: Lossy
195    /// Any flags or settings which do not have a representation in the URL format will be lost.
196    /// They will fall back to their default settings when the URL is parsed.
197    ///
198    /// The only settings guaranteed to be preserved are:
199    /// * Username
200    /// * Password
201    /// * Hostname
202    /// * Port
203    /// * Database name
204    /// * Unix socket or SQLite database file path
205    /// * SSL mode (if applicable)
206    /// * SSL CA certificate path
207    /// * SSL client certificate path
208    /// * SSL client key path
209    ///
210    /// Additional settings are driver-specific. Refer to the source of a given implementation
211    /// to see which options are preserved in the URL.
212    ///
213    /// ### Panics
214    /// This defaults to `unimplemented!()`.
215    ///
216    /// Individual drivers should override this to implement the intended behavior.
217    fn to_url_lossy(&self) -> Url {
218        unimplemented!()
219    }
220
221    /// Establish a new database connection with the options specified by `self`.
222    fn connect(&self) -> BoxFuture<'_, Result<Self::Connection, Error>>
223    where
224        Self::Connection: Sized;
225
226    /// Log executed statements with the specified `level`
227    fn log_statements(self, level: LevelFilter) -> Self;
228
229    /// Log executed statements with a duration above the specified `duration`
230    /// at the specified `level`.
231    fn log_slow_statements(self, level: LevelFilter, duration: Duration) -> Self;
232
233    /// Entirely disables statement logging (both slow and regular).
234    fn disable_statement_logging(self) -> Self {
235        self.log_statements(LevelFilter::Off)
236            .log_slow_statements(LevelFilter::Off, Duration::default())
237    }
238}