sqlx_postgres/connection/
mod.rs

1use std::fmt::{self, Debug, Formatter};
2use std::sync::Arc;
3
4use crate::HashMap;
5use futures_core::future::BoxFuture;
6use futures_util::FutureExt;
7
8use crate::common::StatementCache;
9use crate::error::Error;
10use crate::ext::ustr::UStr;
11use crate::io::StatementId;
12use crate::message::{
13    BackendMessageFormat, Close, Query, ReadyForQuery, ReceivedMessage, Terminate,
14    TransactionStatus,
15};
16use crate::statement::PgStatementMetadata;
17use crate::transaction::Transaction;
18use crate::types::Oid;
19use crate::{PgConnectOptions, PgTypeInfo, Postgres};
20
21pub(crate) use sqlx_core::connection::*;
22
23pub use self::stream::PgStream;
24
25pub(crate) mod describe;
26mod establish;
27mod executor;
28mod sasl;
29mod stream;
30mod tls;
31
32/// A connection to a PostgreSQL database.
33pub struct PgConnection {
34    pub(crate) inner: Box<PgConnectionInner>,
35}
36
37pub struct PgConnectionInner {
38    // underlying TCP or UDS stream,
39    // wrapped in a potentially TLS stream,
40    // wrapped in a buffered stream
41    pub(crate) stream: PgStream,
42
43    // process id of this backend
44    // used to send cancel requests
45    #[allow(dead_code)]
46    process_id: u32,
47
48    // secret key of this backend
49    // used to send cancel requests
50    #[allow(dead_code)]
51    secret_key: u32,
52
53    // sequence of statement IDs for use in preparing statements
54    // in PostgreSQL, the statement is prepared to a user-supplied identifier
55    next_statement_id: StatementId,
56
57    // cache statement by query string to the id and columns
58    cache_statement: StatementCache<(StatementId, Arc<PgStatementMetadata>)>,
59
60    // cache user-defined types by id <-> info
61    cache_type_info: HashMap<Oid, PgTypeInfo>,
62    cache_type_oid: HashMap<UStr, Oid>,
63    cache_elem_type_to_array: HashMap<Oid, Oid>,
64
65    // number of ReadyForQuery messages that we are currently expecting
66    pub(crate) pending_ready_for_query_count: usize,
67
68    // current transaction status
69    transaction_status: TransactionStatus,
70    pub(crate) transaction_depth: usize,
71
72    log_settings: LogSettings,
73}
74
75impl PgConnection {
76    /// the version number of the server in `libpq` format
77    pub fn server_version_num(&self) -> Option<u32> {
78        self.inner.stream.server_version_num
79    }
80
81    // will return when the connection is ready for another query
82    pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
83        if !self.inner.stream.write_buffer_mut().is_empty() {
84            self.inner.stream.flush().await?;
85        }
86
87        while self.inner.pending_ready_for_query_count > 0 {
88            let message = self.inner.stream.recv().await?;
89
90            if let BackendMessageFormat::ReadyForQuery = message.format {
91                self.handle_ready_for_query(message)?;
92            }
93        }
94
95        Ok(())
96    }
97
98    async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
99        let r: ReadyForQuery = self.inner.stream.recv_expect().await?;
100
101        self.inner.pending_ready_for_query_count -= 1;
102        self.inner.transaction_status = r.transaction_status;
103
104        Ok(())
105    }
106
107    #[inline(always)]
108    fn handle_ready_for_query(&mut self, message: ReceivedMessage) -> Result<(), Error> {
109        self.inner.pending_ready_for_query_count = self
110            .inner
111            .pending_ready_for_query_count
112            .checked_sub(1)
113            .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?;
114
115        self.inner.transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
116
117        Ok(())
118    }
119
120    /// Queue a simple query (not prepared) to execute the next time this connection is used.
121    ///
122    /// Used for rolling back transactions and releasing advisory locks.
123    #[inline(always)]
124    pub(crate) fn queue_simple_query(&mut self, query: &str) -> Result<(), Error> {
125        self.inner.stream.write_msg(Query(query))?;
126        self.inner.pending_ready_for_query_count += 1;
127
128        Ok(())
129    }
130}
131
132impl Debug for PgConnection {
133    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
134        f.debug_struct("PgConnection").finish()
135    }
136}
137
138impl Connection for PgConnection {
139    type Database = Postgres;
140
141    type Options = PgConnectOptions;
142
143    fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
144        // The normal, graceful termination procedure is that the frontend sends a Terminate
145        // message and immediately closes the connection.
146
147        // On receipt of this message, the backend closes the
148        // connection and terminates.
149
150        Box::pin(async move {
151            self.inner.stream.send(Terminate).await?;
152            self.inner.stream.shutdown().await?;
153
154            Ok(())
155        })
156    }
157
158    fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
159        Box::pin(async move {
160            self.inner.stream.shutdown().await?;
161
162            Ok(())
163        })
164    }
165
166    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
167        // Users were complaining about this showing up in query statistics on the server.
168        // By sending a comment we avoid an error if the connection was in the middle of a rowset
169        // self.execute("/* SQLx ping */").map_ok(|_| ()).boxed()
170
171        Box::pin(async move {
172            // The simplest call-and-response that's possible.
173            self.write_sync();
174            self.wait_until_ready().await
175        })
176    }
177
178    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
179    where
180        Self: Sized,
181    {
182        Transaction::begin(self)
183    }
184
185    fn cached_statements_size(&self) -> usize {
186        self.inner.cache_statement.len()
187    }
188
189    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
190        Box::pin(async move {
191            self.inner.cache_type_oid.clear();
192
193            let mut cleared = 0_usize;
194
195            self.wait_until_ready().await?;
196
197            while let Some((id, _)) = self.inner.cache_statement.remove_lru() {
198                self.inner.stream.write_msg(Close::Statement(id))?;
199                cleared += 1;
200            }
201
202            if cleared > 0 {
203                self.write_sync();
204                self.inner.stream.flush().await?;
205
206                self.wait_for_close_complete(cleared).await?;
207                self.recv_ready_for_query().await?;
208            }
209
210            Ok(())
211        })
212    }
213
214    fn shrink_buffers(&mut self) {
215        self.inner.stream.shrink_buffers();
216    }
217
218    #[doc(hidden)]
219    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
220        self.wait_until_ready().boxed()
221    }
222
223    #[doc(hidden)]
224    fn should_flush(&self) -> bool {
225        !self.inner.stream.write_buffer().is_empty()
226    }
227}
228
229// Implement `AsMut<Self>` so that `PgConnection` can be wrapped in
230// a `PgAdvisoryLockGuard`.
231//
232// See: https://github.com/launchbadge/sqlx/issues/2520
233impl AsMut<PgConnection> for PgConnection {
234    fn as_mut(&mut self) -> &mut PgConnection {
235        self
236    }
237}