Design improvements (#13)

* protocol: extract response type

* deps: drop lazy_static and regex

* protocol: extract request commands

* protocol: create a struct for...

   ...formatting and parsing sonic protocol

* protocol: refac flush command

* commands: introduce dest, refac push and count

* commands: refac all commands

* commands: add convinient methods

* doc: add documentation for each new structs

* doc: change examples in the readme

* commands: implement from trait for count and flush

* commands: change pag logic
This commit is contained in:
Dmitriy Pleshevskiy 2022-07-18 11:07:12 +00:00 committed by GitHub
parent f5ecc123bc
commit bd08317388
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1239 additions and 847 deletions

3
.vim/coc-settings.json Normal file
View File

@ -0,0 +1,3 @@
{
"rust-analyzer.cargo.features": "all"
}

View File

@ -1,11 +1,11 @@
[package]
name = "sonic-channel"
version = "0.6.0"
version = "1.0.0"
authors = ["Dmitriy Pleshevskiy <dmitriy@ideascup.me>"]
description = "Rust client for sonic search backend"
categories = ["api-bindings"]
keywords = ["sonic", "search", "client", "elasticsearch", "api"]
edition = "2018"
edition = "2021"
license = "MPL-2.0"
repository = "https://github.com/pleshevskiy/sonic-channel"
homepage = "https://github.com/pleshevskiy/sonic-channel"
@ -15,8 +15,9 @@ readme = "README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
lazy_static = "1.4.0"
regex = "1.3.4"
# TODO(pleshevskiy): don't forget to remove before publishing
env_logger = "0.9.0"
log = "0.4.17"
whatlang = "0.12.0"
[features]

View File

@ -17,7 +17,7 @@ version = "0.1.0"
authors = ["Me <user@rust-lang.org>"]
[dependencies]
sonic-channel = { version = "0.6", features = ["ingest"] }
sonic-channel = { version = "1.0", features = ["ingest"] }
```
Add `default-features = false` to dependency, if you want to exclude default
@ -33,8 +33,15 @@ Note: This example requires enabling the `search` feature, enabled by default.
use sonic_channel::*;
fn main() -> result::Result<()> {
let channel = SearchChannel::start("localhost:1491", "SecretPassword")?;
let objects = channel.query("collection", "bucket", "recipe")?;
let channel = SearchChannel::start(
"localhost:1491",
"SecretPassword",
)?;
let objects = channel.query(QueryRequest::new(
Dest::col_buc("collection", "bucket"),
"recipe",
))?;
dbg!(objects);
Ok(())
@ -49,10 +56,17 @@ Note: This example requires enabling the `ingest` feature.
use sonic_channel::*;
fn main() -> result::Result<()> {
let channel = IngestChannel::start("localhost:1491", "SecretPassword")?;
let pushed = channel.push("collection", "bucket", "object:1", "my best recipe")?;
let channel = IngestChannel::start(
"localhost:1491",
"SecretPassword",
)?;
let dest = Dest::col_buc("collection", "bucket").obj("object:1");
let pushed = channel.push(PushRequest::new(dest, "my best recipe"))?;
// or
// let pushed = channel.push_with_locale("collection", "bucket", "object:1", "Мой лучший рецепт", "rus")?;
// let pushed = channel.push(
// PushRequest::new(dest, "Мой лучший рецепт").lang(Lang::Rus)
// )?;
dbg!(pushed);
Ok(())
@ -67,9 +81,13 @@ Note: This example requires enabling the `control` feature.
use sonic_channel::*;
fn main() -> result::Result<()> {
let channel = ControlChannel::start("localhost:1491", "SecretPassword")?;
let channel = ControlChannel::start(
"localhost:1491",
"SecretPassword",
)?;
let result = channel.consolidate()?;
assert_eq!(result, true);
assert_eq!(result, ());
Ok(())
}

View File

@ -70,11 +70,10 @@ impl ControlChannel {
impl ControlChannel {
init_command!(
/// Consolidate indexed search data instead of waiting for the next automated
/// consolidation tick.
/// Trigger control action.
///
/// Note: This method requires enabling the `control` feature and start
/// connection in Control mode.
/// Note: This method requires enabling the `control` feature and start connection in
/// Control mode
///
/// ```rust,no_run
/// # use sonic_channel::*;
@ -84,65 +83,78 @@ impl ControlChannel {
/// "SecretPassword",
/// )?;
///
/// let result = control_channel.consolidate()?;
/// assert_eq!(result, true);
/// control_channel.trigger(TriggerRequest::Consolidate)?;
/// # Ok(())
/// # }
/// ```
use TriggerCommand for fn consolidate()
use TriggerCommand for fn trigger(
req: TriggerRequest,
)
);
init_command!(
/// Backup KV + FST to <path>/<BACKUP_{KV/FST}_PATH>
/// See [sonic backend source code](https://github.com/valeriansaliou/sonic/blob/master/src/channel/command.rs#L808)
/// for more information.
///
/// Note: This method requires enabling the `control` feature and start
/// connection in Control mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let control_channel = ControlChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = control_channel.backup("2020-08-07T23-48")?;
/// assert_eq!(result, true);
/// # Ok(())
/// # }
/// ```
use TriggerCommand for fn backup<'a>(
// It's not action, but my macro cannot support alias for custom argument.
// TODO: Add alias to macro and rename argument of this function.
action: &'a str => TriggerAction::Backup(action),
);
);
/// Consolidate indexed search data instead of waiting for the next automated
/// consolidation tick.
///
/// Note: This method requires enabling the `control` feature and start
/// connection in Control mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let control_channel = ControlChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// control_channel.consolidate()?;
/// # Ok(())
/// # }
/// ```
pub fn consolidate(&self) -> Result<()> {
self.trigger(TriggerRequest::Consolidate)
}
init_command!(
/// Restore KV + FST from <path> if you already have backup with the same name.
///
/// Note: This method requires enabling the `control` feature and start
/// connection in Control mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let control_channel = ControlChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = control_channel.restore("2020-08-07T23-48")?;
/// assert_eq!(result, true);
/// # Ok(())
/// # }
/// ```
use TriggerCommand for fn restore<'a>(
// It's not action, but my macro cannot support alias for custom argument.
// TODO: Add alias to macro and rename argument of this function.
action: &'a str => TriggerAction::Restore(action),
);
);
/// Backup KV + FST to <path>/<BACKUP_{KV/FST}_PATH>
/// See [sonic backend source code](https://github.com/valeriansaliou/sonic/blob/master/src/channel/command.rs#L808)
/// for more information.
///
/// Note: This method requires enabling the `control` feature and start
/// connection in Control mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let control_channel = ControlChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// control_channel.backup("2020-08-07T23-48")?;
/// # Ok(())
/// # }
/// ```
pub fn backup(&self, path: &str) -> Result<()> {
self.trigger(TriggerRequest::Backup(path))
}
/// Restore KV + FST from <path> if you already have backup with the same name.
///
/// Note: This method requires enabling the `control` feature and start
/// connection in Control mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let control_channel = ControlChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = control_channel.restore("2020-08-07T23-48")?;
/// assert_eq!(result, ());
/// # Ok(())
/// # }
/// ```
pub fn restore(&self, path: &str) -> Result<()> {
self.trigger(TriggerRequest::Restore(path))
}
}

View File

