Archived
1
0
Fork 0

feat(clients): move clients to separate dir

refac(core): add batch exec trait
refac(core): smarter managers
refac(cli): removed adapter, builder
This commit is contained in:
Dmitriy Pleshevskiy 2021-06-03 00:22:59 +03:00
parent bb9be306d9
commit c71e8fe33f
60 changed files with 291 additions and 250 deletions

View file

@ -1,5 +1,6 @@
[workspace] [workspace]
members = [ members = [
"migra", "migra",
"migra-cli" "migra_clients",
# "migra_cli"
] ]

View file

@ -1,17 +0,0 @@
use crate::error::StdResult;
pub trait ToSql {
fn to_sql(&self) -> String;
}
pub type ToSqlParams<'a> = &'a [&'a dyn ToSql];
impl ToSql for &str {
fn to_sql(&self) -> String {
format!("'{}'", self)
}
}
pub trait TryFromSql<QueryResultRow>: Sized {
fn try_from_sql(row: QueryResultRow) -> StdResult<Self>;
}

View file

@ -1,39 +0,0 @@
use super::prelude::*;
pub(crate) fn merge_query_with_params(query: &str, params: ToSqlParams) -> String {
params
.iter()
.enumerate()
.fold(query.to_string(), |acc, (i, p)| {
str::replace(&acc, &format!("${}", i + 1), &p.to_sql())
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn replace_one_param_in_query() {
assert_eq!(
merge_query_with_params("SELECT $1", &[&"foo"]),
"SELECT 'foo'"
);
}
#[test]
fn replace_two_params_in_query() {
assert_eq!(
merge_query_with_params("SELECT $1, $2", &[&"foo", &"bar"]),
"SELECT 'foo', 'bar'"
);
}
#[test]
fn replace_all_bonds_in_query_with_first_param() {
assert_eq!(
merge_query_with_params("SELECT $1, $1", &[&"foo"]),
"SELECT 'foo', 'foo'"
);
}
}

View file

@ -1,20 +0,0 @@
cfg_if! {
if #[cfg(feature = "postgres")] {
mod postgres;
pub use self::postgres::*;
}
}
cfg_if! {
if #[cfg(feature = "mysql")] {
mod mysql;
pub use self::mysql::*;
}
}
cfg_if! {
if #[cfg(feature = "sqlite")] {
mod sqlite;
pub use self::sqlite::*;
}
}

View file

@ -1,64 +0,0 @@
use crate::database::builder::merge_query_with_params;
use crate::database::prelude::*;
use crate::error::StdResult;
use postgres::{Client, NoTls};
pub struct PostgresConnection {
client: Client,
}
impl OpenDatabaseConnection for PostgresConnection {
fn open(connection_string: &str) -> StdResult<Self> {
let client = Client::connect(connection_string, NoTls)?;
Ok(PostgresConnection { client })
}
}
impl DatabaseStatements for PostgresConnection {
fn create_migration_table_stmt(&self, migrations_table_name: &str) -> String {
format!(
r#"CREATE TABLE IF NOT EXISTS {} (
id serial PRIMARY KEY,
name text NOT NULL UNIQUE
)"#,
migrations_table_name
)
}
}
impl SupportsTransactionalDdl for PostgresConnection {
#[inline]
fn supports_transactional_ddl(&self) -> bool {
true
}
}
impl DatabaseConnection for PostgresConnection {
fn batch_execute(&mut self, query: &str) -> StdResult<()> {
self.client.batch_execute(query)?;
Ok(())
}
fn execute<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<u64> {
let stmt = merge_query_with_params(query, params);
let res = self.client.execute(stmt.as_str(), &[])?;
Ok(res)
}
fn query<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<Vec<Vec<String>>> {
let stmt = merge_query_with_params(query, params);
let res = self.client.query(stmt.as_str(), &[])?;
let res = res
.into_iter()
.map(|row| {
let column: String = row.get(0);
vec![column]
})
.collect::<Vec<_>>();
Ok(res)
}
}

View file

