Archived
1
0
Fork 0

feat: add transaction manager

now we change database only in transaction
This commit is contained in:
Dmitriy Pleshevskiy 2021-03-02 00:44:54 +03:00
parent 7c8ff199cc
commit 11c374e7b0
8 changed files with 131 additions and 87 deletions

View file

@ -1,12 +1,15 @@
use crate::config::Config; use crate::config::Config;
use crate::database::prelude::*; use crate::database::prelude::*;
use crate::database::MigrationManager; use crate::database::transaction::with_transaction;
use crate::database::{DatabaseConnectionManager, MigrationManager};
use crate::opts::ApplyCommandOpt; use crate::opts::ApplyCommandOpt;
use crate::StdResult; use crate::StdResult;
use std::convert::TryFrom;
pub(crate) fn apply_sql(config: Config, opts: ApplyCommandOpt) -> StdResult<()> { pub(crate) fn apply_sql(config: Config, opts: ApplyCommandOpt) -> StdResult<()> {
let mut manager = MigrationManager::try_from(&config)?; let mut connection_manager = DatabaseConnectionManager::connect(&config.database)?;
let conn = connection_manager.connection();
let migration_manager = MigrationManager::new();
let file_path = { let file_path = {
let mut file_path = config.directory_path().join(opts.file_name); let mut file_path = config.directory_path().join(opts.file_name);
@ -17,15 +20,9 @@ pub(crate) fn apply_sql(config: Config, opts: ApplyCommandOpt) -> StdResult<()>
}; };
let content = std::fs::read_to_string(file_path)?; let content = std::fs::read_to_string(file_path)?;
with_transaction(conn, &mut |conn| {
match manager.apply_sql(&content) { migration_manager.apply_sql(conn, &content)
Ok(_) => { })?;
println!("File was applied successfully");
}
Err(err) => {
println!("{}", err);
}
}
Ok(()) Ok(())
} }

View file

