feat: add transactions

Closes #1
This commit is contained in:
Dmitriy Pleshevskiy 2021-10-17 15:08:46 +03:00
parent b4cbeaf444
commit 18eaee9b16
23 changed files with 293 additions and 46 deletions

3
.gitignore vendored
View file

@ -1 +1,2 @@
target/ target/
.env

3
.vscode/settings.json vendored Normal file
View file

@ -0,0 +1,3 @@
{
"rust.unstable_features": true
}

2
Cargo.lock generated
View file

@ -493,7 +493,7 @@ checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]] [[package]]
name = "ood_persistence" name = "ood_persistence"
version = "0.1.1" version = "0.2.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bb8", "bb8",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ood_persistence" name = "ood_persistence"
version = "0.1.1" version = "0.2.0"
edition = "2018" edition = "2018"
authors = ["Dmitriy Pleshevskiy <dmitriy@ideascup.me>"] authors = ["Dmitriy Pleshevskiy <dmitriy@ideascup.me>"]
repository = "https://github.com/pleshevskiy/ood_persistence" repository = "https://github.com/pleshevskiy/ood_persistence"
@ -11,6 +11,8 @@ license = "MIT OR Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
nightly = []
async = ["async-trait"] async = ["async-trait"]
sync = [] sync = []

View file

@ -20,6 +20,9 @@ authors = ["Me <user@rust-lang.org>"]
ood_persistence = { version = "0", features = ["bb8_postgres"] } ood_persistence = { version = "0", features = ["bb8_postgres"] }
``` ```
In stable rust channel you can use only connection interface, but if you use nightly channel, add an additional
"nightly" feature to your `Cargo.toml` and you can use transactions as well.
## Usage ## Usage
See examples directory. See examples directory.

View file

@ -15,7 +15,7 @@ dotenv = { version = "0.15", optional = true }
async-trait = "0.1" async-trait = "0.1"
# database # database
ood_persistence = { path = "../../", features = ["bb8_postgres"] } ood_persistence = { path = "../../", features = ["nightly", "bb8_postgres"] }
postgres-types = { version = "0.2", features = ["derive"] } postgres-types = { version = "0.2", features = ["derive"] }
# runtime # runtime

View file

@ -0,0 +1,5 @@
[tasks.dev]
command = "cargo"
workspace = false
args = ["run", "--features", "dev"]
watch = { watch = ["src", "Cargo.toml", '.env'] }

47
examples/web/README.md Normal file
View file

@ -0,0 +1,47 @@
# Web example
Simple rest api example with hyper, bb8, postgres
## Deps
For this example you need to install [docker] with [docker-compose], [nightly rust]. Follow the instructions on the official sites.
[docker]: https://docs.docker.com/get-docker/
[docker-compose]: https://docs.docker.com/compose/install/
[nightly rust]: https://www.rust-lang.org/tools/install
## Running
Move to the example directory
```sh
cd examples/web
```
Run configuration for docker-compose
```sh
docker-compose -f docker-compose.dev.yml up
```
Or run postgres server manually.
Then copy `.env.example` to `.env` and edit if you needed.
```sh
cp .env.example .env
```
Now you can run server
```sh
cargo run --features dev
```
Or if you have a [cargo make]
```sh
cargo make dev
```
[cargo make]: https://github.com/sagiegurari/cargo-make

View file

@ -12,6 +12,11 @@ pub fn create_postgres_list_controller(
} }
} }
#[derive(Debug, Deserialize)]
pub struct AddListInput {
name: String,
}
pub struct ListController<P> pub struct ListController<P>
where where
P: PersistencePool, P: PersistencePool,
@ -29,4 +34,8 @@ where
_ => Ok(None), _ => Ok(None),
} }
} }
pub async fn add_list(&self, input: AddListInput) -> ApiResult<List> {
self.list_service.add_list(&input.name).await
}
} }

View file

