initial commit

This commit is contained in:
Dmitriy Pleshevskiy 2020-07-18 11:01:51 +03:00
parent 067934fd56
commit c1b73d15fb
13 changed files with 728 additions and 0 deletions

7
.gitignore vendored Normal file
View file

@ -0,0 +1,7 @@
.idea/
.vscode/
.DS_Store
/target
Cargo.lock

18
Cargo.toml Normal file
View file

@ -0,0 +1,18 @@
[package]
name = "sonic-channel"
version = "0.1.0-rc1"
authors = ["Dmitriy Pleshevskiy <dmitriy@ideascup.me>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
lazy_static = "1.4.0"
regex = "1.3.4"
[features]
default = ["search"]
ingest = []
search = []
control = []

114
README.md Normal file
View file

@ -0,0 +1,114 @@
# Sonic Channel
Rust client for [sonic] search backend.
We recommend you start with the [documentation].
## Installation
Add `sonic-channel = { version = "0.1" }` as a dependency in `Cargo.toml`.
`Cargo.toml` example:
```toml
[package]
name = "my-crate"
version = "0.1.0"
authors = ["Me <user@rust-lang.org>"]
[dependencies]
sonic-channel = { version = "0.1" }
```
## Example usage
```rust
use std::itconfig;
use std::env;
//use dotenv::dotenv;
config! {
DEBUG: bool => false,
#[env_name = "APP_HOST"]
HOST: String => "127.0.0.1",
DATABASE_URL < (
"postgres://",
POSTGRES_USERNAME => "user",
":",
POSTGRES_PASSWORD => "pass",
"@",
POSTGRES_HOST => "localhost:5432",
"/",
POSTGRES_DB => "test",
),
APP {
static BASE_URL => "/api", // &'static str by default
ARTICLE {
static PER_PAGE: u32 => 15,
}
#[cfg(feature = "companies")]
COMPANY {
#[env_name = "INSTITUTIONS_PER_PAGE"]
static PER_PAGE: u32 => 15,
}
}
FEATURE {
NEW_MENU: bool => false,
COMPANY {
PROFILE: bool => false,
}
}
}
fn main () {
// dotenv().expect("dotenv setup to be successful");
// or
env::set_var("FEATURE_NEW_MENU", "t");
config::init();
assert_eq!(config::HOST(), String::from("127.0.0.1"));
assert_eq!(config::DATABASE_URL(), String::from("postgres://user:pass@localhost:5432/test"));
assert_eq!(config::APP:ARTICLE:PER_PAGE(), 15);
assert_eq!(config::FEATURE::NEW_MENU(), true);
}
```
Macro is an optional feature, disabled by default. You can use this library without macro
```rust
use itconfig::*;
use std::env;
// use dotenv::dotenv;
fn main() {
// dotenv().expect("dotenv setup to be successful");
// or
env::set_var("DATABASE_URL", "postgres://127.0.0.1:5432/test");
let database_url = get_env::<String>("DATABASE_URL").unwrap();
let new_profile: bool = get_env_or_default("FEATURE_NEW_PROFILE", false);
let articles_per_page: u32 = get_env_or_set_default("ARTICLES_PER_PAGE", 10);
}
```
## Available features
* **default** - ["search"]
* **search** - Add sonic search mode with methods
* **ignite** - Add sonic ignite mode with methods
* **control** - Add sonic control mode with methods
[sonic]: https://github.com/valeriansaliou/sonic
[documentation]: https://docs.rs/sonic-channel

214
src/channel.rs Normal file
View file

@ -0,0 +1,214 @@
use crate::commands::*;
use crate::errors::SonicError;
use std::fmt;
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::net::{TcpStream, ToSocketAddrs};
const DEFAULT_SONIC_PROTOCOL_VERSION: usize = 1;
const MAX_LINE_BUFFER_SIZE: usize = 20000;
const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200;
const BUFFER_LINE_SEPARATOR: u8 = '\n' as u8;
macro_rules! init_commands {
(
$(
use $cmd_name:ident
for fn $fn_name:ident
$(<$($lt:lifetime)+>)?
($($args:tt)*)
;
)*
) => {
$(init_commands!(use $cmd_name for fn $fn_name $(<$($lt)+>)? ($($args)*));)*
};
(use $cmd_name:ident for fn $fn_name:ident $(<$($lt:lifetime)+>)? ($($arg_name:ident : $arg_type:ty,)*)) => {
pub fn $fn_name $(<$($lt)+>)? (
&self,
$($arg_name: $arg_type),*
) -> Result<
<$cmd_name as crate::commands::StreamCommand>::Response,
crate::errors::SonicError
> {
let command = $cmd_name { $($arg_name,)* ..Default::default() };
self.run_command(command)
}
};
}
#[derive(Debug, Clone, Copy)]
pub enum ChannelMode {
#[cfg(feature = "search")]
Search,
#[cfg(feature = "ingest")]
Ingest,
#[cfg(feature = "control")]
Control,
}
impl ChannelMode {
pub fn to_str(&self) -> &str {
#[cfg(any(feature = "ingest", feature = "search", feature = "control"))]
match self {
#[cfg(feature = "search")]
ChannelMode::Search => "search",
#[cfg(feature = "ingest")]
ChannelMode::Ingest => "ingest",
#[cfg(feature = "control")]
ChannelMode::Control => "control",
}
// Actually we'll not see this text because we cannot call this function for enum
// without enum value, but Rust compiler want this case.
#[cfg(all(
not(feature = "ingest"),
not(feature = "search"),
not(feature = "control")
))]
"unitialized"
}
}
impl fmt::Display for ChannelMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "{}", self.to_str())
}
}
#[derive(Debug)]
pub struct SonicChannel {
stream: TcpStream,
mode: Option<ChannelMode>, // None Uninitialized mode
max_buffer_size: usize,
protocol_version: usize,
}
impl SonicChannel {
pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self, SonicError> {
let stream = TcpStream::connect(addr).map_err(|_| SonicError::ConnectToServer)?;
let channel = SonicChannel {
stream,
mode: None,
max_buffer_size: UNINITIALIZED_MODE_MAX_BUFFER_SIZE,
protocol_version: DEFAULT_SONIC_PROTOCOL_VERSION,
};
let message = channel.read(1)?;
dbg!(&message);
// TODO: need to add support for versions
if message.starts_with("CONNECTED") {
Ok(channel)
} else {
Err(SonicError::ConnectToServer)
}
}
fn write<SC: StreamCommand>(&self, command: &SC) -> Result<(), SonicError> {
let mut writer = BufWriter::with_capacity(self.max_buffer_size, &self.stream);
let message = command.message();
dbg!(&message);
writer
.write_all(message.as_bytes())
.map_err(|_| SonicError::WriteToStream)?;
Ok(())
}
fn read(&self, max_read_lines: usize) -> Result<String, SonicError> {
let mut reader = BufReader::with_capacity(self.max_buffer_size, &self.stream);
let mut message = String::new();
let mut lines_read = 0;
while lines_read < max_read_lines {
reader
.read_line(&mut message)
.map_err(|_| SonicError::ReadStream)?;
lines_read += 1;
}
Ok(message)
}
pub fn run_command<SC: StreamCommand>(&self, command: SC) -> Result<SC::Response, SonicError> {
self.write(&command)?;
let message = self.read(SC::READ_LINES_COUNT)?;
command.receive(message)
}
#[cfg(any(feature = "ingest", feature = "search", feature = "control"))]
pub fn start<S: ToString>(&mut self, mode: ChannelMode, password: S) -> Result<(), SonicError> {
if self.mode.is_some() {
return Err(SonicError::RunCommand);
}
let command = StartCommand {
mode,
password: password.to_string(),
};
let response = self.run_command(command)?;
self.max_buffer_size = response.max_buffer_size;
self.protocol_version = response.protocol_version;
self.mode = Some(response.mode);
Ok(())
}
init_commands! {
use QuitCommand for fn quit();
}
#[cfg(any(feature = "ingest", feature = "search", feature = "control"))]
init_commands! {
use PingCommand for fn ping();
}
#[cfg(feature = "ingest")]
init_commands! {
use PushCommand for fn push<'a>(
collection: &'a str,
bucket: &'a str,
object: &'a str,
text: &'a str,
);
use PushCommand for fn push_with_locale<'a>(
collection: &'a str,
bucket: &'a str,
object: &'a str,
text: &'a str,
locale: Option<&'a str>,
);
}
#[cfg(feature = "search")]
init_commands! {
use QueryCommand for fn query<'a>(
collection: &'a str,
bucket: &'a str,
terms: &'a str,
);
use QueryCommand for fn query_with_limit<'a>(
collection: &'a str,
bucket: &'a str,
terms: &'a str,
limit: Option<usize>,
);
use QueryCommand for fn query_with_limit_and_offset<'a>(
collection: &'a str,
bucket: &'a str,
terms: &'a str,
limit: Option<usize>,
offset: Option<usize>,
);
}
#[cfg(feature = "control")]
init_commands! {}
}

