sqlx_postgres/connection/
mod.rs1use std::fmt::{self, Debug, Formatter};
2use std::sync::Arc;
3
4use crate::HashMap;
5use futures_core::future::BoxFuture;
6use futures_util::FutureExt;
7
8use crate::common::StatementCache;
9use crate::error::Error;
10use crate::ext::ustr::UStr;
11use crate::io::StatementId;
12use crate::message::{
13 BackendMessageFormat, Close, Query, ReadyForQuery, ReceivedMessage, Terminate,
14 TransactionStatus,
15};
16use crate::statement::PgStatementMetadata;
17use crate::transaction::Transaction;
18use crate::types::Oid;
19use crate::{PgConnectOptions, PgTypeInfo, Postgres};
20
21pub(crate) use sqlx_core::connection::*;
22
23pub use self::stream::PgStream;
24
25pub(crate) mod describe;
26mod establish;
27mod executor;
28mod sasl;
29mod stream;
30mod tls;
31
32pub struct PgConnection {
34 pub(crate) inner: Box<PgConnectionInner>,
35}
36
37pub struct PgConnectionInner {
38 pub(crate) stream: PgStream,
42
43 #[allow(dead_code)]
46 process_id: u32,
47
48 #[allow(dead_code)]
51 secret_key: u32,
52
53 next_statement_id: StatementId,
56
57 cache_statement: StatementCache<(StatementId, Arc<PgStatementMetadata>)>,
59
60 cache_type_info: HashMap<Oid, PgTypeInfo>,
62 cache_type_oid: HashMap<UStr, Oid>,
63 cache_elem_type_to_array: HashMap<Oid, Oid>,
64
65 pub(crate) pending_ready_for_query_count: usize,
67
68 transaction_status: TransactionStatus,
70 pub(crate) transaction_depth: usize,
71
72 log_settings: LogSettings,
73}
74
75impl PgConnection {
76 pub fn server_version_num(&self) -> Option<u32> {
78 self.inner.stream.server_version_num
79 }
80
81 pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
83 if !self.inner.stream.write_buffer_mut().is_empty() {
84 self.inner.stream.flush().await?;
85 }
86
87 while self.inner.pending_ready_for_query_count > 0 {
88 let message = self.inner.stream.recv().await?;
89
90 if let BackendMessageFormat::ReadyForQuery = message.format {
91 self.handle_ready_for_query(message)?;
92 }
93 }
94
95 Ok(())
96 }
97
98 async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
99 let r: ReadyForQuery = self.inner.stream.recv_expect().await?;
100
101 self.inner.pending_ready_for_query_count -= 1;
102 self.inner.transaction_status = r.transaction_status;
103
104 Ok(())
105 }
106
107 #[inline(always)]
108 fn handle_ready_for_query(&mut self, message: ReceivedMessage) -> Result<(), Error> {
109 self.inner.pending_ready_for_query_count = self
110 .inner
111 .pending_ready_for_query_count
112 .checked_sub(1)
113 .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?;
114
115 self.inner.transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
116
117 Ok(())
118 }
119
120 #[inline(always)]
124 pub(crate) fn queue_simple_query(&mut self, query: &str) -> Result<(), Error> {
125 self.inner.stream.write_msg(Query(query))?;
126 self.inner.pending_ready_for_query_count += 1;
127
128 Ok(())
129 }
130}
131
132impl Debug for PgConnection {
133 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
134 f.debug_struct("PgConnection").finish()
135 }
136}
137
138impl Connection for PgConnection {
139 type Database = Postgres;
140
141 type Options = PgConnectOptions;
142
143 fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
144 Box::pin(async move {
151 self.inner.stream.send(Terminate).await?;
152 self.inner.stream.shutdown().await?;
153
154 Ok(())
155 })
156 }
157
158 fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
159 Box::pin(async move {
160 self.inner.stream.shutdown().await?;
161
162 Ok(())
163 })
164 }
165
166 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
167 Box::pin(async move {
172 self.write_sync();
174 self.wait_until_ready().await
175 })
176 }
177
178 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
179 where
180 Self: Sized,
181 {
182 Transaction::begin(self)
183 }
184
185 fn cached_statements_size(&self) -> usize {
186 self.inner.cache_statement.len()
187 }
188
189 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
190 Box::pin(async move {
191 self.inner.cache_type_oid.clear();
192
193 let mut cleared = 0_usize;
194
195 self.wait_until_ready().await?;
196
197 while let Some((id, _)) = self.inner.cache_statement.remove_lru() {
198 self.inner.stream.write_msg(Close::Statement(id))?;
199 cleared += 1;
200 }
201
202 if cleared > 0 {
203 self.write_sync();
204 self.inner.stream.flush().await?;
205
206 self.wait_for_close_complete(cleared).await?;
207 self.recv_ready_for_query().await?;
208 }
209
210 Ok(())
211 })
212 }
213
214 fn shrink_buffers(&mut self) {
215 self.inner.stream.shrink_buffers();
216 }
217
218 #[doc(hidden)]
219 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
220 self.wait_until_ready().boxed()
221 }
222
223 #[doc(hidden)]
224 fn should_flush(&self) -> bool {
225 !self.inner.stream.write_buffer().is_empty()
226 }
227}
228
229impl AsMut<PgConnection> for PgConnection {
234 fn as_mut(&mut self) -> &mut PgConnection {
235 self
236 }
237}