Merge pull request #5 from pleshevskiy/task-3

refac: separate structs for each channel
This commit is contained in:
Dmitriy Pleshevskiy 2020-11-25 21:05:31 +02:00 committed by GitHub
commit 1bd455cc4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 931 additions and 793 deletions

View file

@ -32,12 +32,7 @@ Note: This example requires enabling the `search` feature, enabled by default.
use sonic_channel::*;
fn main() -> result::Result<()> {
let channel = SonicChannel::connect_with_start(
ChannelMode::Search,
"localhost:1491",
"SecretPassword",
)?;
let channel = SearchChannel::start("localhost:1491", "SecretPassword")?;
let objects = channel.query("collection", "bucket", "recipe")?;
dbg!(objects);
@ -53,12 +48,7 @@ Note: This example requires enabling the `ingest` feature.
use sonic_channel::*;
fn main() -> result::Result<()> {
let mut channel = SonicChannel::connect_with_start(
ChannelMode::Ingest,
"localhost:1491",
"SecretPassword",
)?;
let channel = IngestChannel::start("localhost:1491", "SecretPassword")?;
let pushed = channel.push("collection", "bucket", "object:1", "my best recipe")?;
// or
// let pushed = channel.push_with_locale("collection", "bucket", "object:1", "Мой лучший рецепт", "rus")?;
@ -76,12 +66,7 @@ Note: This example requires enabling the `control` feature.
use sonic_channel::*;
fn main() -> result::Result<()> {
let mut channel = SonicChannel::connect_with_start(
ChannelMode::Control,
"localhost:1491",
"SecretPassword",
)?;
let channel = ControlChannel::start("localhost:1491", "SecretPassword")?;
let result = channel.consolidate()?;
assert_eq!(result, true);

View file

@ -1,764 +0,0 @@
use crate::commands::*;
use crate::result::*;
use std::fmt;
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::net::{TcpStream, ToSocketAddrs};
const DEFAULT_SONIC_PROTOCOL_VERSION: usize = 1;
const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200;
macro_rules! init_commands {
(
$(
$(#[$outer:meta])*
use $cmd_name:ident
for fn $fn_name:ident
$(<$($lt:lifetime)+>)? (
$($args:tt)*
)
$(where mode is $condition:expr)?;
)*
) => {
$(
init_commands!(
$(#[$outer])*
use $cmd_name
for fn $fn_name $(<$($lt)+>)? (
$($args)*
)
$(where $condition)?
);
)*
};
(
$(#[$outer:meta])*
use $cmd_name:ident
for fn $fn_name:ident $(<$($lt:lifetime)+>)? (
$($arg_name:ident : $arg_type:ty $( => $arg_value:expr)?,)*
)
$(where $condition:expr)?
) => {
$(#[$outer])*
pub fn $fn_name $(<$($lt)+>)? (
&self,
$($arg_name: $arg_type),*
) -> $crate::result::Result<
<$cmd_name as $crate::commands::StreamCommand>::Response,
> {
$(
let mode = self.mode.clone();
if mode != Some($condition) {
return Err(Error::new(
ErrorKind::UnsupportedCommand((
stringify!($fn_name),
mode,
))
));
}
)?
#[allow(clippy::needless_update)]
let command = $cmd_name { $($arg_name $(: $arg_value)?,)* ..Default::default() };
self.run_command(command)
}
};
}
/// Channel modes supported by sonic search backend.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ChannelMode {
/// Sonic server search channel mode.
///
/// In this mode you can use `query`, `suggest`, `ping` and `quit` commands.
///
/// Note: This mode requires enabling the `search` feature.
#[cfg(feature = "search")]
Search,
/// Sonic server ingest channel mode.
///
/// In this mode you can use `push`, `pop`, `flushc`, `flushb`, `flusho`,
/// `bucket_count`, `object_count`, `word_count`, `ping` and `quit` commands.
///
/// Note: This mode requires enabling the `ingest` feature.
#[cfg(feature = "ingest")]
Ingest,
/// Sonic server control channel mode.
///
/// In this mode you can use `consolidate`, `backup`, `restore`,
/// `ping` and `quit` commands.
///
/// Note: This mode requires enabling the `control` feature.
#[cfg(feature = "control")]
Control,
}
impl ChannelMode {
/// Converts enum to &str
pub fn to_str(&self) -> &str {
match self {
#[cfg(feature = "search")]
ChannelMode::Search => "search",
#[cfg(feature = "ingest")]
ChannelMode::Ingest => "ingest",
#[cfg(feature = "control")]
ChannelMode::Control => "control",
}
}
}
impl fmt::Display for ChannelMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
write!(f, "{}", self.to_str())
}
}
/// Root and Heart of this library.
///
/// You can connect to the sonic search backend and run all supported protocol methods.
///
#[derive(Debug)]
pub struct SonicChannel {
stream: TcpStream,
mode: Option<ChannelMode>, // None Uninitialized mode
max_buffer_size: usize,
protocol_version: usize,
}
impl SonicChannel {
fn write<SC: StreamCommand>(&self, command: &SC) -> Result<()> {
let mut writer = BufWriter::with_capacity(self.max_buffer_size, &self.stream);
let message = command.message();
dbg!(&message);
writer
.write_all(message.as_bytes())
.map_err(|_| Error::new(ErrorKind::WriteToStream))?;
Ok(())
}
fn read(&self, max_read_lines: usize) -> Result<String> {
let mut reader = BufReader::with_capacity(self.max_buffer_size, &self.stream);
let mut message = String::new();
let mut lines_read = 0;
while lines_read < max_read_lines {
reader
.read_line(&mut message)
.map_err(|_| Error::new(ErrorKind::ReadStream))?;
lines_read += 1;
}
Ok(message)
}
fn run_command<SC: StreamCommand>(&self, command: SC) -> Result<SC::Response> {
self.write(&command)?;
let message = self.read(SC::READ_LINES_COUNT)?;
command.receive(message)
}
fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
let stream =
TcpStream::connect(addr).map_err(|_| Error::new(ErrorKind::ConnectToServer))?;
let channel = SonicChannel {
stream,
mode: None,
max_buffer_size: UNINITIALIZED_MODE_MAX_BUFFER_SIZE,
protocol_version: DEFAULT_SONIC_PROTOCOL_VERSION,
};
let message = channel.read(1)?;
dbg!(&message);
// TODO: need to add support for versions
if message.starts_with("CONNECTED") {
Ok(channel)
} else {
Err(Error::new(ErrorKind::ConnectToServer))
}
}
fn start<S: ToString>(&mut self, mode: ChannelMode, password: S) -> Result<()> {
if self.mode.is_some() {
return Err(Error::new(ErrorKind::RunCommand));
}
let command = StartCommand {
mode,
password: password.to_string(),
};
let response = self.run_command(command)?;
self.max_buffer_size = response.max_buffer_size;
self.protocol_version = response.protocol_version;
self.mode = Some(response.mode);
Ok(())
}
/// Connect to the search backend in chosen mode.
///
/// I think we shouldn't separate commands connect and start because we haven't
/// possibility to change channel in sonic server, if we already chosen one of them. 🤔
///
/// ```rust,no_run
/// use sonic_channel::*;
///
/// fn main() -> result::Result<()> {
/// let channel = SonicChannel::connect_with_start(
/// ChannelMode::Search,
/// "localhost:1491",
/// "SecretPassword"
/// )?;
///
/// // Now you can use all method of Search channel.
/// let objects = channel.query("search", "default", "beef");
///
/// Ok(())
/// }
/// ```
pub fn connect_with_start<A, S>(mode: ChannelMode, addr: A, password: S) -> Result<Self>
where
A: ToSocketAddrs,
S: ToString,
{
let mut channel = Self::connect(addr)?;
channel.start(mode, password)?;
Ok(channel)
}
init_commands! {
#[doc=r#"
Stop connection.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let channel = SonicChannel::connect_with_start(
ChannelMode::Search,
"localhost:1491",
"SecretPassword",
)?;
channel.quit()?;
# Ok(())
# }
"#]
use QuitCommand for fn quit();
#[doc=r#"
Ping server.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let channel = SonicChannel::connect_with_start(
ChannelMode::Search,
"localhost:1491",
"SecretPassword",
)?;
channel.ping()?;
# Ok(())
# }
"#]
use PingCommand for fn ping();
}
#[cfg(feature = "ingest")]
init_commands! {
#[doc=r#"
Push search data in the index.
Note: This method requires enabling the `ingest` feature and start
connection in Ingest mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let ingest_channel = SonicChannel::connect_with_start(
ChannelMode::Ingest,
"localhost:1491",
"SecretPassword",
)?;
let result = ingest_channel.push(
"search",
"default",
"recipe:295",
"Sweet Teriyaki Beef Skewers",
)?;
assert_eq!(result, true);
# Ok(())
# }
```
"#]
use PushCommand for fn push<'a>(
collection: &'a str,
bucket: &'a str,
object: &'a str,
text: &'a str,
) where mode is ChannelMode::Ingest;
#[doc=r#"
Push search data in the index with locale parameter in ISO 639-3 code.
Note: This method requires enabling the `ingest` feature and start
connection in Ingest mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let ingest_channel = SonicChannel::connect_with_start(
ChannelMode::Ingest,
"localhost:1491",
"SecretPassword",
)?;
let result = ingest_channel.push_with_locale(
"search",
"default",
"recipe:296",
"Гренки с жареным картофелем и сыром",
"rus",
)?;
assert_eq!(result, true);
# Ok(())
# }
```
"#]
use PushCommand for fn push_with_locale<'a>(
collection: &'a str,
bucket: &'a str,
object: &'a str,
text: &'a str,
locale: &'a str => Some(locale),
) where mode is ChannelMode::Ingest;
#[doc=r#"
Pop search data from the index. Returns removed words count as usize type.
Note: This method requires enabling the `ingest` feature and start
connection in Ingest mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let ingest_channel = SonicChannel::connect_with_start(
ChannelMode::Ingest,
"localhost:1491",
"SecretPassword",
)?;
let result = ingest_channel.pop("search", "default", "recipe:295", "beef")?;
assert_eq!(result, 1);
# Ok(())
# }
```
"#]
use PopCommand for fn pop<'a>(
collection: &'a str,
bucket: &'a str,
object: &'a str,
text: &'a str,
) where mode is ChannelMode::Ingest;
#[doc=r#"
Flush all indexed data from collections.
Note: This method requires enabling the `ingest` feature and start
connection in Ingest mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let ingest_channel = SonicChannel::connect_with_start(
ChannelMode::Ingest,
"localhost:1491",
"SecretPassword",
)?;
let flushc_count = ingest_channel.flushc("search")?;
dbg!(flushc_count);
# Ok(())
# }
```
"#]
use FlushCommand for fn flushc<'a>(
collection: &'a str,
) where mode is ChannelMode::Ingest;
#[doc=r#"
Flush all indexed data from bucket in a collection.
Note: This method requires enabling the `ingest` feature and start
connection in Ingest mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let ingest_channel = SonicChannel::connect_with_start(
ChannelMode::Ingest,
"localhost:1491",
"SecretPassword",
)?;
let flushb_count = ingest_channel.flushb("search", "default")?;
dbg!(flushb_count);
# Ok(())
# }
```
"#]
use FlushCommand for fn flushb<'a>(
collection: &'a str,
bucket: &'a str => Some(bucket),
) where mode is ChannelMode::Ingest;
#[doc=r#"
Flush all indexed data from an object in a bucket in collection.
Note: This method requires enabling the `ingest` feature and start
connection in Ingest mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let ingest_channel = SonicChannel::connect_with_start(
ChannelMode::Ingest,
"localhost:1491",
"SecretPassword",
)?;
let flusho_count = ingest_channel.flusho("search", "default", "recipe:296")?;
dbg!(flusho_count);
# Ok(())
# }
```
"#]
use FlushCommand for fn flusho<'a>(
collection: &'a str,
bucket: &'a str => Some(bucket),
object: &'a str => Some(object),
) where mode is ChannelMode::Ingest;
#[doc=r#"
Bucket count in indexed search data of your collection.
Note: This method requires enabling the `ingest` feature and start
connection in Ingest mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let ingest_channel = SonicChannel::connect_with_start(
ChannelMode::Ingest,
"localhost:1491",
"SecretPassword",
)?;
let bucket_count = ingest_channel.bucket_count("search")?;
dbg!(bucket_count);
# Ok(())
# }
```
"#]
use CountCommand for fn bucket_count<'a>(
collection: &'a str,
) where mode is ChannelMode::Ingest;
#[doc=r#"
Object count of bucket in indexed search data.
Note: This method requires enabling the `ingest` feature and start
connection in Ingest mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let ingest_channel = SonicChannel::connect_with_start(
ChannelMode::Ingest,
"localhost:1491",
"SecretPassword",
)?;
let object_count = ingest_channel.object_count("search", "default")?;
dbg!(object_count);
# Ok(())
# }
```
"#]
use CountCommand for fn object_count<'a>(
collection: &'a str,
bucket: &'a str => Some(bucket),
) where mode is ChannelMode::Ingest;
#[doc=r#"
Object word count in indexed bucket search data.
Note: This method requires enabling the `ingest` feature and start
connection in Ingest mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let ingest_channel = SonicChannel::connect_with_start(
ChannelMode::Ingest,
"localhost:1491",
"SecretPassword",
)?;
let word_count = ingest_channel.word_count("search", "default", "recipe:296")?;
dbg!(word_count);
# Ok(())
# }
```
"#]
use CountCommand for fn word_count<'a>(
collection: &'a str,
bucket: &'a str => Some(bucket),
object: &'a str => Some(object),
) where mode is ChannelMode::Ingest;
}
#[cfg(feature = "search")]
init_commands! {
#[doc=r#"
Query objects in database.
Note: This method requires enabling the `search` feature and start
connection in Search mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let search_channel = SonicChannel::connect_with_start(
ChannelMode::Search,
"localhost:1491",
"SecretPassword",
)?;
let result = search_channel.query("search", "default", "Beef")?;
dbg!(result);
# Ok(())
# }
```
"#]
use QueryCommand for fn query<'a>(
collection: &'a str,
bucket: &'a str,
terms: &'a str,
) where mode is ChannelMode::Search;
#[doc=r#"
Query limited objects in database. This method similar query but
you can configure limit of result.
Note: This method requires enabling the `search` feature and start
connection in Search mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let search_channel = SonicChannel::connect_with_start(
ChannelMode::Search,
"localhost:1491",
"SecretPassword",
)?;
let result = search_channel.query_with_limit(
"search",
"default",
"Beef",
10,
)?;
dbg!(result);
# Ok(())
# }
```
"#]
use QueryCommand for fn query_with_limit<'a>(
collection: &'a str,
bucket: &'a str,
terms: &'a str,
limit: usize => Some(limit),
) where mode is ChannelMode::Search;
#[doc=r#"
Query limited objects in database. This method similar
query_with_limit but you can put offset in your query.
Note: This method requires enabling the `search` feature and start
connection in Search mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let search_channel = SonicChannel::connect_with_start(
ChannelMode::Search,
"localhost:1491",
"SecretPassword",
)?;
let result = search_channel.query_with_limit_and_offset(
"search",
"default",
"Beef",
10,
10,
)?;
dbg!(result);
# Ok(())
# }
```
"#]
use QueryCommand for fn query_with_limit_and_offset<'a>(
collection: &'a str,
bucket: &'a str,
terms: &'a str,
limit: usize => Some(limit),
offset: usize => Some(offset),
) where mode is ChannelMode::Search;
#[doc=r#"
Suggest auto-completes words.
Note: This method requires enabling the `search` feature and start
connection in Search mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let search_channel = SonicChannel::connect_with_start(
ChannelMode::Search,
"localhost:1491",
"SecretPassword",
)?;
let result = search_channel.suggest("search", "default", "Beef")?;
dbg!(result);
# Ok(())
# }
```
"#]
use SuggestCommand for fn suggest<'a>(
collection: &'a str,
bucket: &'a str,
word: &'a str,
) where mode is ChannelMode::Search;
#[doc=r#"
Suggest auto-completes words with limit.
Note: This method requires enabling the `search` feature and start
connection in Search mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let search_channel = SonicChannel::connect_with_start(
ChannelMode::Search,
"localhost:1491",
"SecretPassword",
)?;
let result = search_channel.suggest_with_limit("search", "default", "Beef", 5)?;
dbg!(result);
# Ok(())
# }
```
"#]
use SuggestCommand for fn suggest_with_limit<'a>(
collection: &'a str,
bucket: &'a str,
word: &'a str,
limit: usize => Some(limit),
) where mode is ChannelMode::Search;
}
#[cfg(feature = "control")]
init_commands! {
#[doc=r#"
Consolidate indexed search data instead of waiting for the next automated
consolidation tick.
Note: This method requires enabling the `control` feature and start
connection in Control mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let control_channel = SonicChannel::connect_with_start(
ChannelMode::Control,
"localhost:1491",
"SecretPassword",
)?;
let result = control_channel.consolidate()?;
assert_eq!(result, true);
# Ok(())
# }
```
"#]
use TriggerCommand for fn consolidate()
where mode is ChannelMode::Control;
#[doc=r#"
Backup KV + FST to <path>/<BACKUP_{KV/FST}_PATH>
See [sonic backend source code](https://github.com/valeriansaliou/sonic/blob/master/src/channel/command.rs#L808)
for more information.
Note: This method requires enabling the `control` feature and start
connection in Control mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let control_channel = SonicChannel::connect_with_start(
ChannelMode::Control,
"localhost:1491",
"SecretPassword",
)?;
let result = control_channel.backup("2020-08-07T23-48")?;
assert_eq!(result, true);
# Ok(())
# }
```
"#]
use TriggerCommand for fn backup<'a>(
// It's not action, but my macro cannot support alias for custom argument.
// TODO: Add alias to macro and rename argument of this function.
action: &'a str => TriggerAction::Backup(action),
) where mode is ChannelMode::Control;
#[doc=r#"
Restore KV + FST from <path> if you already have backup with the same name.
Note: This method requires enabling the `control` feature and start
connection in Control mode.
```rust,no_run
# use sonic_channel::*;
# fn main() -> result::Result<()> {
let control_channel = SonicChannel::connect_with_start(
ChannelMode::Control,
"localhost:1491",
"SecretPassword",
)?;
let result = control_channel.restore("2020-08-07T23-48")?;
assert_eq!(result, true);
# Ok(())
# }
```
"#]
use TriggerCommand for fn restore<'a>(
// It's not action, but my macro cannot support alias for custom argument.
// TODO: Add alias to macro and rename argument of this function.
action: &'a str => TriggerAction::Restore(action),
) where mode is ChannelMode::Control;
}
}

