Archived
1
0
Fork 0

refac(cli): use migra core for cli

This commit is contained in:
Dmitriy Pleshevskiy 2021-06-06 01:25:45 +03:00
parent c71e8fe33f
commit 44f7ca30e8
27 changed files with 409 additions and 519 deletions

View file

@ -1,6 +1,5 @@
[workspace] [workspace]
members = [ members = [
"migra", "migra",
"migra_clients", "migra_cli",
# "migra_cli"
] ]

View file

@ -6,4 +6,11 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["postgres"]
sqlite = ["rusqlite"]
[dependencies] [dependencies]
postgres = { version = "0.19", optional = true }
mysql = { version = "20.1", optional = true }
rusqlite = { version = "0.25", optional = true }

37
migra/src/clients/mod.rs Normal file
View file

@ -0,0 +1,37 @@
// #![deny(missing_debug_implementations)]
// #![deny(clippy::all, clippy::pedantic)]
// #![allow(clippy::module_name_repetitions)]
// #![allow(clippy::missing_errors_doc)]
use crate::error::MigraResult;
use crate::managers::{ManageMigrations, ManageTransaction};
pub trait OpenDatabaseConnection
where
Self: Sized,
{
fn new(connection_string: &str) -> MigraResult<Self> {
Self::manual(connection_string, "migrations")
}
fn manual(connection_string: &str, migrations_table_name: &str) -> MigraResult<Self>;
}
pub trait Client: ManageMigrations + ManageTransaction {}
pub type AnyClient = Box<dyn Client>;
#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "postgres")]
pub use self::postgres::Client as PostgresClient;
#[cfg(feature = "mysql")]
pub mod mysql;
#[cfg(feature = "mysql")]
pub use self::mysql::Client as MysqlClient;
#[cfg(feature = "sqlite")]
pub mod sqlite;
#[cfg(feature = "sqlite")]
pub use self::sqlite::Client as SqliteClient;

View file

