From 109c9ce52f779dfc859aa0093961dcdb93a452b5 Mon Sep 17 00:00:00 2001 From: Dmitriy Pleshevskiy Date: Mon, 15 Feb 2021 13:06:09 +0300 Subject: [PATCH] refac: add migration manager --- migra-cli/src/commands/apply.rs | 6 +- migra-cli/src/commands/downgrade.rs | 9 +- migra-cli/src/commands/list.rs | 52 ++++++----- migra-cli/src/commands/upgrade.rs | 37 ++++---- migra-cli/src/database.rs | 111 +++++++++++++++-------- migra-cli/src/migration.rs | 134 +++++++++++++++++++++++----- 6 files changed, 243 insertions(+), 106 deletions(-) diff --git a/migra-cli/src/commands/apply.rs b/migra-cli/src/commands/apply.rs index b64f52c..f873db3 100644 --- a/migra-cli/src/commands/apply.rs +++ b/migra-cli/src/commands/apply.rs @@ -1,12 +1,14 @@ use crate::config::Config; use crate::database::PostgresConnection; +use crate::migration::{DatabaseMigrationManager, MigrationManager}; use crate::opts::ApplyCommandOpt; use crate::path::PathBuilder; use crate::StdResult; use std::convert::TryFrom; pub(crate) fn apply_sql(config: Config, opts: ApplyCommandOpt) -> StdResult<()> { - let mut connection = PostgresConnection::try_from(&config)?; + let connection = PostgresConnection::try_from(&config)?; + let mut manager = MigrationManager::new(connection); let file_path = PathBuilder::from(config.directory_path()) .append(opts.file_name) @@ -15,7 +17,7 @@ pub(crate) fn apply_sql(config: Config, opts: ApplyCommandOpt) -> StdResult<()> let content = std::fs::read_to_string(file_path)?; - match connection.apply_sql(&content) { + match manager.apply_sql(&content) { Ok(_) => { println!("File was applied successfully"); } diff --git a/migra-cli/src/commands/downgrade.rs b/migra-cli/src/commands/downgrade.rs index d50377e..a8267ad 100644 --- a/migra-cli/src/commands/downgrade.rs +++ b/migra-cli/src/commands/downgrade.rs @@ -1,13 +1,14 @@ use crate::config::Config; use crate::database::PostgresConnection; -use crate::migration::Downgrade; +use crate::migration::{DatabaseMigrationManager, MigrationManager}; use crate::StdResult; use std::convert::TryFrom; pub(crate) fn downgrade_applied_migrations(config: Config) -> StdResult<()> { - let mut connection = PostgresConnection::try_from(&config)?; + let connection = PostgresConnection::try_from(&config)?; + let mut manager = MigrationManager::new(connection); - let applied_migrations = connection.applied_migration_names()?; + let applied_migrations = manager.applied_migration_names()?; let migrations = config.migrations()?; if let Some(first_applied_migration) = applied_migrations.first() { @@ -16,7 +17,7 @@ pub(crate) fn downgrade_applied_migrations(config: Config) -> StdResult<()> { .find(|m| m.name() == first_applied_migration) { println!("downgrade {}...", migration.name()); - migration.downgrade(&mut connection)?; + manager.downgrade(&migration)?; } } diff --git a/migra-cli/src/commands/list.rs b/migra-cli/src/commands/list.rs index ae3fdcf..d0fa8ec 100644 --- a/migra-cli/src/commands/list.rs +++ b/migra-cli/src/commands/list.rs @@ -1,26 +1,23 @@ use crate::config::Config; -use crate::database::PostgresConnection; +use crate::database::{DatabaseConnection, PostgresConnection}; use crate::error::{ErrorKind, StdResult}; +use crate::migration::{ + filter_pending_migrations, DatabaseMigrationManager, Migration, MigrationManager, +}; const EM_DASH: char = '—'; pub(crate) fn print_migration_lists(config: Config) -> StdResult<()> { - let applied_migrations = match config.database_connection_string() { + let applied_migration_names = match config.database_connection_string() { Ok(ref database_connection_string) => { - let mut connection = PostgresConnection::open(database_connection_string)?; - let applied_migrations = connection.applied_migration_names()?; + let connection = PostgresConnection::open(database_connection_string)?; + let mut manager = MigrationManager::new(connection); - println!("Applied migrations:"); - if applied_migrations.is_empty() { - println!("{}", EM_DASH); - } else { - applied_migrations - .iter() - .rev() - .for_each(|name| println!("{}", name)); - } + let applied_migration_names = manager.applied_migration_names()?; - applied_migrations + show_applied_migrations(&applied_migration_names); + + applied_migration_names } Err(e) if *e.kind() == ErrorKind::MissedEnvVar(String::new()) => { println!("{}", e.kind()); @@ -33,11 +30,26 @@ pub(crate) fn print_migration_lists(config: Config) -> StdResult<()> { println!(); - let pending_migrations = config - .migrations()? - .into_iter() - .filter(|m| !applied_migrations.contains(m.name())) - .collect::>(); + let pending_migrations = + filter_pending_migrations(config.migrations()?, &applied_migration_names); + show_pending_migrations(&pending_migrations); + + Ok(()) +} + +fn show_applied_migrations(applied_migration_names: &[String]) { + println!("Applied migrations:"); + if applied_migration_names.is_empty() { + println!("{}", EM_DASH); + } else { + applied_migration_names + .iter() + .rev() + .for_each(|name| println!("{}", name)); + } +} + +fn show_pending_migrations(pending_migrations: &[Migration]) { println!("Pending migrations:"); if pending_migrations.is_empty() { println!("{}", EM_DASH); @@ -46,6 +58,4 @@ pub(crate) fn print_migration_lists(config: Config) -> StdResult<()> { println!("{}", m.name()); }); } - - Ok(()) } diff --git a/migra-cli/src/commands/upgrade.rs b/migra-cli/src/commands/upgrade.rs index 17fe2d8..16c6ff5 100644 --- a/migra-cli/src/commands/upgrade.rs +++ b/migra-cli/src/commands/upgrade.rs @@ -1,24 +1,21 @@ -use crate::database::PostgresConnection; -use crate::migration::{Migration, Upgrade}; +use crate::database::{DatabaseConnection, PostgresConnection}; +use crate::migration::Migration; +use crate::migration::{filter_pending_migrations, DatabaseMigrationManager, MigrationManager}; use crate::Config; use crate::StdResult; use std::convert::TryFrom; pub(crate) fn upgrade_pending_migrations(config: Config) -> StdResult<()> { - let mut connection = PostgresConnection::try_from(&config)?; + let mut manager = MigrationManager::new(PostgresConnection::try_from(&config)?); - let applied_migration_names = connection.applied_migration_names()?; + let applied_migration_names = manager.applied_migration_names()?; let migrations = config.migrations()?; if is_up_to_date_migrations(&migrations, &applied_migration_names) { println!("Up to date"); } else { let pending_migrations = filter_pending_migrations(migrations, &applied_migration_names); - - for migration in pending_migrations.iter() { - println!("upgrade {}...", migration.name()); - migration.upgrade(&mut connection)?; - } + upgrade_all_pending_migrations(manager, &pending_migrations)?; } Ok(()) @@ -28,12 +25,18 @@ fn is_up_to_date_migrations(migrations: &[Migration], applied_migration_names: & migrations.is_empty() || migrations.last().map(|m| m.name()) == applied_migration_names.first() } -fn filter_pending_migrations( - migrations: Vec, - applied_migration_names: &[String], -) -> Vec { - migrations - .into_iter() - .filter(|m| !applied_migration_names.contains(m.name())) - .collect() +fn upgrade_all_pending_migrations( + mut manager: ManagerT, + pending_migrations: &[Migration], +) -> StdResult<()> +where + Conn: DatabaseConnection, + ManagerT: Sized + DatabaseMigrationManager, +{ + for migration in pending_migrations.iter() { + println!("upgrade {}...", migration.name()); + manager.upgrade(migration)?; + } + + Ok(()) } diff --git a/migra-cli/src/database.rs b/migra-cli/src/database.rs index e66332a..e508aeb 100644 --- a/migra-cli/src/database.rs +++ b/migra-cli/src/database.rs @@ -1,8 +1,41 @@ use crate::config::Config; use crate::StdResult; -use postgres::{Client, Error, NoTls}; +use postgres::{Client, NoTls}; use std::convert::TryFrom; +pub trait ToSql { + fn to_sql(&self) -> String; +} + +impl ToSql for &str { + fn to_sql(&self) -> String { + format!(r#""{}""#, self) + } +} + +pub trait TryFromSql: Sized { + fn try_from_sql(row: QueryResultRow) -> StdResult; +} + +pub trait DatabaseConnection: Sized { + type QueryResultRow; + type QueryResult; + + fn open(connection_string: &str) -> StdResult; + + fn batch_execute(&mut self, query: &str) -> StdResult<()>; + + fn execute<'b>(&mut self, query: &str, params: &'b [&'b dyn ToSql]) -> StdResult; + + fn query<'b, OutputItem>( + &mut self, + query: &str, + params: &'b [&'b dyn ToSql], + ) -> StdResult> + where + OutputItem: ?Sized + TryFromSql; +} + pub struct PostgresConnection { client: Client, } @@ -15,54 +48,54 @@ impl TryFrom<&Config> for PostgresConnection { } } -impl PostgresConnection { - pub fn open(connection_string: &str) -> StdResult { +impl DatabaseConnection for PostgresConnection { + type QueryResultRow = postgres::Row; + type QueryResult = Vec; + + fn open(connection_string: &str) -> StdResult { let client = Client::connect(connection_string, NoTls)?; Ok(PostgresConnection { client }) } -} -pub fn is_migrations_table_not_found(e: &Error) -> bool { - e.to_string() - .contains(r#"relation "migrations" does not exist"#) -} - -impl PostgresConnection { - pub fn apply_sql(&mut self, sql_content: &str) -> Result<(), Error> { - self.client.batch_execute(sql_content) + fn batch_execute(&mut self, query: &str) -> StdResult<()> { + self.client.batch_execute(query)?; + Ok(()) } - pub fn applied_migration_names(&mut self) -> Result, Error> { - let res = self - .client - .query("SELECT name FROM migrations ORDER BY id DESC", &[]) - .or_else(|e| { - if is_migrations_table_not_found(&e) { - Ok(Vec::new()) - } else { - Err(e) - } - })?; + fn execute<'b>(&mut self, query: &str, params: &'b [&'b dyn ToSql]) -> StdResult { + let stmt = params + .iter() + .enumerate() + .fold(query.to_string(), |acc, (i, p)| { + str::replace(&acc, &format!("${}", i), &p.to_sql()) + }); - Ok(res.into_iter().map(|row| row.get(0)).collect()) + let res = self.client.execute(stmt.as_str(), &[])?; + Ok(res) } - pub fn create_migrations_table(&mut self) -> Result<(), Error> { - self.apply_sql( - r#"CREATE TABLE IF NOT EXISTS migrations ( - id serial PRIMARY KEY, - name text NOT NULL UNIQUE - )"#, - ) - } + fn query<'b, OutputItem>( + &mut self, + query: &str, + params: &'b [&'b dyn ToSql], + ) -> StdResult> + where + OutputItem: ?Sized + TryFromSql, + { + let stmt = params + .iter() + .enumerate() + .fold(query.to_string(), |acc, (i, p)| { + str::replace(&acc, &format!("${}", i), &p.to_sql()) + }); - pub fn insert_migration_info(&mut self, name: &str) -> Result { - self.client - .execute("INSERT INTO migrations (name) VALUES ($1)", &[&name]) - } + let res: Self::QueryResult = self.client.query(stmt.as_str(), &[])?; - pub fn delete_migration_info(&mut self, name: &str) -> Result { - self.client - .execute("DELETE FROM migrations WHERE name = $1", &[&name]) + let res = res + .into_iter() + .map(OutputItem::try_from_sql) + .collect::, _>>()?; + + Ok(res) } } diff --git a/migra-cli/src/migration.rs b/migra-cli/src/migration.rs index ff53640..a509826 100644 --- a/migra-cli/src/migration.rs +++ b/migra-cli/src/migration.rs @@ -1,17 +1,10 @@ -use crate::database::PostgresConnection; +use crate::database::TryFromSql; +use crate::database::{DatabaseConnection, PostgresConnection}; use crate::path::PathBuilder; use crate::StdResult; use std::fs; use std::path::PathBuf; -pub trait Upgrade { - fn upgrade(&self, connection: &mut PostgresConnection) -> StdResult<()>; -} - -pub trait Downgrade { - fn downgrade(&self, connection: &mut PostgresConnection) -> StdResult<()>; -} - #[derive(Debug)] pub struct Migration { upgrade_sql: PathBuf, @@ -46,27 +39,122 @@ impl Migration { pub fn name(&self) -> &String { &self.name } -} -impl Upgrade for Migration { - fn upgrade(&self, connection: &mut PostgresConnection) -> StdResult<()> { + fn upgrade_sql_content(&self) -> StdResult { let content = fs::read_to_string(&self.upgrade_sql)?; - - connection.create_migrations_table()?; - connection.apply_sql(&content)?; - connection.insert_migration_info(self.name())?; - - Ok(()) + Ok(content) } -} -impl Downgrade for Migration { - fn downgrade(&self, connection: &mut PostgresConnection) -> StdResult<()> { + fn downgrade_sql_content(&self) -> StdResult { let content = fs::read_to_string(&self.downgrade_sql)?; + Ok(content) + } +} - connection.apply_sql(&content)?; - connection.delete_migration_info(self.name())?; +pub struct MigrationManager { + conn: Conn, +} + +impl MigrationManager { + pub fn new(conn: Conn) -> Self { + MigrationManager { conn } + } +} + +pub fn is_migrations_table_not_found(error: D) -> bool { + error + .to_string() + .contains(r#"relation "migrations" does not exist"#) +} + +impl TryFromSql for String { + fn try_from_sql(row: postgres::Row) -> StdResult { + let res: String = row.get(0); + Ok(res) + } +} + +pub trait DatabaseMigrationManager { + const CREATE_MIGRATIONS_STMT: &'static str = r#" + CREATE TABLE IF NOT EXISTS migrations ( + id serial PRIMARY KEY, + name text NOT NULL UNIQUE + ) + "#; + + const INSERT_MIGRATION_STMT: &'static str = "INSERT INTO migrations (name) VALUES ($1)"; + + const DELETE_MIGRATION_STMT: &'static str = "DELETE FROM migrations WHERE name = $1"; + + fn apply_sql(&mut self, sql_content: &str) -> StdResult<()>; + + fn applied_migration_names(&mut self) -> StdResult>; + + fn create_migrations_table(&mut self) -> StdResult<()>; + + fn insert_migration_info(&mut self, name: &str) -> StdResult; + + fn delete_migration_info(&mut self, name: &str) -> StdResult; + + fn upgrade(&mut self, migration: &Migration) -> StdResult<()> { + let content = migration.upgrade_sql_content()?; + + self.create_migrations_table()?; + self.apply_sql(&content)?; + self.insert_migration_info(migration.name())?; + + Ok(()) + } + + fn downgrade(&mut self, migration: &Migration) -> StdResult<()> { + let content = migration.downgrade_sql_content()?; + + self.apply_sql(&content)?; + self.delete_migration_info(migration.name())?; Ok(()) } } + +impl DatabaseMigrationManager for MigrationManager { + fn apply_sql(&mut self, sql_content: &str) -> StdResult<()> { + self.conn.batch_execute(sql_content) + } + + fn applied_migration_names(&mut self) -> StdResult> { + let res = self + .conn + .query("SELECT name FROM migrations ORDER BY id DESC", &[]) + .or_else(|e| { + if is_migrations_table_not_found(&e) { + Ok(Vec::new()) + } else { + Err(e) + } + })?; + + Ok(res.into_iter().collect()) + } + + fn create_migrations_table(&mut self) -> StdResult<()> { + self.conn.batch_execute(Self::CREATE_MIGRATIONS_STMT) + } + + fn insert_migration_info(&mut self, name: &str) -> StdResult { + self.conn.execute(Self::INSERT_MIGRATION_STMT, &[&name]) + } + + fn delete_migration_info(&mut self, name: &str) -> StdResult { + self.conn.execute(Self::DELETE_MIGRATION_STMT, &[&name]) + } +} + +pub fn filter_pending_migrations( + migrations: Vec, + applied_migration_names: &[String], +) -> Vec { + migrations + .into_iter() + .filter(|m| !applied_migration_names.contains(m.name())) + .collect() +}