@ -1,7 +1,9 @@
use super::storage_type::ListStorage; use super::storage_type::ListStorage;
use super::{List, ListId}; use super::{List, ListId};
use crate::db::list::storage::PostgresListStorage; use crate::db::list::storage::PostgresListStorage;
use crate::db::persistence::{PersistencePool, PostgresPersistence}; use crate::db::persistence::{
ConnectionClient, PersistencePool, PostgresPersistence, TransactionClient,
};
use crate::error::ApiResult; use crate::error::ApiResult;
pub fn create_postgres_list_service( pub fn create_postgres_list_service(
@ -30,4 +32,13 @@ where
let list = self.list_storage.get_list_opt(&mut conn, list_id).await?; let list = self.list_storage.get_list_opt(&mut conn, list_id).await?;
Ok(list) Ok(list)
} }
pub async fn add_list(&self, name: &str) -> ApiResult<List> {
let mut conn = self.persistence.get_connection().await?;
let mut trx = conn.start_transaction().await?;
let list = self.list_storage.add_list(&mut trx, name).await?;
trx.commit().await?;
Ok(list)
}
} }

View file

@ -7,4 +7,6 @@ where
Conn: ConnectionClient, Conn: ConnectionClient,
{ {
async fn get_list_opt(&self, conn: &mut Conn, id: ListId) -> QueryResult<Option<List>>; async fn get_list_opt(&self, conn: &mut Conn, id: ListId) -> QueryResult<Option<List>>;
async fn add_list(&self, conn: &mut Conn::Trx<'_>, name: &str) -> QueryResult<List>;
} }

View file

@ -1,16 +1,18 @@
use super::DbList; use super::DbList;
use crate::app::list::storage_type::ListStorage; use crate::app::list::storage_type::ListStorage;
use crate::app::list::{List, ListId}; use crate::app::list::{List, ListId};
use crate::db::persistence::{try_get_one, ConnectionClient, PostgresConnection, QueryResult}; use crate::db::persistence::{
try_get_one, ConnectionClient, PostgresConnection, PostgresTransaction, QueryResult,
};
use postgres_types::Type; use postgres_types::Type;
pub struct PostgresListStorage {} pub struct PostgresListStorage {}
#[async_trait] #[async_trait]
impl<'c> ListStorage<PostgresConnection<'c>> for PostgresListStorage { impl<'p> ListStorage<PostgresConnection<'p>> for PostgresListStorage {
async fn get_list_opt( async fn get_list_opt(
&self, &self,
conn: &mut PostgresConnection<'c>, conn: &mut PostgresConnection<'p>,
list_id: ListId, list_id: ListId,
) -> QueryResult<Option<List>> { ) -> QueryResult<Option<List>> {
let inner_conn = conn.inner(); let inner_conn = conn.inner();
@ -26,4 +28,21 @@ impl<'c> ListStorage<PostgresConnection<'c>> for PostgresListStorage {
.transpose() .transpose()
.map_err(From::from) .map_err(From::from)
} }
async fn add_list(&self, conn: &mut PostgresTransaction<'_>, name: &str) -> QueryResult<List> {
let inner_conn = conn.inner();
let stmt = inner_conn
.prepare_typed(
"insert into lists as l (name) values ($1) returning l",
&[Type::TEXT],
)
.await?;
inner_conn
.query_one(&stmt, &[&name])
.await
.and_then(try_get_one::<DbList, _>)
.map_err(From::from)
}
} }

View file

@ -2,10 +2,10 @@ use crate::config;
use ood_persistence::bb8_postgres::{tokio_postgres, NoTlsManager}; use ood_persistence::bb8_postgres::{tokio_postgres, NoTlsManager};
pub use ood_persistence::bb8_postgres::{ pub use ood_persistence::bb8_postgres::{
NoTlsConnection as PostgresConnection, NoTlsPersistence as PostgresPersistence, NoTlsConnection as PostgresConnection, NoTlsPersistence as PostgresPersistence,
NoTlsPool as PostgresPool, NoTlsPool as PostgresPool, Transaction as PostgresTransaction,
}; };
pub use ood_persistence::{ pub use ood_persistence::{
asyn::{ConnectionClient, PersistencePool}, asyn::{ConnectionClient, PersistencePool, TransactionClient},
error::Result as QueryResult, error::Result as QueryResult,
}; };

View file

@ -1,4 +1,4 @@
#![allow(dead_code)] #![deny(clippy::all)]
#[macro_use] #[macro_use]
extern crate postgres_types; extern crate postgres_types;

View file

@ -1,15 +1,19 @@
use crate::app::list::controller::create_postgres_list_controller; use crate::app::list::controller::create_postgres_list_controller;
use crate::rest::routes::*; use crate::rest::routes::*;
use crate::rest::server_utils::{create_not_found_err_json_response, create_ok_json_response}; use crate::rest::server_utils::{
create_not_found_err_json_response, create_ok_json_response, deserialize_request_body,
};
pub enum Router { pub enum Router {
GetListById(String), GetListById(String),
AddList,
} }
impl MaybeFrom<RouteParts<'_>> for Router { impl MaybeFrom<RouteParts<'_>> for Router {
fn maybe_from((method, uri_path_parts): RouteParts<'_>) -> Option<Self> { fn maybe_from((method, uri_path_parts): RouteParts<'_>) -> Option<Self> {
match (method, uri_path_parts) { match (method, uri_path_parts) {
(&Method::GET, [list_id]) => Some(Self::GetListById(list_id.to_string())), (&Method::GET, [list_id]) => Some(Self::GetListById(list_id.to_string())),
(&Method::POST, []) => Some(Self::AddList),
_ => None, _ => None,
} }
} }
@ -27,6 +31,11 @@ impl Resolver for Router {
None => create_not_found_err_json_response("List not found"), None => create_not_found_err_json_response("List not found"),
} }
} }
Self::AddList => {
let input = deserialize_request_body(vars.body).await?;
let res = controller.add_list(input).await?;
create_ok_json_response(res)
}
} }
} }
} }

