midas_core/sequel/
mysql.rsuse indoc::{formatdoc, indoc};
use log::trace;
use mysql::{params, prelude::Queryable, Pool, PooledConn};
use super::{AnyhowResult, Driver as SequelDriver, VecSerial};
pub struct Mysql {
conn: PooledConn,
}
impl Mysql {
pub fn new(database_url: &str) -> AnyhowResult<Self> {
let pool = Pool::new(database_url)?;
let conn = pool.get_conn()?;
let mut db = Mysql { conn };
db.ensure_midas_schema()?;
Ok(db)
}
}
impl SequelDriver for Mysql {
fn ensure_midas_schema(&mut self) -> AnyhowResult<()> {
let payload = indoc! {"
CREATE TABLE IF NOT EXISTS __schema_migrations (
id INT NOT NULL AUTO_INCREMENT,
migration BIGINT,
PRIMARY KEY (id)
) AUTO_INCREMENT = 100;
"};
self.conn.query_drop(payload)?;
Ok(())
}
fn drop_migration_table(&mut self) -> AnyhowResult<()> {
let payload = "DROP TABLE __schema_migrations";
self.conn.query_drop(payload)?;
Ok(())
}
fn drop_database(&mut self, db_name: &str) -> AnyhowResult<()> {
let payload = formatdoc! {"
DROP DATABASE IF EXISTS `{db_name}`;
CREATE DATABASE `{db_name}`;
", db_name = db_name};
self.conn.exec_drop(payload, ())?;
Ok(())
}
fn count_migrations(&mut self) -> AnyhowResult<i64> {
trace!("Retrieving migrations count");
let payload = "SELECT COUNT(*) as count FROM __schema_migrations";
let row: Option<i64> = self.conn.query_first(payload)?;
let result = row.unwrap();
Ok(result)
}
fn get_completed_migrations(&mut self) -> AnyhowResult<VecSerial> {
trace!("Retrieving all completed migrations");
let payload =
"SELECT migration FROM __schema_migrations ORDER BY id ASC";
let result: VecSerial = self.conn.query(payload)?;
Ok(result)
}
fn get_last_completed_migration(&mut self) -> AnyhowResult<i64> {
trace!("Checking and retrieving the last migration stored on migrations table");
let payload = "SELECT migration FROM __schema_migrations ORDER BY id DESC LIMIT 1";
let row: Option<i64> = self.conn.query_first(payload)?;
let result = row.unwrap();
Ok(result)
}
fn add_completed_migration(
&mut self,
migration_number: i64,
) -> AnyhowResult<()> {
trace!("Adding migration to migrations table");
let payload =
"INSERT INTO __schema_migrations (migration) VALUES (:migration_number)";
self.conn.exec_drop(
payload,
params! { "migration_number" => migration_number },
)?;
Ok(())
}
fn delete_completed_migration(
&mut self,
migration_number: i64,
) -> AnyhowResult<()> {
trace!("Removing a migration in the migrations table");
let payload =
"DELETE FROM __schema_migrations WHERE migration = :migration_number";
self.conn.exec_drop(
payload,
params! { "migration_number" => migration_number },
)?;
Ok(())
}
fn delete_last_completed_migration(&mut self) -> AnyhowResult<()> {
let payload =
"DELETE FROM __schema_migrations WHERE id=(SELECT MAX(id) FROM __schema_migrations);";
self.conn.query_drop(payload)?;
Ok(())
}
fn migrate(&mut self, query: &str) -> AnyhowResult<()> {
self.conn.query_drop(query)?;
Ok(())
}
fn db_name(&self) -> &str {
""
}
}