sqlx_mysql/connection/
mod.rs1use std::borrow::Cow;
2use std::fmt::{self, Debug, Formatter};
3
4use futures_core::future::BoxFuture;
5use futures_util::FutureExt;
6pub(crate) use sqlx_core::connection::*;
7pub(crate) use stream::{MySqlStream, Waiting};
8
9use crate::common::StatementCache;
10use crate::error::Error;
11use crate::protocol::response::Status;
12use crate::protocol::statement::StmtClose;
13use crate::protocol::text::{Ping, Quit};
14use crate::statement::MySqlStatementMetadata;
15use crate::transaction::Transaction;
16use crate::{MySql, MySqlConnectOptions};
17
18mod auth;
19mod establish;
20mod executor;
21mod stream;
22mod tls;
23
24const MAX_PACKET_SIZE: u32 = 1024;
25
26pub struct MySqlConnection {
28 pub(crate) inner: Box<MySqlConnectionInner>,
29}
30
31pub(crate) struct MySqlConnectionInner {
32 pub(crate) stream: MySqlStream,
36
37 pub(crate) transaction_depth: usize,
39 status_flags: Status,
40
41 cache_statement: StatementCache<(u32, MySqlStatementMetadata)>,
43
44 log_settings: LogSettings,
45}
46
47impl MySqlConnection {
48 pub(crate) fn in_transaction(&self) -> bool {
49 self.inner
50 .status_flags
51 .intersects(Status::SERVER_STATUS_IN_TRANS)
52 }
53}
54
55impl Debug for MySqlConnection {
56 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
57 f.debug_struct("MySqlConnection").finish()
58 }
59}
60
61impl Connection for MySqlConnection {
62 type Database = MySql;
63
64 type Options = MySqlConnectOptions;
65
66 fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
67 Box::pin(async move {
68 self.inner.stream.send_packet(Quit).await?;
69 self.inner.stream.shutdown().await?;
70
71 Ok(())
72 })
73 }
74
75 fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
76 Box::pin(async move {
77 self.inner.stream.shutdown().await?;
78 Ok(())
79 })
80 }
81
82 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
83 Box::pin(async move {
84 self.inner.stream.wait_until_ready().await?;
85 self.inner.stream.send_packet(Ping).await?;
86 self.inner.stream.recv_ok().await?;
87
88 Ok(())
89 })
90 }
91
92 #[doc(hidden)]
93 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
94 self.inner.stream.wait_until_ready().boxed()
95 }
96
97 fn cached_statements_size(&self) -> usize {
98 self.inner.cache_statement.len()
99 }
100
101 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
102 Box::pin(async move {
103 while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() {
104 self.inner
105 .stream
106 .send_packet(StmtClose {
107 statement: statement_id,
108 })
109 .await?;
110 }
111
112 Ok(())
113 })
114 }
115
116 #[doc(hidden)]
117 fn should_flush(&self) -> bool {
118 !self.inner.stream.write_buffer().is_empty()
119 }
120
121 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
122 where
123 Self: Sized,
124 {
125 Transaction::begin(self, None)
126 }
127
128 fn begin_with(
129 &mut self,
130 statement: impl Into<Cow<'static, str>>,
131 ) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
132 where
133 Self: Sized,
134 {
135 Transaction::begin(self, Some(statement.into()))
136 }
137
138 fn shrink_buffers(&mut self) {
139 self.inner.stream.shrink_buffers();
140 }
141}