@ -1,60 +0,0 @@
use crate::database::builder::merge_query_with_params;
use crate::database::prelude::*;
use crate::error::StdResult;
use rusqlite::Connection;
pub struct SqliteConnection {
conn: Connection,
}
impl OpenDatabaseConnection for SqliteConnection {
fn open(connection_string: &str) -> StdResult<Self> {
let conn = Connection::open(connection_string)?;
Ok(SqliteConnection { conn })
}
}
impl DatabaseStatements for SqliteConnection {
fn create_migration_table_stmt(&self, migrations_table_name: &str) -> String {
format!(
r#"CREATE TABLE IF NOT EXISTS {} (
id int AUTO_INCREMENT PRIMARY KEY,
name varchar(256) NOT NULL UNIQUE
)"#,
migrations_table_name
)
}
}
impl SupportsTransactionalDdl for SqliteConnection {
#[inline]
fn supports_transactional_ddl(&self) -> bool {
true
}
}
impl DatabaseConnection for SqliteConnection {
fn batch_execute(&mut self, query: &str) -> StdResult<()> {
self.conn.execute_batch(query)?;
Ok(())
}
fn execute<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<u64> {
let stmt = merge_query_with_params(query, params);
let res = self.conn.execute(&stmt, [])?;
Ok(res as u64)
}
fn query<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<Vec<Vec<String>>> {
let stmt = merge_query_with_params(query, params);
let mut stmt = self.conn.prepare(&stmt)?;
let res = stmt
.query_map([], |row| Ok(vec![row.get(0)?]))?
.collect::<Result<_, _>>()?;
Ok(res)
}
}

View file