@ -1,38 +1,39 @@
use crate::OpenDatabaseConnection; use super::OpenDatabaseConnection;
use migra::managers::{BatchExecute, ManageMigrations, ManageTransaction}; use crate::error::{Error, MigraResult, StdResult};
use migra::migration; use crate::managers::{BatchExecute, ManageMigrations, ManageTransaction};
use crate::migration;
use mysql::prelude::*; use mysql::prelude::*;
use mysql::{Pool, PooledConn}; use mysql::{Pool, PooledConn};
#[derive(Debug)] #[derive(Debug)]
pub struct MySqlClient { pub struct Client {
conn: PooledConn, conn: PooledConn,
migrations_table_name: String, migrations_table_name: String,
} }
impl OpenDatabaseConnection for MySqlClient { impl OpenDatabaseConnection for Client {
fn manual(connection_string: &str, migrations_table_name: &str) -> migra::Result<Self> { fn manual(connection_string: &str, migrations_table_name: &str) -> MigraResult<Self> {
let conn = Pool::new_manual(1, 1, connection_string) let conn = Pool::new_manual(1, 1, connection_string)
.and_then(|pool| pool.get_conn()) .and_then(|pool| pool.get_conn())
.map_err(|_| migra::Error::FailedDatabaseConnection)?; .map_err(|_| Error::FailedDatabaseConnection)?;
Ok(MySqlClient { Ok(Client {
conn, conn,
migrations_table_name: migrations_table_name.to_owned(), migrations_table_name: migrations_table_name.to_owned(),
}) })
} }
} }
impl BatchExecute for MySqlClient { impl BatchExecute for Client {
fn batch_execute(&mut self, sql: &str) -> migra::StdResult<()> { fn batch_execute(&mut self, sql: &str) -> StdResult<()> {
self.conn.query_drop(sql).map_err(From::from) self.conn.query_drop(sql).map_err(From::from)
} }
} }
impl ManageTransaction for MySqlClient {} impl ManageTransaction for Client {}
impl ManageMigrations for MySqlClient { impl ManageMigrations for Client {
fn create_migrations_table(&mut self) -> migra::Result<()> { fn create_migrations_table(&mut self) -> MigraResult<()> {
let stmt = format!( let stmt = format!(
r#"CREATE TABLE IF NOT EXISTS {} ( r#"CREATE TABLE IF NOT EXISTS {} (
id int AUTO_INCREMENT PRIMARY KEY, id int AUTO_INCREMENT PRIMARY KEY,
@ -42,10 +43,10 @@ impl ManageMigrations for MySqlClient {
); );
self.batch_execute(&stmt) self.batch_execute(&stmt)
.map_err(|_| migra::Error::FailedCreateMigrationsTable) .map_err(|_| Error::FailedCreateMigrationsTable)
} }
fn insert_migration(&mut self, name: &str) -> migra::Result<u64> { fn insert_migration(&mut self, name: &str) -> MigraResult<u64> {
let stmt = format!( let stmt = format!(
"INSERT INTO {} (name) VALUES ($1)", "INSERT INTO {} (name) VALUES ($1)",
&self.migrations_table_name &self.migrations_table_name
@ -54,10 +55,10 @@ impl ManageMigrations for MySqlClient {
self.conn self.conn
.exec_first(&stmt, (name,)) .exec_first(&stmt, (name,))
.map(Option::unwrap_or_default) .map(Option::unwrap_or_default)
.map_err(|_| migra::Error::FailedInsertMigration) .map_err(|_| Error::FailedInsertMigration)
} }
fn delete_migration(&mut self, name: &str) -> migra::Result<u64> { fn delete_migration(&mut self, name: &str) -> MigraResult<u64> {
let stmt = format!( let stmt = format!(
"DELETE FROM {} WHERE name = $1", "DELETE FROM {} WHERE name = $1",
&self.migrations_table_name &self.migrations_table_name
@ -66,15 +67,17 @@ impl ManageMigrations for MySqlClient {
self.conn self.conn
.exec_first(&stmt, (name,)) .exec_first(&stmt, (name,))
.map(Option::unwrap_or_default) .map(Option::unwrap_or_default)
.map_err(|_| migra::Error::FailedDeleteMigration) .map_err(|_| Error::FailedDeleteMigration)
} }
fn applied_migrations(&mut self) -> migra::Result<migration::List> { fn applied_migrations(&mut self) -> MigraResult<migration::List> {
let stmt = format!("SELECT name FROM {}", &self.migrations_table_name); let stmt = format!("SELECT name FROM {}", &self.migrations_table_name);
self.conn self.conn
.query::<String, _>(stmt) .query::<String, _>(stmt)
.map(From::from) .map(From::from)
.map_err(|_| migra::Error::FailedGetAppliedMigrations) .map_err(|_| Error::FailedGetAppliedMigrations)
} }
} }
impl super::Client for Client {}

View file

@ -1,43 +1,44 @@
use crate::OpenDatabaseConnection; use super::OpenDatabaseConnection;
use migra::managers::{BatchExecute, ManageMigrations, ManageTransaction}; use crate::error::{Error, MigraResult, StdResult};
use migra::migration; use crate::managers::{BatchExecute, ManageMigrations, ManageTransaction};
use postgres::{Client, NoTls}; use crate::migration;
use postgres::{Client as PostgresClient, NoTls};
use std::fmt; use std::fmt;
pub struct PostgresClient { pub struct Client {
client: Client, client: PostgresClient,
migrations_table_name: String, migrations_table_name: String,
} }
impl fmt::Debug for PostgresClient { impl fmt::Debug for Client {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("PostgresClient") fmt.debug_struct("Client")
.field("migrations_table_name", &self.migrations_table_name) .field("migrations_table_name", &self.migrations_table_name)
.finish() .finish()
} }
} }
impl OpenDatabaseConnection for PostgresClient { impl OpenDatabaseConnection for Client {
fn manual(connection_string: &str, migrations_table_name: &str) -> migra::Result<Self> { fn manual(connection_string: &str, migrations_table_name: &str) -> MigraResult<Self> {
let client = Client::connect(connection_string, NoTls) let client = PostgresClient::connect(connection_string, NoTls)
.map_err(|_| migra::Error::FailedDatabaseConnection)?; .map_err(|_| Error::FailedDatabaseConnection)?;
Ok(PostgresClient { Ok(Client {
client, client,
migrations_table_name: migrations_table_name.to_owned(), migrations_table_name: migrations_table_name.to_owned(),
}) })
} }
} }
impl BatchExecute for PostgresClient { impl BatchExecute for Client {
fn batch_execute(&mut self, sql: &str) -> migra::StdResult<()> { fn batch_execute(&mut self, sql: &str) -> StdResult<()> {
self.client.batch_execute(sql).map_err(From::from) self.client.batch_execute(sql).map_err(From::from)
} }
} }
impl ManageTransaction for PostgresClient {} impl ManageTransaction for Client {}
impl ManageMigrations for PostgresClient { impl ManageMigrations for Client {
fn create_migrations_table(&mut self) -> migra::Result<()> { fn create_migrations_table(&mut self) -> MigraResult<()> {
let stmt = format!( let stmt = format!(
r#"CREATE TABLE IF NOT EXISTS {} ( r#"CREATE TABLE IF NOT EXISTS {} (
id serial PRIMARY KEY, id serial PRIMARY KEY,
@ -47,10 +48,10 @@ impl ManageMigrations for PostgresClient {
); );
self.batch_execute(&stmt) self.batch_execute(&stmt)
.map_err(|_| migra::Error::FailedCreateMigrationsTable) .map_err(|_| Error::FailedCreateMigrationsTable)
} }
fn insert_migration(&mut self, name: &str) -> migra::Result<u64> { fn insert_migration(&mut self, name: &str) -> MigraResult<u64> {
let stmt = format!( let stmt = format!(
"INSERT INTO {} (name) VALUES ($1)", "INSERT INTO {} (name) VALUES ($1)",
&self.migrations_table_name &self.migrations_table_name
@ -58,10 +59,10 @@ impl ManageMigrations for PostgresClient {
self.client self.client
.execute(stmt.as_str(), &[&name]) .execute(stmt.as_str(), &[&name])
.map_err(|_| migra::Error::FailedInsertMigration) .map_err(|_| Error::FailedInsertMigration)
} }
fn delete_migration(&mut self, name: &str) -> migra::Result<u64> { fn delete_migration(&mut self, name: &str) -> MigraResult<u64> {
let stmt = format!( let stmt = format!(
"DELETE FROM {} WHERE name = $1", "DELETE FROM {} WHERE name = $1",
&self.migrations_table_name &self.migrations_table_name
@ -69,10 +70,10 @@ impl ManageMigrations for PostgresClient {
self.client self.client
.execute(stmt.as_str(), &[&name]) .execute(stmt.as_str(), &[&name])
.map_err(|_| migra::Error::FailedDeleteMigration) .map_err(|_| Error::FailedDeleteMigration)
} }
fn applied_migrations(&mut self) -> migra::Result<migration::List> { fn applied_migrations(&mut self) -> MigraResult<migration::List> {
let stmt = format!("SELECT name FROM {}", &self.migrations_table_name); let stmt = format!("SELECT name FROM {}", &self.migrations_table_name);
self.client self.client
@ -83,6 +84,8 @@ impl ManageMigrations for PostgresClient {
.collect::<Result<Vec<String>, _>>() .collect::<Result<Vec<String>, _>>()
}) })
.map(From::from) .map(From::from)
.map_err(|_| migra::Error::FailedGetAppliedMigrations) .map_err(|_| Error::FailedGetAppliedMigrations)
} }
} }
impl super::Client for Client {}

View file

@ -0,0 +1,84 @@
use super::OpenDatabaseConnection;
use crate::error::{Error, MigraResult, StdResult};
use crate::managers::{BatchExecute, ManageMigrations, ManageTransaction};
use crate::migration;
use rusqlite::Connection;
#[derive(Debug)]
pub struct Client {
conn: Connection,
migrations_table_name: String,
}
impl OpenDatabaseConnection for Client {
fn manual(connection_string: &str, migrations_table_name: &str) -> MigraResult<Self> {
let conn =
Connection::open(connection_string).map_err(|_| Error::FailedDatabaseConnection)?;
Ok(Client {
conn,
migrations_table_name: migrations_table_name.to_owned(),
})
}
}
impl BatchExecute for Client {
fn batch_execute(&mut self, sql: &str) -> StdResult<()> {
self.conn.execute_batch(sql).map_err(From::from)
}
}
impl ManageTransaction for Client {}
impl ManageMigrations for Client {
fn create_migrations_table(&mut self) -> MigraResult<()> {
let stmt = format!(
r#"CREATE TABLE IF NOT EXISTS {} (
id int AUTO_INCREMENT PRIMARY KEY,
name varchar(256) NOT NULL UNIQUE
)"#,
&self.migrations_table_name
);
self.batch_execute(&stmt)
.map_err(|_| Error::FailedCreateMigrationsTable)
}
fn insert_migration(&mut self, name: &str) -> MigraResult<u64> {
let stmt = format!(
"INSERT INTO {} (name) VALUES ($1)",
&self.migrations_table_name
);
self.conn
.execute(&stmt, [name])
.map(|res| res as u64)
.map_err(|_| Error::FailedInsertMigration)
}
fn delete_migration(&mut self, name: &str) -> MigraResult<u64> {
let stmt = format!(
"DELETE FROM {} WHERE name = $1",
&self.migrations_table_name
);
self.conn
.execute(&stmt, [name])
.map(|res| res as u64)
.map_err(|_| Error::FailedDeleteMigration)
}
fn applied_migrations(&mut self) -> MigraResult<migration::List> {
let stmt = format!("SELECT name FROM {}", &self.migrations_table_name);
self.conn
.prepare(&stmt)
.and_then(|mut stmt| {
stmt.query_map([], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()
})
.map(From::from)
.map_err(|_| Error::FailedGetAppliedMigrations)
}
}
impl super::Client for Client {}

View file

@ -33,3 +33,15 @@ pub fn get_all_migrations(dir_path: &Path) -> MigraResult<migration::List> {
Ok(migration::List::from(file_names)) Ok(migration::List::from(file_names))
} }
#[must_use]
pub fn filter_pending_migrations(
all_migrations: &migration::List,
applied_migrations: &migration::List,
) -> migration::List {
all_migrations
.clone()
.iter()
.filter(|m| !applied_migrations.contains(m))
.collect()
}

View file

@ -2,6 +2,8 @@
#![deny(clippy::all, clippy::pedantic)] #![deny(clippy::all, clippy::pedantic)]
#![allow(clippy::missing_errors_doc)] #![allow(clippy::missing_errors_doc)]
pub mod clients;
pub mod fs; pub mod fs;
pub mod managers; pub mod managers;
pub mod migration; pub mod migration;
@ -10,31 +12,3 @@ mod error;
pub use error::{Error, MigraResult as Result, StdResult}; pub use error::{Error, MigraResult as Result, StdResult};
pub use migration::Migration; pub use migration::Migration;
/*
# list
fs::get_all_migrations()
db::get_applied_migrations()
utils::filter_pending_migrations(all_migrations, applied_migrations)
show_migrations(applied_migrations)
show_migrations(pending_migrations)
# upgrade
fs::get_all_migrations()
db::get_applied_migrations()
utils::filter_pending_migrations(all_migrations, applied_migrations)
db::upgrade_migration()
# downgrade
*/

View file

@ -52,6 +52,36 @@ impl<T: AsRef<Path>> From<Vec<T>> for List {
} }
} }
impl From<Vec<Migration>> for List {
fn from(list: Vec<Migration>) -> Self {
List { inner: list }
}
}
impl FromIterator<Migration> for List {
fn from_iter<I: IntoIterator<Item = Migration>>(iter: I) -> Self {
let mut list = List::new();
for item in iter {
list.push(item);
}
list
}
}
impl<'a> FromIterator<&'a Migration> for List {
fn from_iter<I: IntoIterator<Item = &'a Migration>>(iter: I) -> Self {
let mut list = List::new();
for item in iter {
list.push(item.clone());
}
list
}
}
impl std::ops::Deref for List { impl std::ops::Deref for List {
type Target = Vec<Migration>; type Target = Vec<Migration>;
@ -112,30 +142,6 @@ impl List {
} }
} }
impl FromIterator<Migration> for List {
fn from_iter<I: IntoIterator<Item = Migration>>(iter: I) -> Self {
let mut list = List::new();
for item in iter {
list.push(item);
}
list
}
}
impl<'a> FromIterator<&'a Migration> for List {
fn from_iter<I: IntoIterator<Item = &'a Migration>>(iter: I) -> Self {
let mut list = List::new();
for item in iter {
list.push(item.clone());
}
list
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View file

@ -15,13 +15,12 @@ readme = "../README.md"
[features] [features]
default = ["postgres"] default = ["postgres"]
postgres = ["migra-clients/postgres"] postgres = ["migra/postgres"]
sqlite = ["migra-clients/sqlite"] sqlite = ["migra/sqlite"]
mysql = ["migra-clients/mysql"] mysql = ["migra/mysql"]
[dependencies] [dependencies]
migra = { version = "0", path = "../migra" } migra = { version = "0", path = "../migra" }
migra-clients = { version = "0", path = "../migra_clients" }
cfg-if = "1.0" cfg-if = "1.0"
structopt = "0.3" structopt = "0.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View file

@ -1,5 +1,5 @@
use crate::commands; use crate::commands;
use crate::error::*; use crate::error::{MigraResult, StdResult};
use crate::opts::Command; use crate::opts::Command;
use crate::AppOpt; use crate::AppOpt;
use crate::Config; use crate::Config;
@ -29,19 +29,19 @@ impl App {
Command::Init => { Command::Init => {
commands::initialize_migra_manifest(self)?; commands::initialize_migra_manifest(self)?;
} }
Command::Apply(cmd_opts) => { Command::Apply(ref cmd_opts) => {
commands::apply_sql(self, cmd_opts)?; commands::apply_sql(self, cmd_opts)?;
} }
Command::Make(cmd_opts) => { Command::Make(ref cmd_opts) => {
commands::make_migration(self, cmd_opts)?; commands::make_migration(self, cmd_opts)?;
} }
Command::List => { Command::List => {
commands::print_migration_lists(self)?; commands::print_migration_lists(self)?;
} }
Command::Upgrade(cmd_opts) => { Command::Upgrade(ref cmd_opts) => {
commands::upgrade_pending_migrations(self, cmd_opts)?; commands::upgrade_pending_migrations(self, cmd_opts)?;
} }
Command::Downgrade(cmd_opts) => { Command::Downgrade(ref cmd_opts) => {
commands::rollback_applied_migrations(self, cmd_opts)?; commands::rollback_applied_migrations(self, cmd_opts)?;
} }
Command::Completions(cmd_opts) => { Command::Completions(cmd_opts) => {

53
migra_cli/src/client.rs Normal file
View file

@ -0,0 +1,53 @@
use crate::config::SupportedDatabaseClient;
#[cfg(feature = "mysql")]
use migra::clients::MysqlClient;
#[cfg(feature = "postgres")]
use migra::clients::PostgresClient;
#[cfg(feature = "sqlite")]
use migra::clients::SqliteClient;
use migra::clients::{AnyClient, OpenDatabaseConnection};
pub fn create(
client_kind: &SupportedDatabaseClient,
connection_string: &str,
) -> migra::Result<AnyClient> {
let client: AnyClient = match client_kind {
#[cfg(feature = "postgres")]
SupportedDatabaseClient::Postgres => Box::new(PostgresClient::new(&connection_string)?),
#[cfg(feature = "mysql")]
SupportedDatabaseClient::Mysql => Box::new(MysqlClient::new(&connection_string)?),
#[cfg(feature = "sqlite")]
SupportedDatabaseClient::Sqlite => Box::new(SqliteClient::new(&connection_string)?),
};
Ok(client)
}
pub fn with_transaction<TrxFnMut, Res>(
client: &mut AnyClient,
trx_fn: &mut TrxFnMut,
) -> migra::Result<Res>
where
TrxFnMut: FnMut(&mut AnyClient) -> migra::Result<Res>,
{
client
.begin_transaction()
.and_then(|_| trx_fn(client))
.and_then(|res| client.commit_transaction().and(Ok(res)))
.or_else(|err| client.rollback_transaction().and(Err(err)))
}
pub fn maybe_with_transaction<TrxFnMut, Res>(
is_needed: bool,
client: &mut AnyClient,
trx_fn: &mut TrxFnMut,
) -> migra::Result<Res>
where
TrxFnMut: FnMut(&mut AnyClient) -> migra::Result<Res>,
{
if is_needed {
with_transaction(client, trx_fn)
} else {
trx_fn(client)
}
}

View file

@ -1,16 +1,14 @@
use crate::app::App; use crate::app::App;
use crate::database::prelude::*; use crate::client::maybe_with_transaction;
use crate::database::transaction::maybe_with_transaction;
use crate::database::{DatabaseConnectionManager, MigrationManager};
use crate::opts::ApplyCommandOpt; use crate::opts::ApplyCommandOpt;
use crate::StdResult; use crate::StdResult;
pub(crate) fn apply_sql(app: &App, cmd_opts: ApplyCommandOpt) -> StdResult<()> { pub(crate) fn apply_sql(app: &App, cmd_opts: &ApplyCommandOpt) -> StdResult<()> {
let config = app.config()?; let config = app.config()?;
let mut connection_manager = DatabaseConnectionManager::connect(&config.database)?; let mut client = crate::client::create(
let conn = connection_manager.connection(); &config.database.client(),
&config.database.connection_string()?,
let migration_manager = MigrationManager::from(&config); )?;
let file_contents = cmd_opts let file_contents = cmd_opts
.file_paths .file_paths
@ -28,15 +26,15 @@ pub(crate) fn apply_sql(app: &App, cmd_opts: ApplyCommandOpt) -> StdResult<()> {
maybe_with_transaction( maybe_with_transaction(
cmd_opts.transaction_opts.single_transaction, cmd_opts.transaction_opts.single_transaction,
conn, &mut client,
&mut |conn| { &mut |mut client| {
file_contents file_contents
.iter() .iter()
.try_for_each(|content| { .try_for_each(|content| {
maybe_with_transaction( maybe_with_transaction(
!cmd_opts.transaction_opts.single_transaction, !cmd_opts.transaction_opts.single_transaction,
conn, &mut client,
&mut |conn| migration_manager.apply_sql(conn, content), &mut |client| client.apply_sql(content),
) )
}) })
.map_err(From::from) .map_err(From::from)

View file

@ -1,19 +1,19 @@
use crate::app::App; use crate::app::App;
use crate::database::prelude::*; use crate::client;
use crate::database::transaction::maybe_with_transaction; use crate::client::maybe_with_transaction;
use crate::database::{DatabaseConnectionManager, MigrationManager};
use crate::opts::DowngradeCommandOpt; use crate::opts::DowngradeCommandOpt;
use crate::StdResult; use crate::StdResult;
use std::cmp; use std::cmp;
pub(crate) fn rollback_applied_migrations(app: &App, opts: DowngradeCommandOpt) -> StdResult<()> { pub(crate) fn rollback_applied_migrations(app: &App, opts: &DowngradeCommandOpt) -> StdResult<()> {
let config = app.config()?; let config = app.config()?;
let mut connection_manager = DatabaseConnectionManager::connect(&config.database)?; let mut client = client::create(
let conn = connection_manager.connection(); &config.database.client(),
let migration_manager = MigrationManager::from(&config); &config.database.connection_string()?,
)?;
let applied_migrations = migration_manager.applied_migration_names(conn)?; let applied_migrations = client.applied_migrations()?;
let migrations = config.migrations()?; let all_migrations = migra::fs::get_all_migrations(&config.migration_dir_path())?;
let rollback_migrations_number = if opts.all_migrations { let rollback_migrations_number = if opts.all_migrations {
applied_migrations.len() applied_migrations.len()
@ -23,18 +23,17 @@ pub(crate) fn rollback_applied_migrations(app: &App, opts: DowngradeCommandOpt)
maybe_with_transaction( maybe_with_transaction(
opts.transaction_opts.single_transaction, opts.transaction_opts.single_transaction,
conn, &mut client,
&mut |conn| { &mut |mut client| {
applied_migrations[..rollback_migrations_number] applied_migrations[..rollback_migrations_number]
.iter() .iter()
.try_for_each(|migration_name| { .try_for_each(|applied_migration| {
if let Some(migration) = migrations.iter().find(|m| m.name() == migration_name) if all_migrations.contains(applied_migration) {
{ println!("downgrade {}...", applied_migration.name());
println!("downgrade {}...", migration.name());
maybe_with_transaction( maybe_with_transaction(
!opts.transaction_opts.single_transaction, !opts.transaction_opts.single_transaction,
conn, &mut client,
&mut |conn| migration_manager.downgrade(conn, &migration), &mut |client| client.apply_downgrade_migration(&applied_migration),
) )
} else { } else {
Ok(()) Ok(())

View file

@ -4,18 +4,17 @@ use crate::StdResult;
use std::path::PathBuf; use std::path::PathBuf;
pub(crate) fn initialize_migra_manifest(app: &App) -> StdResult<()> { pub(crate) fn initialize_migra_manifest(app: &App) -> StdResult<()> {
let config_path = app let config_path = app.config_path().cloned().map_or_else(
.config_path() || PathBuf::from(MIGRA_TOML_FILENAME),
.cloned() |mut config_path| {
.map(|mut config_path| {
let ext = config_path.extension(); let ext = config_path.extension();
if config_path.is_dir() || ext.is_none() { if config_path.is_dir() || ext.is_none() {
config_path.push(MIGRA_TOML_FILENAME); config_path.push(MIGRA_TOML_FILENAME);
} }
config_path config_path
}) },
.unwrap_or_else(|| PathBuf::from(MIGRA_TOML_FILENAME)); );
if config_path.exists() { if config_path.exists() {
println!("{} already exists", config_path.to_str().unwrap()); println!("{} already exists", config_path.to_str().unwrap());

View file

@ -1,65 +1,60 @@
use crate::app::App; use crate::app::App;
use crate::database::migration::filter_pending_migrations; use crate::client;
use crate::database::prelude::*;
use crate::database::{DatabaseConnectionManager, Migration, MigrationManager};
use crate::error::{Error, StdResult}; use crate::error::{Error, StdResult};
use migra::migration;
const EM_DASH: char = '—'; const EM_DASH: char = '—';
pub(crate) fn print_migration_lists(app: &App) -> StdResult<()> { pub(crate) fn print_migration_lists(app: &App) -> StdResult<()> {
let config = app.config()?; let config = app.config()?;
let applied_migration_names = match config.database.connection_string() { let applied_migrations = match config.database.connection_string() {
Ok(ref database_connection_string) => { Ok(ref database_connection_string) => {
let mut connection_manager = DatabaseConnectionManager::connect_with_string( let mut client = client::create(&config.database.client(), database_connection_string)?;
&config.database, let applied_migrations = client.applied_migrations()?;
database_connection_string,
)?;
let conn = connection_manager.connection();
let migration_manager = MigrationManager::from(&config); show_applied_migrations(&applied_migrations);
let applied_migration_names = migration_manager.applied_migration_names(conn)?;
show_applied_migrations(&applied_migration_names); applied_migrations
applied_migration_names
} }
Err(e) if e == Error::MissedEnvVar(String::new()) => { Err(e) if e == Error::MissedEnvVar(String::new()) => {
eprintln!("WARNING: {}", e); eprintln!("WARNING: {}", e);
eprintln!("WARNING: No connection to database"); eprintln!("WARNING: No connection to database");
Vec::new() migration::List::new()
} }
Err(e) => panic!("{}", e), Err(e) => panic!("{}", e),
}; };
println!(); println!();
let all_migrations = migra::fs::get_all_migrations(&config.migration_dir_path())?;
let pending_migrations = let pending_migrations =
filter_pending_migrations(config.migrations()?, &applied_migration_names); migra::fs::filter_pending_migrations(&all_migrations, &applied_migrations);
show_pending_migrations(&pending_migrations); show_pending_migrations(&pending_migrations);
Ok(()) Ok(())
} }
fn show_applied_migrations(applied_migration_names: &[String]) { fn show_applied_migrations(applied_migrations: &migration::List) {
println!("Applied migrations:"); println!("Applied migrations:");
if applied_migration_names.is_empty() { if applied_migrations.is_empty() {
println!("{}", EM_DASH); println!("{}", EM_DASH);
} else { } else {
applied_migration_names applied_migrations
.iter() .iter()
.rev() .rev()
.for_each(|name| println!("{}", name)); .for_each(|migration| println!("{}", migration.name()));
} }
} }
fn show_pending_migrations(pending_migrations: &[Migration]) { fn show_pending_migrations(pending_migrations: &migration::List) {
println!("Pending migrations:"); println!("Pending migrations:");
if pending_migrations.is_empty() { if pending_migrations.is_empty() {
println!("{}", EM_DASH); println!("{}", EM_DASH);
} else { } else {
pending_migrations.iter().for_each(|m| { pending_migrations.iter().for_each(|migration| {
println!("{}", m.name()); println!("{}", migration.name());
}); });
} }
} }

View file

@ -4,7 +4,7 @@ use crate::StdResult;
use chrono::Local; use chrono::Local;
use std::fs; use std::fs;
pub(crate) fn make_migration(app: &App, opts: MakeCommandOpt) -> StdResult<()> { pub(crate) fn make_migration(app: &App, opts: &MakeCommandOpt) -> StdResult<()> {
let config = app.config()?; let config = app.config()?;
let date_format = config.migrations.date_format(); let date_format = config.migrations.date_format();
let formatted_current_timestamp = Local::now().format(&date_format); let formatted_current_timestamp = Local::now().format(&date_format);

View file

@ -1,57 +1,60 @@
use crate::app::App; use crate::app::App;
use crate::database::migration::*; use crate::client;
use crate::database::transaction::maybe_with_transaction; use crate::client::maybe_with_transaction;
use crate::database::DatabaseConnectionManager;
use crate::opts::UpgradeCommandOpt; use crate::opts::UpgradeCommandOpt;
use crate::StdResult; use crate::StdResult;
use migra::migration;
pub(crate) fn upgrade_pending_migrations(app: &App, opts: UpgradeCommandOpt) -> StdResult<()> { pub(crate) fn upgrade_pending_migrations(app: &App, opts: &UpgradeCommandOpt) -> StdResult<()> {
let config = app.config()?; let config = app.config()?;
let mut connection_manager = DatabaseConnectionManager::connect(&config.database)?; let mut client = client::create(
let conn = connection_manager.connection(); &config.database.client(),
&config.database.connection_string()?,
)?;
let migration_manager = MigrationManager::from(&config); let applied_migration_names = client.applied_migrations()?;
let all_migrations = migra::fs::get_all_migrations(&config.migration_dir_path())?;
let applied_migration_names = migration_manager.applied_migration_names(conn)?; let pending_migrations =
let migrations = config.migrations()?; migra::fs::filter_pending_migrations(&all_migrations, &applied_migration_names);
let pending_migrations = filter_pending_migrations(migrations, &applied_migration_names);
if pending_migrations.is_empty() { if pending_migrations.is_empty() {
println!("Up to date"); println!("Up to date");
return Ok(()); return Ok(());
} }
let migrations: Vec<Migration> = if let Some(migration_name) = opts.migration_name.clone() { let migrations: migration::List = if let Some(migration_name) = opts.migration_name.clone() {
let target_migration = pending_migrations let target_migration = (*pending_migrations)
.clone()
.into_iter() .into_iter()
.find(|m| m.name() == &migration_name); .find(|m| m.name() == &migration_name);
match target_migration { if let Some(migration) = target_migration.clone() {
Some(migration) => vec![migration], vec![migration].into()
None => { } else {
eprintln!(r#"Cannot find migration with "{}" name"#, migration_name); eprintln!(r#"Cannot find migration with "{}" name"#, migration_name);
return Ok(()); return Ok(());
}
} }
} else { } else {
let upgrade_migrations_number = opts let upgrade_migrations_number = opts
.migrations_number .migrations_number
.unwrap_or_else(|| pending_migrations.len()); .unwrap_or_else(|| pending_migrations.len());
pending_migrations[..upgrade_migrations_number].to_vec() pending_migrations[..upgrade_migrations_number]
.to_vec()
.into()
}; };
maybe_with_transaction( maybe_with_transaction(
opts.transaction_opts.single_transaction, opts.transaction_opts.single_transaction,
conn, &mut client,
&mut |conn| { &mut |mut client| {
migrations migrations
.iter() .iter()
.try_for_each(|migration| { .try_for_each(|migration| {
print_migration_info(migration); print_migration_info(migration);
maybe_with_transaction( maybe_with_transaction(
!opts.transaction_opts.single_transaction, !opts.transaction_opts.single_transaction,
conn, &mut client,
&mut |conn| migration_manager.upgrade(conn, migration), &mut |client| client.apply_upgrade_migration(migration),
) )
}) })
.map_err(From::from) .map_err(From::from)
@ -61,6 +64,6 @@ pub(crate) fn upgrade_pending_migrations(app: &App, opts: UpgradeCommandOpt) ->
Ok(()) Ok(())
} }
fn print_migration_info(migration: &Migration) { fn print_migration_info(migration: &migra::Migration) {
println!("upgrade {}...", migration.name()); println!("upgrade {}...", migration.name());
} }

View file

@ -1,8 +1,7 @@
use crate::database::migration::Migration;
use crate::error::{Error, MigraResult}; use crate::error::{Error, MigraResult};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::{env, fs, io}; use std::{env, fs};
//===========================================================================// //===========================================================================//
// Internal Config Utils / Macros // // Internal Config Utils / Macros //
@ -46,13 +45,21 @@ cargo install migra-cli --features ${database_name}"#,
// Database config // // Database config //
//===========================================================================// //===========================================================================//
fn is_sqlite_database_file(filename: &str) -> bool {
filename
.rsplit('.')
.next()
.map(|ext| ext.eq_ignore_ascii_case("db"))
== Some(true)
}
fn default_database_connection_env() -> String { fn default_database_connection_env() -> String {
String::from("$DATABASE_URL") String::from("$DATABASE_URL")
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub(crate) enum SupportedDatabaseClient { pub enum SupportedDatabaseClient {
#[cfg(feature = "postgres")] #[cfg(feature = "postgres")]
Postgres, Postgres,
#[cfg(feature = "mysql")] #[cfg(feature = "mysql")]
@ -114,7 +121,7 @@ impl DatabaseConfig {
please_install_with!(feature "mysql") please_install_with!(feature "mysql")
} }
} }
} else if connection_string.ends_with(".db") { } else if is_sqlite_database_file(&connection_string) {
cfg_if! { cfg_if! {
if #[cfg(feature = "sqlite")] { if #[cfg(feature = "sqlite")] {
Some(SupportedDatabaseClient::Sqlite) Some(SupportedDatabaseClient::Sqlite)
@ -131,11 +138,13 @@ impl DatabaseConfig {
} }
pub fn connection_string(&self) -> MigraResult<String> { pub fn connection_string(&self) -> MigraResult<String> {
if let Some(connection_env) = self.connection.strip_prefix("$") { self.connection.strip_prefix("$").map_or_else(
env::var(connection_env).map_err(|_| Error::MissedEnvVar(connection_env.to_string())) || Ok(self.connection.clone()),
} else { |connection_env| {
Ok(self.connection.clone()) env::var(connection_env)
} .map_err(|_| Error::MissedEnvVar(connection_env.to_string()))
},
)
} }
} }
@ -174,33 +183,35 @@ impl Default for MigrationsConfig {
impl MigrationsConfig { impl MigrationsConfig {
pub fn directory(&self) -> String { pub fn directory(&self) -> String {
if let Some(directory_env) = self.directory.strip_prefix("$") { self.directory.strip_prefix("$").map_or_else(
env::var(directory_env).unwrap_or_else(|_| { || self.directory.clone(),
println!( |directory_env| {
"WARN: Cannot read {} variable and use {} directory by default", env::var(directory_env).unwrap_or_else(|_| {
directory_env, println!(
"WARN: Cannot read {} variable and use {} directory by default",
directory_env,
default_migrations_directory()
);
default_migrations_directory() default_migrations_directory()
); })
default_migrations_directory() },
}) )
} else {
self.directory.clone()
}
} }
pub fn table_name(&self) -> String { pub fn table_name(&self) -> String {
if let Some(table_name_env) = self.table_name.strip_prefix("$") { self.table_name.strip_prefix("$").map_or_else(
env::var(table_name_env).unwrap_or_else(|_| { || self.table_name.clone(),
println!( |table_name_env| {
"WARN: Cannot read {} variable and use {} table_name by default", env::var(table_name_env).unwrap_or_else(|_| {
table_name_env, println!(
"WARN: Cannot read {} variable and use {} table_name by default",
table_name_env,
default_migrations_table_name()
);
default_migrations_table_name() default_migrations_table_name()
); })
default_migrations_table_name() },
}) )
} else {
self.table_name.clone()
}
} }
pub fn date_format(&self) -> String { pub fn date_format(&self) -> String {
@ -276,26 +287,4 @@ impl Config {
pub fn migration_dir_path(&self) -> PathBuf { pub fn migration_dir_path(&self) -> PathBuf {
self.directory_path().join(self.migrations.directory()) self.directory_path().join(self.migrations.directory())
} }
pub fn migrations(&self) -> MigraResult<Vec<Migration>> {
let mut entries = match self.migration_dir_path().read_dir() {
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()),
entries => entries?
.map(|res| res.map(|e| e.path()))
.collect::<Result<Vec<_>, _>>()?,
};
if entries.is_empty() {
return Ok(vec![]);
}
entries.sort();
let migrations = entries
.iter()
.filter_map(|path| Migration::new(&path))
.collect::<Vec<_>>();
Ok(migrations)
}
} }

View file

@ -1,64 +1 @@
use super::adapter::ToSqlParams;
use super::clients::*;
use crate::config::{DatabaseConfig, SupportedDatabaseClient};
use crate::error::StdResult;
pub type AnyConnection = Box<dyn DatabaseConnection>;
pub trait OpenDatabaseConnection: Sized {
fn open(connection_string: &str) -> StdResult<Self>;
}
pub trait DatabaseStatements {
fn create_migration_table_stmt(&self, migrations_table_name: &str) -> String;
}
pub trait SupportsTransactionalDdl {
#[inline]
fn supports_transactional_ddl(&self) -> bool {
false
}
}
pub trait DatabaseConnection: DatabaseStatements + SupportsTransactionalDdl {
fn batch_execute(&mut self, query: &str) -> StdResult<()>;
fn execute<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<u64>;
fn query<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<Vec<Vec<String>>>;
}
pub(crate) struct DatabaseConnectionManager {
conn: AnyConnection,
}
impl DatabaseConnectionManager {
pub fn connect_with_string(
config: &DatabaseConfig,
connection_string: &str,
) -> StdResult<Self> {
let conn: AnyConnection = match config.client() {
#[cfg(feature = "postgres")]
SupportedDatabaseClient::Postgres => {
Box::new(PostgresConnection::open(&connection_string)?)
}
#[cfg(feature = "mysql")]
SupportedDatabaseClient::Mysql => Box::new(MySqlConnection::open(&connection_string)?),
#[cfg(feature = "sqlite")]
SupportedDatabaseClient::Sqlite => {
Box::new(SqliteConnection::open(&connection_string)?)
}
};
Ok(DatabaseConnectionManager { conn })
}
pub fn connect(config: &DatabaseConfig) -> StdResult<Self> {
let connection_string = config.connection_string()?;
Self::connect_with_string(config, &connection_string)
}
pub fn connection(&mut self) -> &mut AnyConnection {
&mut self.conn
}
}

View file

@ -175,13 +175,3 @@ impl ManageMigration for MigrationManager {
Ok(applied_migration_names) Ok(applied_migration_names)
} }
} }
pub fn filter_pending_migrations(
migrations: Vec<Migration>,
applied_migration_names: &[String],
) -> Vec<Migration> {
migrations
.into_iter()
.filter(|m| !applied_migration_names.contains(m.name()))
.collect()
}

View file

@ -1,19 +1,3 @@
pub(crate) mod adapter;
pub(crate) mod builder;
pub(crate) mod clients;
pub(crate) mod connection; pub(crate) mod connection;
pub(crate) mod migration; // pub(crate) mod migration;
pub(crate) mod transaction; // pub(crate) mod transaction;
pub mod prelude {
pub use super::adapter::{ToSql, ToSqlParams, TryFromSql};
pub use super::connection::{
AnyConnection, DatabaseConnection, DatabaseStatements, OpenDatabaseConnection,
SupportsTransactionalDdl,
};
pub use super::migration::ManageMigration;
pub use super::transaction::ManageTransaction;
}
pub(crate) use connection::DatabaseConnectionManager;
pub(crate) use migration::{Migration, MigrationManager};

View file

@ -31,33 +31,3 @@ impl ManageTransaction for TransactionManager {
conn.batch_execute("COMMIT") conn.batch_execute("COMMIT")
} }
} }
pub fn with_transaction<TrxFnMut, Res>(
conn: &mut AnyConnection,
trx_fn: &mut TrxFnMut,
) -> StdResult<Res>
where
TrxFnMut: FnMut(&mut AnyConnection) -> StdResult<Res>,
{
let transaction_manager = TransactionManager::new();
transaction_manager
.begin_transaction(conn)
.and_then(|_| trx_fn(conn))
.and_then(|res| transaction_manager.commit_transaction(conn).and(Ok(res)))
.or_else(|err| transaction_manager.rollback_transaction(conn).and(Err(err)))
}
pub fn maybe_with_transaction<TrxFnMut, Res>(
is_needed: bool,
conn: &mut AnyConnection,
trx_fn: &mut TrxFnMut,
) -> StdResult<Res>
where
TrxFnMut: FnMut(&mut AnyConnection) -> StdResult<Res>,
{
if is_needed && conn.supports_transactional_ddl() {
with_transaction(conn, trx_fn)
} else {
trx_fn(conn)
}
}

View file

@ -1,4 +1,4 @@
#![deny(clippy::all)] #![deny(clippy::all, clippy::pedantic)]
#![forbid(unsafe_code)] #![forbid(unsafe_code)]
#[macro_use] #[macro_use]
@ -8,9 +8,9 @@ extern crate cfg_if;
compile_error!(r#"Either features "postgres" or "mysql" must be enabled for "migra" crate"#); compile_error!(r#"Either features "postgres" or "mysql" must be enabled for "migra" crate"#);
mod app; mod app;
mod client;
mod commands; mod commands;
mod config; mod config;
mod database;
mod error; mod error;
pub use error::Error; pub use error::Error;

View file

@ -1,17 +0,0 @@
[package]
name = "migra-clients"
version = "0.1.0"
authors = ["Dmitriy Pleshevskiy <dmitriy@ideascup.me>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["postgres"]
sqlite = ["rusqlite"]
[dependencies]
migra = { version = "0", path = "../migra" }
postgres = { version = "0.19", optional = true }
mysql = { version = "20.1", optional = true }
rusqlite = { version = "0.25", optional = true }

View file

@ -1,27 +0,0 @@
#![deny(missing_debug_implementations)]
#![deny(clippy::all, clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]
#![allow(clippy::missing_errors_doc)]
trait OpenDatabaseConnection: Sized {
fn new(connection_string: &str) -> migra::Result<Self> {
Self::manual(connection_string, "migrations")
}
fn manual(connection_string: &str, migrations_table_name: &str) -> migra::Result<Self>;
}
#[cfg(feature = "postgres")]
mod postgres;
#[cfg(feature = "postgres")]
pub use self::postgres::*;
#[cfg(feature = "mysql")]
mod mysql;
#[cfg(feature = "mysql")]
pub use self::mysql::*;
#[cfg(feature = "sqlite")]
mod sqlite;
#[cfg(feature = "sqlite")]
pub use self::sqlite::*;

View file

@ -1,107 +0,0 @@
use crate::OpenDatabaseConnection;
use migra::managers::{BatchExecute, ManageMigrations, ManageTransaction};
use migra::migration;
use rusqlite::Connection;
#[derive(Debug)]
pub struct SqliteClient {
conn: Connection,
migrations_table_name: String,
}
impl OpenDatabaseConnection for SqliteClient {
fn manual(connection_string: &str, migrations_table_name: &str) -> migra::Result<Self> {
let conn = Connection::open(connection_string)
.map_err(|_| migra::Error::FailedDatabaseConnection)?;
Ok(SqliteClient {
conn,
migrations_table_name: migrations_table_name.to_owned(),
})
}
}
impl BatchExecute for SqliteClient {
fn batch_execute(&mut self, sql: &str) -> migra::StdResult<()> {
self.conn.execute_batch(sql).map_err(From::from)
}
}
impl ManageTransaction for SqliteClient {}
impl ManageMigrations for SqliteClient {
fn create_migrations_table(&mut self) -> migra::Result<()> {
let stmt = format!(
r#"CREATE TABLE IF NOT EXISTS {} (
id int AUTO_INCREMENT PRIMARY KEY,
name varchar(256) NOT NULL UNIQUE
)"#,
&self.migrations_table_name
);
self.batch_execute(&stmt)
.map_err(|_| migra::Error::FailedCreateMigrationsTable)
}
fn insert_migration(&mut self, name: &str) -> migra::Result<u64> {
let stmt = format!(
"INSERT INTO {} (name) VALUES ($1)",
&self.migrations_table_name
);
self.conn
.execute(&stmt, [name])
.map(|res| res as u64)
.map_err(|_| migra::Error::FailedInsertMigration)
}
fn delete_migration(&mut self, name: &str) -> migra::Result<u64> {
let stmt = format!(
"DELETE FROM {} WHERE name = $1",
&self.migrations_table_name
);
self.conn
.execute(&stmt, [name])
.map(|res| res as u64)
.map_err(|_| migra::Error::FailedDeleteMigration)
}
fn applied_migrations(&mut self) -> migra::Result<migration::List> {
let stmt = format!("SELECT name FROM {}", &self.migrations_table_name);
self.conn
.prepare(&stmt)
.and_then(|mut stmt| {
stmt.query_map([], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()
})
.map(From::from)
.map_err(|_| migra::Error::FailedGetAppliedMigrations)
}
}
// impl DatabaseConnection for SqliteConnection {
// fn batch_execute(&mut self, query: &str) -> StdResult<()> {
// self.conn.execute_batch(query)?;
// Ok(())
// }
// fn execute<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<u64> {
// let stmt = merge_query_with_params(query, params);
// let res = self.conn.execute(&stmt, [])?;
// Ok(res as u64)
// }
// fn query<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<Vec<Vec<String>>> {
// let stmt = merge_query_with_params(query, params);
// let mut stmt = self.conn.prepare(&stmt)?;
// let res = stmt
// .query_map([], |row| Ok(vec![row.get(0)?]))?
// .collect::<Result<_, _>>()?;
// Ok(res)
// }
// }