refac: mysql client with core lib
This commit is contained in:
parent
1a59331ecf
commit
bb9be306d9
3 changed files with 88 additions and 38 deletions
|
@ -1,53 +1,98 @@
|
||||||
use crate::database::builder::merge_query_with_params;
|
use crate::error::MigraResult;
|
||||||
use crate::database::prelude::*;
|
use migra::managers::{ManageMigrations, ManageTransaction};
|
||||||
use crate::error::StdResult;
|
use migra::migration;
|
||||||
use mysql::prelude::*;
|
use mysql::prelude::*;
|
||||||
use mysql::{Pool, PooledConn};
|
use mysql::{Pool, PooledConn};
|
||||||
|
|
||||||
pub struct MySqlConnection {
|
pub struct MySqlClient {
|
||||||
conn: PooledConn,
|
conn: PooledConn,
|
||||||
|
migrations_table_name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OpenDatabaseConnection for MySqlConnection {
|
impl MySqlClient {
|
||||||
fn open(connection_string: &str) -> StdResult<Self> {
|
fn new(connection_string: &str, migrations_table_name: &str) -> MigraResult<Self> {
|
||||||
let pool = Pool::new_manual(1, 1, connection_string)?;
|
let conn = Pool::new_manual(1, 1, connection_string)
|
||||||
let conn = pool.get_conn()?;
|
.and_then(|pool| pool.get_conn())
|
||||||
Ok(MySqlConnection { conn })
|
.map_err(|_| crate::Error::FailedDatabaseConnection)?;
|
||||||
|
|
||||||
|
Ok(MySqlClient {
|
||||||
|
conn,
|
||||||
|
migrations_table_name: migrations_table_name.to_owned(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DatabaseStatements for MySqlConnection {
|
impl ManageTransaction for MySqlClient {
|
||||||
fn create_migration_table_stmt(&self, migrations_table_name: &str) -> String {
|
fn begin_transaction(&mut self) -> migra::Result<()> {
|
||||||
format!(
|
self.conn
|
||||||
|
.query_drop("BEGIN")
|
||||||
|
.map_err(|_| migra::Error::FailedOpenTransaction)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rollback_transaction(&mut self) -> migra::Result<()> {
|
||||||
|
self.conn
|
||||||
|
.query_drop("ROLLBACK")
|
||||||
|
.map_err(|_| migra::Error::FailedRollbackTransaction)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn commit_transaction(&mut self) -> migra::Result<()> {
|
||||||
|
self.conn
|
||||||
|
.query_drop("COMMIT")
|
||||||
|
.map_err(|_| migra::Error::FailedCommitTransaction)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ManageMigrations for MySqlClient {
|
||||||
|
fn apply_sql(&mut self, sql_content: &str) -> migra::Result<()> {
|
||||||
|
self.conn
|
||||||
|
.query_drop(sql_content)
|
||||||
|
.map_err(|_| migra::Error::FailedApplySql)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_migrations_table(&mut self) -> migra::Result<()> {
|
||||||
|
let stmt = format!(
|
||||||
r#"CREATE TABLE IF NOT EXISTS {} (
|
r#"CREATE TABLE IF NOT EXISTS {} (
|
||||||
id int AUTO_INCREMENT PRIMARY KEY,
|
id int AUTO_INCREMENT PRIMARY KEY,
|
||||||
name varchar(256) NOT NULL UNIQUE
|
name varchar(256) NOT NULL UNIQUE
|
||||||
)"#,
|
)"#,
|
||||||
migrations_table_name
|
&self.migrations_table_name
|
||||||
)
|
);
|
||||||
}
|
|
||||||
}
|
self.conn
|
||||||
|
.query_drop(stmt)
|
||||||
impl SupportsTransactionalDdl for MySqlConnection {}
|
.map_err(|_| migra::Error::FailedCreateMigrationsTable)
|
||||||
|
}
|
||||||
impl DatabaseConnection for MySqlConnection {
|
|
||||||
fn batch_execute(&mut self, query: &str) -> StdResult<()> {
|
fn insert_migration(&mut self, name: &str) -> migra::Result<u64> {
|
||||||
self.conn.query_drop(query)?;
|
let stmt = format!(
|
||||||
Ok(())
|
"INSERT INTO {} (name) VALUES ($1)",
|
||||||
}
|
&self.migrations_table_name
|
||||||
|
);
|
||||||
fn execute<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<u64> {
|
|
||||||
let stmt = merge_query_with_params(query, params);
|
self.conn
|
||||||
|
.exec_first(&stmt, (name,))
|
||||||
let res = self.conn.query_first(stmt)?.unwrap_or_default();
|
.map(|res| res.unwrap_or_default())
|
||||||
Ok(res)
|
.map_err(|_| migra::Error::FailedInsertMigration)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn query<'b>(&mut self, query: &str, params: ToSqlParams<'b>) -> StdResult<Vec<Vec<String>>> {
|
fn delete_migration(&mut self, name: &str) -> migra::Result<u64> {
|
||||||
let stmt = merge_query_with_params(query, params);
|
let stmt = format!(
|
||||||
|
"DELETE FROM {} WHERE name = $1",
|
||||||
let res = self.conn.query_map(stmt, |(column,)| vec![column])?;
|
&self.migrations_table_name
|
||||||
|
);
|
||||||
Ok(res)
|
|
||||||
|
self.conn
|
||||||
|
.exec_first(&stmt, (name,))
|
||||||
|
.map(|res| res.unwrap_or_default())
|
||||||
|
.map_err(|_| migra::Error::FailedDeleteMigration)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn applied_migrations(&mut self) -> migra::Result<migration::List> {
|
||||||
|
let stmt = format!("SELECT name FROM {}", &self.migrations_table_name);
|
||||||
|
|
||||||
|
self.conn
|
||||||
|
.query::<String, _>(stmt)
|
||||||
|
.map(From::from)
|
||||||
|
.map_err(|_| migra::Error::FailedGetAppliedMigrations)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,8 @@ pub type MigraResult<T> = result::Result<T, Error>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
FailedDatabaseConnection,
|
||||||
|
|
||||||
RootNotFound,
|
RootNotFound,
|
||||||
MissedEnvVar(String),
|
MissedEnvVar(String),
|
||||||
|
|
||||||
|
@ -22,6 +24,7 @@ impl fmt::Display for Error {
|
||||||
Error::MissedEnvVar(ref name) => {
|
Error::MissedEnvVar(ref name) => {
|
||||||
write!(fmt, r#"Missed "{}" environment variable"#, name)
|
write!(fmt, r#"Missed "{}" environment variable"#, name)
|
||||||
}
|
}
|
||||||
|
Error::FailedDatabaseConnection => fmt.write_str("Failed database connection"),
|
||||||
Error::Io(ref error) => write!(fmt, "{}", error),
|
Error::Io(ref error) => write!(fmt, "{}", error),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,8 @@ mod commands;
|
||||||
mod config;
|
mod config;
|
||||||
mod database;
|
mod database;
|
||||||
mod error;
|
mod error;
|
||||||
|
pub use error::Error;
|
||||||
|
|
||||||
mod opts;
|
mod opts;
|
||||||
|
|
||||||
use crate::error::StdResult;
|
use crate::error::StdResult;
|
||||||
|
|
Reference in a new issue