zino_connector/
mysql.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use super::{sqlx_row::SerializeRow, Connector, DataSource, DataSourceConnector::MySql};
use crate::helper;
use futures::TryStreamExt;
use serde::de::DeserializeOwned;
use sqlx::mysql::{MySqlPool, MySqlPoolOptions};
use std::time::Duration;
use toml::Table;
use zino_core::{error::Error, extension::TomlTableExt, state::State, AvroValue, Map, Record};

impl Connector for MySqlPool {
    fn try_new_data_source(config: &Table) -> Result<DataSource, Error> {
        let name = config.get_str("name").unwrap_or("mysql");
        let database = config.get_str("database").unwrap_or(name);
        let authority = State::format_authority(config, Some(3306));
        let dsn = format!("mysql://{authority}/{database}");

        let max_connections = config.get_u32("max-connections").unwrap_or(16);
        let min_connections = config.get_u32("min-connections").unwrap_or(2);
        let max_lifetime = config
            .get_duration("max-lifetime")
            .unwrap_or_else(|| Duration::from_secs(60 * 60));
        let idle_timeout = config
            .get_duration("idle-timeout")
            .unwrap_or_else(|| Duration::from_secs(10 * 60));
        let acquire_timeout = config
            .get_duration("acquire-timeout")
            .unwrap_or_else(|| Duration::from_secs(30));
        let pool_options = MySqlPoolOptions::new()
            .max_connections(max_connections)
            .min_connections(min_connections)
            .max_lifetime(max_lifetime)
            .idle_timeout(idle_timeout)
            .acquire_timeout(acquire_timeout);
        let pool = pool_options.connect_lazy(&dsn)?;
        let data_source = DataSource::new("mysql", None, name, database, MySql(pool));
        Ok(data_source)
    }

    async fn execute(&self, query: &str, params: Option<&Map>) -> Result<Option<u64>, Error> {
        let (sql, values) = helper::prepare_sql_query(query, params, '?');
        let mut query = sqlx::query(&sql);
        for value in values {
            query = query.bind(value.to_string());
        }

        let query_result = query.execute(self).await?;
        Ok(Some(query_result.rows_affected()))
    }

    async fn query(&self, query: &str, params: Option<&Map>) -> Result<Vec<Record>, Error> {
        let (sql, values) = helper::prepare_sql_query(query, params, '?');
        let mut query = sqlx::query(&sql);
        for value in values {
            query = query.bind(value.to_string());
        }

        let mut rows = query.fetch(self);
        let mut records = Vec::new();
        while let Some(row) = rows.try_next().await? {
            let value = apache_avro::to_value(&SerializeRow(row))?;
            if let AvroValue::Record(record) = value {
                records.push(record);
            }
        }
        Ok(records)
    }

    async fn query_as<T: DeserializeOwned>(
        &self,
        query: &str,
        params: Option<&Map>,
    ) -> Result<Vec<T>, Error> {
        let (sql, values) = helper::prepare_sql_query(query, params, '?');
        let mut query = sqlx::query(&sql);
        for value in values {
            query = query.bind(value.to_string());
        }

        let mut rows = query.fetch(self);
        let mut data = Vec::new();
        while let Some(row) = rows.try_next().await? {
            let json_value = serde_json::to_value(&SerializeRow(row))?;
            let value = serde_json::from_value(json_value)?;
            data.push(value);
        }
        Ok(data)
    }

    async fn query_one(&self, query: &str, params: Option<&Map>) -> Result<Option<Record>, Error> {
        let (sql, values) = helper::prepare_sql_query(query, params, '?');
        let mut query = sqlx::query(&sql);
        for value in values {
            query = query.bind(value.to_string());
        }

        let data = if let Some(row) = query.fetch_optional(self).await? {
            let value = apache_avro::to_value(&SerializeRow(row))?;
            if let AvroValue::Record(record) = value {
                Some(record)
            } else {
                None
            }
        } else {
            None
        };
        Ok(data)
    }

    async fn query_one_as<T: DeserializeOwned>(
        &self,
        query: &str,
        params: Option<&Map>,
    ) -> Result<Option<T>, Error> {
        let (sql, values) = helper::prepare_sql_query(query, params, '?');
        let mut query = sqlx::query(&sql);
        for value in values {
            query = query.bind(value.to_string());
        }

        if let Some(row) = query.fetch_optional(self).await? {
            let json_value = serde_json::to_value(&SerializeRow(row))?;
            serde_json::from_value(json_value).map_err(Error::from)
        } else {
            Ok(None)
        }
    }
}