sqlx_mysql/connection/
mod.rs1use std::fmt::{self, Debug, Formatter};
2
3use futures_core::future::BoxFuture;
4use futures_util::FutureExt;
5pub(crate) use sqlx_core::connection::*;
6pub(crate) use stream::{MySqlStream, Waiting};
7
8use crate::common::StatementCache;
9use crate::error::Error;
10use crate::protocol::statement::StmtClose;
11use crate::protocol::text::{Ping, Quit};
12use crate::statement::MySqlStatementMetadata;
13use crate::transaction::Transaction;
14use crate::{MySql, MySqlConnectOptions};
15
16mod auth;
17mod establish;
18mod executor;
19mod stream;
20mod tls;
21
22const MAX_PACKET_SIZE: u32 = 1024;
23
24pub struct MySqlConnection {
26 pub(crate) inner: Box<MySqlConnectionInner>,
27}
28
29pub(crate) struct MySqlConnectionInner {
30 pub(crate) stream: MySqlStream,
34
35 pub(crate) transaction_depth: usize,
37
38 cache_statement: StatementCache<(u32, MySqlStatementMetadata)>,
40
41 log_settings: LogSettings,
42}
43
44impl Debug for MySqlConnection {
45 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
46 f.debug_struct("MySqlConnection").finish()
47 }
48}
49
50impl Connection for MySqlConnection {
51 type Database = MySql;
52
53 type Options = MySqlConnectOptions;
54
55 fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
56 Box::pin(async move {
57 self.inner.stream.send_packet(Quit).await?;
58 self.inner.stream.shutdown().await?;
59
60 Ok(())
61 })
62 }
63
64 fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
65 Box::pin(async move {
66 self.inner.stream.shutdown().await?;
67 Ok(())
68 })
69 }
70
71 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
72 Box::pin(async move {
73 self.inner.stream.wait_until_ready().await?;
74 self.inner.stream.send_packet(Ping).await?;
75 self.inner.stream.recv_ok().await?;
76
77 Ok(())
78 })
79 }
80
81 #[doc(hidden)]
82 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
83 self.inner.stream.wait_until_ready().boxed()
84 }
85
86 fn cached_statements_size(&self) -> usize {
87 self.inner.cache_statement.len()
88 }
89
90 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
91 Box::pin(async move {
92 while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() {
93 self.inner
94 .stream
95 .send_packet(StmtClose {
96 statement: statement_id,
97 })
98 .await?;
99 }
100
101 Ok(())
102 })
103 }
104
105 #[doc(hidden)]
106 fn should_flush(&self) -> bool {
107 !self.inner.stream.write_buffer().is_empty()
108 }
109
110 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
111 where
112 Self: Sized,
113 {
114 Transaction::begin(self)
115 }
116
117 fn shrink_buffers(&mut self) {
118 self.inner.stream.shrink_buffers();
119 }
120}