feat: single transaction

I added a single transaction option for apply, upgrade, and
downgrade commands, which wraps all migrations into a single
transaction. This gives you the ability to safely roll up
migrations and, if some unforeseen situation occurs, roll them back.

Unfortunately if there is an error in syntax, mysql will not
rollback the migration and commits automatically :( I will
research this issue.

Closes #2
This commit is contained in:
Dmitriy Pleshevskiy 2021-04-24 01:58:16 +03:00
parent 56f4d190de
commit f98dd4f0c8
18 changed files with 284 additions and 48 deletions

View File

@ -1,6 +1,6 @@
use crate::app::App;
use crate::database::prelude::*;
use crate::database::transaction::with_transaction;
use crate::database::transaction::maybe_with_transaction;
use crate::database::{DatabaseConnectionManager, MigrationManager};
use crate::opts::ApplyCommandOpt;
use crate::StdResult;
@ -14,6 +14,7 @@ pub(crate) fn apply_sql(app: &App, cmd_opts: ApplyCommandOpt) -> StdResult<()> {
let file_contents = cmd_opts
.file_paths
.clone()
.into_iter()
.map(|file_path| {
let mut file_path = config.directory_path().join(file_path);
@ -25,12 +26,22 @@ pub(crate) fn apply_sql(app: &App, cmd_opts: ApplyCommandOpt) -> StdResult<()> {
.map(std::fs::read_to_string)
.collect::<Result<Vec<_>, _>>()?;
with_transaction(conn, &mut |conn| {
file_contents
.iter()
.try_for_each(|content| migration_manager.apply_sql(conn, content))?;
Ok(())
})?;
maybe_with_transaction(
cmd_opts.transaction_opts.single_transaction,
conn,
&mut |conn| {
file_contents
.iter()
.try_for_each(|content| {
maybe_with_transaction(
!cmd_opts.transaction_opts.single_transaction,
conn,
&mut |conn| migration_manager.apply_sql(conn, content),
)
})
.map_err(From::from)
},
)?;
Ok(())
}

View File

@ -1,6 +1,6 @@
use crate::app::App;
use crate::database::prelude::*;
use crate::database::transaction::with_transaction;
use crate::database::transaction::maybe_with_transaction;
use crate::database::{DatabaseConnectionManager, MigrationManager};
use crate::opts::DowngradeCommandOpt;
use crate::StdResult;
@ -21,14 +21,28 @@ pub(crate) fn rollback_applied_migrations(app: &App, opts: DowngradeCommandOpt)
cmp::min(opts.migrations_number, applied_migrations.len())
};
for migration_name in &applied_migrations[..rollback_migrations_number] {
if let Some(migration) = migrations.iter().find(|m| m.name() == migration_name) {
println!("downgrade {}...", migration.name());
with_transaction(conn, &mut |conn| {
migration_manager.downgrade(conn, &migration)
})?;
}
}
maybe_with_transaction(
opts.transaction_opts.single_transaction,
conn,
&mut |conn| {
applied_migrations[..rollback_migrations_number]
.iter()
.try_for_each(|migration_name| {
if let Some(migration) = migrations.iter().find(|m| m.name() == migration_name)
{
println!("downgrade {}...", migration.name());
maybe_with_transaction(
!opts.transaction_opts.single_transaction,
conn,
&mut |conn| migration_manager.downgrade(conn, &migration),
)
} else {
Ok(())
}
})
.map_err(From::from)
},
)?;
Ok(())
}

View File

@ -1,6 +1,6 @@
use crate::app::App;
use crate::database::migration::*;
use crate::database::transaction::with_transaction;
use crate::database::transaction::maybe_with_transaction;
use crate::database::DatabaseConnectionManager;
use crate::opts::UpgradeCommandOpt;
use crate::StdResult;
@ -18,17 +18,18 @@ pub(crate) fn upgrade_pending_migrations(app: &App, opts: UpgradeCommandOpt) ->
let pending_migrations = filter_pending_migrations(migrations, &applied_migration_names);
if pending_migrations.is_empty() {
println!("Up to date");
} else if let Some(migration_name) = opts.migration_name {
return Ok(());
}
let migrations: Vec<Migration> = if let Some(migration_name) = opts.migration_name.clone() {
let target_migration = pending_migrations
.iter()
.into_iter()
.find(|m| m.name() == &migration_name);
match target_migration {
Some(migration) => {
print_migration_info(migration);
with_transaction(conn, &mut |conn| migration_manager.upgrade(conn, migration))?;
}
Some(migration) => vec![migration],
None => {
eprintln!(r#"Cannot find migration with "{}" name"#, migration_name);
return Ok(());
}
}
} else {
@ -36,11 +37,26 @@ pub(crate) fn upgrade_pending_migrations(app: &App, opts: UpgradeCommandOpt) ->
.migrations_number
.unwrap_or_else(|| pending_migrations.len());
for migration in &pending_migrations[..upgrade_migrations_number] {
print_migration_info(migration);
with_transaction(conn, &mut |conn| migration_manager.upgrade(conn, migration))?;
}
}
pending_migrations[..upgrade_migrations_number].to_vec()
};
maybe_with_transaction(
opts.transaction_opts.single_transaction,
conn,
&mut |conn| {
migrations
.iter()
.try_for_each(|migration| {
print_migration_info(migration);
maybe_with_transaction(
!opts.transaction_opts.single_transaction,
conn,
&mut |conn| migration_manager.upgrade(conn, migration),
)
})
.map_err(From::from)
},
)?;
Ok(())
}

View File

@ -5,20 +5,14 @@ use mysql::prelude::*;
use mysql::{Pool, PooledConn};
pub struct MySqlConnection {
pool: Pool,
}
impl MySqlConnection {
fn client(&self) -> StdResult<PooledConn> {
let conn = self.pool.get_conn()?;
Ok(conn)
}
conn: PooledConn,
}
impl OpenDatabaseConnection for MySqlConnection {
fn open(connection_string: &str) -> StdResult<Self> {
let pool = Pool::new(connection_string)?;
Ok(MySqlConnection { pool })
let conn = pool.get_conn()?;
Ok(MySqlConnection { conn })
}
}
@ -33,21 +27,21 @@ impl DatabaseStatements for MySqlConnection {
impl DatabaseConnection for MySqlConnection {
fn batch_execute(&mut self, query: &str) -> StdResult<()> {
self.client()?.query_drop(query)?;
self.conn.query_drop(query)?;
Ok(())
}
fn execute<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<u64> {
let stmt = merge_query_with_params(query, params);
let res = self.client()?.query_first(stmt)?.unwrap_or_default();
let res = self.conn.query_first(stmt)?.unwrap_or_default();
Ok(res)
}
fn query<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<Vec<Vec<String>>> {
let stmt = merge_query_with_params(query, params);
let res = self.client()?.query_map(stmt, |(column,)| vec![column])?;
let res = self.conn.query_map(stmt, |(column,)| vec![column])?;
Ok(res)
}

View File

@ -3,7 +3,7 @@ use crate::StdResult;
use std::fs;
use std::path::{Path, PathBuf};
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Migration {
upgrade_sql_file_path: PathBuf,
downgrade_sql_file_path: PathBuf,

View File

@ -46,3 +46,18 @@ where
.and_then(|res| transaction_manager.commit_transaction(conn).and(Ok(res)))
.or_else(|err| transaction_manager.rollback_transaction(conn).and(Err(err)))
}
pub fn maybe_with_transaction<TrxFnMut, Res>(
with: bool,
conn: &mut AnyConnection,
trx_fn: &mut TrxFnMut,
) -> StdResult<Res>
where
TrxFnMut: FnMut(&mut AnyConnection) -> StdResult<Res>,
{
if with {
with_transaction(conn, trx_fn)
} else {
trx_fn(conn)
}
}

View File

@ -32,10 +32,19 @@ pub(crate) enum Command {
Completions(CompletionsShell),
}
#[derive(Debug, StructOpt, Clone)]
pub(crate) struct TransactionOpts {
#[structopt(long = "single-transaction")]
pub single_transaction: bool,
}
#[derive(Debug, StructOpt, Clone)]
pub(crate) struct ApplyCommandOpt {
#[structopt(parse(from_os_str), required = true)]
pub file_paths: Vec<PathBuf>,
#[structopt(flatten)]
pub transaction_opts: TransactionOpts,
}
#[derive(Debug, StructOpt, Clone)]
@ -55,6 +64,9 @@ pub(crate) struct UpgradeCommandOpt {
/// How many existing migrations do we have to update.
#[structopt(long = "number", short = "n")]
pub migrations_number: Option<usize>,
#[structopt(flatten)]
pub transaction_opts: TransactionOpts,
}
#[derive(Debug, StructOpt, Clone)]
@ -66,6 +78,9 @@ pub(crate) struct DowngradeCommandOpt {
/// Rolls back all applied migrations. Ignores --number option.
#[structopt(long = "all")]
pub all_migrations: bool,
#[structopt(flatten)]
pub transaction_opts: TransactionOpts,
}
#[derive(Debug, StructOpt, Clone)]

View File

@ -403,13 +403,7 @@ mod upgrade {
.arg("-c")
.arg(&manifest_path)
.arg("down")
.assert()
.success();
Command::cargo_bin("migra")?
.arg("-c")
.arg(&manifest_path)
.arg("down")
.arg("--all")
.assert()
.success();
@ -447,6 +441,117 @@ mod upgrade {
Ok(())
}
#[test]
fn partial_applied_invalid_migrations() -> TestResult {
fn inner<ValidateFn>(database_name: &'static str, validate: ValidateFn) -> TestResult
where
ValidateFn: Fn() -> TestResult,
{
let manifest_path = database_manifest_path(database_name);
Command::cargo_bin("migra")?
.arg("-c")
.arg(&manifest_path)
.arg("up")
.assert()
.failure();
validate()?;
Command::cargo_bin("migra")?
.arg("-c")
.arg(&manifest_path)
.arg("down")
.assert()
.success();
Ok(())
}
#[cfg(feature = "postgres")]
inner("postgres_invalid", || {
let mut conn = postgres::Client::connect(POSTGRES_URL, postgres::NoTls)?;
let articles_res = conn.query("SELECT a.id FROM articles AS a", &[]);
let persons_res = conn.query("SELECT p.id FROM persons AS p", &[]);
assert!(articles_res.is_ok());
assert!(persons_res.is_err());
Ok(())
})?;
#[cfg(feature = "mysql")]
inner("mysql_invalid", || {
use mysql::prelude::*;
let pool = mysql::Pool::new(MYSQL_URL)?;
let mut conn = pool.get_conn()?;
let articles_res = conn.query_drop("SELECT a.id FROM articles AS a");
let persons_res = conn.query_drop("SELECT p.id FROM persons AS p");
assert!(articles_res.is_ok());
assert!(persons_res.is_err());
Ok(())
})?;
Ok(())
}
#[test]
fn cannot_applied_invalid_migrations_in_single_transaction() -> TestResult {
fn inner<ValidateFn>(database_name: &'static str, validate: ValidateFn) -> TestResult
where
ValidateFn: Fn() -> TestResult,
{
let manifest_path = database_manifest_path(database_name);
Command::cargo_bin("migra")?
.arg("-c")
.arg(&manifest_path)
.arg("up")
.arg("--single-transaction")
.assert()
.failure();
validate()?;
Ok(())
}
#[cfg(feature = "postgres")]
inner("postgres_invalid", || {
let mut conn = postgres::Client::connect(POSTGRES_URL, postgres::NoTls)?;
let articles_res = conn.query("SELECT a.id FROM articles AS a", &[]);
let persons_res = conn.query("SELECT p.id FROM persons AS p", &[]);
assert!(articles_res.is_err());
assert!(persons_res.is_err());
Ok(())
})?;
// TODO: Need to investigate how fix single transaction for Mysql
// #[cfg(feature = "mysql")]
// inner("mysql_invalid", || {
// use mysql::prelude::*;
// let pool = mysql::Pool::new(MYSQL_URL)?;
// let mut conn = pool.get_conn()?;
// let articles_res = conn.query_drop("SELECT a.id FROM articles AS a");
// let persons_res = conn.query_drop("SELECT p.id FROM persons AS p");
// assert!(articles_res.is_err());
// assert!(persons_res.is_err());
// Ok(())
// })?;
Ok(())
}
}
mod apply {

View File

@ -0,0 +1,4 @@
root = "./mysql_invalid"
[database]
connection = "mysql://mysql:mysql@localhost:6001/migra_tests"

View File

@ -0,0 +1,4 @@
root = "./postgres_invalid"
[database]
connection = "postgres://postgres:postgres@localhost:6000/migra_tests"

View File

@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
DROP TABLE articles;

View File

@ -0,0 +1,8 @@
-- Your SQL goes here
CREATE TABLE articles (
id int AUTO_INCREMENT PRIMARY KEY,
title text NOT NULL CHECK (length(title) > 0),
content text NOT NULL,
created_at timestamp NOT NULL DEFAULT current_timestamp
);

View File

@ -0,0 +1,6 @@
-- This file should undo anything in `up.sql`
ALTER TABLE articles
DROP COLUMN author_person_id;
DROP TABLE persons;

View File

@ -0,0 +1,12 @@
-- Your SQL goes here
CREATE TABLE persons (
id int AUTO_INCREMENT PRIMARY KEY
email varchar(256) NOT NULL UNIQUE
display_name text NOT NULL
created_at timestamp NOT NULL DEFAULT current_timestamp
);
ALTER TABLE articles
ADD COLUMN author_person_id int NULL
REFERENCES persons (id) ON UPDATE CASCADE ON DELETE CASCADE;

View File

@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
DROP TABLE articles;

View File

@ -0,0 +1,8 @@
-- Your SQL goes here
CREATE TABLE articles (
id serial PRIMARY KEY,
title text NOT NULL CHECK (length(title) > 0),
content text NOT NULL,
created_at timestamp NOT NULL DEFAULT current_timestamp
);

View File

@ -0,0 +1,6 @@
-- This file should undo anything in `up.sql`
ALTER TABLE articles
DROP COLUMN author_person_id;
DROP TABLE persons;

View File

@ -0,0 +1,12 @@
-- Your SQL goes here
CREATE TABLE persons (
id SERIAL PRIMARY KEY
email text NOT NULL UNIQUE
display_name text NOT NULL
created_at timestamp NOT NULL DEFAULT current_timestamp
);
ALTER TABLE articles
ADD COLUMN author_person_id int NULL
REFERENCES persons (id) ON UPDATE CASCADE ON DELETE CASCADE;