148
src/channels/control.rs Normal file
View file

@ -0,0 +1,148 @@
use super::{ChannelMode, SonicChannel, SonicStream};
use crate::commands::*;
use crate::result::Result;
use std::net::ToSocketAddrs;
/// The Sonic Channel Control mode is used for administration purposes.
/// Once in this mode, you cannot switch to other modes or gain access
/// to commands from other modes.
///
/// ### Available commands
///
/// In this mode you can use `consolidate`, `backup`, `restore`,
/// `ping` and `quit` commands.
///
/// **Note:** This mode requires enabling the `control` feature.
#[derive(Debug)]
pub struct ControlChannel(SonicStream);
impl SonicChannel for ControlChannel {
type Channel = ControlChannel;
fn stream(&self) -> &SonicStream {
&self.0
}
fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
where
A: ToSocketAddrs,
S: ToString,
{
SonicStream::connect_with_start(ChannelMode::Control, addr, password).map(Self)
}
}
impl ControlChannel {
init_command!(
/// Stop connection.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let channel = ControlChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// channel.quit()?;
/// # Ok(())
/// # }
use QuitCommand for fn quit();
);
init_command!(
/// Ping server.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let channel = ControlChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// channel.ping()?;
/// # Ok(())
/// # }
use PingCommand for fn ping();
);
}
impl ControlChannel {
init_command!(
/// Consolidate indexed search data instead of waiting for the next automated
/// consolidation tick.
///
/// Note: This method requires enabling the `control` feature and start
/// connection in Control mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let control_channel = ControlChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = control_channel.consolidate()?;
/// assert_eq!(result, true);
/// # Ok(())
/// # }
/// ```
use TriggerCommand for fn consolidate()
);
init_command!(
/// Backup KV + FST to <path>/<BACKUP_{KV/FST}_PATH>
/// See [sonic backend source code](https://github.com/valeriansaliou/sonic/blob/master/src/channel/command.rs#L808)
/// for more information.
///
/// Note: This method requires enabling the `control` feature and start
/// connection in Control mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let control_channel = ControlChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = control_channel.backup("2020-08-07T23-48")?;
/// assert_eq!(result, true);
/// # Ok(())
/// # }
/// ```
use TriggerCommand for fn backup<'a>(
// It's not action, but my macro cannot support alias for custom argument.
// TODO: Add alias to macro and rename argument of this function.
action: &'a str => TriggerAction::Backup(action),
);
);
init_command!(
/// Restore KV + FST from <path> if you already have backup with the same name.
///
/// Note: This method requires enabling the `control` feature and start
/// connection in Control mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let control_channel = ControlChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = control_channel.restore("2020-08-07T23-48")?;
/// assert_eq!(result, true);
/// # Ok(())
/// # }
/// ```
use TriggerCommand for fn restore<'a>(
// It's not action, but my macro cannot support alias for custom argument.
// TODO: Add alias to macro and rename argument of this function.
action: &'a str => TriggerAction::Restore(action),
);
);
}