28
src/commands/mod.rs Normal file
View file

@ -0,0 +1,28 @@
mod quit;
mod start;
mod ping;
#[cfg(feature = "ingest")]
mod push;
#[cfg(feature = "search")]
mod query;
pub use quit::QuitCommand;
pub use start::StartCommand;
pub use ping::PingCommand;
#[cfg(feature = "ingest")]
pub use push::PushCommand;
#[cfg(feature = "search")]
pub use query::QueryCommand;
use crate::errors::SonicError;
pub trait StreamCommand {
type Response;
const READ_LINES_COUNT: usize = 1;
fn message(&self) -> String;
fn receive(&self, message: String) -> Result<Self::Response, SonicError>;
}

18
src/commands/ping.rs Normal file
View file

@ -0,0 +1,18 @@
use super::StreamCommand;
use crate::errors::SonicError;
#[derive(Debug, Default)]
pub struct PingCommand;
impl StreamCommand for PingCommand {
type Response = bool;
fn message(&self) -> String {
String::from("PING\r\n")
}
fn receive(&self, message: String) -> Result<<Self as StreamCommand>::Response, SonicError> {
dbg!(&message);
Ok(message == "PONG\r\n")
}
}

32
src/commands/push.rs Normal file
View file

@ -0,0 +1,32 @@
use super::StreamCommand;
use crate::errors::SonicError;
#[derive(Debug, Default)]
pub struct PushCommand<'a> {
pub collection: &'a str,
pub bucket: &'a str,
pub object: &'a str,
pub text: &'a str,
pub locale: Option<&'a str>,
}
impl StreamCommand for PushCommand<'_> {
type Response = bool;
fn message(&self) -> String {
let mut message = format!(
r#"PUSH {} {} {} "{}""#,
self.collection, self.bucket, self.object, self.text
);
if let Some(locale) = self.locale.as_ref() {
message.push_str(&format!(" LANG({})", locale));
}
message.push_str("\r\n");
message
}
fn receive(&self, message: String) -> Result<<Self as StreamCommand>::Response, SonicError> {
dbg!(&message);
Ok(message == "OK\r\n")
}
}

