sqlx_mysql/connection/
mod.rs

1use std::fmt::{self, Debug, Formatter};
2
3use futures_core::future::BoxFuture;
4use futures_util::FutureExt;
5pub(crate) use sqlx_core::connection::*;
6pub(crate) use stream::{MySqlStream, Waiting};
7
8use crate::common::StatementCache;
9use crate::error::Error;
10use crate::protocol::statement::StmtClose;
11use crate::protocol::text::{Ping, Quit};
12use crate::statement::MySqlStatementMetadata;
13use crate::transaction::Transaction;
14use crate::{MySql, MySqlConnectOptions};
15
16mod auth;
17mod establish;
18mod executor;
19mod stream;
20mod tls;
21
22const MAX_PACKET_SIZE: u32 = 1024;
23
24/// A connection to a MySQL database.
25pub struct MySqlConnection {
26    pub(crate) inner: Box<MySqlConnectionInner>,
27}
28
29pub(crate) struct MySqlConnectionInner {
30    // underlying TCP stream,
31    // wrapped in a potentially TLS stream,
32    // wrapped in a buffered stream
33    pub(crate) stream: MySqlStream,
34
35    // transaction status
36    pub(crate) transaction_depth: usize,
37
38    // cache by query string to the statement id and metadata
39    cache_statement: StatementCache<(u32, MySqlStatementMetadata)>,
40
41    log_settings: LogSettings,
42}
43
44impl Debug for MySqlConnection {
45    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
46        f.debug_struct("MySqlConnection").finish()
47    }
48}
49
50impl Connection for MySqlConnection {
51    type Database = MySql;
52
53    type Options = MySqlConnectOptions;
54
55    fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
56        Box::pin(async move {
57            self.inner.stream.send_packet(Quit).await?;
58            self.inner.stream.shutdown().await?;
59
60            Ok(())
61        })
62    }
63
64    fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
65        Box::pin(async move {
66            self.inner.stream.shutdown().await?;
67            Ok(())
68        })
69    }
70
71    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
72        Box::pin(async move {
73            self.inner.stream.wait_until_ready().await?;
74            self.inner.stream.send_packet(Ping).await?;
75            self.inner.stream.recv_ok().await?;
76
77            Ok(())
78        })
79    }
80
81    #[doc(hidden)]
82    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
83        self.inner.stream.wait_until_ready().boxed()
84    }
85
86    fn cached_statements_size(&self) -> usize {
87        self.inner.cache_statement.len()
88    }
89
90    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
91        Box::pin(async move {
92            while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() {
93                self.inner
94                    .stream
95                    .send_packet(StmtClose {
96                        statement: statement_id,
97                    })
98                    .await?;
99            }
100
101            Ok(())
102        })
103    }
104
105    #[doc(hidden)]
106    fn should_flush(&self) -> bool {
107        !self.inner.stream.write_buffer().is_empty()
108    }
109
110    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
111    where
112        Self: Sized,
113    {
114        Transaction::begin(self)
115    }
116
117    fn shrink_buffers(&mut self) {
118        self.inner.stream.shrink_buffers();
119    }
120}