314
src/channels/ingest.rs Normal file
View file

@ -0,0 +1,314 @@
use super::{ChannelMode, SonicChannel, SonicStream};
use crate::commands::*;
use crate::result::Result;
use std::net::ToSocketAddrs;
/// The Sonic Channel Ingest mode is used for altering the search index
/// (push, pop and flush). Once in this mode, you cannot switch to other
/// modes or gain access to commands from other modes.
///
/// ### Available commands
///
/// In this mode you can use `push`, `pop`, `flushc`, `flushb`, `flusho`,
/// `bucket_count`, `object_count`, `word_count`, `ping` and `quit` commands.
///
/// **Note:** This mode requires enabling the `ingest` feature.
#[derive(Debug)]
pub struct IngestChannel(SonicStream);
impl SonicChannel for IngestChannel {
type Channel = IngestChannel;
fn stream(&self) -> &SonicStream {
&self.0
}
fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
where
A: ToSocketAddrs,
S: ToString,
{
SonicStream::connect_with_start(ChannelMode::Ingest, addr, password).map(Self)
}
}
impl IngestChannel {
init_command!(
/// Stop connection.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// channel.quit()?;
/// # Ok(())
/// # }
use QuitCommand for fn quit();
);
init_command!(
/// Ping server.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// channel.ping()?;
/// # Ok(())
/// # }
use PingCommand for fn ping();
);
}
impl IngestChannel {
init_command!(
/// Push search data in the index.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = ingest_channel.push(
/// "search",
/// "default",
/// "recipe:295",
/// "Sweet Teriyaki Beef Skewers",
/// )?;
/// assert_eq!(result, true);
/// # Ok(())
/// # }
/// ```
use PushCommand for fn push<'a>(
collection: &'a str,
bucket: &'a str,
object: &'a str,
text: &'a str,
);
);
init_command!(
/// Push search data in the index with locale parameter in ISO 639-3 code.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = ingest_channel.push_with_locale(
/// "search",
/// "default",
/// "recipe:296",
/// "Гренки с жареным картофелем и сыром",
/// "rus",
/// )?;
/// assert_eq!(result, true);
/// # Ok(())
/// # }
/// ```
use PushCommand for fn push_with_locale<'a>(
collection: &'a str,
bucket: &'a str,
object: &'a str,
text: &'a str,
locale: &'a str => Some(locale),
);
);
init_command!(
/// Pop search data from the index. Returns removed words count as usize type.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = ingest_channel.pop("search", "default", "recipe:295", "beef")?;
/// assert_eq!(result, 1);
/// # Ok(())
/// # }
/// ```
use PopCommand for fn pop<'a>(
collection: &'a str,
bucket: &'a str,
object: &'a str,
text: &'a str,
);
);
init_command!(
/// Flush all indexed data from collections.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let flushc_count = ingest_channel.flushc("search")?;
/// dbg!(flushc_count);
/// # Ok(())
/// # }
/// ```
use FlushCommand for fn flushc<'a>(
collection: &'a str,
);
);
init_command!(
/// Flush all indexed data from bucket in a collection.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let flushb_count = ingest_channel.flushb("search", "default")?;
/// dbg!(flushb_count);
/// # Ok(())
/// # }
/// ```
use FlushCommand for fn flushb<'a>(
collection: &'a str,
bucket: &'a str => Some(bucket),
);
);
init_command!(
/// Flush all indexed data from an object in a bucket in collection.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let flusho_count = ingest_channel.flusho("search", "default", "recipe:296")?;
/// dbg!(flusho_count);
/// # Ok(())
/// # }
/// ```
use FlushCommand for fn flusho<'a>(
collection: &'a str,
bucket: &'a str => Some(bucket),
object: &'a str => Some(object),
);
);
init_command!(
/// Bucket count in indexed search data of your collection.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let bucket_count = ingest_channel.bucket_count("search")?;
/// dbg!(bucket_count);
/// # Ok(())
/// # }
/// ```
use CountCommand for fn bucket_count<'a>(
collection: &'a str,
);
);
init_command!(
/// Object count of bucket in indexed search data.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let object_count = ingest_channel.object_count("search", "default")?;
/// dbg!(object_count);
/// # Ok(())
/// # }
/// ```
use CountCommand for fn object_count<'a>(
collection: &'a str,
bucket: &'a str => Some(bucket),
);
);
init_command!(
/// Object word count in indexed bucket search data.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let word_count = ingest_channel.word_count("search", "default", "recipe:296")?;
/// dbg!(word_count);
/// # Ok(())
/// # }
/// ```
use CountCommand for fn word_count<'a>(
collection: &'a str,
bucket: &'a str => Some(bucket),
object: &'a str => Some(object),
);
);
}