@ -83,55 +83,16 @@ impl IngestChannel {
/// "SecretPassword",
/// )?;
///
/// let result = ingest_channel.push(
/// "search",
/// "default",
/// "recipe:295",
/// "Sweet Teriyaki Beef Skewers",
/// )?;
/// assert_eq!(result, true);
/// let result = ingest_channel.push(PushRequest::new(
/// Dest::col("search").obj("recipe:295"),
/// "Sweet Teriyaki Beef Skewers"
/// ))?;
/// assert_eq!(result, ());
/// # Ok(())
/// # }
/// ```
use PushCommand for fn push<'a>(
collection: &'a str,
bucket: &'a str,
object: &'a str,
text: &'a str,
);
);
init_command!(
/// Push search data in the index with locale parameter in ISO 639-3 code.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = ingest_channel.push_with_locale(
/// "search",
/// "default",
/// "recipe:296",
/// "Гренки с жареным картофелем и сыром",
/// "rus",
/// )?;
/// assert_eq!(result, true);
/// # Ok(())
/// # }
/// ```
use PushCommand for fn push_with_locale<'a>(
collection: &'a str,
bucket: &'a str,
object: &'a str,
text: &'a str,
locale: &'a str => Some(locale),
req: PushRequest,
);
);
@ -149,16 +110,14 @@ impl IngestChannel {
/// "SecretPassword",
/// )?;
///
/// let result = ingest_channel.pop("search", "default", "recipe:295", "beef")?;
/// let dest = Dest::col("search").obj("recipe:295");
/// let result = ingest_channel.pop(PopRequest::new(dest, "beef"))?;
/// assert_eq!(result, 1);
/// # Ok(())
/// # }
/// ```
use PopCommand for fn pop<'a>(
collection: &'a str,
bucket: &'a str,
object: &'a str,
text: &'a str,
use PopCommand for fn pop(
req: PopRequest,
);
);
@ -176,69 +135,24 @@ impl IngestChannel {
/// "SecretPassword",
/// )?;
///
/// let flushc_count = ingest_channel.flushc("search")?;
/// let flushc_count = ingest_channel.flush(FlushRequest::collection("search"))?;
/// dbg!(flushc_count);
/// # Ok(())
/// # }
/// ```
use FlushCommand for fn flushc<'a>(
collection: &'a str,
);
);
init_command!(
/// Flush all indexed data from bucket in a collection.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let flushb_count = ingest_channel.flushb("search", "default")?;
/// let flushb_count = ingest_channel.flush(FlushRequest::bucket("search", "default"))?;
/// dbg!(flushb_count);
/// # Ok(())
/// # }
/// ```
use FlushCommand for fn flushb<'a>(
collection: &'a str,
bucket: &'a str => Some(bucket),
);
);
init_command!(
/// Flush all indexed data from an object in a bucket in collection.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// let flusho_count = ingest_channel.flush(
/// FlushRequest::object("search", "default", "recipe:295")
/// )?;
///
/// let flusho_count = ingest_channel.flusho("search", "default", "recipe:296")?;
/// dbg!(flusho_count);
/// # Ok(())
/// # }
/// ```
use FlushCommand for fn flusho<'a>(
collection: &'a str,
bucket: &'a str => Some(bucket),
object: &'a str => Some(object),
use FlushCommand for fn flush(
req: FlushRequest,
);
);
init_command!(
/// Bucket count in indexed search data of your collection.
/// Count indexed search data of your collection.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
@ -251,64 +165,19 @@ impl IngestChannel {
/// "SecretPassword",
/// )?;
///
/// let bucket_count = ingest_channel.bucket_count("search")?;
/// let bucket_count = ingest_channel.count(CountRequest::buckets("search"))?;
/// dbg!(bucket_count);
/// # Ok(())
/// # }
/// ```
use CountCommand for fn bucket_count<'a>(
collection: &'a str,
);
);
init_command!(
/// Object count of bucket in indexed search data.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// let object_count = ingest_channel.count(CountRequest::objects("search", "default"))?;
/// dbg!(object_count);
/// let word_count = ingest_channel.count(
/// CountRequest::words("search", "default", "recipe:256")
/// )?;
///
/// let object_count = ingest_channel.object_count("search", "default")?;
/// dbg!(object_count);
/// # Ok(())
/// # }
/// ```
use CountCommand for fn object_count<'a>(
collection: &'a str,
bucket: &'a str => Some(bucket),
);
);
init_command!(
/// Object word count in indexed bucket search data.
///
/// Note: This method requires enabling the `ingest` feature and start
/// connection in Ingest mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let ingest_channel = IngestChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let word_count = ingest_channel.word_count("search", "default", "recipe:296")?;
/// dbg!(word_count);
/// # Ok(())
/// # }
/// ```
use CountCommand for fn word_count<'a>(
collection: &'a str,
bucket: &'a str => Some(bucket),
object: &'a str => Some(object),
use CountCommand for fn count(
req: CountRequest,
);
);
}

View File

