Archived
1
0
Fork 0

Merge pull request #7 from pleshevskiy/task-2

feat: single transaction
This commit is contained in:
Dmitriy Pleshevskiy 2021-04-24 21:52:02 +02:00 committed by GitHub
commit 25ea001ec4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 239 additions and 50 deletions

View file

@ -1,6 +1,6 @@
use crate::app::App;
use crate::database::prelude::*;
use crate::database::transaction::with_transaction;
use crate::database::transaction::maybe_with_transaction;
use crate::database::{DatabaseConnectionManager, MigrationManager};
use crate::opts::ApplyCommandOpt;
use crate::StdResult;
@ -14,6 +14,7 @@ pub(crate) fn apply_sql(app: &App, cmd_opts: ApplyCommandOpt) -> StdResult<()> {
let file_contents = cmd_opts
.file_paths
.clone()
.into_iter()
.map(|file_path| {
let mut file_path = config.directory_path().join(file_path);
@ -25,12 +26,22 @@ pub(crate) fn apply_sql(app: &App, cmd_opts: ApplyCommandOpt) -> StdResult<()> {
.map(std::fs::read_to_string)
.collect::<Result<Vec<_>, _>>()?;
with_transaction(conn, &mut |conn| {
maybe_with_transaction(
cmd_opts.transaction_opts.single_transaction,
conn,
&mut |conn| {
file_contents
.iter()
.try_for_each(|content| migration_manager.apply_sql(conn, content))?;
Ok(())
})?;
.try_for_each(|content| {
maybe_with_transaction(
!cmd_opts.transaction_opts.single_transaction,
conn,
&mut |conn| migration_manager.apply_sql(conn, content),
)
})
.map_err(From::from)
},
)?;
Ok(())
}

View file

@ -1,6 +1,6 @@
use crate::app::App;
use crate::database::prelude::*;
use crate::database::transaction::with_transaction;
use crate::database::transaction::maybe_with_transaction;
use crate::database::{DatabaseConnectionManager, MigrationManager};
use crate::opts::DowngradeCommandOpt;
use crate::StdResult;
@ -21,14 +21,28 @@ pub(crate) fn rollback_applied_migrations(app: &App, opts: DowngradeCommandOpt)
cmp::min(opts.migrations_number, applied_migrations.len())
};
for migration_name in &applied_migrations[..rollback_migrations_number] {
if let Some(migration) = migrations.iter().find(|m| m.name() == migration_name) {
maybe_with_transaction(
opts.transaction_opts.single_transaction,
conn,
&mut |conn| {
applied_migrations[..rollback_migrations_number]
.iter()
.try_for_each(|migration_name| {
if let Some(migration) = migrations.iter().find(|m| m.name() == migration_name)
{
println!("downgrade {}...", migration.name());
with_transaction(conn, &mut |conn| {
migration_manager.downgrade(conn, &migration)
})?;
}
maybe_with_transaction(
!opts.transaction_opts.single_transaction,
conn,
&mut |conn| migration_manager.downgrade(conn, &migration),
)
} else {
Ok(())
}
})
.map_err(From::from)
},
)?;
Ok(())
}

View file

@ -1,6 +1,6 @@
use crate::app::App;
use crate::database::migration::*;
use crate::database::transaction::with_transaction;
use crate::database::transaction::maybe_with_transaction;
use crate::database::DatabaseConnectionManager;
use crate::opts::UpgradeCommandOpt;
use crate::StdResult;
@ -18,17 +18,18 @@ pub(crate) fn upgrade_pending_migrations(app: &App, opts: UpgradeCommandOpt) ->
let pending_migrations = filter_pending_migrations(migrations, &applied_migration_names);
if pending_migrations.is_empty() {
println!("Up to date");
} else if let Some(migration_name) = opts.migration_name {
return Ok(());
}
let migrations: Vec<Migration> = if let Some(migration_name) = opts.migration_name.clone() {
let target_migration = pending_migrations
.iter()
.into_iter()
.find(|m| m.name() == &migration_name);
match target_migration {
Some(migration) => {
print_migration_info(migration);
with_transaction(conn, &mut |conn| migration_manager.upgrade(conn, migration))?;
}
Some(migration) => vec![migration],
None => {
eprintln!(r#"Cannot find migration with "{}" name"#, migration_name);
return Ok(());
}
}
} else {
@ -36,11 +37,26 @@ pub(crate) fn upgrade_pending_migrations(app: &App, opts: UpgradeCommandOpt) ->
.migrations_number
.unwrap_or_else(|| pending_migrations.len());
for migration in &pending_migrations[..upgrade_migrations_number] {
pending_migrations[..upgrade_migrations_number].to_vec()
};
maybe_with_transaction(
opts.transaction_opts.single_transaction,
conn,
&mut |conn| {
migrations
.iter()
.try_for_each(|migration| {
print_migration_info(migration);
with_transaction(conn, &mut |conn| migration_manager.upgrade(conn, migration))?;
}
}
maybe_with_transaction(
!opts.transaction_opts.single_transaction,
conn,
&mut |conn| migration_manager.upgrade(conn, migration),
)
})
.map_err(From::from)
},
)?;
Ok(())
}

View file

@ -5,20 +5,14 @@ use mysql::prelude::*;
use mysql::{Pool, PooledConn};
pub struct MySqlConnection {
pool: Pool,
}
impl MySqlConnection {
fn client(&self) -> StdResult<PooledConn> {
let conn = self.pool.get_conn()?;
Ok(conn)
}
conn: PooledConn,
}
impl OpenDatabaseConnection for MySqlConnection {
fn open(connection_string: &str) -> StdResult<Self> {
let pool = Pool::new(connection_string)?;
Ok(MySqlConnection { pool })
let pool = Pool::new_manual(1, 1, connection_string)?;
let conn = pool.get_conn()?;
Ok(MySqlConnection { conn })
}
}
@ -31,23 +25,25 @@ impl DatabaseStatements for MySqlConnection {
}
}
impl SupportsTransactionalDdl for MySqlConnection {}
impl DatabaseConnection for MySqlConnection {
fn batch_execute(&mut self, query: &str) -> StdResult<()> {
self.client()?.query_drop(query)?;
self.conn.query_drop(query)?;
Ok(())
}
fn execute<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<u64> {
let stmt = merge_query_with_params(query, params);
let res = self.client()?.query_first(stmt)?.unwrap_or_default();
let res = self.conn.query_first(stmt)?.unwrap_or_default();
Ok(res)
}
fn query<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<Vec<Vec<String>>> {
let stmt = merge_query_with_params(query, params);
let res = self.client()?.query_map(stmt, |(column,)| vec![column])?;
let res = self.conn.query_map(stmt, |(column,)| vec![column])?;
Ok(res)
}

View file

@ -23,6 +23,13 @@ impl DatabaseStatements for PostgresConnection {
}
}
impl SupportsTransactionalDdl for PostgresConnection {
#[inline]
fn supports_transactional_ddl(&self) -> bool {
true
}
}
impl DatabaseConnection for PostgresConnection {
fn batch_execute(&mut self, query: &str) -> StdResult<()> {
self.client.batch_execute(query)?;

View file

@ -13,7 +13,14 @@ pub trait DatabaseStatements {
fn create_migration_table_stmt(&self) -> &'static str;
}
pub trait DatabaseConnection: DatabaseStatements {
pub trait SupportsTransactionalDdl {
#[inline]
fn supports_transactional_ddl(&self) -> bool {
false
}
}
pub trait DatabaseConnection: DatabaseStatements + SupportsTransactionalDdl {
fn batch_execute(&mut self, query: &str) -> StdResult<()>;
fn execute<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<u64>;

View file

@ -3,7 +3,7 @@ use crate::StdResult;
use std::fs;
use std::path::{Path, PathBuf};
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Migration {
upgrade_sql_file_path: PathBuf,
downgrade_sql_file_path: PathBuf,

View file

@ -9,6 +9,7 @@ pub mod prelude {
pub use super::adapter::{ToSql, ToSqlParams, TryFromSql};
pub use super::connection::{
AnyConnection, DatabaseConnection, DatabaseStatements, OpenDatabaseConnection,
SupportsTransactionalDdl,
};
pub use super::migration::ManageMigration;
pub use super::transaction::ManageTransaction;

View file

@ -46,3 +46,18 @@ where
.and_then(|res| transaction_manager.commit_transaction(conn).and(Ok(res)))
.or_else(|err| transaction_manager.rollback_transaction(conn).and(Err(err)))
}
pub fn maybe_with_transaction<TrxFnMut, Res>(
is_needed: bool,
conn: &mut AnyConnection,
trx_fn: &mut TrxFnMut,
) -> StdResult<Res>
where
TrxFnMut: FnMut(&mut AnyConnection) -> StdResult<Res>,
{
if is_needed && conn.supports_transactional_ddl() {
with_transaction(conn, trx_fn)
} else {
trx_fn(conn)
}
}

View file

@ -32,10 +32,19 @@ pub(crate) enum Command {
Completions(CompletionsShell),
}
#[derive(Debug, StructOpt, Clone)]
pub(crate) struct TransactionOpts {
#[structopt(long = "single-transaction")]
pub single_transaction: bool,
}
#[derive(Debug, StructOpt, Clone)]
pub(crate) struct ApplyCommandOpt {
#[structopt(parse(from_os_str), required = true)]
pub file_paths: Vec<PathBuf>,
#[structopt(flatten)]
pub transaction_opts: TransactionOpts,
}
#[derive(Debug, StructOpt, Clone)]
@ -55,6 +64,9 @@ pub(crate) struct UpgradeCommandOpt {
/// How many existing migrations do we have to update.
#[structopt(long = "number", short = "n")]
pub migrations_number: Option<usize>,
#[structopt(flatten)]
pub transaction_opts: TransactionOpts,
}
#[derive(Debug, StructOpt, Clone)]
@ -66,6 +78,9 @@ pub(crate) struct DowngradeCommandOpt {
/// Rolls back all applied migrations. Ignores --number option.
#[structopt(long = "all")]
pub all_migrations: bool,
#[structopt(flatten)]
pub transaction_opts: TransactionOpts,
}
#[derive(Debug, StructOpt, Clone)]

View file

@ -403,13 +403,7 @@ mod upgrade {
.arg("-c")
.arg(&manifest_path)
.arg("down")
.assert()
.success();
Command::cargo_bin("migra")?
.arg("-c")
.arg(&manifest_path)
.arg("down")
.arg("--all")
.assert()
.success();
@ -447,6 +441,84 @@ mod upgrade {
Ok(())
}
#[test]
fn partial_applied_invalid_migrations() -> TestResult {
fn inner<ValidateFn>(database_name: &'static str, validate: ValidateFn) -> TestResult
where
ValidateFn: Fn() -> TestResult,
{
let manifest_path = database_manifest_path(database_name);
Command::cargo_bin("migra")?
.arg("-c")
.arg(&manifest_path)
.arg("up")
.assert()
.failure();
validate()?;
Command::cargo_bin("migra")?
.arg("-c")
.arg(&manifest_path)
.arg("down")
.assert()
.success();
Ok(())
}
#[cfg(feature = "postgres")]
inner("postgres_invalid", || {
let mut conn = postgres::Client::connect(POSTGRES_URL, postgres::NoTls)?;
let articles_res = conn.query("SELECT a.id FROM articles AS a", &[]);
let persons_res = conn.query("SELECT p.id FROM persons AS p", &[]);
assert!(articles_res.is_ok());
assert!(persons_res.is_err());
Ok(())
})?;
Ok(())
}
#[test]
fn cannot_applied_invalid_migrations_in_single_transaction() -> TestResult {
fn inner<ValidateFn>(database_name: &'static str, validate: ValidateFn) -> TestResult
where
ValidateFn: Fn() -> TestResult,
{
let manifest_path = database_manifest_path(database_name);
Command::cargo_bin("migra")?
.arg("-c")
.arg(&manifest_path)
.arg("up")
.arg("--single-transaction")
.assert()
.failure();
validate()?;
Ok(())
}
#[cfg(feature = "postgres")]
inner("postgres_invalid", || {
let mut conn = postgres::Client::connect(POSTGRES_URL, postgres::NoTls)?;
let articles_res = conn.query("SELECT a.id FROM articles AS a", &[]);
let persons_res = conn.query("SELECT p.id FROM persons AS p", &[]);
assert!(articles_res.is_err());
assert!(persons_res.is_err());
Ok(())
})?;
Ok(())
}
}
mod apply {

View file

@ -0,0 +1,4 @@
root = "./postgres_invalid"
[database]
connection = "postgres://postgres:postgres@localhost:6000/migra_tests"

View file

@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
DROP TABLE articles;

View file

@ -0,0 +1,8 @@
-- Your SQL goes here
CREATE TABLE articles (
id serial PRIMARY KEY,
title text NOT NULL CHECK (length(title) > 0),
content text NOT NULL,
created_at timestamp NOT NULL DEFAULT current_timestamp
);

View file

@ -0,0 +1,6 @@
-- This file should undo anything in `up.sql`
ALTER TABLE articles
DROP COLUMN author_person_id;
DROP TABLE persons;

View file

@ -0,0 +1,14 @@
-- Your SQL goes here
CREATE TABLE persons (
id SERIAL PRIMARY KEY,
email text NOT NULL UNIQUE,
display_name text NOT NULL,
created_at timestamp NOT NULL DEFAULT current_timestamp
);
/* This table doesn't exist
*/
ALTER TABLE recipes
ADD COLUMN author_person_id int NULL
REFERENCES persons (id) ON UPDATE CASCADE ON DELETE CASCADE;