@ -1,10 +1,13 @@
use std::fmt; use std::fmt;
use std::io; use std::io;
pub type StdResult<T> = Result<T, Box<dyn std::error::Error + 'static + Sync + Send>>;
pub type MigraResult<T> = Result<T, Error>; pub type MigraResult<T> = Result<T, Error>;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
FailedDatabaseConnection,
FailedOpenTransaction, FailedOpenTransaction,
FailedCommitTransaction, FailedCommitTransaction,
FailedRollbackTransaction, FailedRollbackTransaction,
@ -21,6 +24,7 @@ pub enum Error {
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
Error::FailedDatabaseConnection => fmt.write_str("Failed database connection"),
Error::FailedOpenTransaction => fmt.write_str("Failed to open a transaction"), Error::FailedOpenTransaction => fmt.write_str("Failed to open a transaction"),
Error::FailedCommitTransaction => fmt.write_str("Failed to commit a transaction"), Error::FailedCommitTransaction => fmt.write_str("Failed to commit a transaction"),
Error::FailedRollbackTransaction => fmt.write_str("Failed to rollback a transaction"), Error::FailedRollbackTransaction => fmt.write_str("Failed to rollback a transaction"),

View file

@ -7,7 +7,7 @@ pub mod managers;
pub mod migration; pub mod migration;
mod error; mod error;
pub use error::{Error, MigraResult as Result}; pub use error::{Error, MigraResult as Result, StdResult};
pub use migration::Migration; pub use migration::Migration;

View file

@ -1,16 +1,31 @@
use crate::error::MigraResult; use crate::error::{Error, MigraResult, StdResult};
use crate::migration::{self, Migration}; use crate::migration::{self, Migration};
pub trait ManageTransaction { pub trait BatchExecute {
fn begin_transaction(&mut self) -> MigraResult<()>; fn batch_execute(&mut self, sql: &str) -> StdResult<()>;
fn rollback_transaction(&mut self) -> MigraResult<()>;
fn commit_transaction(&mut self) -> MigraResult<()>;
} }
pub trait ManageMigrations { pub trait ManageTransaction: BatchExecute {
fn apply_sql(&mut self, sql_content: &str) -> MigraResult<()>; fn begin_transaction(&mut self) -> MigraResult<()> {
self.batch_execute("BEGIN")
.map_err(|_| Error::FailedOpenTransaction)
}
fn rollback_transaction(&mut self) -> MigraResult<()> {
self.batch_execute("ROLLBACK")
.map_err(|_| Error::FailedRollbackTransaction)
}
fn commit_transaction(&mut self) -> MigraResult<()> {
self.batch_execute("COMMIT")
.map_err(|_| Error::FailedCommitTransaction)
}
}
pub trait ManageMigrations: BatchExecute {
fn apply_sql(&mut self, sql: &str) -> MigraResult<()> {
self.batch_execute(sql).map_err(|_| Error::FailedApplySql)
}
fn create_migrations_table(&mut self) -> MigraResult<()>; fn create_migrations_table(&mut self) -> MigraResult<()>;

View file

@ -13,21 +13,21 @@ readme = "../README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["postgres"]
postgres = ["migra-clients/postgres"]
sqlite = ["migra-clients/sqlite"]
mysql = ["migra-clients/mysql"]
[dependencies] [dependencies]
migra = { version = "0", path = "../migra" } migra = { version = "0", path = "../migra" }
migra-clients = { version = "0", path = "../migra_clients" }
cfg-if = "1.0" cfg-if = "1.0"
structopt = "0.3" structopt = "0.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
toml = "0.5" toml = "0.5"
chrono = "0.4" chrono = "0.4"
dotenv = { version = "0.15", optional = true } dotenv = { version = "0.15", optional = true }
postgres = { version = "0.19", optional = true }
mysql = { version = "20.1", optional = true }
rusqlite = { version = "0.25", optional = true }
[features]
default = ["postgres"]
sqlite = ["rusqlite"]
[badges] [badges]
maintenance = { status = "actively-developed" } maintenance = { status = "actively-developed" }

17
migra_clients/Cargo.toml Normal file
View file

@ -0,0 +1,17 @@
[package]
name = "migra-clients"
version = "0.1.0"
authors = ["Dmitriy Pleshevskiy <dmitriy@ideascup.me>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["postgres"]
sqlite = ["rusqlite"]
[dependencies]
migra = { version = "0", path = "../migra" }
postgres = { version = "0.19", optional = true }
mysql = { version = "20.1", optional = true }
rusqlite = { version = "0.25", optional = true }

27
migra_clients/src/lib.rs Normal file
View file

@ -0,0 +1,27 @@
#![deny(missing_debug_implementations)]
#![deny(clippy::all, clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]
#![allow(clippy::missing_errors_doc)]
trait OpenDatabaseConnection: Sized {
fn new(connection_string: &str) -> migra::Result<Self> {
Self::manual(connection_string, "migrations")
}
fn manual(connection_string: &str, migrations_table_name: &str) -> migra::Result<Self>;
}
#[cfg(feature = "postgres")]
mod postgres;
#[cfg(feature = "postgres")]
pub use self::postgres::*;
#[cfg(feature = "mysql")]
mod mysql;
#[cfg(feature = "mysql")]
pub use self::mysql::*;
#[cfg(feature = "sqlite")]
mod sqlite;
#[cfg(feature = "sqlite")]
pub use self::sqlite::*;

View file

@ -1,19 +1,20 @@
use crate::error::MigraResult; use crate::OpenDatabaseConnection;
use migra::managers::{ManageMigrations, ManageTransaction}; use migra::managers::{BatchExecute, ManageMigrations, ManageTransaction};
use migra::migration; use migra::migration;
use mysql::prelude::*; use mysql::prelude::*;
use mysql::{Pool, PooledConn}; use mysql::{Pool, PooledConn};
#[derive(Debug)]
pub struct MySqlClient { pub struct MySqlClient {
conn: PooledConn, conn: PooledConn,
migrations_table_name: String, migrations_table_name: String,
} }
impl MySqlClient { impl OpenDatabaseConnection for MySqlClient {
fn new(connection_string: &str, migrations_table_name: &str) -> MigraResult<Self> { fn manual(connection_string: &str, migrations_table_name: &str) -> migra::Result<Self> {
let conn = Pool::new_manual(1, 1, connection_string) let conn = Pool::new_manual(1, 1, connection_string)
.and_then(|pool| pool.get_conn()) .and_then(|pool| pool.get_conn())
.map_err(|_| crate::Error::FailedDatabaseConnection)?; .map_err(|_| migra::Error::FailedDatabaseConnection)?;
Ok(MySqlClient { Ok(MySqlClient {
conn, conn,
@ -22,33 +23,15 @@ impl MySqlClient {
} }
} }
impl ManageTransaction for MySqlClient { impl BatchExecute for MySqlClient {
fn begin_transaction(&mut self) -> migra::Result<()> { fn batch_execute(&mut self, sql: &str) -> migra::StdResult<()> {
self.conn self.conn.query_drop(sql).map_err(From::from)
.query_drop("BEGIN")
.map_err(|_| migra::Error::FailedOpenTransaction)
}
fn rollback_transaction(&mut self) -> migra::Result<()> {
self.conn
.query_drop("ROLLBACK")
.map_err(|_| migra::Error::FailedRollbackTransaction)
}
fn commit_transaction(&mut self) -> migra::Result<()> {
self.conn
.query_drop("COMMIT")
.map_err(|_| migra::Error::FailedCommitTransaction)
} }
} }
impl ManageMigrations for MySqlClient { impl ManageTransaction for MySqlClient {}
fn apply_sql(&mut self, sql_content: &str) -> migra::Result<()> {
self.conn
.query_drop(sql_content)
.map_err(|_| migra::Error::FailedApplySql)
}
impl ManageMigrations for MySqlClient {
fn create_migrations_table(&mut self) -> migra::Result<()> { fn create_migrations_table(&mut self) -> migra::Result<()> {
let stmt = format!( let stmt = format!(
r#"CREATE TABLE IF NOT EXISTS {} ( r#"CREATE TABLE IF NOT EXISTS {} (
@ -58,8 +41,7 @@ impl ManageMigrations for MySqlClient {
&self.migrations_table_name &self.migrations_table_name
); );
self.conn self.batch_execute(&stmt)
.query_drop(stmt)
.map_err(|_| migra::Error::FailedCreateMigrationsTable) .map_err(|_| migra::Error::FailedCreateMigrationsTable)
} }
@ -71,7 +53,7 @@ impl ManageMigrations for MySqlClient {
self.conn self.conn
.exec_first(&stmt, (name,)) .exec_first(&stmt, (name,))
.map(|res| res.unwrap_or_default()) .map(Option::unwrap_or_default)
.map_err(|_| migra::Error::FailedInsertMigration) .map_err(|_| migra::Error::FailedInsertMigration)
} }
@ -83,7 +65,7 @@ impl ManageMigrations for MySqlClient {
self.conn self.conn
.exec_first(&stmt, (name,)) .exec_first(&stmt, (name,))
.map(|res| res.unwrap_or_default()) .map(Option::unwrap_or_default)
.map_err(|_| migra::Error::FailedDeleteMigration) .map_err(|_| migra::Error::FailedDeleteMigration)
} }

View file

@ -0,0 +1,88 @@
use crate::OpenDatabaseConnection;
use migra::managers::{BatchExecute, ManageMigrations, ManageTransaction};
use migra::migration;
use postgres::{Client, NoTls};
use std::fmt;
pub struct PostgresClient {
client: Client,
migrations_table_name: String,
}
impl fmt::Debug for PostgresClient {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("PostgresClient")
.field("migrations_table_name", &self.migrations_table_name)
.finish()
}
}
impl OpenDatabaseConnection for PostgresClient {
fn manual(connection_string: &str, migrations_table_name: &str) -> migra::Result<Self> {
let client = Client::connect(connection_string, NoTls)
.map_err(|_| migra::Error::FailedDatabaseConnection)?;
Ok(PostgresClient {
client,
migrations_table_name: migrations_table_name.to_owned(),
})
}
}
impl BatchExecute for PostgresClient {
fn batch_execute(&mut self, sql: &str) -> migra::StdResult<()> {
self.client.batch_execute(sql).map_err(From::from)
}
}
impl ManageTransaction for PostgresClient {}
impl ManageMigrations for PostgresClient {
fn create_migrations_table(&mut self) -> migra::Result<()> {
let stmt = format!(
r#"CREATE TABLE IF NOT EXISTS {} (
id serial PRIMARY KEY,
name text NOT NULL UNIQUE
)"#,
&self.migrations_table_name
);
self.batch_execute(&stmt)
.map_err(|_| migra::Error::FailedCreateMigrationsTable)
}
fn insert_migration(&mut self, name: &str) -> migra::Result<u64> {
let stmt = format!(
"INSERT INTO {} (name) VALUES ($1)",
&self.migrations_table_name
);
self.client
.execute(stmt.as_str(), &[&name])
.map_err(|_| migra::Error::FailedInsertMigration)
}
fn delete_migration(&mut self, name: &str) -> migra::Result<u64> {
let stmt = format!(
"DELETE FROM {} WHERE name = $1",
&self.migrations_table_name
);
self.client
.execute(stmt.as_str(), &[&name])
.map_err(|_| migra::Error::FailedDeleteMigration)
}
fn applied_migrations(&mut self) -> migra::Result<migration::List> {
let stmt = format!("SELECT name FROM {}", &self.migrations_table_name);
self.client
.query(stmt.as_str(), &[])
.and_then(|res| {
res.into_iter()
.map(|row| row.try_get(0))
.collect::<Result<Vec<String>, _>>()
})
.map(From::from)
.map_err(|_| migra::Error::FailedGetAppliedMigrations)
}
}

107
migra_clients/src/sqlite.rs Normal file
View file

@ -0,0 +1,107 @@
use crate::OpenDatabaseConnection;
use migra::managers::{BatchExecute, ManageMigrations, ManageTransaction};
use migra::migration;
use rusqlite::Connection;
#[derive(Debug)]
pub struct SqliteClient {
conn: Connection,
migrations_table_name: String,
}
impl OpenDatabaseConnection for SqliteClient {
fn manual(connection_string: &str, migrations_table_name: &str) -> migra::Result<Self> {
let conn = Connection::open(connection_string)
.map_err(|_| migra::Error::FailedDatabaseConnection)?;
Ok(SqliteClient {
conn,
migrations_table_name: migrations_table_name.to_owned(),
})
}
}
impl BatchExecute for SqliteClient {
fn batch_execute(&mut self, sql: &str) -> migra::StdResult<()> {
self.conn.execute_batch(sql).map_err(From::from)
}
}
impl ManageTransaction for SqliteClient {}
impl ManageMigrations for SqliteClient {
fn create_migrations_table(&mut self) -> migra::Result<()> {
let stmt = format!(
r#"CREATE TABLE IF NOT EXISTS {} (
id int AUTO_INCREMENT PRIMARY KEY,
name varchar(256) NOT NULL UNIQUE
)"#,
&self.migrations_table_name
);
self.batch_execute(&stmt)
.map_err(|_| migra::Error::FailedCreateMigrationsTable)
}
fn insert_migration(&mut self, name: &str) -> migra::Result<u64> {
let stmt = format!(
"INSERT INTO {} (name) VALUES ($1)",
&self.migrations_table_name
);
self.conn
.execute(&stmt, [name])
.map(|res| res as u64)
.map_err(|_| migra::Error::FailedInsertMigration)
}
fn delete_migration(&mut self, name: &str) -> migra::Result<u64> {
let stmt = format!(
"DELETE FROM {} WHERE name = $1",
&self.migrations_table_name
);
self.conn
.execute(&stmt, [name])
.map(|res| res as u64)
.map_err(|_| migra::Error::FailedDeleteMigration)
}
fn applied_migrations(&mut self) -> migra::Result<migration::List> {
let stmt = format!("SELECT name FROM {}", &self.migrations_table_name);
self.conn
.prepare(&stmt)
.and_then(|mut stmt| {
stmt.query_map([], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()
})
.map(From::from)
.map_err(|_| migra::Error::FailedGetAppliedMigrations)
}
}
// impl DatabaseConnection for SqliteConnection {
// fn batch_execute(&mut self, query: &str) -> StdResult<()> {
// self.conn.execute_batch(query)?;
// Ok(())
// }
// fn execute<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<u64> {
// let stmt = merge_query_with_params(query, params);
// let res = self.conn.execute(&stmt, [])?;
// Ok(res as u64)
// }
// fn query<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<Vec<Vec<String>>> {
// let stmt = merge_query_with_params(query, params);
// let mut stmt = self.conn.prepare(&stmt)?;
// let res = stmt
// .query_map([], |row| Ok(vec![row.get(0)?]))?
// .collect::<Result<_, _>>()?;
// Ok(res)
// }
// }