216
src/channels/mod.rs Normal file
View file

@ -0,0 +1,216 @@
#[cfg(feature = "search")]
mod search;
#[cfg(feature = "search")]
use crate::commands::StartCommand;
pub use search::*;
#[cfg(feature = "ingest")]
mod ingest;
#[cfg(feature = "ingest")]
pub use ingest::*;
#[cfg(feature = "control")]
mod control;
#[cfg(feature = "control")]
pub use control::*;
use crate::commands::StreamCommand;
use crate::result::*;
use std::fmt;
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::net::{TcpStream, ToSocketAddrs};
const DEFAULT_SONIC_PROTOCOL_VERSION: usize = 1;
const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200;
/// Channel modes supported by sonic search backend.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ChannelMode {
/// Sonic server search channel mode.
///
/// In this mode you can use `query`, `suggest`, `ping` and `quit` commands.
///
/// Note: This mode requires enabling the `search` feature.
#[cfg(feature = "search")]
Search,
/// Sonic server ingest channel mode.
///
/// In this mode you can use `push`, `pop`, `flushc`, `flushb`, `flusho`,
/// `bucket_count`, `object_count`, `word_count`, `ping` and `quit` commands.
///
/// Note: This mode requires enabling the `ingest` feature.
#[cfg(feature = "ingest")]
Ingest,
/// Sonic server control channel mode.
///
/// In this mode you can use `consolidate`, `backup`, `restore`,
/// `ping` and `quit` commands.
///
/// Note: This mode requires enabling the `control` feature.
#[cfg(feature = "control")]
Control,
}
impl ChannelMode {
/// Converts enum to &str
pub fn to_str(&self) -> &str {
match self {
#[cfg(feature = "search")]
ChannelMode::Search => "search",
#[cfg(feature = "ingest")]
ChannelMode::Ingest => "ingest",
#[cfg(feature = "control")]
ChannelMode::Control => "control",
}
}
}
impl fmt::Display for ChannelMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
write!(f, "{}", self.to_str())
}
}
/// Root and Heart of this library.
///
/// You can connect to the sonic search backend and run all supported protocol methods.
///
#[derive(Debug)]
pub struct SonicStream {
stream: TcpStream,
mode: Option<ChannelMode>, // None Uninitialized mode
max_buffer_size: usize,
protocol_version: usize,
}
impl SonicStream {
fn write<SC: StreamCommand>(&self, command: &SC) -> Result<()> {
let mut writer = BufWriter::with_capacity(self.max_buffer_size, &self.stream);
let message = command.message();
dbg!(&message);
writer
.write_all(message.as_bytes())
.map_err(|_| Error::new(ErrorKind::WriteToStream))?;
Ok(())
}
fn read(&self, max_read_lines: usize) -> Result<String> {
let mut reader = BufReader::with_capacity(self.max_buffer_size, &self.stream);
let mut message = String::new();
let mut lines_read = 0;
while lines_read < max_read_lines {
reader
.read_line(&mut message)
.map_err(|_| Error::new(ErrorKind::ReadStream))?;
lines_read += 1;
}
Ok(message)
}
pub(crate) fn run_command<SC: StreamCommand>(&self, command: SC) -> Result<SC::Response> {
self.write(&command)?;
let message = self.read(SC::READ_LINES_COUNT)?;
command.receive(message)
}
fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
let stream =
TcpStream::connect(addr).map_err(|_| Error::new(ErrorKind::ConnectToServer))?;
let channel = SonicStream {
stream,
mode: None,
max_buffer_size: UNINITIALIZED_MODE_MAX_BUFFER_SIZE,
protocol_version: DEFAULT_SONIC_PROTOCOL_VERSION,
};
let message = channel.read(1)?;
dbg!(&message);
// TODO: need to add support for versions
if message.starts_with("CONNECTED") {
Ok(channel)
} else {
Err(Error::new(ErrorKind::ConnectToServer))
}
}
fn start<S: ToString>(&mut self, mode: ChannelMode, password: S) -> Result<()> {
if self.mode.is_some() {
return Err(Error::new(ErrorKind::RunCommand));
}
let command = StartCommand {
mode,
password: password.to_string(),
};
let response = self.run_command(command)?;
self.max_buffer_size = response.max_buffer_size;
self.protocol_version = response.protocol_version;
self.mode = Some(response.mode);
Ok(())
}
/// Connect to the search backend in chosen mode.
///
/// I think we shouldn't separate commands connect and start because we haven't
/// possibility to change channel in sonic server, if we already chosen one of them. 🤔
///
/// ```rust,no_run
/// use sonic_channel::*;
///
/// fn main() -> result::Result<()> {
/// let channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword"
/// )?;
///
/// // Now you can use all method of Search channel.
/// let objects = channel.query("search", "default", "beef");
///
/// Ok(())
/// }
/// ```
pub(crate) fn connect_with_start<A, S>(mode: ChannelMode, addr: A, password: S) -> Result<Self>
where
A: ToSocketAddrs,
S: ToString,
{
let mut channel = Self::connect(addr)?;
channel.start(mode, password)?;
Ok(channel)
}
}
/// This trait should be implemented for all supported sonic channels
pub trait SonicChannel {
/// Sonic channel struct
type Channel;
/// Returns reference for sonic stream of connection
fn stream(&self) -> &SonicStream;
/// Connects to sonic backend and run start command.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let search_channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
/// # Ok(())
/// # }
/// ```
fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
where
A: ToSocketAddrs,
S: ToString;
}

