Archived
1
0
Fork 0

refac: add database connection manager

feat: add supported client to manifest
This commit is contained in:
Dmitriy Pleshevskiy 2021-02-18 12:29:09 +03:00
parent d4106c50e6
commit a7ab5572df
9 changed files with 120 additions and 101 deletions

View file

@ -1,5 +1,4 @@
use crate::config::Config; use crate::config::Config;
use crate::databases::*;
use crate::migration::{DatabaseMigrationManager, MigrationManager}; use crate::migration::{DatabaseMigrationManager, MigrationManager};
use crate::opts::ApplyCommandOpt; use crate::opts::ApplyCommandOpt;
use crate::path::PathBuilder; use crate::path::PathBuilder;
@ -7,8 +6,7 @@ use crate::StdResult;
use std::convert::TryFrom; use std::convert::TryFrom;
pub(crate) fn apply_sql(config: Config, opts: ApplyCommandOpt) -> StdResult<()> { pub(crate) fn apply_sql(config: Config, opts: ApplyCommandOpt) -> StdResult<()> {
let connection = PostgresConnection::try_from(&config)?; let mut manager = MigrationManager::try_from(&config)?;
let mut manager = MigrationManager::new(connection);
let file_path = PathBuilder::from(config.directory_path()) let file_path = PathBuilder::from(config.directory_path())
.append(opts.file_name) .append(opts.file_name)

View file

@ -1,12 +1,10 @@
use crate::config::Config; use crate::config::Config;
use crate::migration::{DatabaseMigrationManager, MigrationManager, MigrationNames}; use crate::migration::{DatabaseMigrationManager, MigrationManager};
use crate::databases::*;
use crate::StdResult; use crate::StdResult;
use std::convert::TryFrom; use std::convert::TryFrom;
pub(crate) fn downgrade_applied_migrations(config: Config) -> StdResult<()> { pub(crate) fn downgrade_applied_migrations(config: Config) -> StdResult<()> {
let connection = PostgresConnection::try_from(&config)?; let mut manager = MigrationManager::try_from(&config)?;
let mut manager = MigrationManager::new(connection);
let applied_migrations = manager.applied_migration_names()?; let applied_migrations = manager.applied_migration_names()?;
let migrations = config.migrations()?; let migrations = config.migrations()?;

View file

@ -1,16 +1,18 @@
use crate::config::Config; use crate::config::Config;
use crate::database::DatabaseConnection;
use crate::databases::*; use crate::databases::*;
use crate::error::{ErrorKind, StdResult}; use crate::error::{ErrorKind, StdResult};
use crate::migration::{filter_pending_migrations, Migration, MigrationManager, MigrationNames}; use crate::migration::{
filter_pending_migrations, DatabaseMigrationManager, Migration, MigrationManager,
};
const EM_DASH: char = '—'; const EM_DASH: char = '—';
pub(crate) fn print_migration_lists(config: Config) -> StdResult<()> { pub(crate) fn print_migration_lists(config: Config) -> StdResult<()> {
let applied_migration_names = match config.database_connection_string() { let applied_migration_names = match config.database.connection_string() {
Ok(ref database_connection_string) => { Ok(ref database_connection_string) => {
let connection = PostgresConnection::open(database_connection_string)?; let connection_manager = DatabaseConnectionManager::new(&config.database);
let mut manager = MigrationManager::new(connection); let conn = connection_manager.connect_with_string(database_connection_string)?;
let mut manager = MigrationManager::new(conn);
let applied_migration_names = manager.applied_migration_names()?; let applied_migration_names = manager.applied_migration_names()?;

View file

@ -1,14 +1,12 @@
use crate::databases::*;
use crate::migration::{ use crate::migration::{
filter_pending_migrations, DatabaseMigrationManager, Migration, MigrationManager, filter_pending_migrations, DatabaseMigrationManager, Migration, MigrationManager,
MigrationNames,
}; };
use crate::Config; use crate::Config;
use crate::StdResult; use crate::StdResult;
use std::convert::TryFrom; use std::convert::TryFrom;
pub(crate) fn upgrade_pending_migrations(config: Config) -> StdResult<()> { pub(crate) fn upgrade_pending_migrations(config: Config) -> StdResult<()> {
let mut manager = MigrationManager::new(PostgresConnection::try_from(&config)?); let mut manager = MigrationManager::try_from(&config)?;
let applied_migration_names = manager.applied_migration_names()?; let applied_migration_names = manager.applied_migration_names()?;
let migrations = config.migrations()?; let migrations = config.migrations()?;

View file

@ -16,21 +16,47 @@ pub(crate) struct Config {
root: PathBuf, root: PathBuf,
#[serde(default)] #[serde(default)]
database: DatabaseConfig, pub database: DatabaseConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum SupportedDatabaseClient {
Postgres,
} }
#[derive(Debug, Clone, Default, Serialize, Deserialize)] #[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub(crate) struct DatabaseConfig { pub(crate) struct DatabaseConfig {
pub client: Option<SupportedDatabaseClient>,
pub connection: Option<String>, pub connection: Option<String>,
} }
impl DatabaseConfig {
pub fn client(&self) -> crate::error::Result<SupportedDatabaseClient> {
Ok(SupportedDatabaseClient::Postgres)
}
pub fn connection_string(&self) -> crate::error::Result<String> {
let connection = self
.connection
.clone()
.unwrap_or_else(|| String::from(DEFAULT_DATABASE_CONNECTION_ENV));
if let Some(connection_env) = connection.strip_prefix("$") {
env::var(connection_env)
.map_err(|e| Error::new(ErrorKind::MissedEnvVar(connection_env.to_string()), e))
} else {
Ok(connection)
}
}
}
impl Default for Config { impl Default for Config {
fn default() -> Config { fn default() -> Config {
Config { Config {
manifest_root: PathBuf::new(), manifest_root: PathBuf::default(),
root: PathBuf::from("database"), root: PathBuf::from("database"),
database: DatabaseConfig { database: DatabaseConfig {
connection: Some(String::from(DEFAULT_DATABASE_CONNECTION_ENV)), connection: Some(String::from(DEFAULT_DATABASE_CONNECTION_ENV)),
..Default::default()
}, },
} }
} }
@ -91,20 +117,6 @@ impl Config {
.build() .build()
} }
pub fn database_connection_string(&self) -> crate::error::Result<String> {
let connection = self
.database
.connection
.clone()
.unwrap_or_else(|| String::from(DEFAULT_DATABASE_CONNECTION_ENV));
if let Some(connection_env) = connection.strip_prefix("$") {
env::var(connection_env)
.map_err(|e| Error::new(ErrorKind::MissedEnvVar(connection_env.to_string()), e))
} else {
Ok(connection)
}
}
pub fn migration_dir_path(&self) -> PathBuf { pub fn migration_dir_path(&self) -> PathBuf {
PathBuilder::from(&self.directory_path()) PathBuilder::from(&self.directory_path())
.append("migrations") .append("migrations")

View file

@ -14,21 +14,18 @@ pub trait TryFromSql<QueryResultRow>: Sized {
fn try_from_sql(row: QueryResultRow) -> StdResult<Self>; fn try_from_sql(row: QueryResultRow) -> StdResult<Self>;
} }
pub trait DatabaseConnection: Sized { pub trait OpenDatabaseConnection: Sized {
type QueryResultRow;
type QueryResult;
fn open(connection_string: &str) -> StdResult<Self>; fn open(connection_string: &str) -> StdResult<Self>;
}
pub trait DatabaseConnection {
fn batch_execute(&mut self, query: &str) -> StdResult<()>; fn batch_execute(&mut self, query: &str) -> StdResult<()>;
fn execute<'b>(&mut self, query: &str, params: &'b [&'b dyn ToSql]) -> StdResult<u64>; fn execute<'b>(&mut self, query: &str, params: &'b [&'b dyn ToSql]) -> StdResult<u64>;
fn query<'b, OutputItem>( fn query<'b>(
&mut self, &mut self,
query: &str, query: &str,
params: &'b [&'b dyn ToSql], params: &'b [&'b dyn ToSql],
) -> StdResult<Vec<OutputItem>> ) -> StdResult<Vec<Vec<String>>>;
where
OutputItem: ?Sized + TryFromSql<Self::QueryResultRow>;
} }

View file

@ -1,3 +1,32 @@
mod postgres; mod postgres;
pub use self::postgres::*; pub use self::postgres::*;
use crate::config::{DatabaseConfig, SupportedDatabaseClient};
use crate::database::{DatabaseConnection, OpenDatabaseConnection};
use crate::error::StdResult;
pub(crate) struct DatabaseConnectionManager {
config: DatabaseConfig,
}
impl DatabaseConnectionManager {
pub fn new(config: &DatabaseConfig) -> Self {
Self {
config: config.clone(),
}
}
pub fn connect_with_string(&self, connection_string: &str) -> StdResult<Box<dyn DatabaseConnection>> {
let conn = match self.config.client()? {
SupportedDatabaseClient::Postgres => PostgresConnection::open(&connection_string)?,
};
Ok(Box::new(conn))
}
pub fn connect(&self) -> StdResult<Box<dyn DatabaseConnection>> {
let connection_string = self.config.connection_string()?;
self.connect_with_string(&connection_string)
}
}

View file

@ -1,32 +1,19 @@
use crate::database::{DatabaseConnection, OpenDatabaseConnection, ToSql};
use crate::migration::{MigrationNames, MigrationManager, is_migrations_table_not_found};
use postgres::{Client, NoTls};
use crate::config::Config;
use std::convert::TryFrom;
use crate::error::StdResult; use crate::error::StdResult;
use crate::database::{TryFromSql, ToSql, DatabaseConnection}; use postgres::{Client, NoTls};
pub struct PostgresConnection { pub struct PostgresConnection {
client: Client, client: Client,
} }
impl TryFrom<&Config> for PostgresConnection { impl OpenDatabaseConnection for PostgresConnection {
type Error = Box<dyn std::error::Error>;
fn try_from(config: &Config) -> Result<Self, Self::Error> {
PostgresConnection::open(&config.database_connection_string()?)
}
}
impl DatabaseConnection for PostgresConnection {
type QueryResultRow = postgres::Row;
type QueryResult = Vec<Self::QueryResultRow>;
fn open(connection_string: &str) -> StdResult<Self> { fn open(connection_string: &str) -> StdResult<Self> {
let client = Client::connect(connection_string, NoTls)?; let client = Client::connect(connection_string, NoTls)?;
Ok(PostgresConnection { client }) Ok(PostgresConnection { client })
} }
}
impl DatabaseConnection for PostgresConnection {
fn batch_execute(&mut self, query: &str) -> StdResult<()> { fn batch_execute(&mut self, query: &str) -> StdResult<()> {
self.client.batch_execute(query)?; self.client.batch_execute(query)?;
Ok(()) Ok(())
@ -44,14 +31,11 @@ impl DatabaseConnection for PostgresConnection {
Ok(res) Ok(res)
} }
fn query<'b, OutputItem>( fn query<'b>(
&mut self, &mut self,
query: &str, query: &str,
params: &'b [&'b dyn ToSql], params: &'b [&'b dyn ToSql],
) -> StdResult<Vec<OutputItem>> ) -> StdResult<Vec<Vec<String>>> {
where
OutputItem: ?Sized + TryFromSql<Self::QueryResultRow>,
{
let stmt = params let stmt = params
.iter() .iter()
.enumerate() .enumerate()
@ -59,37 +43,16 @@ impl DatabaseConnection for PostgresConnection {
str::replace(&acc, &format!("${}", i), &p.to_sql()) str::replace(&acc, &format!("${}", i), &p.to_sql())
}); });
let res: Self::QueryResult = self.client.query(stmt.as_str(), &[])?; let res = self.client.query(stmt.as_str(), &[])?;
let res = res let res = res
.into_iter() .into_iter()
.map(OutputItem::try_from_sql) .map(|row| {
.collect::<Result<Vec<OutputItem>, _>>()?; let column: String = row.get(0);
vec![column]
})
.collect::<Vec<_>>();
Ok(res) Ok(res)
} }
} }
impl MigrationNames for MigrationManager<PostgresConnection> {
fn applied_migration_names(&mut self) -> StdResult<Vec<String>> {
let res = self
.conn
.query(Self::APPLIED_MIGRATIONS_STMT, &[])
.or_else(|e| {
if is_migrations_table_not_found(&e) {
Ok(Vec::new())
} else {
Err(e)
}
})?;
Ok(res.into_iter().collect())
}
}
impl TryFromSql<postgres::Row> for String {
fn try_from_sql(row: postgres::Row) -> StdResult<Self> {
let res: String = row.get(0);
Ok(res)
}
}

View file

@ -1,6 +1,9 @@
use crate::config::Config;
use crate::database::DatabaseConnection; use crate::database::DatabaseConnection;
use crate::databases::DatabaseConnectionManager;
use crate::path::PathBuilder; use crate::path::PathBuilder;
use crate::StdResult; use crate::StdResult;
use std::convert::TryFrom;
use std::fs; use std::fs;
use std::path::PathBuf; use std::path::PathBuf;
@ -50,16 +53,26 @@ impl Migration {
} }
} }
pub struct MigrationManager<Conn: DatabaseConnection> { pub struct MigrationManager {
pub(crate) conn: Conn, pub(crate) conn: Box<dyn DatabaseConnection>,
} }
impl<Conn: DatabaseConnection> MigrationManager<Conn> { impl MigrationManager {
pub fn new(conn: Conn) -> Self { pub fn new(conn: Box<dyn DatabaseConnection>) -> Self {
MigrationManager { conn } MigrationManager { conn }
} }
} }
impl TryFrom<&Config> for MigrationManager {
type Error = Box<dyn std::error::Error>;
fn try_from(config: &Config) -> Result<Self, Self::Error> {
let connection_manager = DatabaseConnectionManager::new(&config.database);
let conn = connection_manager.connect()?;
Ok(Self { conn })
}
}
pub fn is_migrations_table_not_found<D: std::fmt::Display>(error: D) -> bool { pub fn is_migrations_table_not_found<D: std::fmt::Display>(error: D) -> bool {
error error
.to_string() .to_string()
@ -75,6 +88,8 @@ pub trait DatabaseMigrationManager {
fn delete_migration_info(&mut self, name: &str) -> StdResult<u64>; fn delete_migration_info(&mut self, name: &str) -> StdResult<u64>;
fn applied_migration_names(&mut self) -> StdResult<Vec<String>>;
fn upgrade(&mut self, migration: &Migration) -> StdResult<()> { fn upgrade(&mut self, migration: &Migration) -> StdResult<()> {
let content = migration.upgrade_sql_content()?; let content = migration.upgrade_sql_content()?;
@ -95,10 +110,7 @@ pub trait DatabaseMigrationManager {
} }
} }
impl<Conn> DatabaseMigrationManager for MigrationManager<Conn> impl DatabaseMigrationManager for MigrationManager {
where
Conn: DatabaseConnection,
{
fn apply_sql(&mut self, sql_content: &str) -> StdResult<()> { fn apply_sql(&mut self, sql_content: &str) -> StdResult<()> {
self.conn.batch_execute(sql_content) self.conn.batch_execute(sql_content)
} }
@ -121,12 +133,22 @@ where
self.conn self.conn
.execute("DELETE FROM migrations WHERE name = $1", &[&name]) .execute("DELETE FROM migrations WHERE name = $1", &[&name])
} }
fn applied_migration_names(&mut self) -> StdResult<Vec<String>> {
let res = self
.conn
.query("SELECT name FROM migrations ORDER BY id DESC", &[])
.map(|row| row.first().unwrap().clone())
.or_else(|e| {
if is_migrations_table_not_found(&e) {
Ok(Vec::new())
} else {
Err(e)
} }
})?;
pub trait MigrationNames { Ok(res.into_iter().collect())
const APPLIED_MIGRATIONS_STMT: &'static str = "SELECT name FROM migrations ORDER BY id DESC"; }
fn applied_migration_names(&mut self) -> StdResult<Vec<String>>;
} }
pub fn filter_pending_migrations( pub fn filter_pending_migrations(