sqlx_core/connection.rs
1use crate::database::{Database, HasStatementCache};
2use crate::error::Error;
3
4use crate::transaction::Transaction;
5use futures_core::future::BoxFuture;
6use log::LevelFilter;
7use std::fmt::Debug;
8use std::str::FromStr;
9use std::time::Duration;
10use url::Url;
11
12/// Represents a single database connection.
13pub trait Connection: Send {
14 type Database: Database<Connection = Self>;
15
16 type Options: ConnectOptions<Connection = Self>;
17
18 /// Explicitly close this database connection.
19 ///
20 /// This notifies the database server that the connection is closing so that it can
21 /// free up any server-side resources in use.
22 ///
23 /// While connections can simply be dropped to clean up local resources,
24 /// the `Drop` handler itself cannot notify the server that the connection is being closed
25 /// because that may require I/O to send a termination message. That can result in a delay
26 /// before the server learns that the connection is gone, usually from a TCP keepalive timeout.
27 ///
28 /// Creating and dropping many connections in short order without calling `.close()` may
29 /// lead to errors from the database server because those senescent connections will still
30 /// count against any connection limit or quota that is configured.
31 ///
32 /// Therefore it is recommended to call `.close()` on a connection when you are done using it
33 /// and to `.await` the result to ensure the termination message is sent.
34 fn close(self) -> BoxFuture<'static, Result<(), Error>>;
35
36 /// Immediately close the connection without sending a graceful shutdown.
37 ///
38 /// This should still at least send a TCP `FIN` frame to let the server know we're dying.
39 #[doc(hidden)]
40 fn close_hard(self) -> BoxFuture<'static, Result<(), Error>>;
41
42 /// Checks if a connection to the database is still valid.
43 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>>;
44
45 /// Begin a new transaction or establish a savepoint within the active transaction.
46 ///
47 /// Returns a [`Transaction`] for controlling and tracking the new transaction.
48 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
49 where
50 Self: Sized;
51
52 /// Execute the function inside a transaction.
53 ///
54 /// If the function returns an error, the transaction will be rolled back. If it does not
55 /// return an error, the transaction will be committed.
56 ///
57 /// # Example
58 ///
59 /// ```rust
60 /// use sqlx::postgres::{PgConnection, PgRow};
61 /// use sqlx::Connection;
62 ///
63 /// # pub async fn _f(conn: &mut PgConnection) -> sqlx::Result<Vec<PgRow>> {
64 /// conn.transaction(|txn| Box::pin(async move {
65 /// sqlx::query("select * from ..").fetch_all(&mut **txn).await
66 /// })).await
67 /// # }
68 /// ```
69 fn transaction<'a, F, R, E>(&'a mut self, callback: F) -> BoxFuture<'a, Result<R, E>>
70 where
71 for<'c> F: FnOnce(&'c mut Transaction<'_, Self::Database>) -> BoxFuture<'c, Result<R, E>>
72 + 'a
73 + Send
74 + Sync,
75 Self: Sized,
76 R: Send,
77 E: From<Error> + Send,
78 {
79 Box::pin(async move {
80 let mut transaction = self.begin().await?;
81 let ret = callback(&mut transaction).await;
82
83 match ret {
84 Ok(ret) => {
85 transaction.commit().await?;
86
87 Ok(ret)
88 }
89 Err(err) => {
90 transaction.rollback().await?;
91
92 Err(err)
93 }
94 }
95 })
96 }
97
98 /// The number of statements currently cached in the connection.
99 fn cached_statements_size(&self) -> usize
100 where
101 Self::Database: HasStatementCache,
102 {
103 0
104 }
105
106 /// Removes all statements from the cache, closing them on the server if
107 /// needed.
108 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>>
109 where
110 Self::Database: HasStatementCache,
111 {
112 Box::pin(async move { Ok(()) })
113 }
114
115 /// Restore any buffers in the connection to their default capacity, if possible.
116 ///
117 /// Sending a large query or receiving a resultset with many columns can cause the connection
118 /// to allocate additional buffer space to fit the data which is retained afterwards in
119 /// case it's needed again. This can give the outward appearance of a memory leak, but is
120 /// in fact the intended behavior.
121 ///
122 /// Calling this method tells the connection to release that excess memory if it can,
123 /// though be aware that calling this too often can cause unnecessary thrashing or
124 /// fragmentation in the global allocator. If there's still data in the connection buffers
125 /// (unlikely if the last query was run to completion) then it may need to be moved to
126 /// allow the buffers to shrink.
127 fn shrink_buffers(&mut self);
128
129 #[doc(hidden)]
130 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>>;
131
132 #[doc(hidden)]
133 fn should_flush(&self) -> bool;
134
135 /// Establish a new database connection.
136 ///
137 /// A value of [`Options`][Self::Options] is parsed from the provided connection string. This parsing
138 /// is database-specific.
139 #[inline]
140 fn connect(url: &str) -> BoxFuture<'static, Result<Self, Error>>
141 where
142 Self: Sized,
143 {
144 let options = url.parse();
145
146 Box::pin(async move { Self::connect_with(&options?).await })
147 }
148
149 /// Establish a new database connection with the provided options.
150 fn connect_with(options: &Self::Options) -> BoxFuture<'_, Result<Self, Error>>
151 where
152 Self: Sized,
153 {
154 options.connect()
155 }
156}
157
158#[derive(Clone, Debug)]
159#[non_exhaustive]
160pub struct LogSettings {
161 pub statements_level: LevelFilter,
162 pub slow_statements_level: LevelFilter,
163 pub slow_statements_duration: Duration,
164}
165
166impl Default for LogSettings {
167 fn default() -> Self {
168 LogSettings {
169 statements_level: LevelFilter::Debug,
170 slow_statements_level: LevelFilter::Warn,
171 slow_statements_duration: Duration::from_secs(1),
172 }
173 }
174}
175
176impl LogSettings {
177 pub fn log_statements(&mut self, level: LevelFilter) {
178 self.statements_level = level;
179 }
180 pub fn log_slow_statements(&mut self, level: LevelFilter, duration: Duration) {
181 self.slow_statements_level = level;
182 self.slow_statements_duration = duration;
183 }
184}
185
186pub trait ConnectOptions: 'static + Send + Sync + FromStr<Err = Error> + Debug + Clone {
187 type Connection: Connection<Options = Self> + ?Sized;
188
189 /// Parse the `ConnectOptions` from a URL.
190 fn from_url(url: &Url) -> Result<Self, Error>;
191
192 /// Get a connection URL that may be used to connect to the same database as this `ConnectOptions`.
193 ///
194 /// ### Note: Lossy
195 /// Any flags or settings which do not have a representation in the URL format will be lost.
196 /// They will fall back to their default settings when the URL is parsed.
197 ///
198 /// The only settings guaranteed to be preserved are:
199 /// * Username
200 /// * Password
201 /// * Hostname
202 /// * Port
203 /// * Database name
204 /// * Unix socket or SQLite database file path
205 /// * SSL mode (if applicable)
206 /// * SSL CA certificate path
207 /// * SSL client certificate path
208 /// * SSL client key path
209 ///
210 /// Additional settings are driver-specific. Refer to the source of a given implementation
211 /// to see which options are preserved in the URL.
212 ///
213 /// ### Panics
214 /// This defaults to `unimplemented!()`.
215 ///
216 /// Individual drivers should override this to implement the intended behavior.
217 fn to_url_lossy(&self) -> Url {
218 unimplemented!()
219 }
220
221 /// Establish a new database connection with the options specified by `self`.
222 fn connect(&self) -> BoxFuture<'_, Result<Self::Connection, Error>>
223 where
224 Self::Connection: Sized;
225
226 /// Log executed statements with the specified `level`
227 fn log_statements(self, level: LevelFilter) -> Self;
228
229 /// Log executed statements with a duration above the specified `duration`
230 /// at the specified `level`.
231 fn log_slow_statements(self, level: LevelFilter, duration: Duration) -> Self;
232
233 /// Entirely disables statement logging (both slow and regular).
234 fn disable_statement_logging(self) -> Self {
235 self.log_statements(LevelFilter::Off)
236 .log_slow_statements(LevelFilter::Off, Duration::default())
237 }
238}