217
src/channels/search.rs Normal file
View file

@ -0,0 +1,217 @@
use super::{ChannelMode, SonicChannel, SonicStream};
use crate::commands::*;
use crate::result::Result;
use std::net::ToSocketAddrs;
/// The Sonic Channel Search mode is used for querying the search index.
/// Once in this mode, you cannot switch to other modes or gain access
/// to commands from other modes.
///
/// ### Available commands
///
/// In this mode you can use `query`, `suggest`, `ping` and `quit` commands.
///
/// **Note:** This mode requires enabling the `search` feature.
#[derive(Debug)]
pub struct SearchChannel(SonicStream);
impl SonicChannel for SearchChannel {
type Channel = SearchChannel;
fn stream(&self) -> &SonicStream {
&self.0
}
fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
where
A: ToSocketAddrs,
S: ToString,
{
SonicStream::connect_with_start(ChannelMode::Search, addr, password).map(Self)
}
}
impl SearchChannel {
init_command!(
/// Stop connection.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// channel.quit()?;
/// # Ok(())
/// # }
use QuitCommand for fn quit();
);
init_command!(
/// Ping server.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// channel.ping()?;
/// # Ok(())
/// # }
use PingCommand for fn ping();
);
}
impl SearchChannel {
init_command!(
/// Query objects in database.
///
/// Note: This method requires enabling the `search` feature and start
/// connection in Search mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let search_channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = search_channel.query("search", "default", "Beef")?;
/// dbg!(result);
/// # Ok(())
/// # }
/// ```
use QueryCommand for fn query<'a>(
collection: &'a str,
bucket: &'a str,
terms: &'a str,
);
);
init_command!(
/// Query limited objects in database. This method similar query but
/// you can configure limit of result.
///
/// Note: This method requires enabling the `search` feature and start
/// connection in Search mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let search_channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = search_channel.query_with_limit(
/// "search",
/// "default",
/// "Beef",
/// 10,
/// )?;
/// dbg!(result);
/// # Ok(())
/// # }
/// ```
use QueryCommand for fn query_with_limit<'a>(
collection: &'a str,
bucket: &'a str,
terms: &'a str,
limit: usize => Some(limit),
);
);
init_command!(
/// Query limited objects in database. This method similar
/// query_with_limit but you can put offset in your query.
///
/// Note: This method requires enabling the `search` feature and start
/// connection in Search mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let search_channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = search_channel.query_with_limit_and_offset(
/// "search",
/// "default",
/// "Beef",
/// 10,
/// 10,
/// )?;
/// dbg!(result);
/// # Ok(())
/// # }
/// ```
use QueryCommand for fn query_with_limit_and_offset<'a>(
collection: &'a str,
bucket: &'a str,
terms: &'a str,
limit: usize => Some(limit),
offset: usize => Some(offset),
)
);
init_command!(
/// Suggest auto-completes words.
///
/// Note: This method requires enabling the `search` feature and start
/// connection in Search mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let search_channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = search_channel.suggest("search", "default", "Beef")?;
/// dbg!(result);
/// # Ok(())
/// # }
/// ```
use SuggestCommand for fn suggest<'a>(
collection: &'a str,
bucket: &'a str,
word: &'a str,
);
);
init_command!(
/// Suggest auto-completes words with limit.
///
/// Note: This method requires enabling the `search` feature and start
/// connection in Search mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let search_channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = search_channel.suggest_with_limit("search", "default", "Beef", 5)?;
/// dbg!(result);
/// # Ok(())
/// # }
/// ```
use SuggestCommand for fn suggest_with_limit<'a>(
collection: &'a str,
bucket: &'a str,
word: &'a str,
limit: usize => Some(limit),
);
);
}