68
src/commands/query.rs Normal file
View file

@ -0,0 +1,68 @@
use super::StreamCommand;
use crate::errors::SonicError;
use regex::Regex;
const RE_QUERY_RECEIVED_MESSAGE: &str = r"(?x)
^PENDING\s(?P<pending_query_id>\w+)\r\n
EVENT\sQUERY\s(?P<event_query_id>\w+)\s(?P<objects>.*?)\r\n$
";
#[derive(Debug, Default)]
pub struct QueryCommand<'a> {
pub collection: &'a str,
pub bucket: &'a str,
pub terms: &'a str,
pub limit: Option<usize>,
pub offset: Option<usize>,
}
impl StreamCommand for QueryCommand<'_> {
type Response = Vec<String>;
const READ_LINES_COUNT: usize = 2;
fn message(&self) -> String {
let mut message = format!(
r#"QUERY {} {} "{}""#,
self.collection, self.bucket, self.terms
);
if let Some(limit) = self.limit.as_ref() {
message.push_str(&format!(" LIMIT({})", limit));
}
if let Some(offset) = self.offset.as_ref() {
message.push_str(&format!(" OFFSET({})", offset));
}
message.push_str("\r\n");
message
}
fn receive(&self, message: String) -> Result<Self::Response, SonicError> {
lazy_static! {
static ref RE: Regex = Regex::new(RE_QUERY_RECEIVED_MESSAGE).unwrap();
}
dbg!(&message);
match RE.captures(&message) {
None => Err(SonicError::QueryResponseError(
"Sonic response are wrong. Please write issue to github.",
)),
Some(caps) => {
if &caps["pending_query_id"] != &caps["event_query_id"] {
Err(SonicError::QueryResponseError(
"Pending id and event id don't match",
))
} else if caps["objects"].is_empty() {
Ok(vec![])
} else {
let objects = caps["objects"]
.split(" ")
.map(String::from)
.collect::<Vec<String>>();
Ok(objects)
}
}
}
}
}

