Archived
1
0
Fork 0

refac: add migration manager

This commit is contained in:
Dmitriy Pleshevskiy 2021-02-15 13:06:09 +03:00
parent 1b16aff6e5
commit 109c9ce52f
6 changed files with 243 additions and 106 deletions

View file

@ -1,12 +1,14 @@
use crate::config::Config; use crate::config::Config;
use crate::database::PostgresConnection; use crate::database::PostgresConnection;
use crate::migration::{DatabaseMigrationManager, MigrationManager};
use crate::opts::ApplyCommandOpt; use crate::opts::ApplyCommandOpt;
use crate::path::PathBuilder; use crate::path::PathBuilder;
use crate::StdResult; use crate::StdResult;
use std::convert::TryFrom; use std::convert::TryFrom;
pub(crate) fn apply_sql(config: Config, opts: ApplyCommandOpt) -> StdResult<()> { pub(crate) fn apply_sql(config: Config, opts: ApplyCommandOpt) -> StdResult<()> {
let mut connection = PostgresConnection::try_from(&config)?; let connection = PostgresConnection::try_from(&config)?;
let mut manager = MigrationManager::new(connection);
let file_path = PathBuilder::from(config.directory_path()) let file_path = PathBuilder::from(config.directory_path())
.append(opts.file_name) .append(opts.file_name)
@ -15,7 +17,7 @@ pub(crate) fn apply_sql(config: Config, opts: ApplyCommandOpt) -> StdResult<()>
let content = std::fs::read_to_string(file_path)?; let content = std::fs::read_to_string(file_path)?;
match connection.apply_sql(&content) { match manager.apply_sql(&content) {
Ok(_) => { Ok(_) => {
println!("File was applied successfully"); println!("File was applied successfully");
} }

View file

@ -1,13 +1,14 @@
use crate::config::Config; use crate::config::Config;
use crate::database::PostgresConnection; use crate::database::PostgresConnection;
use crate::migration::Downgrade; use crate::migration::{DatabaseMigrationManager, MigrationManager};
use crate::StdResult; use crate::StdResult;
use std::convert::TryFrom; use std::convert::TryFrom;
pub(crate) fn downgrade_applied_migrations(config: Config) -> StdResult<()> { pub(crate) fn downgrade_applied_migrations(config: Config) -> StdResult<()> {
let mut connection = PostgresConnection::try_from(&config)?; let connection = PostgresConnection::try_from(&config)?;
let mut manager = MigrationManager::new(connection);
let applied_migrations = connection.applied_migration_names()?; let applied_migrations = manager.applied_migration_names()?;
let migrations = config.migrations()?; let migrations = config.migrations()?;
if let Some(first_applied_migration) = applied_migrations.first() { if let Some(first_applied_migration) = applied_migrations.first() {
@ -16,7 +17,7 @@ pub(crate) fn downgrade_applied_migrations(config: Config) -> StdResult<()> {
.find(|m| m.name() == first_applied_migration) .find(|m| m.name() == first_applied_migration)
{ {
println!("downgrade {}...", migration.name()); println!("downgrade {}...", migration.name());
migration.downgrade(&mut connection)?; manager.downgrade(&migration)?;
} }
} }

View file

@ -1,26 +1,23 @@
use crate::config::Config; use crate::config::Config;
use crate::database::PostgresConnection; use crate::database::{DatabaseConnection, PostgresConnection};
use crate::error::{ErrorKind, StdResult}; use crate::error::{ErrorKind, StdResult};
use crate::migration::{
filter_pending_migrations, DatabaseMigrationManager, Migration, MigrationManager,
};
const EM_DASH: char = '—'; const EM_DASH: char = '—';
pub(crate) fn print_migration_lists(config: Config) -> StdResult<()> { pub(crate) fn print_migration_lists(config: Config) -> StdResult<()> {
let applied_migrations = match config.database_connection_string() { let applied_migration_names = match config.database_connection_string() {
Ok(ref database_connection_string) => { Ok(ref database_connection_string) => {
let mut connection = PostgresConnection::open(database_connection_string)?; let connection = PostgresConnection::open(database_connection_string)?;
let applied_migrations = connection.applied_migration_names()?; let mut manager = MigrationManager::new(connection);
println!("Applied migrations:"); let applied_migration_names = manager.applied_migration_names()?;
if applied_migrations.is_empty() {
println!("{}", EM_DASH);
} else {
applied_migrations
.iter()
.rev()
.for_each(|name| println!("{}", name));
}
applied_migrations show_applied_migrations(&applied_migration_names);
applied_migration_names
} }
Err(e) if *e.kind() == ErrorKind::MissedEnvVar(String::new()) => { Err(e) if *e.kind() == ErrorKind::MissedEnvVar(String::new()) => {
println!("{}", e.kind()); println!("{}", e.kind());
@ -33,11 +30,26 @@ pub(crate) fn print_migration_lists(config: Config) -> StdResult<()> {
println!(); println!();
let pending_migrations = config let pending_migrations =
.migrations()? filter_pending_migrations(config.migrations()?, &applied_migration_names);
.into_iter() show_pending_migrations(&pending_migrations);
.filter(|m| !applied_migrations.contains(m.name()))
.collect::<Vec<_>>(); Ok(())
}
fn show_applied_migrations(applied_migration_names: &[String]) {
println!("Applied migrations:");
if applied_migration_names.is_empty() {
println!("{}", EM_DASH);
} else {
applied_migration_names
.iter()
.rev()
.for_each(|name| println!("{}", name));
}
}
fn show_pending_migrations(pending_migrations: &[Migration]) {
println!("Pending migrations:"); println!("Pending migrations:");
if pending_migrations.is_empty() { if pending_migrations.is_empty() {
println!("{}", EM_DASH); println!("{}", EM_DASH);
@ -46,6 +58,4 @@ pub(crate) fn print_migration_lists(config: Config) -> StdResult<()> {
println!("{}", m.name()); println!("{}", m.name());
}); });
} }
Ok(())
} }

View file

@ -1,24 +1,21 @@
use crate::database::PostgresConnection; use crate::database::{DatabaseConnection, PostgresConnection};
use crate::migration::{Migration, Upgrade}; use crate::migration::Migration;
use crate::migration::{filter_pending_migrations, DatabaseMigrationManager, MigrationManager};
use crate::Config; use crate::Config;
use crate::StdResult; use crate::StdResult;
use std::convert::TryFrom; use std::convert::TryFrom;
pub(crate) fn upgrade_pending_migrations(config: Config) -> StdResult<()> { pub(crate) fn upgrade_pending_migrations(config: Config) -> StdResult<()> {
let mut connection = PostgresConnection::try_from(&config)?; let mut manager = MigrationManager::new(PostgresConnection::try_from(&config)?);
let applied_migration_names = connection.applied_migration_names()?; let applied_migration_names = manager.applied_migration_names()?;
let migrations = config.migrations()?; let migrations = config.migrations()?;
if is_up_to_date_migrations(&migrations, &applied_migration_names) { if is_up_to_date_migrations(&migrations, &applied_migration_names) {
println!("Up to date"); println!("Up to date");
} else { } else {
let pending_migrations = filter_pending_migrations(migrations, &applied_migration_names); let pending_migrations = filter_pending_migrations(migrations, &applied_migration_names);
upgrade_all_pending_migrations(manager, &pending_migrations)?;
for migration in pending_migrations.iter() {
println!("upgrade {}...", migration.name());
migration.upgrade(&mut connection)?;
}
} }
Ok(()) Ok(())
@ -28,12 +25,18 @@ fn is_up_to_date_migrations(migrations: &[Migration], applied_migration_names: &
migrations.is_empty() || migrations.last().map(|m| m.name()) == applied_migration_names.first() migrations.is_empty() || migrations.last().map(|m| m.name()) == applied_migration_names.first()
} }
fn filter_pending_migrations( fn upgrade_all_pending_migrations<Conn, ManagerT>(
migrations: Vec<Migration>, mut manager: ManagerT,
applied_migration_names: &[String], pending_migrations: &[Migration],
) -> Vec<Migration> { ) -> StdResult<()>
migrations where
.into_iter() Conn: DatabaseConnection,
.filter(|m| !applied_migration_names.contains(m.name())) ManagerT: Sized + DatabaseMigrationManager<Conn>,
.collect() {
for migration in pending_migrations.iter() {
println!("upgrade {}...", migration.name());
manager.upgrade(migration)?;
}
Ok(())
} }

View file

@ -1,8 +1,41 @@
use crate::config::Config; use crate::config::Config;
use crate::StdResult; use crate::StdResult;
use postgres::{Client, Error, NoTls}; use postgres::{Client, NoTls};
use std::convert::TryFrom; use std::convert::TryFrom;
pub trait ToSql {
fn to_sql(&self) -> String;
}
impl ToSql for &str {
fn to_sql(&self) -> String {
format!(r#""{}""#, self)
}
}
pub trait TryFromSql<QueryResultRow>: Sized {
fn try_from_sql(row: QueryResultRow) -> StdResult<Self>;
}
pub trait DatabaseConnection: Sized {
type QueryResultRow;
type QueryResult;
fn open(connection_string: &str) -> StdResult<Self>;
fn batch_execute(&mut self, query: &str) -> StdResult<()>;
fn execute<'b>(&mut self, query: &str, params: &'b [&'b dyn ToSql]) -> StdResult<u64>;
fn query<'b, OutputItem>(
&mut self,
query: &str,
params: &'b [&'b dyn ToSql],
) -> StdResult<Vec<OutputItem>>
where
OutputItem: ?Sized + TryFromSql<Self::QueryResultRow>;
}
pub struct PostgresConnection { pub struct PostgresConnection {
client: Client, client: Client,
} }
@ -15,54 +48,54 @@ impl TryFrom<&Config> for PostgresConnection {
} }
} }
impl PostgresConnection { impl DatabaseConnection for PostgresConnection {
pub fn open(connection_string: &str) -> StdResult<PostgresConnection> { type QueryResultRow = postgres::Row;
type QueryResult = Vec<Self::QueryResultRow>;
fn open(connection_string: &str) -> StdResult<Self> {
let client = Client::connect(connection_string, NoTls)?; let client = Client::connect(connection_string, NoTls)?;
Ok(PostgresConnection { client }) Ok(PostgresConnection { client })
} }
}
pub fn is_migrations_table_not_found(e: &Error) -> bool { fn batch_execute(&mut self, query: &str) -> StdResult<()> {
e.to_string() self.client.batch_execute(query)?;
.contains(r#"relation "migrations" does not exist"#) Ok(())
}
impl PostgresConnection {
pub fn apply_sql(&mut self, sql_content: &str) -> Result<(), Error> {
self.client.batch_execute(sql_content)
} }
pub fn applied_migration_names(&mut self) -> Result<Vec<String>, Error> { fn execute<'b>(&mut self, query: &str, params: &'b [&'b dyn ToSql]) -> StdResult<u64> {
let res = self let stmt = params
.client .iter()
.query("SELECT name FROM migrations ORDER BY id DESC", &[]) .enumerate()
.or_else(|e| { .fold(query.to_string(), |acc, (i, p)| {
if is_migrations_table_not_found(&e) { str::replace(&acc, &format!("${}", i), &p.to_sql())
Ok(Vec::new()) });
} else {
Err(e)
}
})?;
Ok(res.into_iter().map(|row| row.get(0)).collect()) let res = self.client.execute(stmt.as_str(), &[])?;
Ok(res)
} }
pub fn create_migrations_table(&mut self) -> Result<(), Error> { fn query<'b, OutputItem>(
self.apply_sql( &mut self,
r#"CREATE TABLE IF NOT EXISTS migrations ( query: &str,
id serial PRIMARY KEY, params: &'b [&'b dyn ToSql],
name text NOT NULL UNIQUE ) -> StdResult<Vec<OutputItem>>
)"#, where
) OutputItem: ?Sized + TryFromSql<Self::QueryResultRow>,
} {
let stmt = params
.iter()
.enumerate()
.fold(query.to_string(), |acc, (i, p)| {
str::replace(&acc, &format!("${}", i), &p.to_sql())
});
pub fn insert_migration_info(&mut self, name: &str) -> Result<u64, Error> { let res: Self::QueryResult = self.client.query(stmt.as_str(), &[])?;
self.client
.execute("INSERT INTO migrations (name) VALUES ($1)", &[&name])
}
pub fn delete_migration_info(&mut self, name: &str) -> Result<u64, Error> { let res = res
self.client .into_iter()
.execute("DELETE FROM migrations WHERE name = $1", &[&name]) .map(OutputItem::try_from_sql)
.collect::<Result<Vec<OutputItem>, _>>()?;
Ok(res)
} }
} }

View file

@ -1,17 +1,10 @@
use crate::database::PostgresConnection; use crate::database::TryFromSql;
use crate::database::{DatabaseConnection, PostgresConnection};
use crate::path::PathBuilder; use crate::path::PathBuilder;
use crate::StdResult; use crate::StdResult;
use std::fs; use std::fs;
use std::path::PathBuf; use std::path::PathBuf;
pub trait Upgrade {
fn upgrade(&self, connection: &mut PostgresConnection) -> StdResult<()>;
}
pub trait Downgrade {
fn downgrade(&self, connection: &mut PostgresConnection) -> StdResult<()>;
}
#[derive(Debug)] #[derive(Debug)]
pub struct Migration { pub struct Migration {
upgrade_sql: PathBuf, upgrade_sql: PathBuf,
@ -46,27 +39,122 @@ impl Migration {
pub fn name(&self) -> &String { pub fn name(&self) -> &String {
&self.name &self.name
} }
}
impl Upgrade for Migration { fn upgrade_sql_content(&self) -> StdResult<String> {
fn upgrade(&self, connection: &mut PostgresConnection) -> StdResult<()> {
let content = fs::read_to_string(&self.upgrade_sql)?; let content = fs::read_to_string(&self.upgrade_sql)?;
Ok(content)
connection.create_migrations_table()?;
connection.apply_sql(&content)?;
connection.insert_migration_info(self.name())?;
Ok(())
} }
}
impl Downgrade for Migration { fn downgrade_sql_content(&self) -> StdResult<String> {
fn downgrade(&self, connection: &mut PostgresConnection) -> StdResult<()> {
let content = fs::read_to_string(&self.downgrade_sql)?; let content = fs::read_to_string(&self.downgrade_sql)?;
Ok(content)
}
}
connection.apply_sql(&content)?; pub struct MigrationManager<Conn: DatabaseConnection> {
connection.delete_migration_info(self.name())?; conn: Conn,
}
impl<Conn: DatabaseConnection> MigrationManager<Conn> {
pub fn new(conn: Conn) -> Self {
MigrationManager { conn }
}
}
pub fn is_migrations_table_not_found<D: std::fmt::Display>(error: D) -> bool {
error
.to_string()
.contains(r#"relation "migrations" does not exist"#)
}
impl TryFromSql<postgres::Row> for String {
fn try_from_sql(row: postgres::Row) -> StdResult<Self> {
let res: String = row.get(0);
Ok(res)
}
}
pub trait DatabaseMigrationManager<Conn: DatabaseConnection> {
const CREATE_MIGRATIONS_STMT: &'static str = r#"
CREATE TABLE IF NOT EXISTS migrations (
id serial PRIMARY KEY,
name text NOT NULL UNIQUE
)
"#;
const INSERT_MIGRATION_STMT: &'static str = "INSERT INTO migrations (name) VALUES ($1)";
const DELETE_MIGRATION_STMT: &'static str = "DELETE FROM migrations WHERE name = $1";
fn apply_sql(&mut self, sql_content: &str) -> StdResult<()>;
fn applied_migration_names(&mut self) -> StdResult<Vec<String>>;
fn create_migrations_table(&mut self) -> StdResult<()>;
fn insert_migration_info(&mut self, name: &str) -> StdResult<u64>;
fn delete_migration_info(&mut self, name: &str) -> StdResult<u64>;
fn upgrade(&mut self, migration: &Migration) -> StdResult<()> {
let content = migration.upgrade_sql_content()?;
self.create_migrations_table()?;
self.apply_sql(&content)?;
self.insert_migration_info(migration.name())?;
Ok(())
}
fn downgrade(&mut self, migration: &Migration) -> StdResult<()> {
let content = migration.downgrade_sql_content()?;
self.apply_sql(&content)?;
self.delete_migration_info(migration.name())?;
Ok(()) Ok(())
} }
} }
impl DatabaseMigrationManager<PostgresConnection> for MigrationManager<PostgresConnection> {
fn apply_sql(&mut self, sql_content: &str) -> StdResult<()> {
self.conn.batch_execute(sql_content)
}
fn applied_migration_names(&mut self) -> StdResult<Vec<String>> {
let res = self
.conn
.query("SELECT name FROM migrations ORDER BY id DESC", &[])
.or_else(|e| {
if is_migrations_table_not_found(&e) {
Ok(Vec::new())
} else {
Err(e)
}
})?;
Ok(res.into_iter().collect())
}
fn create_migrations_table(&mut self) -> StdResult<()> {
self.conn.batch_execute(Self::CREATE_MIGRATIONS_STMT)
}
fn insert_migration_info(&mut self, name: &str) -> StdResult<u64> {
self.conn.execute(Self::INSERT_MIGRATION_STMT, &[&name])
}
fn delete_migration_info(&mut self, name: &str) -> StdResult<u64> {
self.conn.execute(Self::DELETE_MIGRATION_STMT, &[&name])
}
}
pub fn filter_pending_migrations(
migrations: Vec<Migration>,
applied_migration_names: &[String],
) -> Vec<Migration> {
migrations
.into_iter()
.filter(|m| !applied_migration_names.contains(m.name()))
.collect()
}