View file

@ -1,5 +1,5 @@
use super::StreamCommand;
use crate::channel::ChannelMode;
use crate::channels::ChannelMode;
use crate::result::*;
use regex::Regex;

View file

@ -12,8 +12,7 @@
//! use sonic_channel::*;
//!
//! fn main() -> result::Result<()> {
//! let channel = SonicChannel::connect_with_start(
//! ChannelMode::Search,
//! let channel = SearchChannel::start(
//! "localhost:1491",
//! "SecretPassword",
//! )?;
@ -33,8 +32,7 @@
//! use sonic_channel::*;
//!
//! fn main() -> result::Result<()> {
//! let mut channel = SonicChannel::connect_with_start(
//! ChannelMode::Ingest,
//! let mut channel = IngestChannel::start(
//! "localhost:1491",
//! "SecretPassword",
//! )?;
@ -56,8 +54,7 @@
//! use sonic_channel::*;
//!
//! fn main() -> result::Result<()> {
//! let mut channel = SonicChannel::connect_with_start(
//! ChannelMode::Control,
//! let mut channel = ControlChannel::start(
//! "localhost:1491",
//! "SecretPassword",
//! )?;
@ -88,13 +85,16 @@ compile_error!(
r#"Either features "ingest" or "search" or "control" must be enabled for "sonic-channel" crate"#
);
mod channel;
#[macro_use]
mod macroses;
mod channels;
mod commands;
/// Contains sonic channel error type and custom Result type for easy configure your functions.
pub mod result;
pub use channel::*;
pub use channels::*;
#[macro_use]
extern crate lazy_static;
@ -102,7 +102,7 @@ extern crate regex;
#[cfg(test)]
mod tests {
use crate::channel::ChannelMode;
use crate::channels::ChannelMode;
#[test]
fn format_channel_enums() {

22
src/macroses.rs Normal file
View file

@ -0,0 +1,22 @@
macro_rules! init_command {
(
$(#[$outer:meta])*
use $cmd_name:ident
for fn $fn_name:ident $(<$($lt:lifetime)+>)? (
$($arg_name:ident : $arg_type:ty $( => $arg_value:expr)?,)*
)
$(;)?
) => {
$(#[$outer])*
pub fn $fn_name $(<$($lt)+>)? (
&self,
$($arg_name: $arg_type),*
) -> $crate::result::Result<
<$cmd_name as $crate::commands::StreamCommand>::Response,
> {
#[allow(clippy::needless_update)]
let command = $cmd_name { $($arg_name $(: $arg_value)?,)* ..Default::default() };
self.stream().run_command(command)
}
};
}

View file

@ -1,4 +1,4 @@
use crate::channel::ChannelMode;
use crate::channels::ChannelMode;
use std::error::Error as StdError;
use std::fmt;