18
src/commands/quit.rs Normal file
View file

@ -0,0 +1,18 @@
use super::StreamCommand;
use crate::errors::SonicError;
#[derive(Debug, Default)]
pub struct QuitCommand;
impl StreamCommand for QuitCommand {
type Response = bool;
fn message(&self) -> String {
String::from("QUIT\r\n")
}
fn receive(&self, message: String) -> Result<<Self as StreamCommand>::Response, SonicError> {
dbg!(&message);
Ok(message.starts_with("ENDED "))
}
}

62
src/commands/start.rs Normal file
View file

@ -0,0 +1,62 @@
use super::StreamCommand;
use crate::channel::ChannelMode;
use crate::errors::SonicError;
use regex::Regex;
const RE_START_RECEIVED_MESSAGE: &str = r"(?x)
STARTED
\s # started with mode
(?P<mode>search|ingest|control)
\s # wich protocol used
protocol\((?P<protocol>\d+)\)
\s # maximum buffer size
buffer\((?P<buffer_size>\d+)\)
";
#[derive(Debug)]
pub struct StartCommand {
pub mode: ChannelMode,
pub password: String,
}
pub struct StartCommandResponse {
pub protocol_version: usize,
pub max_buffer_size: usize,
pub mode: ChannelMode,
}
impl StreamCommand for StartCommand {
type Response = StartCommandResponse;
fn message(&self) -> String {
format!("START {} {}\r\n", self.mode, self.password)
}
fn receive(&self, message: String) -> Result<Self::Response, SonicError> {
lazy_static! {
static ref RE: Regex = Regex::new(RE_START_RECEIVED_MESSAGE).unwrap();
}
dbg!(&message);
match RE.captures(&message) {
None => Err(SonicError::SwitchMode),
Some(caps) => {
if self.mode.to_str() != &caps["mode"] {
return Err(SonicError::SwitchMode);
}
let protocol_version: usize =
caps["protocol"].parse().expect("Must be digit by regex");
let max_buffer_size: usize =
caps["buffer_size"].parse().expect("Must be digit by regex");
Ok(StartCommandResponse {
protocol_version,
max_buffer_size,
mode: self.mode,
})
}
}
}
}

27
src/errors.rs Normal file
View file

@ -0,0 +1,27 @@
use std::fmt;
#[derive(Debug)]
pub enum SonicError {
ConnectToServer,
WriteToStream,
ReadStream,
SwitchMode,
RunCommand,
QueryResponseError(&'static str),
}
impl fmt::Display for SonicError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
let message = match self {
SonicError::ConnectToServer => String::from("Cannot connect to server"),
SonicError::WriteToStream => String::from("Cannot write data to stream"),
SonicError::ReadStream => String::from("Cannot read sonic response from stream"),
SonicError::SwitchMode => String::from("Cannot switch channel mode"),
SonicError::RunCommand => String::from("Cannot run command in current mode"),
SonicError::QueryResponseError(message) => format!("Error in query response: {}", message)
};
write!(f, "{}", message)
}
}