View file

@ -1,8 +1,7 @@
use crate::error::StdResult;
use crate::rest::prelude::*; use crate::rest::prelude::*;
use serde::{de, ser}; use serde::{de, ser};
pub async fn deserialize_request_body<T>(req_body: Body) -> StdResult<T> pub async fn deserialize_request_body<T>(req_body: Body) -> ApiResult<T>
where where
T: de::DeserializeOwned, T: de::DeserializeOwned,
{ {

View file

@ -9,8 +9,8 @@ pub type QueryParams<'a> = HashMap<&'a str, &'a str>;
#[derive(Debug)] #[derive(Debug)]
pub struct ReqVariables<'params> { pub struct ReqVariables<'params> {
body: Body, pub body: Body,
query_params: QueryParams<'params>, pub query_params: QueryParams<'params>,
} }
impl<'params> ReqVariables<'params> { impl<'params> ReqVariables<'params> {

2
rust-toolchain.toml Normal file
View file

@ -0,0 +1,2 @@
[toolchain]
channel = "nightly"

View file

@ -7,8 +7,23 @@ pub trait PersistencePool: Send + Sync {
async fn get_connection(&self) -> error::Result<Self::Conn>; async fn get_connection(&self) -> error::Result<Self::Conn>;
} }
#[cfg_attr(feature = "nightly", async_trait)]
pub trait ConnectionClient { pub trait ConnectionClient {
type InnerConn; type InnerConn;
#[cfg(feature = "nightly")]
type Trx<'t>: TransactionClient;
fn inner(&mut self) -> &mut Self::InnerConn; fn inner(&mut self) -> &mut Self::InnerConn;
#[cfg(feature = "nightly")]
async fn start_transaction(&mut self) -> error::Result<Self::Trx<'_>>;
}
#[cfg(feature = "nightly")]
#[async_trait]
pub trait TransactionClient: ConnectionClient {
async fn commit(self) -> error::Result<()>;
async fn rollback(self) -> error::Result<()>;
} }

View file

@ -1,3 +1,5 @@
#[cfg(feature = "nightly")]
use crate::asyn::TransactionClient;
use crate::asyn::{ConnectionClient, PersistencePool}; use crate::asyn::{ConnectionClient, PersistencePool};
use crate::error; use crate::error;
@ -5,13 +7,15 @@ pub use bb8::{Pool, PooledConnection};
pub use bb8_postgres::tokio_postgres; pub use bb8_postgres::tokio_postgres;
pub use bb8_postgres::PostgresConnectionManager as Manager; pub use bb8_postgres::PostgresConnectionManager as Manager;
pub type InnerConn<'p, M> = PooledConnection<'p, M>;
pub type InnerTrx<'p> = tokio_postgres::Transaction<'p>;
pub type NoTlsManager = Manager<tokio_postgres::NoTls>; pub type NoTlsManager = Manager<tokio_postgres::NoTls>;
pub type NoTlsPersistence<'p> = Persistence<'p, NoTlsManager>; pub type NoTlsPersistence<'p> = Persistence<'p, NoTlsManager>;
pub type NoTlsConnection<'p> = Connection<'p, NoTlsManager>; pub type NoTlsConnection<'p> = Connection<'p, NoTlsManager>;
pub type NoTlsInnerConn<'p> = InnerConn<'p, NoTlsManager>;
pub type NoTlsPool = Pool<NoTlsManager>; pub type NoTlsPool = Pool<NoTlsManager>;
pub type InnerConn<'p, M> = PooledConnection<'p, M>;
pub fn new<M>(pool: &Pool<M>) -> Persistence<M> pub fn new<M>(pool: &Pool<M>) -> Persistence<M>
where where
M: bb8::ManageConnection, M: bb8::ManageConnection,
@ -20,14 +24,13 @@ where
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Persistence<'p, M: bb8::ManageConnection>(&'p Pool<M>); pub struct Persistence<'p, M>(&'p Pool<M>)
where
M: bb8::ManageConnection;
#[async_trait] #[async_trait]
impl<'p, M> PersistencePool for Persistence<'p, M> impl<'p> PersistencePool for NoTlsPersistence<'p> {
where type Conn = NoTlsConnection<'p>;
M: bb8::ManageConnection + Send + Sync,
{
type Conn = Connection<'p, M>;
async fn get_connection(&self) -> error::Result<Self::Conn> { async fn get_connection(&self) -> error::Result<Self::Conn> {
self.0 self.0
@ -38,15 +41,68 @@ where
} }
} }
pub struct Connection<'p, M: bb8::ManageConnection>(InnerConn<'p, M>); pub struct Connection<'p, M>(InnerConn<'p, M>)
impl<'c, M> ConnectionClient for Connection<'c, M>
where where
M: bb8::ManageConnection, M: bb8::ManageConnection;
{
type InnerConn = InnerConn<'c, M>; #[cfg_attr(feature = "nightly", async_trait)]
impl<'me> ConnectionClient for NoTlsConnection<'me> {
type InnerConn = NoTlsInnerConn<'me>;
#[cfg(feature = "nightly")]
type Trx<'t> = Transaction<'t>;
fn inner(&mut self) -> &mut Self::InnerConn { fn inner(&mut self) -> &mut Self::InnerConn {
&mut self.0 &mut self.0
} }
#[cfg(feature = "nightly")]
async fn start_transaction(&mut self) -> error::Result<Self::Trx<'_>> {
self.0
.transaction()
.await
.map_err(|_| error::PersistenceError::UpgradeToTransaction)
.map(Transaction)
}
}
#[cfg(feature = "nightly")]
pub struct Transaction<'p>(InnerTrx<'p>);
#[cfg(feature = "nightly")]
#[async_trait]
impl<'me> ConnectionClient for Transaction<'me> {
type InnerConn = InnerTrx<'me>;
type Trx<'t> = Transaction<'t>;
fn inner(&mut self) -> &mut Self::InnerConn {
&mut self.0
}
async fn start_transaction(&mut self) -> error::Result<Self::Trx<'_>> {
self.0
.transaction()
.await
.map_err(|_| error::PersistenceError::UpgradeToTransaction)
.map(Transaction)
}
}
#[cfg(feature = "nightly")]
#[async_trait]
impl<'me> TransactionClient for Transaction<'me> {
async fn commit(self) -> error::Result<()> {
self.0
.commit()
.await
.map_err(|_| error::PersistenceError::CommitTransaction)
}
async fn rollback(self) -> error::Result<()> {
self.0
.rollback()
.await
.map_err(|_| error::PersistenceError::RollbackTransaction)
}
} }

