From bd0831738870580927a2e8f79007d159c25d78e7 Mon Sep 17 00:00:00 2001 From: Dmitriy Pleshevskiy Date: Mon, 18 Jul 2022 11:07:12 +0000 Subject: [PATCH] Design improvements (#13) * protocol: extract response type * deps: drop lazy_static and regex * protocol: extract request commands * protocol: create a struct for... ...formatting and parsing sonic protocol * protocol: refac flush command * commands: introduce dest, refac push and count * commands: refac all commands * commands: add convinient methods * doc: add documentation for each new structs * doc: change examples in the readme * commands: implement from trait for count and flush * commands: change pag logic --- .vim/coc-settings.json | 3 + Cargo.toml | 9 +- README.md | 34 ++++- src/channels/control.rs | 132 ++++++++-------- src/channels/ingest.rs | 179 +++------------------- src/channels/mod.rs | 132 +++++++--------- src/channels/search.rs | 118 +++----------- src/commands/count.rs | 78 +++++++--- src/commands/flush.rs | 89 +++++++---- src/commands/mod.rs | 34 ++--- src/commands/ping.rs | 17 ++- src/commands/pop.rs | 74 +++++---- src/commands/push.rs | 120 +++++++-------- src/commands/query.rs | 139 ++++++++++------- src/commands/quit.rs | 17 ++- src/commands/start.rs | 58 +++---- src/commands/suggest.rs | 95 ++++++------ src/commands/trigger.rs | 54 +++---- src/lib.rs | 31 ++-- src/macroses.rs | 3 +- src/misc.rs | 165 ++++++++++++++++++++ src/protocol.rs | 321 +++++++++++++++++++++++++++++++++++++++ src/result.rs | 52 +++---- tests/common.rs | 4 +- tests/push_command.rs | 33 ++-- tests/query_command.rs | 87 ++++++----- tests/suggest_command.rs | 8 +- 27 files changed, 1239 insertions(+), 847 deletions(-) create mode 100644 .vim/coc-settings.json create mode 100644 src/misc.rs create mode 100644 src/protocol.rs diff --git a/.vim/coc-settings.json b/.vim/coc-settings.json new file mode 100644 index 0000000..6f5c4ed --- /dev/null +++ b/.vim/coc-settings.json @@ -0,0 +1,3 @@ +{ + "rust-analyzer.cargo.features": "all" +} diff --git a/Cargo.toml b/Cargo.toml index fb14d31..b1373b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,11 @@ [package] name = "sonic-channel" -version = "0.6.0" +version = "1.0.0" authors = ["Dmitriy Pleshevskiy "] description = "Rust client for sonic search backend" categories = ["api-bindings"] keywords = ["sonic", "search", "client", "elasticsearch", "api"] -edition = "2018" +edition = "2021" license = "MPL-2.0" repository = "https://github.com/pleshevskiy/sonic-channel" homepage = "https://github.com/pleshevskiy/sonic-channel" @@ -15,8 +15,9 @@ readme = "README.md" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -lazy_static = "1.4.0" -regex = "1.3.4" +# TODO(pleshevskiy): don't forget to remove before publishing +env_logger = "0.9.0" +log = "0.4.17" whatlang = "0.12.0" [features] diff --git a/README.md b/README.md index 79a478c..bdea91d 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ version = "0.1.0" authors = ["Me "] [dependencies] -sonic-channel = { version = "0.6", features = ["ingest"] } +sonic-channel = { version = "1.0", features = ["ingest"] } ``` Add `default-features = false` to dependency, if you want to exclude default @@ -33,8 +33,15 @@ Note: This example requires enabling the `search` feature, enabled by default. use sonic_channel::*; fn main() -> result::Result<()> { - let channel = SearchChannel::start("localhost:1491", "SecretPassword")?; - let objects = channel.query("collection", "bucket", "recipe")?; + let channel = SearchChannel::start( + "localhost:1491", + "SecretPassword", + )?; + + let objects = channel.query(QueryRequest::new( + Dest::col_buc("collection", "bucket"), + "recipe", + ))?; dbg!(objects); Ok(()) @@ -49,10 +56,17 @@ Note: This example requires enabling the `ingest` feature. use sonic_channel::*; fn main() -> result::Result<()> { - let channel = IngestChannel::start("localhost:1491", "SecretPassword")?; - let pushed = channel.push("collection", "bucket", "object:1", "my best recipe")?; + let channel = IngestChannel::start( + "localhost:1491", + "SecretPassword", + )?; + + let dest = Dest::col_buc("collection", "bucket").obj("object:1"); + let pushed = channel.push(PushRequest::new(dest, "my best recipe"))?; // or - // let pushed = channel.push_with_locale("collection", "bucket", "object:1", "Мой лучший рецепт", "rus")?; + // let pushed = channel.push( + // PushRequest::new(dest, "Мой лучший рецепт").lang(Lang::Rus) + // )?; dbg!(pushed); Ok(()) @@ -67,9 +81,13 @@ Note: This example requires enabling the `control` feature. use sonic_channel::*; fn main() -> result::Result<()> { - let channel = ControlChannel::start("localhost:1491", "SecretPassword")?; + let channel = ControlChannel::start( + "localhost:1491", + "SecretPassword", + )?; + let result = channel.consolidate()?; - assert_eq!(result, true); + assert_eq!(result, ()); Ok(()) } diff --git a/src/channels/control.rs b/src/channels/control.rs index 32b872c..a331644 100644 --- a/src/channels/control.rs +++ b/src/channels/control.rs @@ -70,11 +70,10 @@ impl ControlChannel { impl ControlChannel { init_command!( - /// Consolidate indexed search data instead of waiting for the next automated - /// consolidation tick. + /// Trigger control action. /// - /// Note: This method requires enabling the `control` feature and start - /// connection in Control mode. + /// Note: This method requires enabling the `control` feature and start connection in + /// Control mode /// /// ```rust,no_run /// # use sonic_channel::*; @@ -84,65 +83,78 @@ impl ControlChannel { /// "SecretPassword", /// )?; /// - /// let result = control_channel.consolidate()?; - /// assert_eq!(result, true); + /// control_channel.trigger(TriggerRequest::Consolidate)?; /// # Ok(()) /// # } - /// ``` - use TriggerCommand for fn consolidate() + use TriggerCommand for fn trigger( + req: TriggerRequest, + ) ); - init_command!( - /// Backup KV + FST to / - /// See [sonic backend source code](https://github.com/valeriansaliou/sonic/blob/master/src/channel/command.rs#L808) - /// for more information. - /// - /// Note: This method requires enabling the `control` feature and start - /// connection in Control mode. - /// - /// ```rust,no_run - /// # use sonic_channel::*; - /// # fn main() -> result::Result<()> { - /// let control_channel = ControlChannel::start( - /// "localhost:1491", - /// "SecretPassword", - /// )?; - /// - /// let result = control_channel.backup("2020-08-07T23-48")?; - /// assert_eq!(result, true); - /// # Ok(()) - /// # } - /// ``` - use TriggerCommand for fn backup<'a>( - // It's not action, but my macro cannot support alias for custom argument. - // TODO: Add alias to macro and rename argument of this function. - action: &'a str => TriggerAction::Backup(action), - ); - ); + /// Consolidate indexed search data instead of waiting for the next automated + /// consolidation tick. + /// + /// Note: This method requires enabling the `control` feature and start + /// connection in Control mode. + /// + /// ```rust,no_run + /// # use sonic_channel::*; + /// # fn main() -> result::Result<()> { + /// let control_channel = ControlChannel::start( + /// "localhost:1491", + /// "SecretPassword", + /// )?; + /// + /// control_channel.consolidate()?; + /// # Ok(()) + /// # } + /// ``` + pub fn consolidate(&self) -> Result<()> { + self.trigger(TriggerRequest::Consolidate) + } - init_command!( - /// Restore KV + FST from if you already have backup with the same name. - /// - /// Note: This method requires enabling the `control` feature and start - /// connection in Control mode. - /// - /// ```rust,no_run - /// # use sonic_channel::*; - /// # fn main() -> result::Result<()> { - /// let control_channel = ControlChannel::start( - /// "localhost:1491", - /// "SecretPassword", - /// )?; - /// - /// let result = control_channel.restore("2020-08-07T23-48")?; - /// assert_eq!(result, true); - /// # Ok(()) - /// # } - /// ``` - use TriggerCommand for fn restore<'a>( - // It's not action, but my macro cannot support alias for custom argument. - // TODO: Add alias to macro and rename argument of this function. - action: &'a str => TriggerAction::Restore(action), - ); - ); + /// Backup KV + FST to / + /// See [sonic backend source code](https://github.com/valeriansaliou/sonic/blob/master/src/channel/command.rs#L808) + /// for more information. + /// + /// Note: This method requires enabling the `control` feature and start + /// connection in Control mode. + /// + /// ```rust,no_run + /// # use sonic_channel::*; + /// # fn main() -> result::Result<()> { + /// let control_channel = ControlChannel::start( + /// "localhost:1491", + /// "SecretPassword", + /// )?; + /// + /// control_channel.backup("2020-08-07T23-48")?; + /// # Ok(()) + /// # } + /// ``` + pub fn backup(&self, path: &str) -> Result<()> { + self.trigger(TriggerRequest::Backup(path)) + } + + /// Restore KV + FST from if you already have backup with the same name. + /// + /// Note: This method requires enabling the `control` feature and start + /// connection in Control mode. + /// + /// ```rust,no_run + /// # use sonic_channel::*; + /// # fn main() -> result::Result<()> { + /// let control_channel = ControlChannel::start( + /// "localhost:1491", + /// "SecretPassword", + /// )?; + /// + /// let result = control_channel.restore("2020-08-07T23-48")?; + /// assert_eq!(result, ()); + /// # Ok(()) + /// # } + /// ``` + pub fn restore(&self, path: &str) -> Result<()> { + self.trigger(TriggerRequest::Restore(path)) + } } diff --git a/src/channels/ingest.rs b/src/channels/ingest.rs index 637c0ea..dec4e25 100644 --- a/src/channels/ingest.rs +++ b/src/channels/ingest.rs @@ -83,55 +83,16 @@ impl IngestChannel { /// "SecretPassword", /// )?; /// - /// let result = ingest_channel.push( - /// "search", - /// "default", - /// "recipe:295", - /// "Sweet Teriyaki Beef Skewers", - /// )?; - /// assert_eq!(result, true); + /// let result = ingest_channel.push(PushRequest::new( + /// Dest::col("search").obj("recipe:295"), + /// "Sweet Teriyaki Beef Skewers" + /// ))?; + /// assert_eq!(result, ()); /// # Ok(()) /// # } /// ``` use PushCommand for fn push<'a>( - collection: &'a str, - bucket: &'a str, - object: &'a str, - text: &'a str, - ); - ); - - init_command!( - /// Push search data in the index with locale parameter in ISO 639-3 code. - /// - /// Note: This method requires enabling the `ingest` feature and start - /// connection in Ingest mode. - /// - /// ```rust,no_run - /// # use sonic_channel::*; - /// # fn main() -> result::Result<()> { - /// let ingest_channel = IngestChannel::start( - /// "localhost:1491", - /// "SecretPassword", - /// )?; - /// - /// let result = ingest_channel.push_with_locale( - /// "search", - /// "default", - /// "recipe:296", - /// "Гренки с жареным картофелем и сыром", - /// "rus", - /// )?; - /// assert_eq!(result, true); - /// # Ok(()) - /// # } - /// ``` - use PushCommand for fn push_with_locale<'a>( - collection: &'a str, - bucket: &'a str, - object: &'a str, - text: &'a str, - locale: &'a str => Some(locale), + req: PushRequest, ); ); @@ -149,16 +110,14 @@ impl IngestChannel { /// "SecretPassword", /// )?; /// - /// let result = ingest_channel.pop("search", "default", "recipe:295", "beef")?; + /// let dest = Dest::col("search").obj("recipe:295"); + /// let result = ingest_channel.pop(PopRequest::new(dest, "beef"))?; /// assert_eq!(result, 1); /// # Ok(()) /// # } /// ``` - use PopCommand for fn pop<'a>( - collection: &'a str, - bucket: &'a str, - object: &'a str, - text: &'a str, + use PopCommand for fn pop( + req: PopRequest, ); ); @@ -176,69 +135,24 @@ impl IngestChannel { /// "SecretPassword", /// )?; /// - /// let flushc_count = ingest_channel.flushc("search")?; + /// let flushc_count = ingest_channel.flush(FlushRequest::collection("search"))?; /// dbg!(flushc_count); - /// # Ok(()) - /// # } - /// ``` - use FlushCommand for fn flushc<'a>( - collection: &'a str, - ); - ); - - init_command!( - /// Flush all indexed data from bucket in a collection. - /// - /// Note: This method requires enabling the `ingest` feature and start - /// connection in Ingest mode. - /// - /// ```rust,no_run - /// # use sonic_channel::*; - /// # fn main() -> result::Result<()> { - /// let ingest_channel = IngestChannel::start( - /// "localhost:1491", - /// "SecretPassword", - /// )?; - /// - /// let flushb_count = ingest_channel.flushb("search", "default")?; + /// let flushb_count = ingest_channel.flush(FlushRequest::bucket("search", "default"))?; /// dbg!(flushb_count); - /// # Ok(()) - /// # } - /// ``` - use FlushCommand for fn flushb<'a>( - collection: &'a str, - bucket: &'a str => Some(bucket), - ); - ); - - init_command!( - /// Flush all indexed data from an object in a bucket in collection. - /// - /// Note: This method requires enabling the `ingest` feature and start - /// connection in Ingest mode. - /// - /// ```rust,no_run - /// # use sonic_channel::*; - /// # fn main() -> result::Result<()> { - /// let ingest_channel = IngestChannel::start( - /// "localhost:1491", - /// "SecretPassword", + /// let flusho_count = ingest_channel.flush( + /// FlushRequest::object("search", "default", "recipe:295") /// )?; - /// - /// let flusho_count = ingest_channel.flusho("search", "default", "recipe:296")?; /// dbg!(flusho_count); /// # Ok(()) /// # } /// ``` - use FlushCommand for fn flusho<'a>( - collection: &'a str, - bucket: &'a str => Some(bucket), - object: &'a str => Some(object), + use FlushCommand for fn flush( + req: FlushRequest, ); ); init_command!( - /// Bucket count in indexed search data of your collection. + /// Count indexed search data of your collection. /// /// Note: This method requires enabling the `ingest` feature and start /// connection in Ingest mode. @@ -251,64 +165,19 @@ impl IngestChannel { /// "SecretPassword", /// )?; /// - /// let bucket_count = ingest_channel.bucket_count("search")?; + /// let bucket_count = ingest_channel.count(CountRequest::buckets("search"))?; /// dbg!(bucket_count); - /// # Ok(()) - /// # } - /// ``` - use CountCommand for fn bucket_count<'a>( - collection: &'a str, - ); - ); - - init_command!( - /// Object count of bucket in indexed search data. - /// - /// Note: This method requires enabling the `ingest` feature and start - /// connection in Ingest mode. - /// - /// ```rust,no_run - /// # use sonic_channel::*; - /// # fn main() -> result::Result<()> { - /// let ingest_channel = IngestChannel::start( - /// "localhost:1491", - /// "SecretPassword", + /// let object_count = ingest_channel.count(CountRequest::objects("search", "default"))?; + /// dbg!(object_count); + /// let word_count = ingest_channel.count( + /// CountRequest::words("search", "default", "recipe:256") /// )?; - /// - /// let object_count = ingest_channel.object_count("search", "default")?; /// dbg!(object_count); /// # Ok(()) /// # } /// ``` - use CountCommand for fn object_count<'a>( - collection: &'a str, - bucket: &'a str => Some(bucket), - ); - ); - - init_command!( - /// Object word count in indexed bucket search data. - /// - /// Note: This method requires enabling the `ingest` feature and start - /// connection in Ingest mode. - /// - /// ```rust,no_run - /// # use sonic_channel::*; - /// # fn main() -> result::Result<()> { - /// let ingest_channel = IngestChannel::start( - /// "localhost:1491", - /// "SecretPassword", - /// )?; - /// - /// let word_count = ingest_channel.word_count("search", "default", "recipe:296")?; - /// dbg!(word_count); - /// # Ok(()) - /// # } - /// ``` - use CountCommand for fn word_count<'a>( - collection: &'a str, - bucket: &'a str => Some(bucket), - object: &'a str => Some(object), + use CountCommand for fn count( + req: CountRequest, ); ); } diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 0158d36..9ecffa9 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -1,7 +1,6 @@ #[cfg(feature = "search")] mod search; #[cfg(feature = "search")] -use crate::commands::StartCommand; pub use search::*; #[cfg(feature = "ingest")] @@ -14,13 +13,14 @@ mod control; #[cfg(feature = "control")] pub use control::*; -use crate::commands::StreamCommand; -use crate::result::*; -use std::fmt; -use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::cell::RefCell; +use std::io::{BufRead, BufReader, Write}; use std::net::{TcpStream, ToSocketAddrs}; -const DEFAULT_SONIC_PROTOCOL_VERSION: usize = 1; +use crate::commands::{StartCommand, StreamCommand}; +use crate::protocol::{self, Protocol}; +use crate::result::*; + const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200; /// Channel modes supported by sonic search backend. @@ -28,7 +28,8 @@ const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200; pub enum ChannelMode { /// Sonic server search channel mode. /// - /// In this mode you can use `query`, `suggest`, `ping` and `quit` commands. + /// In this mode you can use `query`, `pag_query`, `suggest`, `lim_suggest`, `ping` + /// and `quit` commands. /// /// Note: This mode requires enabling the `search` feature. #[cfg(feature = "search")] @@ -36,8 +37,7 @@ pub enum ChannelMode { /// Sonic server ingest channel mode. /// - /// In this mode you can use `push`, `pop`, `flushc`, `flushb`, `flusho`, - /// `bucket_count`, `object_count`, `word_count`, `ping` and `quit` commands. + /// In this mode you can use `push`, `pop`, `flush`, `count` `ping` and `quit` commands. /// /// Note: This mode requires enabling the `ingest` feature. #[cfg(feature = "ingest")] @@ -45,7 +45,7 @@ pub enum ChannelMode { /// Sonic server control channel mode. /// - /// In this mode you can use `consolidate`, `backup`, `restore`, + /// In this mode you can use `trigger`, `consolidate`, `backup`, `restore`, /// `ping` and `quit` commands. /// /// Note: This mode requires enabling the `control` feature. @@ -69,8 +69,8 @@ impl ChannelMode { } } -impl fmt::Display for ChannelMode { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { +impl std::fmt::Display for ChannelMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.as_str()) } } @@ -81,84 +81,84 @@ impl fmt::Display for ChannelMode { /// #[derive(Debug)] pub struct SonicStream { - stream: TcpStream, + stream: RefCell, + reader: RefCell>, mode: Option, // None – Uninitialized mode max_buffer_size: usize, - protocol_version: usize, + protocol: Protocol, } impl SonicStream { - fn write(&self, command: &SC) -> Result<()> { - let mut writer = BufWriter::with_capacity(self.max_buffer_size, &self.stream); - let message = command.message(); - writer - .write_all(message.as_bytes()) - .map_err(|_| Error::new(ErrorKind::WriteToStream))?; + fn send(&self, command: &SC) -> Result<()> { + let buf = self + .protocol + .format_request(command.request()) + .map_err(|_| Error::WriteToStream)?; + self.stream + .borrow_mut() + .write_all(&buf) + .map_err(|_| Error::WriteToStream)?; Ok(()) } - fn read(&self, max_read_lines: usize) -> Result { - let mut reader = BufReader::with_capacity(self.max_buffer_size, &self.stream); - let mut message = String::new(); + fn read_line(&self) -> Result { + let line = { + let mut line = String::with_capacity(self.max_buffer_size); + self.reader + .borrow_mut() + .read_line(&mut line) + .map_err(|_| Error::ReadStream)?; + line + }; - for _ in 0..max_read_lines { - reader - .read_line(&mut message) - .map_err(|_| Error::new(ErrorKind::ReadStream))?; - if message.starts_with("ERR ") { - break; - } - } - - Ok(message) + log::debug!("[channel] {}", &line); + self.protocol.parse_response(&line) } pub(crate) fn run_command(&self, command: SC) -> Result { - self.write(&command)?; - let message = self.read(SC::READ_LINES_COUNT)?; - if let Some(error) = message.strip_prefix("ERR ") { - Err(Error::new(ErrorKind::SonicServer(Box::leak( - error.to_owned().into_boxed_str(), - )))) - } else { - command.receive(message) - } + self.send(&command)?; + let res = loop { + let res = self.read_line()?; + if !matches!(&res, protocol::Response::Pending(_)) { + break res; + } + }; + command.receive(res) } fn connect(addr: A) -> Result { - let stream = - TcpStream::connect(addr).map_err(|_| Error::new(ErrorKind::ConnectToServer))?; + let stream = TcpStream::connect(addr).map_err(|_| Error::ConnectToServer)?; + let read_stream = stream.try_clone().map_err(|_| Error::ConnectToServer)?; let channel = SonicStream { - stream, + reader: RefCell::new(BufReader::new(read_stream)), + stream: RefCell::new(stream), mode: None, max_buffer_size: UNINITIALIZED_MODE_MAX_BUFFER_SIZE, - protocol_version: DEFAULT_SONIC_PROTOCOL_VERSION, + protocol: Default::default(), }; - let message = channel.read(1)?; - // TODO: need to add support for versions - if message.starts_with("CONNECTED") { + let res = channel.read_line()?; + if matches!(res, protocol::Response::Connected) { Ok(channel) } else { - Err(Error::new(ErrorKind::ConnectToServer)) + Err(Error::ConnectToServer) } } fn start(&mut self, mode: ChannelMode, password: S) -> Result<()> { if self.mode.is_some() { - return Err(Error::new(ErrorKind::RunCommand)); + return Err(Error::RunCommand); } - let command = StartCommand { + let res = self.run_command(StartCommand { mode, password: password.to_string(), - }; - let response = self.run_command(command)?; + })?; - self.max_buffer_size = response.max_buffer_size; - self.protocol_version = response.protocol_version; - self.mode = Some(response.mode); + self.max_buffer_size = res.max_buffer_size; + self.protocol = Protocol::from(res.protocol_version); + self.mode = Some(res.mode); Ok(()) } @@ -167,22 +167,6 @@ impl SonicStream { /// /// I think we shouldn't separate commands connect and start because we haven't /// possibility to change channel in sonic server, if we already chosen one of them. 🤔 - /// - /// ```rust,no_run - /// use sonic_channel::*; - /// - /// fn main() -> result::Result<()> { - /// let channel = SearchChannel::start( - /// "localhost:1491", - /// "SecretPassword" - /// )?; - /// - /// // Now you can use all method of Search channel. - /// let objects = channel.query("search", "default", "beef"); - /// - /// Ok(()) - /// } - /// ``` pub(crate) fn connect_with_start(mode: ChannelMode, addr: A, password: S) -> Result where A: ToSocketAddrs, @@ -222,7 +206,7 @@ pub trait SonicChannel { #[cfg(test)] mod tests { - use super::ChannelMode; + use super::*; #[test] fn format_channel_enums() { diff --git a/src/channels/search.rs b/src/channels/search.rs index f1104eb..98c2309 100644 --- a/src/channels/search.rs +++ b/src/channels/search.rs @@ -82,86 +82,24 @@ impl SearchChannel { /// "SecretPassword", /// )?; /// - /// let result = search_channel.query("search", "default", "Beef")?; - /// dbg!(result); - /// # Ok(()) - /// # } - /// ``` - use QueryCommand for fn query<'a>( - collection: &'a str, - bucket: &'a str, - terms: &'a str, - ); - ); - - init_command!( - /// Query limited objects in database. This method similar query but - /// you can configure limit of result. - /// - /// Note: This method requires enabling the `search` feature and start - /// connection in Search mode. - /// - /// ```rust,no_run - /// # use sonic_channel::*; - /// # fn main() -> result::Result<()> { - /// let search_channel = SearchChannel::start( - /// "localhost:1491", - /// "SecretPassword", - /// )?; - /// - /// let result = search_channel.query_with_limit( - /// "search", - /// "default", + /// let result = search_channel.query(QueryRequest::new( + /// Dest::col("search"), /// "Beef", - /// 10, + /// ))?; + /// dbg!(result); + /// + /// let result = search_channel.query( + /// QueryRequest::new(Dest::col("search"), "Beef").limit(10) /// )?; /// dbg!(result); /// # Ok(()) /// # } /// ``` - use QueryCommand for fn query_with_limit<'a>( - collection: &'a str, - bucket: &'a str, - terms: &'a str, - limit: usize => Some(limit), + use QueryCommand for fn query( + req: QueryRequest, ); ); - init_command!( - /// Query limited objects in database. This method similar - /// query_with_limit but you can put offset in your query. - /// - /// Note: This method requires enabling the `search` feature and start - /// connection in Search mode. - /// - /// ```rust,no_run - /// # use sonic_channel::*; - /// # fn main() -> result::Result<()> { - /// let search_channel = SearchChannel::start( - /// "localhost:1491", - /// "SecretPassword", - /// )?; - /// - /// let result = search_channel.query_with_limit_and_offset( - /// "search", - /// "default", - /// "Beef", - /// 10, - /// 10, - /// )?; - /// dbg!(result); - /// # Ok(()) - /// # } - /// ``` - use QueryCommand for fn query_with_limit_and_offset<'a>( - collection: &'a str, - bucket: &'a str, - terms: &'a str, - limit: usize => Some(limit), - offset: usize => Some(offset), - ) - ); - init_command!( /// Suggest auto-completes words. /// @@ -176,42 +114,20 @@ impl SearchChannel { /// "SecretPassword", /// )?; /// - /// let result = search_channel.suggest("search", "default", "Beef")?; - /// dbg!(result); - /// # Ok(()) - /// # } - /// ``` - use SuggestCommand for fn suggest<'a>( - collection: &'a str, - bucket: &'a str, - word: &'a str, - ); - ); - - init_command!( - /// Suggest auto-completes words with limit. - /// - /// Note: This method requires enabling the `search` feature and start - /// connection in Search mode. - /// - /// ```rust,no_run - /// # use sonic_channel::*; - /// # fn main() -> result::Result<()> { - /// let search_channel = SearchChannel::start( - /// "localhost:1491", - /// "SecretPassword", + /// let result = search_channel.suggest( + /// SuggestRequest::new(Dest::col("search"), "Beef") /// )?; + /// dbg!(result); /// - /// let result = search_channel.suggest_with_limit("search", "default", "Beef", 5)?; + /// let result = search_channel.suggest( + /// SuggestRequest::new(Dest::col("search"), "Beef").limit(2) + /// )?; /// dbg!(result); /// # Ok(()) /// # } /// ``` - use SuggestCommand for fn suggest_with_limit<'a>( - collection: &'a str, - bucket: &'a str, - word: &'a str, - limit: usize => Some(limit), + use SuggestCommand for fn suggest( + req: SuggestRequest, ); ); } diff --git a/src/commands/count.rs b/src/commands/count.rs index 35a031b..a34a767 100644 --- a/src/commands/count.rs +++ b/src/commands/count.rs @@ -1,39 +1,67 @@ use super::StreamCommand; +use crate::misc::*; +use crate::protocol; use crate::result::*; -#[derive(Debug, Default)] -pub struct CountCommand<'a> { - pub collection: &'a str, - pub bucket: Option<&'a str>, - pub object: Option<&'a str>, +/// Parameters for the `count` command. +#[derive(Debug)] +pub struct CountRequest(OptDest); + +impl CountRequest { + /// Creates a new request to get the number of buckets in the collection. + pub fn buckets(collection: impl ToString) -> CountRequest { + Self(OptDest::col(collection)) + } + + /// Creates a new request to get the number of objects in the collection bucket. + pub fn objects(collection: impl ToString, bucket: impl ToString) -> CountRequest { + Self(OptDest::col_buc(collection, bucket)) + } + + /// Creates a new request to get the number of words in the collection bucket object. + pub fn words( + collection: impl ToString, + bucket: impl ToString, + object: impl ToString, + ) -> CountRequest { + Self(OptDest::col_buc_obj(collection, bucket, object)) + } } -impl StreamCommand for CountCommand<'_> { +impl From for CountRequest { + fn from(d: Dest) -> Self { + Self(OptDest::from(d)) + } +} + +impl From for CountRequest { + fn from(d: ObjDest) -> Self { + Self(OptDest::from(d)) + } +} + +#[derive(Debug)] +pub struct CountCommand { + pub(crate) req: CountRequest, +} + +impl StreamCommand for CountCommand { type Response = usize; - fn message(&self) -> String { - let mut message = format!("COUNT {}", self.collection); - if let Some(bucket) = self.bucket { - message.push_str(&format!(" {}", bucket)); - - if let Some(object) = self.object { - message.push_str(&format!(" {}", object)); - } + fn request(&self) -> protocol::Request { + let dest = &self.req.0; + protocol::Request::Count { + collection: dest.collection.clone(), + bucket: dest.bucket.clone(), + object: dest.object.clone(), } - message.push_str("\r\n"); - message } - fn receive(&self, message: String) -> Result { - if message.starts_with("RESULT ") { - let count = message.split_whitespace().last().unwrap_or_default(); - count.parse().map_err(|_| { - Error::new(ErrorKind::QueryResponse( - "Cannot parse count of count method response to usize", - )) - }) + fn receive(&self, res: protocol::Response) -> Result { + if let protocol::Response::Result(count) = res { + Ok(count) } else { - Err(Error::new(ErrorKind::WrongResponse)) + Err(Error::WrongResponse) } } } diff --git a/src/commands/flush.rs b/src/commands/flush.rs index 5d29015..04ff828 100644 --- a/src/commands/flush.rs +++ b/src/commands/flush.rs @@ -1,39 +1,68 @@ use super::StreamCommand; -use crate::result::{Error, ErrorKind, Result}; +use crate::misc::*; +use crate::protocol; +use crate::result::*; -#[derive(Debug, Default)] -pub struct FlushCommand<'a> { - pub collection: &'a str, - pub bucket: Option<&'a str>, - pub object: Option<&'a str>, -} +/// Parameters for the `flush` command. +#[derive(Debug)] +pub struct FlushRequest(OptDest); -impl StreamCommand for FlushCommand<'_> { - type Response = usize; - - fn message(&self) -> String { - let mut message = match (self.bucket, self.object) { - (Some(bucket), Some(object)) => { - format!("FLUSHO {} {} {}", self.collection, bucket, object) - } - (Some(bucket), None) => format!("FLUSHB {} {}", self.collection, bucket), - (None, None) => format!("FLUSHC {}", self.collection), - _ => panic!("Invalid flush command"), - }; - message.push_str("\r\n"); - message +impl FlushRequest { + /// Creates a new request to flush all data in the collection. + pub fn collection(collection: impl ToString) -> FlushRequest { + Self(OptDest::col(collection)) } - fn receive(&self, message: String) -> Result { - if message.starts_with("RESULT ") { - let count = message.split_whitespace().last().unwrap_or_default(); - count.parse().map_err(|_| { - Error::new(ErrorKind::QueryResponse( - "Cannot parse count of flush method response to usize", - )) - }) + /// Creates a new request to flush all data in the collection bucket. + pub fn bucket(collection: impl ToString, bucket: impl ToString) -> FlushRequest { + Self(OptDest::col_buc(collection, bucket)) + } + + /// Creates a new request to flush all data in the collection bucket object. + pub fn object( + collection: impl ToString, + bucket: impl ToString, + object: impl ToString, + ) -> FlushRequest { + Self(OptDest::col_buc_obj(collection, bucket, object)) + } +} + +impl From for FlushRequest { + fn from(d: Dest) -> Self { + Self(OptDest::from(d)) + } +} + +impl From for FlushRequest { + fn from(d: ObjDest) -> Self { + Self(OptDest::from(d)) + } +} + +#[derive(Debug)] +pub struct FlushCommand { + pub(crate) req: FlushRequest, +} + +impl StreamCommand for FlushCommand { + type Response = usize; + + fn request(&self) -> protocol::Request { + let dest = &self.req.0; + + protocol::Request::Flush { + collection: dest.collection.clone(), + bucket: dest.bucket.clone(), + object: dest.object.clone(), + } + } + + fn receive(&self, res: protocol::Response) -> Result { + if let protocol::Response::Result(count) = res { + Ok(count) } else { - Err(Error::new(ErrorKind::WrongResponse)) + Err(Error::WrongResponse) } } } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index a4a25c3..de6c1ce 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,8 +1,7 @@ +mod ping; mod quit; mod start; -mod ping; - #[cfg(feature = "ingest")] mod count; #[cfg(feature = "ingest")] @@ -20,36 +19,33 @@ mod suggest; #[cfg(feature = "control")] mod trigger; -pub(crate) use quit::QuitCommand; -pub(crate) use start::StartCommand; - -pub(crate) use ping::PingCommand; +pub(crate) use self::{ping::PingCommand, quit::QuitCommand, start::StartCommand}; #[cfg(feature = "ingest")] -pub(crate) use count::CountCommand; +pub(crate) use self::{ + count::CountCommand, flush::FlushCommand, pop::PopCommand, push::PushCommand, +}; #[cfg(feature = "ingest")] -pub(crate) use flush::FlushCommand; -#[cfg(feature = "ingest")] -pub(crate) use pop::PopCommand; -#[cfg(feature = "ingest")] -pub(crate) use push::PushCommand; +pub use self::{count::CountRequest, flush::FlushRequest, pop::PopRequest, push::PushRequest}; #[cfg(feature = "search")] -pub(crate) use query::QueryCommand; +pub(crate) use self::{query::QueryCommand, suggest::SuggestCommand}; #[cfg(feature = "search")] -pub(crate) use suggest::SuggestCommand; +pub use self::{query::QueryRequest, suggest::SuggestRequest}; #[cfg(feature = "control")] -pub(crate) use trigger::{TriggerAction, TriggerCommand}; +pub(crate) use trigger::TriggerCommand; +#[cfg(feature = "control")] +pub use trigger::TriggerRequest; +use crate::protocol; use crate::result::Result; +#[doc(hidden)] pub trait StreamCommand { type Response; - const READ_LINES_COUNT: usize = 1; + fn request(&self) -> protocol::Request; - fn message(&self) -> String; - - fn receive(&self, message: String) -> Result; + fn receive(&self, res: protocol::Response) -> Result; } diff --git a/src/commands/ping.rs b/src/commands/ping.rs index 3eb3030..c845229 100644 --- a/src/commands/ping.rs +++ b/src/commands/ping.rs @@ -1,21 +1,22 @@ use super::StreamCommand; +use crate::protocol; use crate::result::*; -#[derive(Debug, Default)] +#[derive(Debug)] pub struct PingCommand; impl StreamCommand for PingCommand { - type Response = bool; + type Response = (); - fn message(&self) -> String { - String::from("PING\r\n") + fn request(&self) -> protocol::Request { + protocol::Request::Ping } - fn receive(&self, message: String) -> Result { - if message == "PONG\r\n" { - Ok(true) + fn receive(&self, res: protocol::Response) -> Result { + if matches!(res, protocol::Response::Pong) { + Ok(()) } else { - Err(Error::new(ErrorKind::WrongResponse)) + Err(Error::WrongResponse) } } } diff --git a/src/commands/pop.rs b/src/commands/pop.rs index dd52549..b463c79 100644 --- a/src/commands/pop.rs +++ b/src/commands/pop.rs @@ -1,36 +1,54 @@ use super::StreamCommand; +use crate::misc::ObjDest; +use crate::protocol; use crate::result::*; -#[derive(Debug, Default)] -pub struct PopCommand<'a> { - pub collection: &'a str, - pub bucket: &'a str, - pub object: &'a str, - pub text: &'a str, +/// Parameters for the `pop` command. +#[derive(Debug)] +pub struct PopRequest { + /// Collection, bucket and object where we should pop search data from index. + pub dest: ObjDest, + /// Search data to be deleted + pub text: String, } -impl StreamCommand for PopCommand<'_> { - type Response = usize; - - fn message(&self) -> String { - let mut message = format!( - r#"POP {} {} {} "{}""#, - self.collection, self.bucket, self.object, self.text - ); - message.push_str("\r\n"); - message - } - - fn receive(&self, message: String) -> Result { - if message.starts_with("RESULT ") { - let count = message.split_whitespace().last().unwrap_or_default(); - count.parse().map_err(|_| { - Error::new(ErrorKind::QueryResponse( - "Cannot parse count of pop method response to usize", - )) - }) - } else { - Err(Error::new(ErrorKind::WrongResponse)) +impl PopRequest { + /// Creates a base pop request. + pub fn new(dest: ObjDest, text: impl ToString) -> Self { + Self { + dest, + text: text.to_string(), + } + } +} + +#[derive(Debug)] +pub struct PopCommand { + pub(crate) req: PopRequest, +} + +impl StreamCommand for PopCommand { + type Response = usize; + + fn request(&self) -> protocol::Request { + let dest = &self.req.dest; + protocol::Request::Pop { + collection: dest.collection().clone(), + bucket: dest + .bucket_opt() + .cloned() + // TODO: use a global context for default bucket value + .unwrap_or_else(|| String::from("default")), + object: dest.object().clone(), + terms: self.req.text.to_string(), + } + } + + fn receive(&self, res: protocol::Response) -> Result { + if let protocol::Response::Result(count) = res { + Ok(count) + } else { + Err(Error::WrongResponse) } } } diff --git a/src/commands/push.rs b/src/commands/push.rs index 01fc732..887ab74 100644 --- a/src/commands/push.rs +++ b/src/commands/push.rs @@ -1,79 +1,73 @@ use super::StreamCommand; +use crate::misc::ObjDest; +use crate::protocol; use crate::result::*; -#[derive(Debug, Default)] -pub struct PushCommand<'a> { - pub collection: &'a str, - pub bucket: &'a str, - pub object: &'a str, - pub text: &'a str, - pub locale: Option<&'a str>, +/// Parameters for the `push` command. +#[derive(Debug)] +pub struct PushRequest { + /// Collection, bucket and object where we should push search data in the index. + pub dest: ObjDest, + /// Search data to be added + pub text: String, + /// Language of the search data. If None, the client will try to determine based on the `text`. + pub lang: Option, } -impl StreamCommand for PushCommand<'_> { - type Response = bool; +impl PushRequest { + /// Creates a base push request + pub fn new(dest: ObjDest, text: impl ToString) -> Self { + Self { + dest, + text: text.to_string(), + lang: None, + } + } - fn message(&self) -> String { - let mut message = format!( - r#"PUSH {} {} {} "{}""#, - self.collection, - self.bucket, - self.object, - remove_multiline(self.text) - ); + /// Set a language for the request. + pub fn lang(mut self, lang: whatlang::Lang) -> Self { + self.lang = Some(lang); + self + } +} - let locale = self.locale.or_else(|| { - whatlang::detect(self.text).and_then(|info| { - if info.confidence() == 1.0 { - Some(info.lang().code()) - } else { - None - } +#[derive(Debug)] +pub struct PushCommand { + pub(crate) req: PushRequest, +} + +impl StreamCommand for PushCommand { + type Response = (); + + fn request(&self) -> protocol::Request { + let req = &self.req; + + let lang = req + .lang + .or_else(|| { + whatlang::detect(&req.text).and_then(|i| (i.confidence() == 1.0).then(|| i.lang())) }) - }); + .map(|l| l.code()); - if let Some(locale) = locale { - message.push_str(&format!(" LANG({})", locale)); + protocol::Request::Push { + collection: req.dest.collection().clone(), + bucket: req + .dest + .bucket_opt() + .cloned() + // TODO: use a global context for default bucket value + .unwrap_or_else(|| String::from("default")), + object: req.dest.object().clone(), + terms: req.text.to_string(), + lang, } - - message.push_str("\r\n"); - message } - fn receive(&self, message: String) -> Result { - if message == "OK\r\n" { - Ok(true) + fn receive(&self, res: protocol::Response) -> Result { + if matches!(res, protocol::Response::Ok) { + Ok(()) } else { - Err(Error::new(ErrorKind::WrongResponse)) + Err(Error::WrongResponse) } } } - -fn remove_multiline(text: &str) -> String { - text.lines() - .enumerate() - .fold(String::new(), |mut acc, (i, line)| { - if i != 0 && !line.is_empty() && !acc.is_empty() && !acc.ends_with(' ') { - acc.push(' '); - } - - acc.push_str(line); - acc - }) -} - -#[cfg(test)] -mod tests { - use super::remove_multiline; - - #[test] - fn should_make_single_line() { - let text = " -Hello -World -"; - - let expected_text = "Hello World"; - assert_eq!(remove_multiline(text), expected_text); - } -} diff --git a/src/commands/query.rs b/src/commands/query.rs index a96a5ac..f953ea6 100644 --- a/src/commands/query.rs +++ b/src/commands/query.rs @@ -1,69 +1,100 @@ use super::StreamCommand; +use crate::misc::Dest; +use crate::protocol; use crate::result::*; -use regex::Regex; -const RE_QUERY_RECEIVED_MESSAGE: &str = r"(?x) - ^PENDING\s(?P\w+)\r\n - EVENT\sQUERY\s(?P\w+)\s(?P.*?)\r\n$ -"; - -#[derive(Debug, Default)] -pub struct QueryCommand<'a> { - pub collection: &'a str, - pub bucket: &'a str, - pub terms: &'a str, +/// Parameters for the `query` command +#[derive(Debug, Clone)] +pub struct QueryRequest { + /// Collection and bucket where we should search for objects. + pub dest: Dest, + /// Searchable terms. + pub terms: String, + /// Language of the search data. If None, the client will try to determine based on the `terms`. + pub lang: Option, + /// Limit of result objects. pub limit: Option, + /// The number of result objects we want to skip. pub offset: Option, } -impl StreamCommand for QueryCommand<'_> { - type Response = Vec; - - const READ_LINES_COUNT: usize = 2; - - fn message(&self) -> String { - let mut message = format!( - r#"QUERY {} {} "{}""#, - self.collection, self.bucket, self.terms - ); - if let Some(limit) = self.limit.as_ref() { - message.push_str(&format!(" LIMIT({})", limit)); +impl QueryRequest { + /// Creates base query request. + pub fn new(dest: Dest, terms: impl ToString) -> Self { + Self { + dest, + terms: terms.to_string(), + lang: None, + limit: None, + offset: None, } - if let Some(offset) = self.offset.as_ref() { - message.push_str(&format!(" OFFSET({})", offset)); - } - - // use greyblake/whatlang-rs to autodect locale - if let Some(info) = whatlang::detect(self.terms) { - if info.confidence() == 1.0 { - message.push_str(&format!(" LANG({})", info.lang().code())); - } - } - - message.push_str("\r\n"); - message } - fn receive(&self, message: String) -> Result { - lazy_static! { - static ref RE: Regex = Regex::new(RE_QUERY_RECEIVED_MESSAGE).unwrap(); - } + /// Set a language for the request. + pub fn lang(mut self, lang: whatlang::Lang) -> Self { + self.lang = Some(lang); + self + } - if let Some(caps) = RE.captures(&message) { - if caps["pending_query_id"] != caps["event_query_id"] { - Err(Error::new(ErrorKind::QueryResponse( - "Pending id and event id don't match", - ))) - } else if caps["objects"].is_empty() { - Ok(vec![]) - } else { - Ok(caps["objects"] - .split_whitespace() - .map(str::to_owned) - .collect()) - } + /// Set a limit for the request. + pub fn limit(mut self, limit: usize) -> Self { + self.limit = Some(limit); + self + } + + /// Set an offset for the request. + pub fn offset(mut self, offset: usize) -> Self { + self.offset = Some(offset); + self + } + + /// Set the pagination for the request. Automatic offset calculation based on provided + /// limit and page. + /// + /// Note: the first page is 0; + pub fn pag(self, page: usize, limit: usize) -> Self { + let offset = page * limit; + self.offset(offset).limit(limit) + } +} + +#[derive(Debug)] +pub struct QueryCommand { + pub(crate) req: QueryRequest, +} + +impl StreamCommand for QueryCommand { + type Response = Vec; + + fn request(&self) -> protocol::Request { + let dest = &self.req.dest; + let lang = self + .req + .lang + .or_else(|| { + whatlang::detect(&self.req.terms) + .and_then(|i| (i.confidence() == 1.0).then(|| i.lang())) + }) + .map(|l| l.code()); + + protocol::Request::Query { + collection: dest.collection().clone(), + bucket: dest + .bucket_opt() + .cloned() + .unwrap_or_else(|| String::from("default")), + terms: self.req.terms.clone(), + offset: self.req.offset, + limit: self.req.limit, + lang, + } + } + + fn receive(&self, res: protocol::Response) -> Result { + if let protocol::Response::Event(protocol::EventKind::Query, _id, objects) = res { + Ok(objects) } else { - Err(Error::new(ErrorKind::WrongResponse)) + Err(Error::WrongResponse) } } } diff --git a/src/commands/quit.rs b/src/commands/quit.rs index 30fc6f1..0193fbb 100644 --- a/src/commands/quit.rs +++ b/src/commands/quit.rs @@ -1,21 +1,22 @@ use super::StreamCommand; +use crate::protocol; use crate::result::*; -#[derive(Debug, Default)] +#[derive(Debug)] pub struct QuitCommand; impl StreamCommand for QuitCommand { - type Response = bool; + type Response = (); - fn message(&self) -> String { - String::from("QUIT\r\n") + fn request(&self) -> protocol::Request { + protocol::Request::Quit } - fn receive(&self, message: String) -> Result { - if message.starts_with("ENDED ") { - Ok(true) + fn receive(&self, res: protocol::Response) -> Result { + if matches!(res, protocol::Response::Ended) { + Ok(()) } else { - Err(Error::new(ErrorKind::WrongResponse)) + Err(Error::WrongResponse) } } } diff --git a/src/commands/start.rs b/src/commands/start.rs index dd0f5a5..e9129aa 100644 --- a/src/commands/start.rs +++ b/src/commands/start.rs @@ -1,27 +1,17 @@ use super::StreamCommand; use crate::channels::ChannelMode; +use crate::protocol; use crate::result::*; -use regex::Regex; - -const RE_START_RECEIVED_MESSAGE: &str = r"(?x) - STARTED - \s # started with mode - (?Psearch|ingest|control) - \s # wich protocol used - protocol\((?P\d+)\) - \s # maximum buffer size - buffer\((?P\d+)\) -"; #[derive(Debug)] pub struct StartCommand { - pub mode: ChannelMode, - pub password: String, + pub(crate) mode: ChannelMode, + pub(crate) password: String, } #[derive(Debug)] pub struct StartCommandResponse { - pub protocol_version: usize, + pub protocol_version: protocol::Version, pub max_buffer_size: usize, pub mode: ChannelMode, } @@ -29,32 +19,26 @@ pub struct StartCommandResponse { impl StreamCommand for StartCommand { type Response = StartCommandResponse; - fn message(&self) -> String { - format!("START {} {}\r\n", self.mode, self.password) + fn request(&self) -> protocol::Request { + protocol::Request::Start { + mode: self.mode, + password: self.password.to_string(), + } } - fn receive(&self, message: String) -> Result { - lazy_static! { - static ref RE: Regex = Regex::new(RE_START_RECEIVED_MESSAGE).unwrap(); - } - - if let Some(caps) = RE.captures(&message) { - if self.mode.as_str() != &caps["mode"] { - Err(Error::new(ErrorKind::SwitchMode)) - } else { - let protocol_version: usize = - caps["protocol"].parse().expect("Must be digit by regex"); - let max_buffer_size: usize = - caps["buffer_size"].parse().expect("Must be digit by regex"); - - Ok(StartCommandResponse { - protocol_version, - max_buffer_size, - mode: self.mode, - }) - } + fn receive(&self, res: protocol::Response) -> Result { + if let protocol::Response::Started(payload) = res { + Ok(StartCommandResponse { + protocol_version: payload + .protocol_version + .try_into() + // TODO: better error + .map_err(|_| Error::SwitchMode)?, + max_buffer_size: payload.max_buffer_size, + mode: self.mode, + }) } else { - Err(Error::new(ErrorKind::SwitchMode)) + Err(Error::SwitchMode) } } } diff --git a/src/commands/suggest.rs b/src/commands/suggest.rs index 66a4f88..9da5cda 100644 --- a/src/commands/suggest.rs +++ b/src/commands/suggest.rs @@ -1,58 +1,63 @@ use super::StreamCommand; +use crate::misc::Dest; +use crate::protocol; use crate::result::*; -use regex::Regex; -const RE_SUGGEST_RECEIVED_MESSAGE: &str = r"(?x) - ^PENDING\s(?P\w+)\r\n - EVENT\sSUGGEST\s(?P\w+)\s(?P.*?)\r\n$ -"; - -#[derive(Debug, Default)] -pub struct SuggestCommand<'a> { - pub collection: &'a str, - pub bucket: &'a str, - pub word: &'a str, +/// Parameters for the `suggest` command. +#[derive(Debug)] +pub struct SuggestRequest { + /// Collection and bucket where we should search for suggested words. + pub dest: Dest, + /// Base word. + pub word: String, + /// Limit of result words. pub limit: Option, } -impl StreamCommand for SuggestCommand<'_> { - type Response = Vec; - - const READ_LINES_COUNT: usize = 2; - - fn message(&self) -> String { - let mut message = format!( - r#"SUGGEST {} {} "{}""#, - self.collection, self.bucket, self.word - ); - if let Some(limit) = self.limit.as_ref() { - message.push_str(&format!(" LIMIT({})", limit)); +impl SuggestRequest { + /// Creates a base suggest request. + pub fn new(dest: Dest, word: impl ToString) -> Self { + Self { + dest, + word: word.to_string(), + limit: None, } - message.push_str("\r\n"); - message } - fn receive(&self, message: String) -> Result { - lazy_static! { - static ref RE: Regex = Regex::new(RE_SUGGEST_RECEIVED_MESSAGE).unwrap(); - } + /// Set a limit for the request. + pub fn limit(mut self, limit: usize) -> Self { + self.limit = Some(limit); + self + } +} - match RE.captures(&message) { - None => Err(Error::new(ErrorKind::WrongResponse)), - Some(caps) => { - if caps["pending_suggest_id"] != caps["event_suggest_id"] { - Err(Error::new(ErrorKind::QueryResponse( - "Pending id and event id don't match", - ))) - } else if caps["words"].is_empty() { - Ok(vec![]) - } else { - Ok(caps["words"] - .split_whitespace() - .map(str::to_owned) - .collect()) - } - } +#[derive(Debug)] +pub struct SuggestCommand { + pub(crate) req: SuggestRequest, +} + +impl StreamCommand for SuggestCommand { + type Response = Vec; + + fn request(&self) -> protocol::Request { + let dest = &self.req.dest; + + protocol::Request::Suggest { + collection: dest.collection().clone(), + bucket: dest + .bucket_opt() + .cloned() + .unwrap_or_else(|| String::from("default")), + word: self.req.word.to_string(), + limit: self.req.limit, + } + } + + fn receive(&self, res: protocol::Response) -> Result { + if let protocol::Response::Event(protocol::EventKind::Suggest, _id, words) = res { + Ok(words) + } else { + Err(Error::WrongResponse) } } } diff --git a/src/commands/trigger.rs b/src/commands/trigger.rs index 5067fe6..e89ae55 100644 --- a/src/commands/trigger.rs +++ b/src/commands/trigger.rs @@ -1,47 +1,47 @@ use super::StreamCommand; +use crate::protocol; use crate::result::*; -use std::fmt; +use std::path::PathBuf; +/// Parameters for the `trigger` command. #[derive(Debug)] -pub enum TriggerAction<'a> { +pub enum TriggerRequest<'a> { + /// Consolidate indexed search data instead of waiting for the next automated + /// consolidation tick. Consolidate, + + /// Backup KV + FST to / + /// See [sonic backend source code](https://github.com/valeriansaliou/sonic/blob/master/src/channel/command.rs#L808) + /// for more information. Backup(&'a str), + + /// Restore KV + FST from if you already have backup with the same name. Restore(&'a str), } -impl Default for TriggerAction<'_> { - fn default() -> Self { - TriggerAction::Consolidate - } -} - -impl fmt::Display for TriggerAction<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { - match self { - TriggerAction::Consolidate => write!(f, "consolidate"), - TriggerAction::Backup(data) => write!(f, "backup {}", data), - TriggerAction::Restore(data) => write!(f, "restore {}", data), - } - } -} - -#[derive(Debug, Default)] +#[derive(Debug)] pub struct TriggerCommand<'a> { - pub action: TriggerAction<'a>, + pub(crate) req: TriggerRequest<'a>, } impl StreamCommand for TriggerCommand<'_> { - type Response = bool; + type Response = (); - fn message(&self) -> String { - format!("TRIGGER {}\r\n", self.action) + fn request(&self) -> protocol::Request { + let req = match self.req { + TriggerRequest::Consolidate => protocol::TriggerRequest::Consolidate, + TriggerRequest::Backup(path) => protocol::TriggerRequest::Backup(PathBuf::from(path)), + TriggerRequest::Restore(path) => protocol::TriggerRequest::Restore(PathBuf::from(path)), + }; + + protocol::Request::Trigger(req) } - fn receive(&self, message: String) -> Result { - if message == "OK\r\n" { - Ok(true) + fn receive(&self, res: protocol::Response) -> Result { + if matches!(res, protocol::Response::Ok) { + Ok(()) } else { - Err(Error::new(ErrorKind::WrongResponse)) + Err(Error::WrongResponse) } } } diff --git a/src/lib.rs b/src/lib.rs index 4f38a74..6887812 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,10 @@ //! "SecretPassword", //! )?; //! -//! let objects = channel.query("collection", "bucket", "recipe")?; +//! let objects = channel.query(QueryRequest::new( +//! Dest::col_buc("collection", "bucket"), +//! "recipe", +//! ))?; //! dbg!(objects); //! //! Ok(()) @@ -32,14 +35,17 @@ //! use sonic_channel::*; //! //! fn main() -> result::Result<()> { -//! let mut channel = IngestChannel::start( +//! let channel = IngestChannel::start( //! "localhost:1491", //! "SecretPassword", //! )?; //! -//! let pushed = channel.push("collection", "bucket", "object:1", "my best recipe")?; +//! let dest = Dest::col_buc("collection", "bucket").obj("object:1"); +//! let pushed = channel.push(PushRequest::new(dest, "my best recipe"))?; //! // or -//! // let pushed = channel.push_with_locale("collection", "bucket", "object:1", "Мой лучший рецепт", "rus")?; +//! // let pushed = channel.push( +//! // PushRequest::new(dest, "Мой лучший рецепт").lang(Lang::Rus) +//! // )?; //! dbg!(pushed); //! //! Ok(()) @@ -54,13 +60,13 @@ //! use sonic_channel::*; //! //! fn main() -> result::Result<()> { -//! let mut channel = ControlChannel::start( +//! let channel = ControlChannel::start( //! "localhost:1491", //! "SecretPassword", //! )?; //! //! let result = channel.consolidate()?; -//! assert_eq!(result, true); +//! assert_eq!(result, ()); //! //! Ok(()) //! } @@ -87,15 +93,20 @@ compile_error!( #[macro_use] mod macroses; +mod misc; + +pub(crate) mod protocol; mod channels; -mod commands; + +/// Contains the request parameters for each command to the sonic server. +pub mod commands; /// Contains sonic channel error type and custom Result type for easy configure your functions. pub mod result; pub use channels::*; +pub use commands::*; +pub use misc::*; -#[macro_use] -extern crate lazy_static; -extern crate regex; +pub use whatlang::Lang; diff --git a/src/macroses.rs b/src/macroses.rs index 27fabbe..593465c 100644 --- a/src/macroses.rs +++ b/src/macroses.rs @@ -14,8 +14,7 @@ macro_rules! init_command { ) -> $crate::result::Result< <$cmd_name as $crate::commands::StreamCommand>::Response, > { - #[allow(clippy::needless_update)] - let command = $cmd_name { $($arg_name $(: $arg_value)?,)* ..Default::default() }; + let command = $cmd_name { $($arg_name $(: $arg_value)?,)* }; self.stream().run_command(command) } }; diff --git a/src/misc.rs b/src/misc.rs new file mode 100644 index 0000000..5526bcb --- /dev/null +++ b/src/misc.rs @@ -0,0 +1,165 @@ +/// Search data destination. Contains collection, bucket and object. +#[derive(Debug, PartialEq, Eq)] +pub struct ObjDest(Dest, String); + +impl ObjDest { + /// Creates a new object destination from base destination (`Dest`) and object id. + /// + /// ```rust + /// # use sonic_channel::{Dest, ObjDest}; + /// let base_dest = Dest::col_buc("wiki", "user:1"); + /// let dest = ObjDest::new(base_dest, "article:1"); + /// assert_eq!(dest.collection(), "wiki"); + /// assert_eq!(dest.bucket_opt(), Some(&String::from("user:1"))); + /// assert_eq!(dest.object(), "article:1"); + /// ``` + pub fn new(cb: Dest, o: impl ToString) -> Self { + Self(cb, o.to_string()) + } + + /// Returns the collection. + #[inline] + pub fn collection(&self) -> &String { + self.0.collection() + } + + /// Returns the optional bucket. + #[inline] + pub fn bucket_opt(&self) -> Option<&String> { + self.0.bucket_opt() + } + + /// Returns the object id. + #[inline] + pub fn object(&self) -> &String { + &self.1 + } +} + +/// Search objects destination. Contains collection and bucket. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Dest { + collection: String, + bucket: Option, +} + +impl Dest { + /// Creates a new destination with collection and bucket. + /// + /// ```rust + /// # use sonic_channel::Dest; + /// let dest = Dest::col_buc("wiki", "user:1"); + /// assert_eq!(dest.collection(), "wiki"); + /// assert_eq!(dest.bucket_opt(), Some(&String::from("user:1"))); + /// ``` + pub fn col_buc(c: impl ToString, b: impl ToString) -> Self { + Self::col(c).buc(b) + } + + /// Creates a new destination with collection. + /// + /// ```rust + /// # use sonic_channel::Dest; + /// let dest = Dest::col("wiki"); + /// assert_eq!(dest.collection(), "wiki"); + /// ``` + pub fn col(c: impl ToString) -> Self { + Self { + collection: c.to_string(), + bucket: None, + } + } + + /// Set bucket for the destination. + /// + /// ```rust + /// # use sonic_channel::Dest; + /// let dest = Dest::col("wiki").buc("user:1"); + /// assert_eq!(dest.collection(), "wiki"); + /// assert_eq!(dest.bucket_opt(), Some(&String::from("user:1"))); + /// ``` + pub fn buc(mut self, b: impl ToString) -> Self { + self.bucket = Some(b.to_string()); + self + } + + /// Set object id to the destination and transform to object destination (`ObjDest`). + /// + /// Short for `ObjDest::new(dest, object_id)` + /// + /// ```rust + /// # use sonic_channel::Dest; + /// let dest = Dest::col_buc("wiki", "user:1").obj("article:1"); + /// assert_eq!(dest.collection(), "wiki"); + /// assert_eq!(dest.bucket_opt(), Some(&String::from("user:1"))); + /// assert_eq!(dest.object(), "article:1"); + /// ``` + pub fn obj(self, o: impl ToString) -> ObjDest { + ObjDest::new(self, o) + } + + /// Returns the collection. + #[inline] + pub fn collection(&self) -> &String { + &self.collection + } + + /// Returns the optional bucket. + #[inline] + pub fn bucket_opt(&self) -> Option<&String> { + self.bucket.as_ref() + } +} + +#[derive(Debug)] +pub(crate) struct OptDest { + pub(crate) collection: String, + pub(crate) bucket: Option, + pub(crate) object: Option, +} + +impl OptDest { + pub(crate) fn col(c: impl ToString) -> Self { + Self { + collection: c.to_string(), + bucket: None, + object: None, + } + } + + pub(crate) fn col_buc(c: impl ToString, b: impl ToString) -> Self { + Self { + collection: c.to_string(), + bucket: Some(b.to_string()), + object: None, + } + } + + pub(crate) fn col_buc_obj(c: impl ToString, b: impl ToString, o: impl ToString) -> Self { + Self { + collection: c.to_string(), + bucket: Some(b.to_string()), + object: Some(o.to_string()), + } + } +} + +impl From for OptDest { + fn from(d: Dest) -> Self { + Self { + collection: d.collection, + bucket: d.bucket, + object: None, + } + } +} + +impl From for OptDest { + fn from(ObjDest(dest, obj): ObjDest) -> Self { + Self { + collection: dest.collection, + bucket: dest.bucket, + object: Some(obj), + } + } +} diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..fcf4dcf --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,321 @@ +use std::io::{self, BufWriter, Write}; +use std::{path::PathBuf, str::FromStr}; + +use crate::{result::*, ChannelMode}; + +#[derive(Debug, Default)] +pub struct Protocol { + #[allow(dead_code)] + version: Version, +} + +impl From for Protocol { + fn from(version: Version) -> Self { + Self { version } + } +} + +impl Protocol { + pub fn format_request(&self, req: Request) -> io::Result> { + let mut res = BufWriter::new(Vec::new()); + + match req { + Request::Quit => write!(res, "QUIT")?, + + Request::Ping => write!(res, "PING")?, + + Request::Start { mode, password } => write!(res, "START {} {}", mode, password)?, + + #[rustfmt::skip] + Request::Count { collection, bucket, object } => match (bucket, object) { + (Some(b), Some(o)) => write!(res, "COUNT {} {} {}", collection, b, o)?, + (Some(b), None) => write!(res, "COUNT {} {}", collection, b)?, + (None, None) => write!(res, "COUNT {}", collection)?, + _ => panic!("Wrong protocol format"), + }, + + #[rustfmt::skip] + Request::Flush { collection, bucket, object } => match (bucket, object) { + (Some(b), Some(o)) => write!(res, "FLUSHO {} {} {}", collection, b, o)?, + (Some(b), None) => write!(res, "FLUSHB {} {}", collection, b)?, + (None, None) => write!(res, "FLUSHC {}", collection)?, + _ => panic!("Wrong protocol format"), + }, + + #[rustfmt::skip] + Request::Pop { collection, bucket, object, terms } => { + write!(res, "POP {} {} {} \"{}\"", collection, bucket, object, terms)? + }, + #[rustfmt::skip] + Request::Push { collection, bucket, object, terms, lang } => { + let oneline_terms = remove_multiline(&terms); + write!(res, "PUSH {} {} {} \"{}\"", collection, bucket, object, oneline_terms)?; + if let Some(lang) = lang { + write!(res, " LANG({})", lang)? + } + } + + #[rustfmt::skip] + Request::Query { collection, bucket, terms, offset, limit, lang } => { + write!(res, "QUERY {} {} \"{}\"", collection, bucket, terms)?; + if let Some(limit) = limit { + write!(res, " LIMIT({})", limit)?; + } + if let Some(offset) = offset { + write!(res, " OFFSET({})", offset)?; + } + if let Some(lang) = lang { + write!(res, " LANG({})", lang)?; + } + } + #[rustfmt::skip] + Request::Suggest { collection, bucket, word, limit } => { + write!(res, "SUGGEST {} {} \"{}\"", collection, bucket, word)?; + if let Some(limit) = limit { + write!(res, " LIMIT({})", limit)?; + } + } + + Request::Trigger(triger_req) => match triger_req { + TriggerRequest::Consolidate => write!(res, "TRIGGER consolidate")?, + TriggerRequest::Backup(path) => { + write!(res, "TRIGGER backup {}", path.to_str().unwrap())? + } + TriggerRequest::Restore(path) => { + write!(res, "TRIGGER restore {}", path.to_str().unwrap())? + } + }, + } + + write!(res, "\r\n")?; + res.flush()?; + + Ok(res.into_inner()?) + } + + pub fn parse_response(&self, line: &str) -> Result { + let mut segments = line.split_whitespace(); + match segments.next() { + Some("STARTED") => match (segments.next(), segments.next(), segments.next()) { + (Some(_raw_mode), Some(raw_protocol), Some(raw_buffer_size)) => { + Ok(Response::Started(StartedPayload { + protocol_version: parse_server_config(raw_protocol)?, + max_buffer_size: parse_server_config(raw_buffer_size)?, + })) + } + _ => Err(Error::WrongResponse), + }, + Some("PENDING") => { + let event_id = segments + .next() + .map(String::from) + .ok_or(Error::WrongResponse)?; + Ok(Response::Pending(event_id)) + } + Some("RESULT") => match segments.next() { + Some(num) => num + .parse() + .map(Response::Result) + .map_err(|_| Error::WrongResponse), + _ => Err(Error::WrongResponse), + }, + Some("EVENT") => { + let event_kind = match segments.next() { + Some("SUGGEST") => Ok(EventKind::Suggest), + Some("QUERY") => Ok(EventKind::Query), + _ => Err(Error::WrongResponse), + }?; + + let event_id = segments + .next() + .map(String::from) + .ok_or(Error::WrongResponse)?; + + let objects = segments.map(String::from).collect(); + + Ok(Response::Event(event_kind, event_id, objects)) + } + Some("OK") => Ok(Response::Ok), + Some("ENDED") => Ok(Response::Ended), + Some("CONNECTED") => Ok(Response::Connected), + Some("ERR") => match segments.next() { + Some(message) => Err(Error::SonicServer(String::from(message))), + _ => Err(Error::WrongResponse), + }, + _ => Err(Error::WrongResponse), + } + } +} + +//===========================================================================// +// Primitives // +//===========================================================================// + +#[derive(Debug, PartialEq, Eq)] +#[repr(u8)] +pub enum Version { + V1 = 1, +} + +impl Default for Version { + fn default() -> Self { + Self::V1 + } +} + +impl TryFrom for Version { + type Error = (); + + fn try_from(value: u8) -> std::result::Result { + match value { + 1 => Ok(Self::V1), + _ => Err(()), + } + } +} + +//===========================================================================// +// Response // +//===========================================================================// + +pub type EventId = String; + +#[derive(Debug)] +pub enum Response { + Ok, + Ended, + Connected, + Pending(EventId), + Pong, + Started(StartedPayload), + Result(usize), + Event(EventKind, EventId, Vec), +} + +#[derive(Debug)] +pub struct StartedPayload { + pub protocol_version: u8, + pub max_buffer_size: usize, +} + +#[derive(Debug)] +pub enum EventKind { + Suggest, + Query, +} + +//===========================================================================// +// Request // +//===========================================================================// + +#[derive(Debug)] +pub enum Request { + Start { + mode: ChannelMode, + password: String, + }, + Quit, + Ping, + Trigger(TriggerRequest), + Suggest { + collection: String, + bucket: String, + word: String, + limit: Option, + }, + Query { + collection: String, + bucket: String, + terms: String, + offset: Option, + limit: Option, + lang: Option<&'static str>, + }, + Push { + collection: String, + bucket: String, + object: String, + terms: String, + lang: Option<&'static str>, + }, + Pop { + collection: String, + bucket: String, + object: String, + terms: String, + }, + Flush { + collection: String, + bucket: Option, + object: Option, + }, + Count { + collection: String, + bucket: Option, + object: Option, + }, +} + +#[derive(Debug)] +pub enum TriggerRequest { + Consolidate, + Backup(PathBuf), + Restore(PathBuf), +} + +//===========================================================================// +// Utils // +//===========================================================================// + +fn parse_server_config(raw: &str) -> Result { + raw.split_terminator(&['(', ')']) + .nth(1) + .ok_or(Error::WrongResponse)? + .parse() + .map_err(|_| Error::WrongResponse) +} + +fn remove_multiline(text: &str) -> String { + text.lines() + .enumerate() + .fold(String::new(), |mut acc, (i, line)| { + if i != 0 && !line.is_empty() && !acc.is_empty() && !acc.ends_with(' ') { + acc.push(' '); + } + + acc.push_str(line); + acc + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn should_parse_protocol() { + match parse_server_config::("protocol(1)") { + Ok(protocol) => assert_eq!(protocol, 1), + _ => unreachable!(), + } + } + + #[test] + fn should_parse_buffer_size() { + match parse_server_config::("buffer_size(20000)") { + Ok(buffer_size) => assert_eq!(buffer_size, 20000), + _ => unreachable!(), + } + } + + #[test] + fn should_make_single_line() { + let text = " +Hello +World +"; + + let expected_text = "Hello World"; + assert_eq!(remove_multiline(text), expected_text); + } +} diff --git a/src/result.rs b/src/result.rs index b86da14..7205f9d 100644 --- a/src/result.rs +++ b/src/result.rs @@ -1,6 +1,4 @@ use crate::channels::ChannelMode; -use std::error::Error as StdError; -use std::fmt; /// Sugar if you expect only sonic-channel error type in result pub type Result = std::result::Result; @@ -8,29 +6,10 @@ pub type Result = std::result::Result; /// Wrap for sonic channel error kind. This type has std::error::Error /// implementation and you can use boxed trait for catch other errors /// like this. -#[derive(Debug)] -pub struct Error { - kind: ErrorKind, -} - -impl StdError for Error {} - -impl Error { - /// Creates new Error with sonic channel error kind - /// - /// ```rust - /// use sonic_channel::result::*; - /// - /// let err = Error::new(ErrorKind::ConnectToServer); - /// ``` - pub fn new(kind: ErrorKind) -> Self { - Error { kind } - } -} /// All error kinds that you can see in sonic-channel crate. #[derive(Debug)] -pub enum ErrorKind { +pub enum Error { /// Cannot connect to the sonic search backend. ConnectToServer, @@ -57,24 +36,25 @@ pub enum ErrorKind { UnsupportedCommand((&'static str, Option)), /// This error appears if the error occurred on the server side - SonicServer(&'static str), + SonicServer(String), } -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { - match self.kind { - ErrorKind::ConnectToServer => write!(f, "Cannot connect to server"), - ErrorKind::WriteToStream => write!(f, "Cannot write data to stream"), - ErrorKind::ReadStream => write!(f, "Cannot read sonic response from stream"), - ErrorKind::SwitchMode => write!(f, "Cannot switch channel mode"), - ErrorKind::RunCommand => write!(f, "Cannot run command in current mode"), - ErrorKind::QueryResponse(message) => { +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use Error::*; + match self { + ConnectToServer => f.write_str("Cannot connect to server"), + WriteToStream => f.write_str("Cannot write data to stream"), + ReadStream => f.write_str("Cannot read sonic response from stream"), + SwitchMode => f.write_str("Cannot switch channel mode"), + RunCommand => f.write_str("Cannot run command in current mode"), + QueryResponse(message) => { write!(f, "Error in query response: {}", message) } - ErrorKind::WrongResponse => { + WrongResponse => { write!(f, "Client cannot parse response from sonic server. Please write an issue to github (https://github.com/pleshevskiy/sonic-channel).") } - ErrorKind::UnsupportedCommand((command_name, channel_mode)) => { + UnsupportedCommand((command_name, channel_mode)) => { if let Some(channel_mode) = channel_mode { write!( f, @@ -89,7 +69,9 @@ impl fmt::Display for Error { ) } } - ErrorKind::SonicServer(message) => write!(f, "Sonic Server-side error: {}", message), + SonicServer(message) => write!(f, "Sonic Server-side error: {}", message), } } } + +impl std::error::Error for Error {} diff --git a/tests/common.rs b/tests/common.rs index 5c50f35..79d38bd 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -22,5 +22,7 @@ pub fn consolidate() { } pub fn flush_bucket(collection: &str, bucket: &str) { - ingest_start().flushb(collection, bucket).unwrap(); + ingest_start() + .flush(FlushRequest::bucket(collection, bucket)) + .unwrap(); } diff --git a/tests/push_command.rs b/tests/push_command.rs index d7d4547..8d40284 100644 --- a/tests/push_command.rs +++ b/tests/push_command.rs @@ -7,10 +7,15 @@ const COLLECTION: &str = "Ingest"; fn should_push_new_object_to_sonic() { let bucket = "push_simple"; + let dest = Dest::col_buc(COLLECTION, bucket); + let ingest_channel = ingest_start(); - match ingest_channel.push(COLLECTION, bucket, "1", "Sweet Teriyaki Beef Skewers") { - Ok(res) => assert!(res), - Err(_) => unreachable!(), + match ingest_channel.push(PushRequest::new( + dest.obj("1"), + "Sweet Teriyaki Beef Skewers", + )) { + Ok(()) => {} + _ => unreachable!(), } flush_bucket(COLLECTION, bucket); @@ -20,16 +25,14 @@ fn should_push_new_object_to_sonic() { fn should_push_new_object_to_sonic_with_russian_locale() { let bucket = "push_locale"; + let dest = Dest::col_buc(COLLECTION, bucket); + let ingest_channel = ingest_start(); - match ingest_channel.push_with_locale( - COLLECTION, - bucket, - "1", - "Открытый пирог с орехами и сгущенкой", - "rus", + match ingest_channel.push( + PushRequest::new(dest.obj("1"), "Открытый пирог с орехами и сгущенкой").lang(Lang::Rus), ) { - Ok(res) => assert!(res), - Err(_) => unreachable!(), + Ok(()) => {} + _ => unreachable!(), } flush_bucket(COLLECTION, bucket); @@ -45,10 +48,12 @@ Beef Skewers "; + let dest = Dest::col_buc(COLLECTION, bucket); + let ingest_channel = ingest_start(); - match ingest_channel.push(COLLECTION, bucket, "1", multiline_text) { - Ok(res) => assert!(res), - Err(_) => unreachable!(), + match ingest_channel.push(PushRequest::new(dest.obj("1"), multiline_text)) { + Ok(()) => {} + _ => unreachable!(), } flush_bucket(COLLECTION, bucket); diff --git a/tests/query_command.rs b/tests/query_command.rs index f559a75..ca872e3 100644 --- a/tests/query_command.rs +++ b/tests/query_command.rs @@ -8,35 +8,19 @@ fn should_find_object_by_exact_match() { let bucket = "query_by_exact_match"; let title = "Sweet Teriyaki Beef Skewers"; - let ingest_channel = ingest_start(); - ingest_channel.push(COLLECTION, bucket, "1", title).unwrap(); - - let search_channel = search_start(); - match search_channel.query(COLLECTION, bucket, title) { - Ok(object_ids) => assert_eq!(object_ids, vec!["1"]), - Err(_) => unreachable!(), - } - - flush_bucket(COLLECTION, bucket); -} - -#[test] -fn should_find_object_by_partial_match() { - let bucket = "query_by_partial_match"; + let dest = Dest::col_buc(COLLECTION, bucket); let ingest_channel = ingest_start(); ingest_channel - .push(COLLECTION, bucket, "1", "Sweet Teriyaki Beef Skewers") + .push(PushRequest::new(dest.clone().obj("1"), title)) .unwrap(); - let search_channel = search_start(); + consolidate(); - let words = ["Sweet", "Teriyaki", "Beef", "Skewers"]; - for word in words { - match search_channel.query(COLLECTION, bucket, word) { - Ok(object_ids) => assert_eq!(object_ids, vec!["1"]), - Err(_) => unreachable!(), - } + let search_channel = search_start(); + match search_channel.query(QueryRequest::new(dest, title)) { + Ok(object_ids) => assert_eq!(object_ids, vec![String::from("1")]), + Err(_) => unreachable!(), } flush_bucket(COLLECTION, bucket); @@ -50,19 +34,22 @@ Sweet Teriyaki Beef Skewers -"; +None"; + + let dest = Dest::col_buc(COLLECTION, bucket); let ingest_channel = ingest_start(); ingest_channel - .push(COLLECTION, bucket, "1", multiline_text) + .push(PushRequest::new(dest.clone().obj("1"), multiline_text)) .unwrap(); - let search_channel = search_start(); + consolidate(); let words = ["Sweet", "Teriyaki", "Beef", "Skewers"]; + let search_channel = search_start(); for word in words { - match search_channel.query(COLLECTION, bucket, word) { - Ok(object_ids) => assert_eq!(object_ids, vec!["1"]), + match search_channel.query(QueryRequest::new(dest.clone(), word)) { + Ok(object_ids) => assert_eq!(object_ids, vec![String::from("1")]), Err(_) => unreachable!(), } } @@ -74,19 +61,32 @@ Skewers fn should_find_many_objects() { let bucket = "query_many_objects"; + let dest = Dest::col_buc(COLLECTION, bucket); + let ingest_channel = ingest_start(); ingest_channel - .push(COLLECTION, bucket, "1", "Sweet Teriyaki Beef Skewers") + .push(PushRequest::new( + dest.clone().obj("1"), + "Sweet Teriyaki Beef Skewers", + )) .unwrap(); ingest_channel - .push(COLLECTION, bucket, "2", "Slow Cooker Beef Stew I") + .push(PushRequest::new( + dest.clone().obj("2"), + "Slow Cooker Beef Stew I", + )) .unwrap(); ingest_channel - .push(COLLECTION, bucket, "3", "Christmas Prime Rib") + .push(PushRequest::new( + dest.clone().obj("3"), + "Christmas Prime Rib", + )) .unwrap(); + consolidate(); + let search_channel = search_start(); - match search_channel.query(COLLECTION, bucket, "Beef") { + match search_channel.query(QueryRequest::new(dest, "Beef")) { Ok(object_ids) => assert_eq!(object_ids, vec!["2", "1"]), Err(_) => unreachable!(), } @@ -98,25 +98,38 @@ fn should_find_many_objects() { fn should_find_limited_objects() { let bucket = "query_limited_objects"; + let dest = Dest::col_buc(COLLECTION, bucket); + let ingest_channel = ingest_start(); ingest_channel - .push(COLLECTION, bucket, "1", "Sweet Teriyaki Beef Skewers") + .push(PushRequest::new( + dest.clone().obj("1"), + "Sweet Teriyaki Beef Skewers", + )) .unwrap(); ingest_channel - .push(COLLECTION, bucket, "2", "Slow Cooker Beef Stew I") + .push(PushRequest::new( + dest.clone().obj("2"), + "Slow Cooker Beef Stew I", + )) .unwrap(); ingest_channel - .push(COLLECTION, bucket, "3", "Christmas Prime Rib") + .push(PushRequest::new( + dest.clone().obj("3"), + "Christmas Prime Rib", + )) .unwrap(); + consolidate(); + let search_channel = search_start(); - match search_channel.query_with_limit(COLLECTION, bucket, "Beef", 1) { + match search_channel.query(QueryRequest::new(dest.clone(), "Beef").limit(1)) { Ok(object_ids) => assert_eq!(object_ids, vec!["2"]), Err(_) => unreachable!(), } let search_channel = search_start(); - match search_channel.query_with_limit_and_offset(COLLECTION, bucket, "Beef", 1, 1) { + match search_channel.query(QueryRequest::new(dest, "Beef").pag(1, 1)) { Ok(object_ids) => assert_eq!(object_ids, vec!["1"]), Err(_) => unreachable!(), } diff --git a/tests/suggest_command.rs b/tests/suggest_command.rs index 62ee0e5..a4de7f2 100644 --- a/tests/suggest_command.rs +++ b/tests/suggest_command.rs @@ -8,8 +8,12 @@ fn should_suggest_nearest_word() { let bucket = "suggest_nearest"; let title = "Sweet Teriyaki Beef Skewers"; + let dest = Dest::col_buc(COLLECTION, bucket); + let ingest_channel = ingest_start(); - ingest_channel.push(COLLECTION, bucket, "1", title).unwrap(); + ingest_channel + .push(PushRequest::new(dest.clone().obj("1"), title)) + .unwrap(); consolidate(); @@ -22,7 +26,7 @@ fn should_suggest_nearest_word() { let search_channel = search_start(); for (input, expected) in pairs { - match search_channel.suggest(COLLECTION, bucket, input) { + match search_channel.suggest(SuggestRequest::new(dest.clone(), input)) { Ok(object_ids) => assert_eq!(object_ids, vec![expected]), Err(_) => unreachable!(), }