27
src/lib.rs Normal file
View file

@ -0,0 +1,27 @@
#![allow(dead_code)]
mod channel;
mod commands;
mod errors;
pub use channel::*;
pub use commands::*;
pub use errors::*;
#[macro_use]
extern crate lazy_static;
extern crate regex;
#[cfg(test)]
mod tests {
use crate::channel::ChannelMode;
#[test]
fn format_channel_enums() {
assert_eq!(format!("{}", ChannelMode::Search), String::from("search"));
assert_eq!(format!("{}", ChannelMode::Ingest), String::from("ingest"));
assert_eq!(format!("{}", ChannelMode::Control), String::from("control"));
}
//TODO: write tests with sonic server
}

95
src/main.rs Normal file
View file

@ -0,0 +1,95 @@
use sonic_channel::*;
fn main() -> Result<(), SonicError> {
// let mut stream = TcpStream::connect("localhost:1491")?;
// let mut buffer = [0; 128];
// stream.read(&mut buffer)?;
// let res = String::from_utf8(buffer.to_vec()).expect("Cannot convert response buffer to utf8");
// dbg!(res);
// let command = "START search SecretPassword";
// stream.write(command.as_bytes())?;
// let mut buffer = [0; 1000];
// loop {
// let mut line_buffer = [0; 128];
// let res_length = stream.read(&mut line_buffer)?;
// }
// stream.read(&mut buffer)?;
// let res = String::from_utf8(buffer.to_vec()).expect("Cannot convert response buffer to utf8");
// dbg!(res);
// let mut buffer = [0; 128];
// stream.read(&mut buffer)?;
// let res = String::from_utf8(buffer.to_vec()).expect("Cannot convert response buffer to utf8");
// dbg!(res);
// let (tx, rx) = mpsc::channel();
// thread::spawn(|| {
// let listener = TcpListener::bind("localhost:7777").expect("Should open connection for tests");
// for stream in listener.incoming() {
// let mut stream = stream.expect("Should connect socket successfully");
// stream.write("CONNECTED\r\n".as_bytes()).unwrap();
// loop {
// let mut buffer = [0; 60];
// println!("wait from client");
// let n = stream.read(&mut buffer).expect("Cannot read stream from client");
// let message = String::from_utf8(buffer[0..n].to_vec()).expect("Should convert response buffer to utf8");
// dbg!(&message);
// if message.starts_with("START") {
// stream.write("STARTED search protocol(1) buffer(20000)\r\n".as_bytes()).unwrap();
// } else if message.starts_with("PING") {
// stream.write("PONG\r\n".as_bytes()).unwrap();
// }
// }
// }
// });
// let client = thread::spawn(|| {
// let mut stream = TcpStream::connect("localhost:7777").expect("Should open connection for tests");
// let message = "hello world".as_bytes();
// stream.write_all(message).expect("Should write to stream successfully");
// });
// client.join().unwrap();
// server.join().unwrap();
// let received = rx.recv().expect("Should recieve message from thread");
// dbg!(received);
let mut channel = SonicChannel::connect("localhost:1491")?;
// std::thread::sleep(std::time::Duration::from_secs(5));
// let mut channel = SonicChannel::connect("localhost:7777")?;
// channel.start(ChannelMode::Ingest, "SecretPassword")?;
// std::thread::sleep(std::time::Duration::from_secs(1));
// let pong = channel.ping()?;
// dbg!(pong);
// let pushed = channel.push("collection", "bucket", "user:1", "my best recipe")?;
// dbg!(pushed);
channel.start(ChannelMode::Search, "SecretPassword")?;
std::thread::sleep(std::time::Duration::from_secs(1));
let pong = channel.ping()?;
dbg!(pong);
let objects = channel.query("collection", "bucket", "recipe")?;
dbg!(objects);
Ok(())
}