@ -1,7 +1,6 @@
#[cfg(feature = "search")]
mod search;
#[cfg(feature = "search")]
use crate::commands::StartCommand;
pub use search::*;
#[cfg(feature = "ingest")]
@ -14,13 +13,14 @@ mod control;
#[cfg(feature = "control")]
pub use control::*;
use crate::commands::StreamCommand;
use crate::result::*;
use std::fmt;
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::cell::RefCell;
use std::io::{BufRead, BufReader, Write};
use std::net::{TcpStream, ToSocketAddrs};
const DEFAULT_SONIC_PROTOCOL_VERSION: usize = 1;
use crate::commands::{StartCommand, StreamCommand};
use crate::protocol::{self, Protocol};
use crate::result::*;
const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200;
/// Channel modes supported by sonic search backend.
@ -28,7 +28,8 @@ const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200;
pub enum ChannelMode {
/// Sonic server search channel mode.
///
/// In this mode you can use `query`, `suggest`, `ping` and `quit` commands.
/// In this mode you can use `query`, `pag_query`, `suggest`, `lim_suggest`, `ping`
/// and `quit` commands.
///
/// Note: This mode requires enabling the `search` feature.
#[cfg(feature = "search")]
@ -36,8 +37,7 @@ pub enum ChannelMode {
/// Sonic server ingest channel mode.
///
/// In this mode you can use `push`, `pop`, `flushc`, `flushb`, `flusho`,
/// `bucket_count`, `object_count`, `word_count`, `ping` and `quit` commands.
/// In this mode you can use `push`, `pop`, `flush`, `count` `ping` and `quit` commands.
///
/// Note: This mode requires enabling the `ingest` feature.
#[cfg(feature = "ingest")]
@ -45,7 +45,7 @@ pub enum ChannelMode {
/// Sonic server control channel mode.
///
/// In this mode you can use `consolidate`, `backup`, `restore`,
/// In this mode you can use `trigger`, `consolidate`, `backup`, `restore`,
/// `ping` and `quit` commands.
///
/// Note: This mode requires enabling the `control` feature.
@ -69,8 +69,8 @@ impl ChannelMode {
}
}
impl fmt::Display for ChannelMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
impl std::fmt::Display for ChannelMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
@ -81,84 +81,84 @@ impl fmt::Display for ChannelMode {
///
#[derive(Debug)]
pub struct SonicStream {
stream: TcpStream,
stream: RefCell<TcpStream>,
reader: RefCell<BufReader<TcpStream>>,
mode: Option<ChannelMode>, // None Uninitialized mode
max_buffer_size: usize,
protocol_version: usize,
protocol: Protocol,
}
impl SonicStream {
fn write<SC: StreamCommand>(&self, command: &SC) -> Result<()> {
let mut writer = BufWriter::with_capacity(self.max_buffer_size, &self.stream);
let message = command.message();
writer
.write_all(message.as_bytes())
.map_err(|_| Error::new(ErrorKind::WriteToStream))?;
fn send<SC: StreamCommand>(&self, command: &SC) -> Result<()> {
let buf = self
.protocol
.format_request(command.request())
.map_err(|_| Error::WriteToStream)?;
self.stream
.borrow_mut()
.write_all(&buf)
.map_err(|_| Error::WriteToStream)?;
Ok(())
}
fn read(&self, max_read_lines: usize) -> Result<String> {
let mut reader = BufReader::with_capacity(self.max_buffer_size, &self.stream);
let mut message = String::new();
fn read_line(&self) -> Result<protocol::Response> {
let line = {
let mut line = String::with_capacity(self.max_buffer_size);
self.reader
.borrow_mut()
.read_line(&mut line)
.map_err(|_| Error::ReadStream)?;
line
};
for _ in 0..max_read_lines {
reader
.read_line(&mut message)
.map_err(|_| Error::new(ErrorKind::ReadStream))?;
if message.starts_with("ERR ") {
break;
}
}
Ok(message)
log::debug!("[channel] {}", &line);
self.protocol.parse_response(&line)
}
pub(crate) fn run_command<SC: StreamCommand>(&self, command: SC) -> Result<SC::Response> {
self.write(&command)?;
let message = self.read(SC::READ_LINES_COUNT)?;
if let Some(error) = message.strip_prefix("ERR ") {
Err(Error::new(ErrorKind::SonicServer(Box::leak(
error.to_owned().into_boxed_str(),
))))
} else {
command.receive(message)
}
self.send(&command)?;
let res = loop {
let res = self.read_line()?;
if !matches!(&res, protocol::Response::Pending(_)) {
break res;
}
};
command.receive(res)
}
fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
let stream =
TcpStream::connect(addr).map_err(|_| Error::new(ErrorKind::ConnectToServer))?;
let stream = TcpStream::connect(addr).map_err(|_| Error::ConnectToServer)?;
let read_stream = stream.try_clone().map_err(|_| Error::ConnectToServer)?;
let channel = SonicStream {
stream,
reader: RefCell::new(BufReader::new(read_stream)),
stream: RefCell::new(stream),
mode: None,
max_buffer_size: UNINITIALIZED_MODE_MAX_BUFFER_SIZE,
protocol_version: DEFAULT_SONIC_PROTOCOL_VERSION,
protocol: Default::default(),
};
let message = channel.read(1)?;
// TODO: need to add support for versions
if message.starts_with("CONNECTED") {
let res = channel.read_line()?;
if matches!(res, protocol::Response::Connected) {
Ok(channel)
} else {
Err(Error::new(ErrorKind::ConnectToServer))
Err(Error::ConnectToServer)
}
}
fn start<S: ToString>(&mut self, mode: ChannelMode, password: S) -> Result<()> {
if self.mode.is_some() {
return Err(Error::new(ErrorKind::RunCommand));
return Err(Error::RunCommand);
}
let command = StartCommand {
let res = self.run_command(StartCommand {
mode,
password: password.to_string(),
};
let response = self.run_command(command)?;
})?;
self.max_buffer_size = response.max_buffer_size;
self.protocol_version = response.protocol_version;
self.mode = Some(response.mode);
self.max_buffer_size = res.max_buffer_size;
self.protocol = Protocol::from(res.protocol_version);
self.mode = Some(res.mode);
Ok(())
}
@ -167,22 +167,6 @@ impl SonicStream {
///
/// I think we shouldn't separate commands connect and start because we haven't
/// possibility to change channel in sonic server, if we already chosen one of them. 🤔
///
/// ```rust,no_run
/// use sonic_channel::*;
///
/// fn main() -> result::Result<()> {
/// let channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword"
/// )?;
///
/// // Now you can use all method of Search channel.
/// let objects = channel.query("search", "default", "beef");
///
/// Ok(())
/// }
/// ```
pub(crate) fn connect_with_start<A, S>(mode: ChannelMode, addr: A, password: S) -> Result<Self>
where
A: ToSocketAddrs,
@ -222,7 +206,7 @@ pub trait SonicChannel {
#[cfg(test)]
mod tests {
use super::ChannelMode;
use super::*;
#[test]
fn format_channel_enums() {

View File

@ -82,86 +82,24 @@ impl SearchChannel {
/// "SecretPassword",
/// )?;
///
/// let result = search_channel.query("search", "default", "Beef")?;
/// dbg!(result);
/// # Ok(())
/// # }
/// ```
use QueryCommand for fn query<'a>(
collection: &'a str,
bucket: &'a str,
terms: &'a str,
);
);
init_command!(
/// Query limited objects in database. This method similar query but
/// you can configure limit of result.
///
/// Note: This method requires enabling the `search` feature and start
/// connection in Search mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let search_channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = search_channel.query_with_limit(
/// "search",
/// "default",
/// let result = search_channel.query(QueryRequest::new(
/// Dest::col("search"),
/// "Beef",
/// 10,
/// ))?;
/// dbg!(result);
///
/// let result = search_channel.query(
/// QueryRequest::new(Dest::col("search"), "Beef").limit(10)
/// )?;
/// dbg!(result);
/// # Ok(())
/// # }
/// ```
use QueryCommand for fn query_with_limit<'a>(
collection: &'a str,
bucket: &'a str,
terms: &'a str,
limit: usize => Some(limit),
use QueryCommand for fn query(
req: QueryRequest,
);
);
init_command!(
/// Query limited objects in database. This method similar
/// query_with_limit but you can put offset in your query.
///
/// Note: This method requires enabling the `search` feature and start
/// connection in Search mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let search_channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// )?;
///
/// let result = search_channel.query_with_limit_and_offset(
/// "search",
/// "default",
/// "Beef",
/// 10,
/// 10,
/// )?;
/// dbg!(result);
/// # Ok(())
/// # }
/// ```
use QueryCommand for fn query_with_limit_and_offset<'a>(
collection: &'a str,
bucket: &'a str,
terms: &'a str,
limit: usize => Some(limit),
offset: usize => Some(offset),
)
);
init_command!(
/// Suggest auto-completes words.
///
@ -176,42 +114,20 @@ impl SearchChannel {
/// "SecretPassword",
/// )?;
///
/// let result = search_channel.suggest("search", "default", "Beef")?;
/// dbg!(result);
/// # Ok(())
/// # }
/// ```
use SuggestCommand for fn suggest<'a>(
collection: &'a str,
bucket: &'a str,
word: &'a str,
);
);
init_command!(
/// Suggest auto-completes words with limit.
///
/// Note: This method requires enabling the `search` feature and start
/// connection in Search mode.
///
/// ```rust,no_run
/// # use sonic_channel::*;
/// # fn main() -> result::Result<()> {
/// let search_channel = SearchChannel::start(
/// "localhost:1491",
/// "SecretPassword",
/// let result = search_channel.suggest(
/// SuggestRequest::new(Dest::col("search"), "Beef")
/// )?;
/// dbg!(result);
///
/// let result = search_channel.suggest_with_limit("search", "default", "Beef", 5)?;
/// let result = search_channel.suggest(
/// SuggestRequest::new(Dest::col("search"), "Beef").limit(2)
/// )?;
/// dbg!(result);
/// # Ok(())
/// # }
/// ```
use SuggestCommand for fn suggest_with_limit<'a>(
collection: &'a str,
bucket: &'a str,
word: &'a str,
limit: usize => Some(limit),
use SuggestCommand for fn suggest(
req: SuggestRequest,
);
);
}

View File

@ -1,39 +1,67 @@
use super::StreamCommand;
use crate::misc::*;
use crate::protocol;
use crate::result::*;
#[derive(Debug, Default)]
pub struct CountCommand<'a> {
pub collection: &'a str,
pub bucket: Option<&'a str>,
pub object: Option<&'a str>,
/// Parameters for the `count` command.
#[derive(Debug)]
pub struct CountRequest(OptDest);
impl CountRequest {
/// Creates a new request to get the number of buckets in the collection.
pub fn buckets(collection: impl ToString) -> CountRequest {
Self(OptDest::col(collection))
}
/// Creates a new request to get the number of objects in the collection bucket.
pub fn objects(collection: impl ToString, bucket: impl ToString) -> CountRequest {
Self(OptDest::col_buc(collection, bucket))
}
/// Creates a new request to get the number of words in the collection bucket object.
pub fn words(
collection: impl ToString,
bucket: impl ToString,
object: impl ToString,
) -> CountRequest {
Self(OptDest::col_buc_obj(collection, bucket, object))
}
}
impl StreamCommand for CountCommand<'_> {
impl From<Dest> for CountRequest {
fn from(d: Dest) -> Self {
Self(OptDest::from(d))
}
}
impl From<ObjDest> for CountRequest {
fn from(d: ObjDest) -> Self {
Self(OptDest::from(d))
}
}
#[derive(Debug)]
pub struct CountCommand {
pub(crate) req: CountRequest,
}
impl StreamCommand for CountCommand {
type Response = usize;
fn message(&self) -> String {
let mut message = format!("COUNT {}", self.collection);
if let Some(bucket) = self.bucket {
message.push_str(&format!(" {}", bucket));
if let Some(object) = self.object {
message.push_str(&format!(" {}", object));
}
fn request(&self) -> protocol::Request {
let dest = &self.req.0;
protocol::Request::Count {
collection: dest.collection.clone(),
bucket: dest.bucket.clone(),
object: dest.object.clone(),
}
message.push_str("\r\n");
message
}
fn receive(&self, message: String) -> Result<Self::Response> {
if message.starts_with("RESULT ") {
let count = message.split_whitespace().last().unwrap_or_default();
count.parse().map_err(|_| {
Error::new(ErrorKind::QueryResponse(
"Cannot parse count of count method response to usize",
))
})
fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
if let protocol::Response::Result(count) = res {
Ok(count)
} else {
Err(Error::new(ErrorKind::WrongResponse))
Err(Error::WrongResponse)
}
}
}

View File

@ -1,39 +1,68 @@
use super::StreamCommand;
use crate::result::{Error, ErrorKind, Result};
use crate::misc::*;
use crate::protocol;
use crate::result::*;
#[derive(Debug, Default)]
pub struct FlushCommand<'a> {
pub collection: &'a str,
pub bucket: Option<&'a str>,
pub object: Option<&'a str>,
}
/// Parameters for the `flush` command.
#[derive(Debug)]
pub struct FlushRequest(OptDest);
impl StreamCommand for FlushCommand<'_> {
type Response = usize;
fn message(&self) -> String {
let mut message = match (self.bucket, self.object) {
(Some(bucket), Some(object)) => {
format!("FLUSHO {} {} {}", self.collection, bucket, object)
}
(Some(bucket), None) => format!("FLUSHB {} {}", self.collection, bucket),
(None, None) => format!("FLUSHC {}", self.collection),
_ => panic!("Invalid flush command"),
};
message.push_str("\r\n");
message
impl FlushRequest {
/// Creates a new request to flush all data in the collection.
pub fn collection(collection: impl ToString) -> FlushRequest {
Self(OptDest::col(collection))
}
fn receive(&self, message: String) -> Result<Self::Response> {
if message.starts_with("RESULT ") {
let count = message.split_whitespace().last().unwrap_or_default();
count.parse().map_err(|_| {
Error::new(ErrorKind::QueryResponse(
"Cannot parse count of flush method response to usize",
))
})
/// Creates a new request to flush all data in the collection bucket.
pub fn bucket(collection: impl ToString, bucket: impl ToString) -> FlushRequest {
Self(OptDest::col_buc(collection, bucket))
}
/// Creates a new request to flush all data in the collection bucket object.
pub fn object(
collection: impl ToString,
bucket: impl ToString,
object: impl ToString,
) -> FlushRequest {
Self(OptDest::col_buc_obj(collection, bucket, object))
}
}
impl From<Dest> for FlushRequest {
fn from(d: Dest) -> Self {
Self(OptDest::from(d))
}
}
impl From<ObjDest> for FlushRequest {
fn from(d: ObjDest) -> Self {
Self(OptDest::from(d))
}
}
#[derive(Debug)]
pub struct FlushCommand {
pub(crate) req: FlushRequest,
}
impl StreamCommand for FlushCommand {
type Response = usize;
fn request(&self) -> protocol::Request {
let dest = &self.req.0;
protocol::Request::Flush {
collection: dest.collection.clone(),
bucket: dest.bucket.clone(),
object: dest.object.clone(),
}
}
fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
if let protocol::Response::Result(count) = res {
Ok(count)
} else {
Err(Error::new(ErrorKind::WrongResponse))
Err(Error::WrongResponse)
}
}
}

View File

@ -1,8 +1,7 @@
mod ping;
mod quit;
mod start;
mod ping;
#[cfg(feature = "ingest")]
mod count;
#[cfg(feature = "ingest")]
@ -20,36 +19,33 @@ mod suggest;
#[cfg(feature = "control")]
mod trigger;
pub(crate) use quit::QuitCommand;
pub(crate) use start::StartCommand;
pub(crate) use ping::PingCommand;
pub(crate) use self::{ping::PingCommand, quit::QuitCommand, start::StartCommand};
#[cfg(feature = "ingest")]
pub(crate) use count::CountCommand;
pub(crate) use self::{
count::CountCommand, flush::FlushCommand, pop::PopCommand, push::PushCommand,
};
#[cfg(feature = "ingest")]
pub(crate) use flush::FlushCommand;
#[cfg(feature = "ingest")]
pub(crate) use pop::PopCommand;
#[cfg(feature = "ingest")]
pub(crate) use push::PushCommand;
pub use self::{count::CountRequest, flush::FlushRequest, pop::PopRequest, push::PushRequest};
#[cfg(feature = "search")]
pub(crate) use query::QueryCommand;
pub(crate) use self::{query::QueryCommand, suggest::SuggestCommand};
#[cfg(feature = "search")]
pub(crate) use suggest::SuggestCommand;
pub use self::{query::QueryRequest, suggest::SuggestRequest};
#[cfg(feature = "control")]
pub(crate) use trigger::{TriggerAction, TriggerCommand};
pub(crate) use trigger::TriggerCommand;
#[cfg(feature = "control")]
pub use trigger::TriggerRequest;
use crate::protocol;
use crate::result::Result;
#[doc(hidden)]
pub trait StreamCommand {
type Response;
const READ_LINES_COUNT: usize = 1;
fn request(&self) -> protocol::Request;
fn message(&self) -> String;
fn receive(&self, message: String) -> Result<Self::Response>;
fn receive(&self, res: protocol::Response) -> Result<Self::Response>;
}

View File

@ -1,21 +1,22 @@
use super::StreamCommand;
use crate::protocol;
use crate::result::*;
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct PingCommand;
impl StreamCommand for PingCommand {
type Response = bool;
type Response = ();
fn message(&self) -> String {
String::from("PING\r\n")
fn request(&self) -> protocol::Request {
protocol::Request::Ping
}
fn receive(&self, message: String) -> Result<Self::Response> {
if message == "PONG\r\n" {
Ok(true)
fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
if matches!(res, protocol::Response::Pong) {
Ok(())
} else {
Err(Error::new(ErrorKind::WrongResponse))
Err(Error::WrongResponse)
}
}
}

View File

@ -1,36 +1,54 @@
use super::StreamCommand;
use crate::misc::ObjDest;
use crate::protocol;
use crate::result::*;
#[derive(Debug, Default)]
pub struct PopCommand<'a> {
pub collection: &'a str,
pub bucket: &'a str,
pub object: &'a str,
pub text: &'a str,
/// Parameters for the `pop` command.
#[derive(Debug)]
pub struct PopRequest {
/// Collection, bucket and object where we should pop search data from index.
pub dest: ObjDest,
/// Search data to be deleted
pub text: String,
}
impl StreamCommand for PopCommand<'_> {
type Response = usize;
fn message(&self) -> String {
let mut message = format!(
r#"POP {} {} {} "{}""#,
self.collection, self.bucket, self.object, self.text
);
message.push_str("\r\n");
message
}
fn receive(&self, message: String) -> Result<Self::Response> {
if message.starts_with("RESULT ") {
let count = message.split_whitespace().last().unwrap_or_default();
count.parse().map_err(|_| {
Error::new(ErrorKind::QueryResponse(
"Cannot parse count of pop method response to usize",
))
})
} else {
Err(Error::new(ErrorKind::WrongResponse))
impl PopRequest {
/// Creates a base pop request.
pub fn new(dest: ObjDest, text: impl ToString) -> Self {
Self {
dest,
text: text.to_string(),
}
}
}
#[derive(Debug)]
pub struct PopCommand {
pub(crate) req: PopRequest,
}
impl StreamCommand for PopCommand {
type Response = usize;
fn request(&self) -> protocol::Request {
let dest = &self.req.dest;
protocol::Request::Pop {
collection: dest.collection().clone(),
bucket: dest
.bucket_opt()
.cloned()
// TODO: use a global context for default bucket value
.unwrap_or_else(|| String::from("default")),
object: dest.object().clone(),
terms: self.req.text.to_string(),
}
}
fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
if let protocol::Response::Result(count) = res {
Ok(count)
} else {
Err(Error::WrongResponse)
}
}
}

View File

@ -1,79 +1,73 @@
use super::StreamCommand;
use crate::misc::ObjDest;
use crate::protocol;
use crate::result::*;
#[derive(Debug, Default)]
pub struct PushCommand<'a> {
pub collection: &'a str,
pub bucket: &'a str,
pub object: &'a str,
pub text: &'a str,
pub locale: Option<&'a str>,
/// Parameters for the `push` command.
#[derive(Debug)]
pub struct PushRequest {
/// Collection, bucket and object where we should push search data in the index.
pub dest: ObjDest,
/// Search data to be added
pub text: String,
/// Language of the search data. If None, the client will try to determine based on the `text`.
pub lang: Option<whatlang::Lang>,
}
impl StreamCommand for PushCommand<'_> {
type Response = bool;
impl PushRequest {
/// Creates a base push request
pub fn new(dest: ObjDest, text: impl ToString) -> Self {
Self {
dest,
text: text.to_string(),
lang: None,
}
}
fn message(&self) -> String {
let mut message = format!(
r#"PUSH {} {} {} "{}""#,
self.collection,
self.bucket,
self.object,
remove_multiline(self.text)
);
/// Set a language for the request.
pub fn lang(mut self, lang: whatlang::Lang) -> Self {
self.lang = Some(lang);
self
}
}
let locale = self.locale.or_else(|| {
whatlang::detect(self.text).and_then(|info| {
if info.confidence() == 1.0 {
Some(info.lang().code())
} else {
None
}
#[derive(Debug)]
pub struct PushCommand {
pub(crate) req: PushRequest,
}
impl StreamCommand for PushCommand {
type Response = ();
fn request(&self) -> protocol::Request {
let req = &self.req;
let lang = req
.lang
.or_else(|| {
whatlang::detect(&req.text).and_then(|i| (i.confidence() == 1.0).then(|| i.lang()))
})
});
.map(|l| l.code());
if let Some(locale) = locale {
message.push_str(&format!(" LANG({})", locale));
protocol::Request::Push {
collection: req.dest.collection().clone(),
bucket: req
.dest
.bucket_opt()
.cloned()
// TODO: use a global context for default bucket value
.unwrap_or_else(|| String::from("default")),
object: req.dest.object().clone(),
terms: req.text.to_string(),
lang,
}
message.push_str("\r\n");
message
}
fn receive(&self, message: String) -> Result<Self::Response> {
if message == "OK\r\n" {
Ok(true)
fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
if matches!(res, protocol::Response::Ok) {
Ok(())
} else {
Err(Error::new(ErrorKind::WrongResponse))
Err(Error::WrongResponse)
}
}
}
fn remove_multiline(text: &str) -> String {
text.lines()
.enumerate()
.fold(String::new(), |mut acc, (i, line)| {
if i != 0 && !line.is_empty() && !acc.is_empty() && !acc.ends_with(' ') {
acc.push(' ');
}
acc.push_str(line);
acc
})
}
#[cfg(test)]
mod tests {
use super::remove_multiline;
#[test]
fn should_make_single_line() {
let text = "
Hello
World
";
let expected_text = "Hello World";
assert_eq!(remove_multiline(text), expected_text);
}
}

View File

@ -1,69 +1,100 @@
use super::StreamCommand;
use crate::misc::Dest;
use crate::protocol;
use crate::result::*;
use regex::Regex;
const RE_QUERY_RECEIVED_MESSAGE: &str = r"(?x)
^PENDING\s(?P<pending_query_id>\w+)\r\n
EVENT\sQUERY\s(?P<event_query_id>\w+)\s(?P<objects>.*?)\r\n$
";
#[derive(Debug, Default)]
pub struct QueryCommand<'a> {
pub collection: &'a str,
pub bucket: &'a str,
pub terms: &'a str,
/// Parameters for the `query` command
#[derive(Debug, Clone)]
pub struct QueryRequest {
/// Collection and bucket where we should search for objects.
pub dest: Dest,
/// Searchable terms.
pub terms: String,
/// Language of the search data. If None, the client will try to determine based on the `terms`.
pub lang: Option<whatlang::Lang>,
/// Limit of result objects.
pub limit: Option<usize>,
/// The number of result objects we want to skip.
pub offset: Option<usize>,
}
impl StreamCommand for QueryCommand<'_> {
type Response = Vec<String>;
const READ_LINES_COUNT: usize = 2;
fn message(&self) -> String {
let mut message = format!(
r#"QUERY {} {} "{}""#,
self.collection, self.bucket, self.terms
);
if let Some(limit) = self.limit.as_ref() {
message.push_str(&format!(" LIMIT({})", limit));
impl QueryRequest {
/// Creates base query request.
pub fn new(dest: Dest, terms: impl ToString) -> Self {
Self {
dest,
terms: terms.to_string(),
lang: None,
limit: None,
offset: None,
}
if let Some(offset) = self.offset.as_ref() {
message.push_str(&format!(" OFFSET({})", offset));
}
// use greyblake/whatlang-rs to autodect locale
if let Some(info) = whatlang::detect(self.terms) {
if info.confidence() == 1.0 {
message.push_str(&format!(" LANG({})", info.lang().code()));
}
}
message.push_str("\r\n");
message
}
fn receive(&self, message: String) -> Result<Self::Response> {
lazy_static! {
static ref RE: Regex = Regex::new(RE_QUERY_RECEIVED_MESSAGE).unwrap();
}
/// Set a language for the request.
pub fn lang(mut self, lang: whatlang::Lang) -> Self {
self.lang = Some(lang);
self
}
if let Some(caps) = RE.captures(&message) {
if caps["pending_query_id"] != caps["event_query_id"] {
Err(Error::new(ErrorKind::QueryResponse(
"Pending id and event id don't match",
)))
} else if caps["objects"].is_empty() {
Ok(vec![])
} else {
Ok(caps["objects"]
.split_whitespace()
.map(str::to_owned)
.collect())
}
/// Set a limit for the request.
pub fn limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
/// Set an offset for the request.
pub fn offset(mut self, offset: usize) -> Self {
self.offset = Some(offset);
self
}
/// Set the pagination for the request. Automatic offset calculation based on provided
/// limit and page.
///
/// Note: the first page is 0;
pub fn pag(self, page: usize, limit: usize) -> Self {
let offset = page * limit;
self.offset(offset).limit(limit)
}
}
#[derive(Debug)]
pub struct QueryCommand {
pub(crate) req: QueryRequest,
}
impl StreamCommand for QueryCommand {
type Response = Vec<String>;
fn request(&self) -> protocol::Request {
let dest = &self.req.dest;
let lang = self
.req
.lang
.or_else(|| {
whatlang::detect(&self.req.terms)
.and_then(|i| (i.confidence() == 1.0).then(|| i.lang()))
})
.map(|l| l.code());
protocol::Request::Query {
collection: dest.collection().clone(),
bucket: dest
.bucket_opt()
.cloned()
.unwrap_or_else(|| String::from("default")),
terms: self.req.terms.clone(),
offset: self.req.offset,
limit: self.req.limit,
lang,
}
}
fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
if let protocol::Response::Event(protocol::EventKind::Query, _id, objects) = res {
Ok(objects)
} else {
Err(Error::new(ErrorKind::WrongResponse))
Err(Error::WrongResponse)
}
}
}

View File

@ -1,21 +1,22 @@
use super::StreamCommand;
use crate::protocol;
use crate::result::*;
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct QuitCommand;
impl StreamCommand for QuitCommand {
type Response = bool;
type Response = ();
fn message(&self) -> String {
String::from("QUIT\r\n")
fn request(&self) -> protocol::Request {
protocol::Request::Quit
}
fn receive(&self, message: String) -> Result<Self::Response> {
if message.starts_with("ENDED ") {
Ok(true)
fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
if matches!(res, protocol::Response::Ended) {
Ok(())
} else {
Err(Error::new(ErrorKind::WrongResponse))
Err(Error::WrongResponse)
}
}
}

View File

@ -1,27 +1,17 @@
use super::StreamCommand;
use crate::channels::ChannelMode;
use crate::protocol;
use crate::result::*;
use regex::Regex;
const RE_START_RECEIVED_MESSAGE: &str = r"(?x)
STARTED
\s # started with mode
(?P<mode>search|ingest|control)
\s # wich protocol used
protocol\((?P<protocol>\d+)\)
\s # maximum buffer size
buffer\((?P<buffer_size>\d+)\)
";
#[derive(Debug)]
pub struct StartCommand {
pub mode: ChannelMode,
pub password: String,
pub(crate) mode: ChannelMode,
pub(crate) password: String,
}
#[derive(Debug)]
pub struct StartCommandResponse {
pub protocol_version: usize,
pub protocol_version: protocol::Version,
pub max_buffer_size: usize,
pub mode: ChannelMode,
}
@ -29,32 +19,26 @@ pub struct StartCommandResponse {
impl StreamCommand for StartCommand {
type Response = StartCommandResponse;
fn message(&self) -> String {
format!("START {} {}\r\n", self.mode, self.password)
fn request(&self) -> protocol::Request {
protocol::Request::Start {
mode: self.mode,
password: self.password.to_string(),
}
}
fn receive(&self, message: String) -> Result<Self::Response> {
lazy_static! {
static ref RE: Regex = Regex::new(RE_START_RECEIVED_MESSAGE).unwrap();
}
if let Some(caps) = RE.captures(&message) {
if self.mode.as_str() != &caps["mode"] {
Err(Error::new(ErrorKind::SwitchMode))
} else {
let protocol_version: usize =
caps["protocol"].parse().expect("Must be digit by regex");
let max_buffer_size: usize =
caps["buffer_size"].parse().expect("Must be digit by regex");
Ok(StartCommandResponse {
protocol_version,
max_buffer_size,
mode: self.mode,
})
}
fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
if let protocol::Response::Started(payload) = res {
Ok(StartCommandResponse {
protocol_version: payload
.protocol_version
.try_into()
// TODO: better error
.map_err(|_| Error::SwitchMode)?,
max_buffer_size: payload.max_buffer_size,
mode: self.mode,
})
} else {
Err(Error::new(ErrorKind::SwitchMode))
Err(Error::SwitchMode)
}
}
}

View File

@ -1,58 +1,63 @@
use super::StreamCommand;
use crate::misc::Dest;
use crate::protocol;
use crate::result::*;
use regex::Regex;
const RE_SUGGEST_RECEIVED_MESSAGE: &str = r"(?x)
^PENDING\s(?P<pending_suggest_id>\w+)\r\n
EVENT\sSUGGEST\s(?P<event_suggest_id>\w+)\s(?P<words>.*?)\r\n$
";
#[derive(Debug, Default)]
pub struct SuggestCommand<'a> {
pub collection: &'a str,
pub bucket: &'a str,
pub word: &'a str,
/// Parameters for the `suggest` command.
#[derive(Debug)]
pub struct SuggestRequest {
/// Collection and bucket where we should search for suggested words.
pub dest: Dest,
/// Base word.
pub word: String,
/// Limit of result words.
pub limit: Option<usize>,
}
impl StreamCommand for SuggestCommand<'_> {
type Response = Vec<String>;
const READ_LINES_COUNT: usize = 2;
fn message(&self) -> String {
let mut message = format!(
r#"SUGGEST {} {} "{}""#,
self.collection, self.bucket, self.word
);
if let Some(limit) = self.limit.as_ref() {
message.push_str(&format!(" LIMIT({})", limit));
impl SuggestRequest {
/// Creates a base suggest request.
pub fn new(dest: Dest, word: impl ToString) -> Self {
Self {
dest,
word: word.to_string(),
limit: None,
}
message.push_str("\r\n");
message
}
fn receive(&self, message: String) -> Result<Self::Response> {
lazy_static! {
static ref RE: Regex = Regex::new(RE_SUGGEST_RECEIVED_MESSAGE).unwrap();
}
/// Set a limit for the request.
pub fn limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
}
match RE.captures(&message) {
None => Err(Error::new(ErrorKind::WrongResponse)),
Some(caps) => {
if caps["pending_suggest_id"] != caps["event_suggest_id"] {
Err(Error::new(ErrorKind::QueryResponse(
"Pending id and event id don't match",
)))
} else if caps["words"].is_empty() {
Ok(vec![])
} else {
Ok(caps["words"]
.split_whitespace()
.map(str::to_owned)
.collect())
}
}
#[derive(Debug)]
pub struct SuggestCommand {
pub(crate) req: SuggestRequest,
}
impl StreamCommand for SuggestCommand {
type Response = Vec<String>;
fn request(&self) -> protocol::Request {
let dest = &self.req.dest;
protocol::Request::Suggest {
collection: dest.collection().clone(),
bucket: dest
.bucket_opt()
.cloned()
.unwrap_or_else(|| String::from("default")),
word: self.req.word.to_string(),
limit: self.req.limit,
}
}
fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
if let protocol::Response::Event(protocol::EventKind::Suggest, _id, words) = res {
Ok(words)
} else {
Err(Error::WrongResponse)
}
}
}

View File

@ -1,47 +1,47 @@
use super::StreamCommand;
use crate::protocol;
use crate::result::*;
use std::fmt;
use std::path::PathBuf;
/// Parameters for the `trigger` command.
#[derive(Debug)]
pub enum TriggerAction<'a> {
pub enum TriggerRequest<'a> {
/// Consolidate indexed search data instead of waiting for the next automated
/// consolidation tick.
Consolidate,
/// Backup KV + FST to <path>/<BACKUP_{KV/FST}_PATH>
/// See [sonic backend source code](https://github.com/valeriansaliou/sonic/blob/master/src/channel/command.rs#L808)
/// for more information.
Backup(&'a str),
/// Restore KV + FST from <path> if you already have backup with the same name.
Restore(&'a str),
}
impl Default for TriggerAction<'_> {
fn default() -> Self {
TriggerAction::Consolidate
}
}
impl fmt::Display for TriggerAction<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
match self {
TriggerAction::Consolidate => write!(f, "consolidate"),
TriggerAction::Backup(data) => write!(f, "backup {}", data),
TriggerAction::Restore(data) => write!(f, "restore {}", data),
}
}
}
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct TriggerCommand<'a> {
pub action: TriggerAction<'a>,
pub(crate) req: TriggerRequest<'a>,
}
impl StreamCommand for TriggerCommand<'_> {
type Response = bool;
type Response = ();
fn message(&self) -> String {
format!("TRIGGER {}\r\n", self.action)
fn request(&self) -> protocol::Request {
let req = match self.req {
TriggerRequest::Consolidate => protocol::TriggerRequest::Consolidate,
TriggerRequest::Backup(path) => protocol::TriggerRequest::Backup(PathBuf::from(path)),
TriggerRequest::Restore(path) => protocol::TriggerRequest::Restore(PathBuf::from(path)),
};
protocol::Request::Trigger(req)
}
fn receive(&self, message: String) -> Result<Self::Response> {
if message == "OK\r\n" {
Ok(true)
fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
if matches!(res, protocol::Response::Ok) {
Ok(())
} else {
Err(Error::new(ErrorKind::WrongResponse))
Err(Error::WrongResponse)
}
}
}

View File

@ -17,7 +17,10 @@
//! "SecretPassword",
//! )?;
//!
//! let objects = channel.query("collection", "bucket", "recipe")?;
//! let objects = channel.query(QueryRequest::new(
//! Dest::col_buc("collection", "bucket"),
//! "recipe",
//! ))?;
//! dbg!(objects);
//!
//! Ok(())
@ -32,14 +35,17 @@
//! use sonic_channel::*;
//!
//! fn main() -> result::Result<()> {
//! let mut channel = IngestChannel::start(
//! let channel = IngestChannel::start(
//! "localhost:1491",
//! "SecretPassword",
//! )?;
//!
//! let pushed = channel.push("collection", "bucket", "object:1", "my best recipe")?;
//! let dest = Dest::col_buc("collection", "bucket").obj("object:1");
//! let pushed = channel.push(PushRequest::new(dest, "my best recipe"))?;
//! // or
//! // let pushed = channel.push_with_locale("collection", "bucket", "object:1", "Мой лучший рецепт", "rus")?;
//! // let pushed = channel.push(
//! // PushRequest::new(dest, "Мой лучший рецепт").lang(Lang::Rus)
//! // )?;
//! dbg!(pushed);
//!
//! Ok(())
@ -54,13 +60,13 @@
//! use sonic_channel::*;
//!
//! fn main() -> result::Result<()> {
//! let mut channel = ControlChannel::start(
//! let channel = ControlChannel::start(
//! "localhost:1491",
//! "SecretPassword",
//! )?;
//!
//! let result = channel.consolidate()?;
//! assert_eq!(result, true);
//! assert_eq!(result, ());
//!
//! Ok(())
//! }
@ -87,15 +93,20 @@ compile_error!(
#[macro_use]
mod macroses;
mod misc;
pub(crate) mod protocol;
mod channels;
mod commands;
/// Contains the request parameters for each command to the sonic server.
pub mod commands;
/// Contains sonic channel error type and custom Result type for easy configure your functions.
pub mod result;
pub use channels::*;
pub use commands::*;
pub use misc::*;
#[macro_use]
extern crate lazy_static;
extern crate regex;
pub use whatlang::Lang;

View File

@ -14,8 +14,7 @@ macro_rules! init_command {
) -> $crate::result::Result<
<$cmd_name as $crate::commands::StreamCommand>::Response,
> {
#[allow(clippy::needless_update)]
let command = $cmd_name { $($arg_name $(: $arg_value)?,)* ..Default::default() };
let command = $cmd_name { $($arg_name $(: $arg_value)?,)* };
self.stream().run_command(command)
}
};

165
src/misc.rs Normal file
View File

@ -0,0 +1,165 @@
/// Search data destination. Contains collection, bucket and object.
#[derive(Debug, PartialEq, Eq)]
pub struct ObjDest(Dest, String);
impl ObjDest {
/// Creates a new object destination from base destination (`Dest`) and object id.
///
/// ```rust
/// # use sonic_channel::{Dest, ObjDest};
/// let base_dest = Dest::col_buc("wiki", "user:1");
/// let dest = ObjDest::new(base_dest, "article:1");
/// assert_eq!(dest.collection(), "wiki");
/// assert_eq!(dest.bucket_opt(), Some(&String::from("user:1")));
/// assert_eq!(dest.object(), "article:1");
/// ```
pub fn new(cb: Dest, o: impl ToString) -> Self {
Self(cb, o.to_string())
}
/// Returns the collection.
#[inline]
pub fn collection(&self) -> &String {
self.0.collection()
}
/// Returns the optional bucket.
#[inline]
pub fn bucket_opt(&self) -> Option<&String> {
self.0.bucket_opt()
}
/// Returns the object id.
#[inline]
pub fn object(&self) -> &String {
&self.1
}
}
/// Search objects destination. Contains collection and bucket.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Dest {
collection: String,
bucket: Option<String>,
}
impl Dest {
/// Creates a new destination with collection and bucket.
///
/// ```rust
/// # use sonic_channel::Dest;
/// let dest = Dest::col_buc("wiki", "user:1");
/// assert_eq!(dest.collection(), "wiki");
/// assert_eq!(dest.bucket_opt(), Some(&String::from("user:1")));
/// ```
pub fn col_buc(c: impl ToString, b: impl ToString) -> Self {
Self::col(c).buc(b)
}
/// Creates a new destination with collection.
///
/// ```rust
/// # use sonic_channel::Dest;
/// let dest = Dest::col("wiki");
/// assert_eq!(dest.collection(), "wiki");
/// ```
pub fn col(c: impl ToString) -> Self {
Self {
collection: c.to_string(),
bucket: None,
}
}
/// Set bucket for the destination.
///
/// ```rust
/// # use sonic_channel::Dest;
/// let dest = Dest::col("wiki").buc("user:1");
/// assert_eq!(dest.collection(), "wiki");
/// assert_eq!(dest.bucket_opt(), Some(&String::from("user:1")));
/// ```
pub fn buc(mut self, b: impl ToString) -> Self {
self.bucket = Some(b.to_string());
self
}
/// Set object id to the destination and transform to object destination (`ObjDest`).
///
/// Short for `ObjDest::new(dest, object_id)`
///
/// ```rust
/// # use sonic_channel::Dest;
/// let dest = Dest::col_buc("wiki", "user:1").obj("article:1");
/// assert_eq!(dest.collection(), "wiki");
/// assert_eq!(dest.bucket_opt(), Some(&String::from("user:1")));
/// assert_eq!(dest.object(), "article:1");
/// ```
pub fn obj(self, o: impl ToString) -> ObjDest {
ObjDest::new(self, o)
}
/// Returns the collection.
#[inline]
pub fn collection(&self) -> &String {
&self.collection
}
/// Returns the optional bucket.
#[inline]
pub fn bucket_opt(&self) -> Option<&String> {
self.bucket.as_ref()
}
}
#[derive(Debug)]
pub(crate) struct OptDest {
pub(crate) collection: String,
pub(crate) bucket: Option<String>,
pub(crate) object: Option<String>,
}
impl OptDest {
pub(crate) fn col(c: impl ToString) -> Self {
Self {
collection: c.to_string(),
bucket: None,
object: None,
}
}
pub(crate) fn col_buc(c: impl ToString, b: impl ToString) -> Self {
Self {
collection: c.to_string(),
bucket: Some(b.to_string()),
object: None,
}
}
pub(crate) fn col_buc_obj(c: impl ToString, b: impl ToString, o: impl ToString) -> Self {
Self {
collection: c.to_string(),
bucket: Some(b.to_string()),
object: Some(o.to_string()),
}
}
}
impl From<Dest> for OptDest {
fn from(d: Dest) -> Self {
Self {
collection: d.collection,
bucket: d.bucket,
object: None,
}
}
}
impl From<ObjDest> for OptDest {
fn from(ObjDest(dest, obj): ObjDest) -> Self {
Self {
collection: dest.collection,
bucket: dest.bucket,
object: Some(obj),
}
}
}

321
src/protocol.rs Normal file
View File

@ -0,0 +1,321 @@
use std::io::{self, BufWriter, Write};
use std::{path::PathBuf, str::FromStr};
use crate::{result::*, ChannelMode};
#[derive(Debug, Default)]
pub struct Protocol {
#[allow(dead_code)]
version: Version,
}
impl From<Version> for Protocol {
fn from(version: Version) -> Self {
Self { version }
}
}
impl Protocol {
pub fn format_request(&self, req: Request) -> io::Result<Vec<u8>> {
let mut res = BufWriter::new(Vec::new());
match req {
Request::Quit => write!(res, "QUIT")?,
Request::Ping => write!(res, "PING")?,
Request::Start { mode, password } => write!(res, "START {} {}", mode, password)?,
#[rustfmt::skip]
Request::Count { collection, bucket, object } => match (bucket, object) {
(Some(b), Some(o)) => write!(res, "COUNT {} {} {}", collection, b, o)?,
(Some(b), None) => write!(res, "COUNT {} {}", collection, b)?,
(None, None) => write!(res, "COUNT {}", collection)?,
_ => panic!("Wrong protocol format"),
},
#[rustfmt::skip]
Request::Flush { collection, bucket, object } => match (bucket, object) {
(Some(b), Some(o)) => write!(res, "FLUSHO {} {} {}", collection, b, o)?,
(Some(b), None) => write!(res, "FLUSHB {} {}", collection, b)?,
(None, None) => write!(res, "FLUSHC {}", collection)?,
_ => panic!("Wrong protocol format"),
},
#[rustfmt::skip]
Request::Pop { collection, bucket, object, terms } => {
write!(res, "POP {} {} {} \"{}\"", collection, bucket, object, terms)?
},
#[rustfmt::skip]
Request::Push { collection, bucket, object, terms, lang } => {
let oneline_terms = remove_multiline(&terms);
write!(res, "PUSH {} {} {} \"{}\"", collection, bucket, object, oneline_terms)?;
if let Some(lang) = lang {
write!(res, " LANG({})", lang)?
}
}
#[rustfmt::skip]
Request::Query { collection, bucket, terms, offset, limit, lang } => {
write!(res, "QUERY {} {} \"{}\"", collection, bucket, terms)?;
if let Some(limit) = limit {
write!(res, " LIMIT({})", limit)?;
}
if let Some(offset) = offset {
write!(res, " OFFSET({})", offset)?;
}
if let Some(lang) = lang {
write!(res, " LANG({})", lang)?;
}
}
#[rustfmt::skip]
Request::Suggest { collection, bucket, word, limit } => {
write!(res, "SUGGEST {} {} \"{}\"", collection, bucket, word)?;
if let Some(limit) = limit {
write!(res, " LIMIT({})", limit)?;
}
}
Request::Trigger(triger_req) => match triger_req {
TriggerRequest::Consolidate => write!(res, "TRIGGER consolidate")?,
TriggerRequest::Backup(path) => {
write!(res, "TRIGGER backup {}", path.to_str().unwrap())?
}
TriggerRequest::Restore(path) => {
write!(res, "TRIGGER restore {}", path.to_str().unwrap())?
}
},
}
write!(res, "\r\n")?;
res.flush()?;
Ok(res.into_inner()?)
}
pub fn parse_response(&self, line: &str) -> Result<Response> {
let mut segments = line.split_whitespace();
match segments.next() {
Some("STARTED") => match (segments.next(), segments.next(), segments.next()) {
(Some(_raw_mode), Some(raw_protocol), Some(raw_buffer_size)) => {
Ok(Response::Started(StartedPayload {
protocol_version: parse_server_config(raw_protocol)?,
max_buffer_size: parse_server_config(raw_buffer_size)?,
}))
}
_ => Err(Error::WrongResponse),
},
Some("PENDING") => {
let event_id = segments
.next()
.map(String::from)
.ok_or(Error::WrongResponse)?;
Ok(Response::Pending(event_id))
}
Some("RESULT") => match segments.next() {
Some(num) => num
.parse()
.map(Response::Result)
.map_err(|_| Error::WrongResponse),
_ => Err(Error::WrongResponse),
},
Some("EVENT") => {
let event_kind = match segments.next() {
Some("SUGGEST") => Ok(EventKind::Suggest),
Some("QUERY") => Ok(EventKind::Query),
_ => Err(Error::WrongResponse),
}?;
let event_id = segments
.next()
.map(String::from)
.ok_or(Error::WrongResponse)?;
let objects = segments.map(String::from).collect();
Ok(Response::Event(event_kind, event_id, objects))
}
Some("OK") => Ok(Response::Ok),
Some("ENDED") => Ok(Response::Ended),
Some("CONNECTED") => Ok(Response::Connected),
Some("ERR") => match segments.next() {
Some(message) => Err(Error::SonicServer(String::from(message))),
_ => Err(Error::WrongResponse),
},
_ => Err(Error::WrongResponse),
}
}
}
//===========================================================================//
// Primitives //
//===========================================================================//
#[derive(Debug, PartialEq, Eq)]
#[repr(u8)]
pub enum Version {
V1 = 1,
}
impl Default for Version {
fn default() -> Self {
Self::V1
}
}
impl TryFrom<u8> for Version {
type Error = ();
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
match value {
1 => Ok(Self::V1),
_ => Err(()),
}
}
}
//===========================================================================//
// Response //
//===========================================================================//
pub type EventId = String;
#[derive(Debug)]
pub enum Response {
Ok,
Ended,
Connected,
Pending(EventId),
Pong,
Started(StartedPayload),
Result(usize),
Event(EventKind, EventId, Vec<String>),
}
#[derive(Debug)]
pub struct StartedPayload {
pub protocol_version: u8,
pub max_buffer_size: usize,
}
#[derive(Debug)]
pub enum EventKind {
Suggest,
Query,
}
//===========================================================================//
// Request //
//===========================================================================//
#[derive(Debug)]
pub enum Request {
Start {
mode: ChannelMode,
password: String,
},
Quit,
Ping,
Trigger(TriggerRequest),
Suggest {
collection: String,
bucket: String,
word: String,
limit: Option<usize>,
},
Query {
collection: String,
bucket: String,
terms: String,
offset: Option<usize>,
limit: Option<usize>,
lang: Option<&'static str>,
},
Push {
collection: String,
bucket: String,
object: String,
terms: String,
lang: Option<&'static str>,
},
Pop {
collection: String,
bucket: String,
object: String,
terms: String,
},
Flush {
collection: String,
bucket: Option<String>,
object: Option<String>,
},
Count {
collection: String,
bucket: Option<String>,
object: Option<String>,
},
}
#[derive(Debug)]
pub enum TriggerRequest {
Consolidate,
Backup(PathBuf),
Restore(PathBuf),
}
//===========================================================================//
// Utils //
//===========================================================================//
fn parse_server_config<T: FromStr>(raw: &str) -> Result<T> {
raw.split_terminator(&['(', ')'])
.nth(1)
.ok_or(Error::WrongResponse)?
.parse()
.map_err(|_| Error::WrongResponse)
}
fn remove_multiline(text: &str) -> String {
text.lines()
.enumerate()
.fold(String::new(), |mut acc, (i, line)| {
if i != 0 && !line.is_empty() && !acc.is_empty() && !acc.ends_with(' ') {
acc.push(' ');
}
acc.push_str(line);
acc
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_parse_protocol() {
match parse_server_config::<u8>("protocol(1)") {
Ok(protocol) => assert_eq!(protocol, 1),
_ => unreachable!(),
}
}
#[test]
fn should_parse_buffer_size() {
match parse_server_config::<usize>("buffer_size(20000)") {
Ok(buffer_size) => assert_eq!(buffer_size, 20000),
_ => unreachable!(),
}
}
#[test]
fn should_make_single_line() {
let text = "
Hello
World
";
let expected_text = "Hello World";
assert_eq!(remove_multiline(text), expected_text);
}
}

View File

@ -1,6 +1,4 @@
use crate::channels::ChannelMode;
use std::error::Error as StdError;
use std::fmt;
/// Sugar if you expect only sonic-channel error type in result
pub type Result<T> = std::result::Result<T, Error>;
@ -8,29 +6,10 @@ pub type Result<T> = std::result::Result<T, Error>;
/// Wrap for sonic channel error kind. This type has std::error::Error
/// implementation and you can use boxed trait for catch other errors
/// like this.
#[derive(Debug)]
pub struct Error {
kind: ErrorKind,
}
impl StdError for Error {}
impl Error {
/// Creates new Error with sonic channel error kind
///
/// ```rust
/// use sonic_channel::result::*;
///
/// let err = Error::new(ErrorKind::ConnectToServer);
/// ```
pub fn new(kind: ErrorKind) -> Self {
Error { kind }
}
}
/// All error kinds that you can see in sonic-channel crate.
#[derive(Debug)]
pub enum ErrorKind {
pub enum Error {
/// Cannot connect to the sonic search backend.
ConnectToServer,
@ -57,24 +36,25 @@ pub enum ErrorKind {
UnsupportedCommand((&'static str, Option<ChannelMode>)),
/// This error appears if the error occurred on the server side
SonicServer(&'static str),
SonicServer(String),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
match self.kind {
ErrorKind::ConnectToServer => write!(f, "Cannot connect to server"),
ErrorKind::WriteToStream => write!(f, "Cannot write data to stream"),
ErrorKind::ReadStream => write!(f, "Cannot read sonic response from stream"),
ErrorKind::SwitchMode => write!(f, "Cannot switch channel mode"),
ErrorKind::RunCommand => write!(f, "Cannot run command in current mode"),
ErrorKind::QueryResponse(message) => {
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use Error::*;
match self {
ConnectToServer => f.write_str("Cannot connect to server"),
WriteToStream => f.write_str("Cannot write data to stream"),
ReadStream => f.write_str("Cannot read sonic response from stream"),
SwitchMode => f.write_str("Cannot switch channel mode"),
RunCommand => f.write_str("Cannot run command in current mode"),
QueryResponse(message) => {
write!(f, "Error in query response: {}", message)
}
ErrorKind::WrongResponse => {
WrongResponse => {
write!(f, "Client cannot parse response from sonic server. Please write an issue to github (https://github.com/pleshevskiy/sonic-channel).")
}
ErrorKind::UnsupportedCommand((command_name, channel_mode)) => {
UnsupportedCommand((command_name, channel_mode)) => {
if let Some(channel_mode) = channel_mode {
write!(
f,
@ -89,7 +69,9 @@ impl fmt::Display for Error {
)
}
}
ErrorKind::SonicServer(message) => write!(f, "Sonic Server-side error: {}", message),
SonicServer(message) => write!(f, "Sonic Server-side error: {}", message),
}
}
}
impl std::error::Error for Error {}

View File

@ -22,5 +22,7 @@ pub fn consolidate() {
}
pub fn flush_bucket(collection: &str, bucket: &str) {
ingest_start().flushb(collection, bucket).unwrap();
ingest_start()
.flush(FlushRequest::bucket(collection, bucket))
.unwrap();
}

View File

@ -7,10 +7,15 @@ const COLLECTION: &str = "Ingest";
fn should_push_new_object_to_sonic() {
let bucket = "push_simple";
let dest = Dest::col_buc(COLLECTION, bucket);
let ingest_channel = ingest_start();
match ingest_channel.push(COLLECTION, bucket, "1", "Sweet Teriyaki Beef Skewers") {
Ok(res) => assert!(res),
Err(_) => unreachable!(),
match ingest_channel.push(PushRequest::new(
dest.obj("1"),
"Sweet Teriyaki Beef Skewers",
)) {
Ok(()) => {}
_ => unreachable!(),
}
flush_bucket(COLLECTION, bucket);
@ -20,16 +25,14 @@ fn should_push_new_object_to_sonic() {
fn should_push_new_object_to_sonic_with_russian_locale() {
let bucket = "push_locale";
let dest = Dest::col_buc(COLLECTION, bucket);
let ingest_channel = ingest_start();
match ingest_channel.push_with_locale(
COLLECTION,
bucket,
"1",
"Открытый пирог с орехами и сгущенкой",
"rus",
match ingest_channel.push(
PushRequest::new(dest.obj("1"), "Открытый пирог с орехами и сгущенкой").lang(Lang::Rus),
) {
Ok(res) => assert!(res),
Err(_) => unreachable!(),
Ok(()) => {}
_ => unreachable!(),
}
flush_bucket(COLLECTION, bucket);
@ -45,10 +48,12 @@ Beef
Skewers
";
let dest = Dest::col_buc(COLLECTION, bucket);
let ingest_channel = ingest_start();
match ingest_channel.push(COLLECTION, bucket, "1", multiline_text) {
Ok(res) => assert!(res),
Err(_) => unreachable!(),
match ingest_channel.push(PushRequest::new(dest.obj("1"), multiline_text)) {
Ok(()) => {}
_ => unreachable!(),
}
flush_bucket(COLLECTION, bucket);

View File

@ -8,35 +8,19 @@ fn should_find_object_by_exact_match() {
let bucket = "query_by_exact_match";
let title = "Sweet Teriyaki Beef Skewers";
let ingest_channel = ingest_start();
ingest_channel.push(COLLECTION, bucket, "1", title).unwrap();
let search_channel = search_start();
match search_channel.query(COLLECTION, bucket, title) {
Ok(object_ids) => assert_eq!(object_ids, vec!["1"]),
Err(_) => unreachable!(),
}
flush_bucket(COLLECTION, bucket);
}
#[test]
fn should_find_object_by_partial_match() {
let bucket = "query_by_partial_match";
let dest = Dest::col_buc(COLLECTION, bucket);
let ingest_channel = ingest_start();
ingest_channel
.push(COLLECTION, bucket, "1", "Sweet Teriyaki Beef Skewers")
.push(PushRequest::new(dest.clone().obj("1"), title))
.unwrap();
let search_channel = search_start();
consolidate();
let words = ["Sweet", "Teriyaki", "Beef", "Skewers"];
for word in words {
match search_channel.query(COLLECTION, bucket, word) {
Ok(object_ids) => assert_eq!(object_ids, vec!["1"]),
Err(_) => unreachable!(),
}
let search_channel = search_start();
match search_channel.query(QueryRequest::new(dest, title)) {
Ok(object_ids) => assert_eq!(object_ids, vec![String::from("1")]),
Err(_) => unreachable!(),
}
flush_bucket(COLLECTION, bucket);
@ -50,19 +34,22 @@ Sweet
Teriyaki
Beef
Skewers
";
None";
let dest = Dest::col_buc(COLLECTION, bucket);
let ingest_channel = ingest_start();
ingest_channel
.push(COLLECTION, bucket, "1", multiline_text)
.push(PushRequest::new(dest.clone().obj("1"), multiline_text))
.unwrap();
let search_channel = search_start();
consolidate();
let words = ["Sweet", "Teriyaki", "Beef", "Skewers"];
let search_channel = search_start();
for word in words {
match search_channel.query(COLLECTION, bucket, word) {
Ok(object_ids) => assert_eq!(object_ids, vec!["1"]),
match search_channel.query(QueryRequest::new(dest.clone(), word)) {
Ok(object_ids) => assert_eq!(object_ids, vec![String::from("1")]),
Err(_) => unreachable!(),
}
}
@ -74,19 +61,32 @@ Skewers
fn should_find_many_objects() {
let bucket = "query_many_objects";
let dest = Dest::col_buc(COLLECTION, bucket);
let ingest_channel = ingest_start();
ingest_channel
.push(COLLECTION, bucket, "1", "Sweet Teriyaki Beef Skewers")
.push(PushRequest::new(
dest.clone().obj("1"),
"Sweet Teriyaki Beef Skewers",
))
.unwrap();
ingest_channel
.push(COLLECTION, bucket, "2", "Slow Cooker Beef Stew I")
.push(PushRequest::new(
dest.clone().obj("2"),
"Slow Cooker Beef Stew I",
))
.unwrap();
ingest_channel
.push(COLLECTION, bucket, "3", "Christmas Prime Rib")
.push(PushRequest::new(
dest.clone().obj("3"),
"Christmas Prime Rib",
))
.unwrap();
consolidate();
let search_channel = search_start();
match search_channel.query(COLLECTION, bucket, "Beef") {
match search_channel.query(QueryRequest::new(dest, "Beef")) {
Ok(object_ids) => assert_eq!(object_ids, vec!["2", "1"]),
Err(_) => unreachable!(),
}
@ -98,25 +98,38 @@ fn should_find_many_objects() {
fn should_find_limited_objects() {
let bucket = "query_limited_objects";
let dest = Dest::col_buc(COLLECTION, bucket);
let ingest_channel = ingest_start();
ingest_channel
.push(COLLECTION, bucket, "1", "Sweet Teriyaki Beef Skewers")
.push(PushRequest::new(
dest.clone().obj("1"),
"Sweet Teriyaki Beef Skewers",
))
.unwrap();
ingest_channel
.push(COLLECTION, bucket, "2", "Slow Cooker Beef Stew I")
.push(PushRequest::new(
dest.clone().obj("2"),
"Slow Cooker Beef Stew I",
))
.unwrap();
ingest_channel
.push(COLLECTION, bucket, "3", "Christmas Prime Rib")
.push(PushRequest::new(
dest.clone().obj("3"),
"Christmas Prime Rib",
))
.unwrap();
consolidate();
let search_channel = search_start();
match search_channel.query_with_limit(COLLECTION, bucket, "Beef", 1) {
match search_channel.query(QueryRequest::new(dest.clone(), "Beef").limit(1)) {
Ok(object_ids) => assert_eq!(object_ids, vec!["2"]),
Err(_) => unreachable!(),
}
let search_channel = search_start();
match search_channel.query_with_limit_and_offset(COLLECTION, bucket, "Beef", 1, 1) {
match search_channel.query(QueryRequest::new(dest, "Beef").pag(1, 1)) {
Ok(object_ids) => assert_eq!(object_ids, vec!["1"]),
Err(_) => unreachable!(),
}

View File

@ -8,8 +8,12 @@ fn should_suggest_nearest_word() {
let bucket = "suggest_nearest";
let title = "Sweet Teriyaki Beef Skewers";
let dest = Dest::col_buc(COLLECTION, bucket);
let ingest_channel = ingest_start();
ingest_channel.push(COLLECTION, bucket, "1", title).unwrap();
ingest_channel
.push(PushRequest::new(dest.clone().obj("1"), title))
.unwrap();
consolidate();
@ -22,7 +26,7 @@ fn should_suggest_nearest_word() {
let search_channel = search_start();
for (input, expected) in pairs {
match search_channel.suggest(COLLECTION, bucket, input) {
match search_channel.suggest(SuggestRequest::new(dest.clone(), input)) {
Ok(object_ids) => assert_eq!(object_ids, vec![expected]),
Err(_) => unreachable!(),
}