From 11c374e7b097404018b92a35abb53a1eda5c65b5 Mon Sep 17 00:00:00 2001 From: Dmitriy Pleshevskiy Date: Tue, 2 Mar 2021 00:44:54 +0300 Subject: [PATCH] feat: add transaction manager now we change database only in transaction --- migra-cli/src/commands/apply.rs | 21 ++++---- migra-cli/src/commands/downgrade.rs | 14 +++-- migra-cli/src/commands/list.rs | 11 ++-- migra-cli/src/commands/upgrade.rs | 14 +++-- migra-cli/src/database/connection.rs | 30 ++++++----- migra-cli/src/database/migration.rs | 74 +++++++++++---------------- migra-cli/src/database/mod.rs | 6 ++- migra-cli/src/database/transaction.rs | 48 +++++++++++++++++ 8 files changed, 131 insertions(+), 87 deletions(-) create mode 100644 migra-cli/src/database/transaction.rs diff --git a/migra-cli/src/commands/apply.rs b/migra-cli/src/commands/apply.rs index 4c04fc2..1b793ea 100644 --- a/migra-cli/src/commands/apply.rs +++ b/migra-cli/src/commands/apply.rs @@ -1,12 +1,15 @@ use crate::config::Config; use crate::database::prelude::*; -use crate::database::MigrationManager; +use crate::database::transaction::with_transaction; +use crate::database::{DatabaseConnectionManager, MigrationManager}; use crate::opts::ApplyCommandOpt; use crate::StdResult; -use std::convert::TryFrom; pub(crate) fn apply_sql(config: Config, opts: ApplyCommandOpt) -> StdResult<()> { - let mut manager = MigrationManager::try_from(&config)?; + let mut connection_manager = DatabaseConnectionManager::connect(&config.database)?; + let conn = connection_manager.connection(); + + let migration_manager = MigrationManager::new(); let file_path = { let mut file_path = config.directory_path().join(opts.file_name); @@ -17,15 +20,9 @@ pub(crate) fn apply_sql(config: Config, opts: ApplyCommandOpt) -> StdResult<()> }; let content = std::fs::read_to_string(file_path)?; - - match manager.apply_sql(&content) { - Ok(_) => { - println!("File was applied successfully"); - } - Err(err) => { - println!("{}", err); - } - } + with_transaction(conn, &mut |conn| { + migration_manager.apply_sql(conn, &content) + })?; Ok(()) } diff --git a/migra-cli/src/commands/downgrade.rs b/migra-cli/src/commands/downgrade.rs index 6201494..4e7f6ca 100644 --- a/migra-cli/src/commands/downgrade.rs +++ b/migra-cli/src/commands/downgrade.rs @@ -1,18 +1,20 @@ use crate::config::Config; use crate::database::prelude::*; -use crate::database::MigrationManager; +use crate::database::transaction::with_transaction; +use crate::database::{DatabaseConnectionManager, MigrationManager}; use crate::opts::DowngradeCommandOpt; use crate::StdResult; use std::cmp; -use std::convert::TryFrom; pub(crate) fn rollback_applied_migrations( config: Config, opts: DowngradeCommandOpt, ) -> StdResult<()> { - let mut manager = MigrationManager::try_from(&config)?; + let mut connection_manager = DatabaseConnectionManager::connect(&config.database)?; + let conn = connection_manager.connection(); + let migration_manager = MigrationManager::new(); - let applied_migrations = manager.applied_migration_names()?; + let applied_migrations = migration_manager.applied_migration_names(conn)?; let migrations = config.migrations()?; let rollback_migrations_number = if opts.all_migrations { @@ -24,7 +26,9 @@ pub(crate) fn rollback_applied_migrations( for migration_name in &applied_migrations[..rollback_migrations_number] { if let Some(migration) = migrations.iter().find(|m| m.name() == migration_name) { println!("downgrade {}...", migration.name()); - manager.downgrade(&migration)?; + with_transaction(conn, &mut |conn| { + migration_manager.downgrade(conn, &migration) + })?; } } diff --git a/migra-cli/src/commands/list.rs b/migra-cli/src/commands/list.rs index 8087835..3e087d6 100644 --- a/migra-cli/src/commands/list.rs +++ b/migra-cli/src/commands/list.rs @@ -9,11 +9,14 @@ const EM_DASH: char = '—'; pub(crate) fn print_migration_lists(config: Config) -> StdResult<()> { let applied_migration_names = match config.database.connection_string() { Ok(ref database_connection_string) => { - let connection_manager = DatabaseConnectionManager::new(&config.database); - let conn = connection_manager.connect_with_string(database_connection_string)?; - let mut manager = MigrationManager::new(conn); + let mut connection_manager = DatabaseConnectionManager::connect_with_string( + &config.database, + database_connection_string, + )?; + let conn = connection_manager.connection(); - let applied_migration_names = manager.applied_migration_names()?; + let migration_manager = MigrationManager::new(); + let applied_migration_names = migration_manager.applied_migration_names(conn)?; show_applied_migrations(&applied_migration_names); diff --git a/migra-cli/src/commands/upgrade.rs b/migra-cli/src/commands/upgrade.rs index 0d166f5..bc6b549 100644 --- a/migra-cli/src/commands/upgrade.rs +++ b/migra-cli/src/commands/upgrade.rs @@ -1,13 +1,17 @@ use crate::database::migration::*; +use crate::database::transaction::with_transaction; +use crate::database::DatabaseConnectionManager; use crate::opts::UpgradeCommandOpt; use crate::Config; use crate::StdResult; -use std::convert::TryFrom; pub(crate) fn upgrade_pending_migrations(config: Config, opts: UpgradeCommandOpt) -> StdResult<()> { - let mut manager = MigrationManager::try_from(&config)?; + let mut connection_manager = DatabaseConnectionManager::connect(&config.database)?; + let conn = connection_manager.connection(); - let applied_migration_names = manager.applied_migration_names()?; + let migration_manager = MigrationManager::new(); + + let applied_migration_names = migration_manager.applied_migration_names(conn)?; let migrations = config.migrations()?; let pending_migrations = filter_pending_migrations(migrations, &applied_migration_names); @@ -20,7 +24,7 @@ pub(crate) fn upgrade_pending_migrations(config: Config, opts: UpgradeCommandOpt match target_migration { Some(migration) => { print_migration_info(migration); - manager.upgrade(migration)?; + with_transaction(conn, &mut |conn| migration_manager.upgrade(conn, migration))?; } None => { eprintln!(r#"Cannot find migration with "{}" name"#, migration_name); @@ -33,7 +37,7 @@ pub(crate) fn upgrade_pending_migrations(config: Config, opts: UpgradeCommandOpt for migration in &pending_migrations[..upgrade_migrations_number] { print_migration_info(migration); - manager.upgrade(migration)?; + with_transaction(conn, &mut |conn| migration_manager.upgrade(conn, migration))?; } } diff --git a/migra-cli/src/database/connection.rs b/migra-cli/src/database/connection.rs index c9ef533..04cdace 100644 --- a/migra-cli/src/database/connection.rs +++ b/migra-cli/src/database/connection.rs @@ -3,6 +3,8 @@ use super::clients::*; use crate::config::{DatabaseConfig, SupportedDatabaseClient}; use crate::error::StdResult; +pub type AnyConnection = Box; + pub trait OpenDatabaseConnection: Sized { fn open(connection_string: &str) -> StdResult; } @@ -16,29 +18,29 @@ pub trait DatabaseConnection { } pub(crate) struct DatabaseConnectionManager { - config: DatabaseConfig, + conn: AnyConnection, } impl DatabaseConnectionManager { - pub fn new(config: &DatabaseConfig) -> Self { - Self { - config: config.clone(), - } - } - pub fn connect_with_string( - &self, + config: &DatabaseConfig, connection_string: &str, - ) -> StdResult> { - let conn = match self.config.client()? { + ) -> StdResult { + let conn = match config.client()? { SupportedDatabaseClient::Postgres => PostgresConnection::open(&connection_string)?, }; - Ok(Box::new(conn)) + Ok(DatabaseConnectionManager { + conn: Box::new(conn), + }) } - pub fn connect(&self) -> StdResult> { - let connection_string = self.config.connection_string()?; - self.connect_with_string(&connection_string) + pub fn connect(config: &DatabaseConfig) -> StdResult { + let connection_string = config.connection_string()?; + Self::connect_with_string(config, &connection_string) + } + + pub fn connection(&mut self) -> &mut AnyConnection { + &mut self.conn } } diff --git a/migra-cli/src/database/migration.rs b/migra-cli/src/database/migration.rs index 3ef6331..90f9426 100644 --- a/migra-cli/src/database/migration.rs +++ b/migra-cli/src/database/migration.rs @@ -1,7 +1,5 @@ -use super::connection::{DatabaseConnection, DatabaseConnectionManager}; -use crate::config::Config; +use super::connection::AnyConnection; use crate::StdResult; -use std::convert::TryFrom; use std::fs; use std::path::{Path, PathBuf}; @@ -51,23 +49,12 @@ impl Migration { } } -pub struct MigrationManager { - pub(crate) conn: Box, -} +#[derive(Debug)] +pub struct MigrationManager; impl MigrationManager { - pub fn new(conn: Box) -> Self { - MigrationManager { conn } - } -} - -impl TryFrom<&Config> for MigrationManager { - type Error = Box; - - fn try_from(config: &Config) -> Result { - let connection_manager = DatabaseConnectionManager::new(&config.database); - let conn = connection_manager.connect()?; - Ok(Self { conn }) + pub fn new() -> Self { + MigrationManager } } @@ -77,44 +64,44 @@ pub fn is_migrations_table_not_found(error: D) -> bool { .contains(r#"relation "migrations" does not exist"#) } -pub trait DatabaseMigrationManager { - fn apply_sql(&mut self, sql_content: &str) -> StdResult<()>; +pub trait ManageMigration { + fn apply_sql(&self, conn: &mut AnyConnection, sql_content: &str) -> StdResult<()>; - fn create_migrations_table(&mut self) -> StdResult<()>; + fn create_migrations_table(&self, conn: &mut AnyConnection) -> StdResult<()>; - fn insert_migration_info(&mut self, name: &str) -> StdResult; + fn insert_migration_info(&self, conn: &mut AnyConnection, name: &str) -> StdResult; - fn delete_migration_info(&mut self, name: &str) -> StdResult; + fn delete_migration_info(&self, conn: &mut AnyConnection, name: &str) -> StdResult; - fn applied_migration_names(&mut self) -> StdResult>; + fn applied_migration_names(&self, conn: &mut AnyConnection) -> StdResult>; - fn upgrade(&mut self, migration: &Migration) -> StdResult<()> { + fn upgrade(&self, conn: &mut AnyConnection, migration: &Migration) -> StdResult<()> { let content = migration.upgrade_sql_content()?; - self.create_migrations_table()?; - self.apply_sql(&content)?; - self.insert_migration_info(migration.name())?; + self.create_migrations_table(conn)?; + self.apply_sql(conn, &content)?; + self.insert_migration_info(conn, migration.name())?; Ok(()) } - fn downgrade(&mut self, migration: &Migration) -> StdResult<()> { + fn downgrade(&self, conn: &mut AnyConnection, migration: &Migration) -> StdResult<()> { let content = migration.downgrade_sql_content()?; - self.apply_sql(&content)?; - self.delete_migration_info(migration.name())?; + self.apply_sql(conn, &content)?; + self.delete_migration_info(conn, migration.name())?; Ok(()) } } -impl DatabaseMigrationManager for MigrationManager { - fn apply_sql(&mut self, sql_content: &str) -> StdResult<()> { - self.conn.batch_execute(sql_content) +impl ManageMigration for MigrationManager { + fn apply_sql(&self, conn: &mut AnyConnection, sql_content: &str) -> StdResult<()> { + conn.batch_execute(sql_content) } - fn create_migrations_table(&mut self) -> StdResult<()> { - self.conn.batch_execute( + fn create_migrations_table(&self, conn: &mut AnyConnection) -> StdResult<()> { + conn.batch_execute( r#"CREATE TABLE IF NOT EXISTS migrations ( id serial PRIMARY KEY, name text NOT NULL UNIQUE @@ -122,19 +109,16 @@ impl DatabaseMigrationManager for MigrationManager { ) } - fn insert_migration_info(&mut self, name: &str) -> StdResult { - self.conn - .execute("INSERT INTO migrations (name) VALUES ($1)", &[&name]) + fn insert_migration_info(&self, conn: &mut AnyConnection, name: &str) -> StdResult { + conn.execute("INSERT INTO migrations (name) VALUES ($1)", &[&name]) } - fn delete_migration_info(&mut self, name: &str) -> StdResult { - self.conn - .execute("DELETE FROM migrations WHERE name = $1", &[&name]) + fn delete_migration_info(&self, conn: &mut AnyConnection, name: &str) -> StdResult { + conn.execute("DELETE FROM migrations WHERE name = $1", &[&name]) } - fn applied_migration_names(&mut self) -> StdResult> { - let res = self - .conn + fn applied_migration_names(&self, conn: &mut AnyConnection) -> StdResult> { + let res = conn .query("SELECT name FROM migrations ORDER BY id DESC", &[]) .or_else(|e| { if is_migrations_table_not_found(&e) { diff --git a/migra-cli/src/database/mod.rs b/migra-cli/src/database/mod.rs index fe4a41c..b4d0edb 100644 --- a/migra-cli/src/database/mod.rs +++ b/migra-cli/src/database/mod.rs @@ -3,11 +3,13 @@ pub(crate) mod builder; pub(crate) mod clients; pub(crate) mod connection; pub(crate) mod migration; +pub(crate) mod transaction; pub mod prelude { pub use super::adapter::{ToSql, ToSqlParams, TryFromSql}; - pub use super::connection::{DatabaseConnection, OpenDatabaseConnection}; - pub use super::migration::DatabaseMigrationManager; + pub use super::connection::{AnyConnection, DatabaseConnection, OpenDatabaseConnection}; + pub use super::migration::ManageMigration; + pub use super::transaction::ManageTransaction; } pub(crate) use connection::DatabaseConnectionManager; diff --git a/migra-cli/src/database/transaction.rs b/migra-cli/src/database/transaction.rs new file mode 100644 index 0000000..b625cf1 --- /dev/null +++ b/migra-cli/src/database/transaction.rs @@ -0,0 +1,48 @@ +use super::connection::AnyConnection; +use crate::error::StdResult; + +pub trait ManageTransaction { + fn begin_transaction(&self, conn: &mut AnyConnection) -> StdResult<()>; + + fn rollback_transaction(&self, conn: &mut AnyConnection) -> StdResult<()>; + + fn commit_transaction(&self, conn: &mut AnyConnection) -> StdResult<()>; +} + +#[derive(Debug)] +pub struct TransactionManager; + +impl TransactionManager { + pub fn new() -> Self { + TransactionManager + } +} + +impl ManageTransaction for TransactionManager { + fn begin_transaction(&self, conn: &mut AnyConnection) -> StdResult<()> { + conn.batch_execute("BEGIN") + } + + fn rollback_transaction(&self, conn: &mut AnyConnection) -> StdResult<()> { + conn.batch_execute("ROLLBACK") + } + + fn commit_transaction(&self, conn: &mut AnyConnection) -> StdResult<()> { + conn.batch_execute("COMMIT") + } +} + +pub fn with_transaction( + conn: &mut AnyConnection, + trx_fn: &mut TrxFnMut, +) -> StdResult +where + TrxFnMut: FnMut(&mut AnyConnection) -> StdResult, +{ + let transaction_manager = TransactionManager::new(); + transaction_manager + .begin_transaction(conn) + .and_then(|_| trx_fn(conn)) + .and_then(|res| transaction_manager.commit_transaction(conn).and(Ok(res))) + .or_else(|err| transaction_manager.rollback_transaction(conn).and(Err(err))) +}