View file

@ -1,3 +1,6 @@
#![deny(clippy::all)]
#![cfg_attr(feature = "nightly", feature(generic_associated_types))]
#[cfg(feature = "async")] #[cfg(feature = "async")]
#[macro_use] #[macro_use]
extern crate async_trait; extern crate async_trait;

View file

@ -1,17 +1,21 @@
use crate::error; use crate::error;
#[cfg(feature = "nightly")]
use crate::syn::TransactionClient;
use crate::syn::{ConnectionClient, PersistencePool}; use crate::syn::{ConnectionClient, PersistencePool};
pub use r2d2::{Pool, PooledConnection}; pub use r2d2::{Pool, PooledConnection};
pub use r2d2_postgres::postgres; pub use r2d2_postgres::postgres;
pub use r2d2_postgres::PostgresConnectionManager as Manager; pub use r2d2_postgres::PostgresConnectionManager as Manager;
pub type InnerConn<M> = PooledConnection<M>;
pub type InnerTrx<'t> = postgres::Transaction<'t>;
pub type NoTlsManager = Manager<postgres::NoTls>; pub type NoTlsManager = Manager<postgres::NoTls>;
pub type NoTlsPersistence<'p> = Persistence<'p, NoTlsManager>; pub type NoTlsPersistence<'p> = Persistence<'p, NoTlsManager>;
pub type NoTlsConnection<'p> = Connection<NoTlsManager>; pub type NoTlsConnection = Connection<NoTlsManager>;
pub type NoTlsInnerConn = InnerConn<NoTlsManager>;
pub type NoTlsPool = Pool<NoTlsManager>; pub type NoTlsPool = Pool<NoTlsManager>;
pub type InnerConn<M> = PooledConnection<M>;
pub fn new<M>(pool: &Pool<M>) -> Persistence<M> pub fn new<M>(pool: &Pool<M>) -> Persistence<M>
where where
M: r2d2::ManageConnection, M: r2d2::ManageConnection,
@ -20,14 +24,12 @@ where
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Persistence<'p, M: r2d2::ManageConnection>(&'p Pool<M>); pub struct Persistence<'p, M>(&'p Pool<M>)
#[async_trait]
impl<'p, M> PersistencePool for Persistence<'p, M>
where where
M: r2d2::ManageConnection + Send + Sync, M: r2d2::ManageConnection;
{
type Conn = Connection<M>; impl<'p> PersistencePool for NoTlsPersistence<'p> {
type Conn = NoTlsConnection;
fn get_connection(&self) -> error::Result<Self::Conn> { fn get_connection(&self) -> error::Result<Self::Conn> {
self.0 self.0
@ -37,15 +39,61 @@ where
} }
} }
pub struct Connection<M: r2d2::ManageConnection>(InnerConn<M>); pub struct Connection<M>(InnerConn<M>)
impl<M> ConnectionClient for Connection<M>
where where
M: r2d2::ManageConnection, M: r2d2::ManageConnection;
{
type InnerConn = InnerConn<M>; impl ConnectionClient for NoTlsConnection {
type InnerConn = NoTlsInnerConn;
#[cfg(feature = "nightly")]
type Trx<'t> = Transaction<'t>;
fn inner(&mut self) -> &mut Self::InnerConn { fn inner(&mut self) -> &mut Self::InnerConn {
&mut self.0 &mut self.0
} }
#[cfg(feature = "nightly")]
fn start_transaction(&mut self) -> error::Result<Self::Trx<'_>> {
self.0
.transaction()
.map_err(|_| error::PersistenceError::UpgradeToTransaction)
.map(Transaction)
}
}
#[cfg(feature = "nightly")]
pub struct Transaction<'me>(InnerTrx<'me>);
#[cfg(feature = "nightly")]
impl<'me> ConnectionClient for Transaction<'me> {
type InnerConn = InnerTrx<'me>;
type Trx<'t> = Transaction<'t>;
fn inner(&mut self) -> &mut Self::InnerConn {
&mut self.0
}
fn start_transaction(&mut self) -> error::Result<Self::Trx<'_>> {
self.0
.transaction()
.map_err(|_| error::PersistenceError::UpgradeToTransaction)
.map(Transaction)
}
}
#[cfg(feature = "nightly")]
impl TransactionClient for Transaction<'_> {
fn commit(self) -> error::Result<()> {
self.0
.commit()
.map_err(|_| error::PersistenceError::CommitTransaction)
}
fn rollback(self) -> error::Result<()> {
self.0
.rollback()
.map_err(|_| error::PersistenceError::RollbackTransaction)
}
} }

View file

@ -9,5 +9,18 @@ pub trait PersistencePool {
pub trait ConnectionClient { pub trait ConnectionClient {
type InnerConn; type InnerConn;
#[cfg(feature = "nightly")]
type Trx<'t>: TransactionClient;
fn inner(&mut self) -> &mut Self::InnerConn; fn inner(&mut self) -> &mut Self::InnerConn;
#[cfg(feature = "nightly")]
fn start_transaction(&mut self) -> error::Result<Self::Trx<'_>>;
}
#[cfg(feature = "nightly")]
pub trait TransactionClient: ConnectionClient {
fn commit(self) -> error::Result<()>;
fn rollback(self) -> error::Result<()>;
} }