@ -1,18 +1,20 @@
use crate::config::Config; use crate::config::Config;
use crate::database::prelude::*; use crate::database::prelude::*;
use crate::database::MigrationManager; use crate::database::transaction::with_transaction;
use crate::database::{DatabaseConnectionManager, MigrationManager};
use crate::opts::DowngradeCommandOpt; use crate::opts::DowngradeCommandOpt;
use crate::StdResult; use crate::StdResult;
use std::cmp; use std::cmp;
use std::convert::TryFrom;
pub(crate) fn rollback_applied_migrations( pub(crate) fn rollback_applied_migrations(
config: Config, config: Config,
opts: DowngradeCommandOpt, opts: DowngradeCommandOpt,
) -> StdResult<()> { ) -> StdResult<()> {
let mut manager = MigrationManager::try_from(&config)?; let mut connection_manager = DatabaseConnectionManager::connect(&config.database)?;
let conn = connection_manager.connection();
let migration_manager = MigrationManager::new();
let applied_migrations = manager.applied_migration_names()?; let applied_migrations = migration_manager.applied_migration_names(conn)?;
let migrations = config.migrations()?; let migrations = config.migrations()?;
let rollback_migrations_number = if opts.all_migrations { let rollback_migrations_number = if opts.all_migrations {
@ -24,7 +26,9 @@ pub(crate) fn rollback_applied_migrations(
for migration_name in &applied_migrations[..rollback_migrations_number] { for migration_name in &applied_migrations[..rollback_migrations_number] {
if let Some(migration) = migrations.iter().find(|m| m.name() == migration_name) { if let Some(migration) = migrations.iter().find(|m| m.name() == migration_name) {
println!("downgrade {}...", migration.name()); println!("downgrade {}...", migration.name());
manager.downgrade(&migration)?; with_transaction(conn, &mut |conn| {
migration_manager.downgrade(conn, &migration)
})?;
} }
} }

View file

@ -9,11 +9,14 @@ const EM_DASH: char = '—';
pub(crate) fn print_migration_lists(config: Config) -> StdResult<()> { pub(crate) fn print_migration_lists(config: Config) -> StdResult<()> {
let applied_migration_names = match config.database.connection_string() { let applied_migration_names = match config.database.connection_string() {
Ok(ref database_connection_string) => { Ok(ref database_connection_string) => {
let connection_manager = DatabaseConnectionManager::new(&config.database); let mut connection_manager = DatabaseConnectionManager::connect_with_string(
let conn = connection_manager.connect_with_string(database_connection_string)?; &config.database,
let mut manager = MigrationManager::new(conn); database_connection_string,
)?;
let conn = connection_manager.connection();
let applied_migration_names = manager.applied_migration_names()?; let migration_manager = MigrationManager::new();
let applied_migration_names = migration_manager.applied_migration_names(conn)?;
show_applied_migrations(&applied_migration_names); show_applied_migrations(&applied_migration_names);

View file

@ -1,13 +1,17 @@
use crate::database::migration::*; use crate::database::migration::*;
use crate::database::transaction::with_transaction;
use crate::database::DatabaseConnectionManager;
use crate::opts::UpgradeCommandOpt; use crate::opts::UpgradeCommandOpt;
use crate::Config; use crate::Config;
use crate::StdResult; use crate::StdResult;
use std::convert::TryFrom;
pub(crate) fn upgrade_pending_migrations(config: Config, opts: UpgradeCommandOpt) -> StdResult<()> { pub(crate) fn upgrade_pending_migrations(config: Config, opts: UpgradeCommandOpt) -> StdResult<()> {
let mut manager = MigrationManager::try_from(&config)?; let mut connection_manager = DatabaseConnectionManager::connect(&config.database)?;
let conn = connection_manager.connection();
let applied_migration_names = manager.applied_migration_names()?; let migration_manager = MigrationManager::new();
let applied_migration_names = migration_manager.applied_migration_names(conn)?;
let migrations = config.migrations()?; let migrations = config.migrations()?;
let pending_migrations = filter_pending_migrations(migrations, &applied_migration_names); let pending_migrations = filter_pending_migrations(migrations, &applied_migration_names);
@ -20,7 +24,7 @@ pub(crate) fn upgrade_pending_migrations(config: Config, opts: UpgradeCommandOpt
match target_migration { match target_migration {
Some(migration) => { Some(migration) => {
print_migration_info(migration); print_migration_info(migration);
manager.upgrade(migration)?; with_transaction(conn, &mut |conn| migration_manager.upgrade(conn, migration))?;
} }
None => { None => {
eprintln!(r#"Cannot find migration with "{}" name"#, migration_name); eprintln!(r#"Cannot find migration with "{}" name"#, migration_name);
@ -33,7 +37,7 @@ pub(crate) fn upgrade_pending_migrations(config: Config, opts: UpgradeCommandOpt
for migration in &pending_migrations[..upgrade_migrations_number] { for migration in &pending_migrations[..upgrade_migrations_number] {
print_migration_info(migration); print_migration_info(migration);
manager.upgrade(migration)?; with_transaction(conn, &mut |conn| migration_manager.upgrade(conn, migration))?;
} }
} }

View file

@ -3,6 +3,8 @@ use super::clients::*;
use crate::config::{DatabaseConfig, SupportedDatabaseClient}; use crate::config::{DatabaseConfig, SupportedDatabaseClient};
use crate::error::StdResult; use crate::error::StdResult;
pub type AnyConnection = Box<dyn DatabaseConnection>;
pub trait OpenDatabaseConnection: Sized { pub trait OpenDatabaseConnection: Sized {
fn open(connection_string: &str) -> StdResult<Self>; fn open(connection_string: &str) -> StdResult<Self>;
} }
@ -16,29 +18,29 @@ pub trait DatabaseConnection {
} }
pub(crate) struct DatabaseConnectionManager { pub(crate) struct DatabaseConnectionManager {
config: DatabaseConfig, conn: AnyConnection,
} }
impl DatabaseConnectionManager { impl DatabaseConnectionManager {
pub fn new(config: &DatabaseConfig) -> Self {
Self {
config: config.clone(),
}
}
pub fn connect_with_string( pub fn connect_with_string(
&self, config: &DatabaseConfig,
connection_string: &str, connection_string: &str,
) -> StdResult<Box<dyn DatabaseConnection>> { ) -> StdResult<Self> {
let conn = match self.config.client()? { let conn = match config.client()? {
SupportedDatabaseClient::Postgres => PostgresConnection::open(&connection_string)?, SupportedDatabaseClient::Postgres => PostgresConnection::open(&connection_string)?,
}; };
Ok(Box::new(conn)) Ok(DatabaseConnectionManager {
conn: Box::new(conn),
})
} }
pub fn connect(&self) -> StdResult<Box<dyn DatabaseConnection>> { pub fn connect(config: &DatabaseConfig) -> StdResult<Self> {
let connection_string = self.config.connection_string()?; let connection_string = config.connection_string()?;
self.connect_with_string(&connection_string) Self::connect_with_string(config, &connection_string)
}
pub fn connection(&mut self) -> &mut AnyConnection {
&mut self.conn
} }
} }

View file

@ -1,7 +1,5 @@
use super::connection::{DatabaseConnection, DatabaseConnectionManager}; use super::connection::AnyConnection;
use crate::config::Config;
use crate::StdResult; use crate::StdResult;
use std::convert::TryFrom;
use std::fs; use std::fs;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -51,23 +49,12 @@ impl Migration {
} }
} }
pub struct MigrationManager { #[derive(Debug)]
pub(crate) conn: Box<dyn DatabaseConnection>, pub struct MigrationManager;
}
impl MigrationManager { impl MigrationManager {
pub fn new(conn: Box<dyn DatabaseConnection>) -> Self { pub fn new() -> Self {
MigrationManager { conn } MigrationManager
}
}
impl TryFrom<&Config> for MigrationManager {
type Error = Box<dyn std::error::Error>;
fn try_from(config: &Config) -> Result<Self, Self::Error> {
let connection_manager = DatabaseConnectionManager::new(&config.database);
let conn = connection_manager.connect()?;
Ok(Self { conn })
} }
} }
@ -77,44 +64,44 @@ pub fn is_migrations_table_not_found<D: std::fmt::Display>(error: D) -> bool {
.contains(r#"relation "migrations" does not exist"#) .contains(r#"relation "migrations" does not exist"#)
} }
pub trait DatabaseMigrationManager { pub trait ManageMigration {
fn apply_sql(&mut self, sql_content: &str) -> StdResult<()>; fn apply_sql(&self, conn: &mut AnyConnection, sql_content: &str) -> StdResult<()>;
fn create_migrations_table(&mut self) -> StdResult<()>; fn create_migrations_table(&self, conn: &mut AnyConnection) -> StdResult<()>;
fn insert_migration_info(&mut self, name: &str) -> StdResult<u64>; fn insert_migration_info(&self, conn: &mut AnyConnection, name: &str) -> StdResult<u64>;
fn delete_migration_info(&mut self, name: &str) -> StdResult<u64>; fn delete_migration_info(&self, conn: &mut AnyConnection, name: &str) -> StdResult<u64>;
fn applied_migration_names(&mut self) -> StdResult<Vec<String>>; fn applied_migration_names(&self, conn: &mut AnyConnection) -> StdResult<Vec<String>>;
fn upgrade(&mut self, migration: &Migration) -> StdResult<()> { fn upgrade(&self, conn: &mut AnyConnection, migration: &Migration) -> StdResult<()> {
let content = migration.upgrade_sql_content()?; let content = migration.upgrade_sql_content()?;
self.create_migrations_table()?; self.create_migrations_table(conn)?;
self.apply_sql(&content)?; self.apply_sql(conn, &content)?;
self.insert_migration_info(migration.name())?; self.insert_migration_info(conn, migration.name())?;
Ok(()) Ok(())
} }
fn downgrade(&mut self, migration: &Migration) -> StdResult<()> { fn downgrade(&self, conn: &mut AnyConnection, migration: &Migration) -> StdResult<()> {
let content = migration.downgrade_sql_content()?; let content = migration.downgrade_sql_content()?;
self.apply_sql(&content)?; self.apply_sql(conn, &content)?;
self.delete_migration_info(migration.name())?; self.delete_migration_info(conn, migration.name())?;
Ok(()) Ok(())
} }
} }
impl DatabaseMigrationManager for MigrationManager { impl ManageMigration for MigrationManager {
fn apply_sql(&mut self, sql_content: &str) -> StdResult<()> { fn apply_sql(&self, conn: &mut AnyConnection, sql_content: &str) -> StdResult<()> {
self.conn.batch_execute(sql_content) conn.batch_execute(sql_content)
} }
fn create_migrations_table(&mut self) -> StdResult<()> { fn create_migrations_table(&self, conn: &mut AnyConnection) -> StdResult<()> {
self.conn.batch_execute( conn.batch_execute(
r#"CREATE TABLE IF NOT EXISTS migrations ( r#"CREATE TABLE IF NOT EXISTS migrations (
id serial PRIMARY KEY, id serial PRIMARY KEY,
name text NOT NULL UNIQUE name text NOT NULL UNIQUE
@ -122,19 +109,16 @@ impl DatabaseMigrationManager for MigrationManager {
) )
} }
fn insert_migration_info(&mut self, name: &str) -> StdResult<u64> { fn insert_migration_info(&self, conn: &mut AnyConnection, name: &str) -> StdResult<u64> {
self.conn conn.execute("INSERT INTO migrations (name) VALUES ($1)", &[&name])
.execute("INSERT INTO migrations (name) VALUES ($1)", &[&name])
} }
fn delete_migration_info(&mut self, name: &str) -> StdResult<u64> { fn delete_migration_info(&self, conn: &mut AnyConnection, name: &str) -> StdResult<u64> {
self.conn conn.execute("DELETE FROM migrations WHERE name = $1", &[&name])
.execute("DELETE FROM migrations WHERE name = $1", &[&name])
} }
fn applied_migration_names(&mut self) -> StdResult<Vec<String>> { fn applied_migration_names(&self, conn: &mut AnyConnection) -> StdResult<Vec<String>> {
let res = self let res = conn
.conn
.query("SELECT name FROM migrations ORDER BY id DESC", &[]) .query("SELECT name FROM migrations ORDER BY id DESC", &[])
.or_else(|e| { .or_else(|e| {
if is_migrations_table_not_found(&e) { if is_migrations_table_not_found(&e) {

View file

@ -3,11 +3,13 @@ pub(crate) mod builder;
pub(crate) mod clients; pub(crate) mod clients;
pub(crate) mod connection; pub(crate) mod connection;
pub(crate) mod migration; pub(crate) mod migration;
pub(crate) mod transaction;
pub mod prelude { pub mod prelude {
pub use super::adapter::{ToSql, ToSqlParams, TryFromSql}; pub use super::adapter::{ToSql, ToSqlParams, TryFromSql};
pub use super::connection::{DatabaseConnection, OpenDatabaseConnection}; pub use super::connection::{AnyConnection, DatabaseConnection, OpenDatabaseConnection};
pub use super::migration::DatabaseMigrationManager; pub use super::migration::ManageMigration;
pub use super::transaction::ManageTransaction;
} }
pub(crate) use connection::DatabaseConnectionManager; pub(crate) use connection::DatabaseConnectionManager;

View file

@ -0,0 +1,48 @@
use super::connection::AnyConnection;
use crate::error::StdResult;
pub trait ManageTransaction {
fn begin_transaction(&self, conn: &mut AnyConnection) -> StdResult<()>;
fn rollback_transaction(&self, conn: &mut AnyConnection) -> StdResult<()>;
fn commit_transaction(&self, conn: &mut AnyConnection) -> StdResult<()>;
}
#[derive(Debug)]
pub struct TransactionManager;
impl TransactionManager {
pub fn new() -> Self {
TransactionManager
}
}
impl ManageTransaction for TransactionManager {
fn begin_transaction(&self, conn: &mut AnyConnection) -> StdResult<()> {
conn.batch_execute("BEGIN")
}
fn rollback_transaction(&self, conn: &mut AnyConnection) -> StdResult<()> {
conn.batch_execute("ROLLBACK")
}
fn commit_transaction(&self, conn: &mut AnyConnection) -> StdResult<()> {
conn.batch_execute("COMMIT")
}
}
pub fn with_transaction<TrxFnMut, Res>(
conn: &mut AnyConnection,
trx_fn: &mut TrxFnMut,
) -> StdResult<Res>
where
TrxFnMut: FnMut(&mut AnyConnection) -> StdResult<Res>,
{
let transaction_manager = TransactionManager::new();
transaction_manager
.begin_transaction(conn)
.and_then(|_| trx_fn(conn))
.and_then(|res| transaction_manager.commit_transaction(conn).and(Ok(res)))
.or_else(|err| transaction_manager.rollback_transaction(conn).and(Err(err)))
}