1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
use crate::acquire::Acquire;
use crate::migrate::{AppliedMigration, Migrate, MigrateError, Migration, MigrationSource};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::slice;
#[derive(Debug)]
pub struct Migrator {
pub migrations: Cow<'static, [Migration]>,
pub ignore_missing: bool,
pub locking: bool,
}
fn validate_applied_migrations(
applied_migrations: &[AppliedMigration],
migrator: &Migrator,
) -> Result<(), MigrateError> {
if migrator.ignore_missing {
return Ok(());
}
let migrations: HashSet<_> = migrator.iter().map(|m| m.version).collect();
for applied_migration in applied_migrations {
if !migrations.contains(&applied_migration.version) {
return Err(MigrateError::VersionMissing(applied_migration.version));
}
}
Ok(())
}
impl Migrator {
/// Creates a new instance with the given source.
///
/// # Examples
///
/// ```rust,no_run
/// # use sqlx_core::migrate::MigrateError;
/// # fn main() -> Result<(), MigrateError> {
/// # sqlx_rt::block_on(async move {
/// # use sqlx_core::migrate::Migrator;
/// use std::path::Path;
///
/// // Read migrations from a local folder: ./migrations
/// let m = Migrator::new(Path::new("./migrations")).await?;
/// # Ok(())
/// # })
/// # }
/// ```
/// See [MigrationSource] for details on structure of the `./migrations` directory.
pub async fn new<'s, S>(source: S) -> Result<Self, MigrateError>
where
S: MigrationSource<'s>,
{
Ok(Self {
migrations: Cow::Owned(source.resolve().await.map_err(MigrateError::Source)?),
ignore_missing: false,
locking: true,
})
}
/// Specify whether applied migrations that are missing from the resolved migrations should be ignored.
pub fn set_ignore_missing(&mut self, ignore_missing: bool) -> &Self {
self.ignore_missing = ignore_missing;
self
}
/// Specify whether or not to lock database during migration. Defaults to `true`.
///
/// ### Warning
/// Disabling locking can lead to errors or data loss if multiple clients attempt to apply migrations simultaneously
/// without some sort of mutual exclusion.
///
/// This should only be used if the database does not support locking, e.g. CockroachDB which talks the Postgres
/// protocol but does not support advisory locks used by SQLx's migrations support for Postgres.
pub fn set_locking(&mut self, locking: bool) -> &Self {
self.locking = locking;
self
}
/// Get an iterator over all known migrations.
pub fn iter(&self) -> slice::Iter<'_, Migration> {
self.migrations.iter()
}
/// Run any pending migrations against the database; and, validate previously applied migrations
/// against the current migration source to detect accidental changes in previously-applied migrations.
///
/// # Examples
///
/// ```rust,no_run
/// # use sqlx_core::migrate::MigrateError;
/// # #[cfg(feature = "sqlite")]
/// # fn main() -> Result<(), MigrateError> {
/// # sqlx_rt::block_on(async move {
/// # use sqlx_core::migrate::Migrator;
/// let m = Migrator::new(std::path::Path::new("./migrations")).await?;
/// let pool = sqlx_core::sqlite::SqlitePoolOptions::new().connect("sqlite::memory:").await?;
/// m.run(&pool).await
/// # })
/// # }
/// ```
pub async fn run<'a, A>(&self, migrator: A) -> Result<(), MigrateError>
where
A: Acquire<'a>,
<A::Connection as Deref>::Target: Migrate,
{
let mut conn = migrator.acquire().await?;
self.run_direct(&mut *conn).await
}
// Getting around the annoying "implementation of `Acquire` is not general enough" error
#[doc(hidden)]
pub async fn run_direct<C>(&self, conn: &mut C) -> Result<(), MigrateError>
where
C: Migrate,
{
// lock the database for exclusive access by the migrator
if self.locking {
conn.lock().await?;
}
// creates [_migrations] table only if needed
// eventually this will likely migrate previous versions of the table
conn.ensure_migrations_table().await?;
let version = conn.dirty_version().await?;
if let Some(version) = version {
return Err(MigrateError::Dirty(version));
}
let applied_migrations = conn.list_applied_migrations().await?;
validate_applied_migrations(&applied_migrations, self)?;
let applied_migrations: HashMap<_, _> = applied_migrations
.into_iter()
.map(|m| (m.version, m))
.collect();
for migration in self.iter() {
if migration.migration_type.is_down_migration() {
continue;
}
match applied_migrations.get(&migration.version) {
Some(applied_migration) => {
if migration.checksum != applied_migration.checksum {
return Err(MigrateError::VersionMismatch(migration.version));
}
}
None => {
conn.apply(migration).await?;
}
}
}
// unlock the migrator to allow other migrators to run
// but do nothing as we already migrated
if self.locking {
conn.unlock().await?;
}
Ok(())
}
/// Run down migrations against the database until a specific version.
///
/// # Examples
///
/// ```rust,no_run
/// # use sqlx_core::migrate::MigrateError;
/// # #[cfg(feature = "sqlite")]
/// # fn main() -> Result<(), MigrateError> {
/// # sqlx_rt::block_on(async move {
/// # use sqlx_core::migrate::Migrator;
/// let m = Migrator::new(std::path::Path::new("./migrations")).await?;
/// let pool = sqlx_core::sqlite::SqlitePoolOptions::new().connect("sqlite::memory:").await?;
/// m.undo(&pool, 4).await
/// # })
/// # }
/// ```
pub async fn undo<'a, A>(&self, migrator: A, target: i64) -> Result<(), MigrateError>
where
A: Acquire<'a>,
<A::Connection as Deref>::Target: Migrate,
{
let mut conn = migrator.acquire().await?;
// lock the database for exclusive access by the migrator
if self.locking {
conn.lock().await?;
}
// creates [_migrations] table only if needed
// eventually this will likely migrate previous versions of the table
conn.ensure_migrations_table().await?;
let version = conn.dirty_version().await?;
if let Some(version) = version {
return Err(MigrateError::Dirty(version));
}
let applied_migrations = conn.list_applied_migrations().await?;
validate_applied_migrations(&applied_migrations, self)?;
let applied_migrations: HashMap<_, _> = applied_migrations
.into_iter()
.map(|m| (m.version, m))
.collect();
for migration in self
.iter()
.rev()
.filter(|m| m.migration_type.is_down_migration())
.filter(|m| applied_migrations.contains_key(&m.version))
.filter(|m| m.version > target)
{
conn.revert(migration).await?;
}
// unlock the migrator to allow other migrators to run
// but do nothing as we already migrated
if self.locking {
conn.unlock().await?;
}
Ok(())
}
}