Merge pull request #11 from pleshevskiy/bug-8

fix!: handle sonic server-side errors
This commit is contained in:
Dmitriy Pleshevskiy 2021-12-22 15:03:37 +02:00 committed by GitHub
commit 215b045f9b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 46 additions and 41 deletions

View file

@ -101,12 +101,13 @@ impl SonicStream {
let mut reader = BufReader::with_capacity(self.max_buffer_size, &self.stream); let mut reader = BufReader::with_capacity(self.max_buffer_size, &self.stream);
let mut message = String::new(); let mut message = String::new();
let mut lines_read = 0; for _ in 0..max_read_lines {
while lines_read < max_read_lines {
reader reader
.read_line(&mut message) .read_line(&mut message)
.map_err(|_| Error::new(ErrorKind::ReadStream))?; .map_err(|_| Error::new(ErrorKind::ReadStream))?;
lines_read += 1; if message.starts_with("ERR ") {
break;
}
} }
Ok(message) Ok(message)
@ -115,8 +116,14 @@ impl SonicStream {
pub(crate) fn run_command<SC: StreamCommand>(&self, command: SC) -> Result<SC::Response> { pub(crate) fn run_command<SC: StreamCommand>(&self, command: SC) -> Result<SC::Response> {
self.write(&command)?; self.write(&command)?;
let message = self.read(SC::READ_LINES_COUNT)?; let message = self.read(SC::READ_LINES_COUNT)?;
if let Some(error) = message.strip_prefix("ERR ") {
Err(Error::new(ErrorKind::SonicServer(Box::leak(
error.to_owned().into_boxed_str(),
))))
} else {
command.receive(message) command.receive(message)
} }
}
fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> { fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
let stream = let stream =
@ -212,3 +219,15 @@ pub trait SonicChannel {
A: ToSocketAddrs, A: ToSocketAddrs,
S: ToString; S: ToString;
} }
#[cfg(test)]
mod tests {
use super::ChannelMode;
#[test]
fn format_channel_enums() {
assert_eq!(format!("{}", ChannelMode::Search), String::from("search"));
assert_eq!(format!("{}", ChannelMode::Ingest), String::from("ingest"));
assert_eq!(format!("{}", ChannelMode::Control), String::from("control"));
}
}

View file

@ -28,12 +28,12 @@ impl StreamCommand for CountCommand<'_> {
if message.starts_with("RESULT ") { if message.starts_with("RESULT ") {
let count = message.split_whitespace().last().unwrap_or_default(); let count = message.split_whitespace().last().unwrap_or_default();
count.parse().map_err(|_| { count.parse().map_err(|_| {
Error::new(ErrorKind::QueryResponseError( Error::new(ErrorKind::QueryResponse(
"Cannot parse count of count method response to usize", "Cannot parse count of count method response to usize",
)) ))
}) })
} else { } else {
Err(Error::new(ErrorKind::WrongSonicResponse)) Err(Error::new(ErrorKind::WrongResponse))
} }
} }
} }

View file

@ -28,12 +28,12 @@ impl StreamCommand for FlushCommand<'_> {
if message.starts_with("RESULT ") { if message.starts_with("RESULT ") {
let count = message.split_whitespace().last().unwrap_or_default(); let count = message.split_whitespace().last().unwrap_or_default();
count.parse().map_err(|_| { count.parse().map_err(|_| {
Error::new(ErrorKind::QueryResponseError( Error::new(ErrorKind::QueryResponse(
"Cannot parse count of flush method response to usize", "Cannot parse count of flush method response to usize",
)) ))
}) })
} else { } else {
Err(Error::new(ErrorKind::WrongSonicResponse)) Err(Error::new(ErrorKind::WrongResponse))
} }
} }
} }

View file

@ -15,7 +15,7 @@ impl StreamCommand for PingCommand {
if message == "PONG\r\n" { if message == "PONG\r\n" {
Ok(true) Ok(true)
} else { } else {
Err(Error::new(ErrorKind::WrongSonicResponse)) Err(Error::new(ErrorKind::WrongResponse))
} }
} }
} }

View file

@ -25,12 +25,12 @@ impl StreamCommand for PopCommand<'_> {
if message.starts_with("RESULT ") { if message.starts_with("RESULT ") {
let count = message.split_whitespace().last().unwrap_or_default(); let count = message.split_whitespace().last().unwrap_or_default();
count.parse().map_err(|_| { count.parse().map_err(|_| {
Error::new(ErrorKind::QueryResponseError( Error::new(ErrorKind::QueryResponse(
"Cannot parse count of pop method response to usize", "Cannot parse count of pop method response to usize",
)) ))
}) })
} else { } else {
Err(Error::new(ErrorKind::WrongSonicResponse)) Err(Error::new(ErrorKind::WrongResponse))
} }
} }
} }

View file

@ -31,12 +31,8 @@ impl StreamCommand for PushCommand<'_> {
fn receive(&self, message: String) -> Result<Self::Response> { fn receive(&self, message: String) -> Result<Self::Response> {
if message == "OK\r\n" { if message == "OK\r\n" {
Ok(true) Ok(true)
} else if message.starts_with("ERR ") {
Err(Error::new(ErrorKind::QueryResponseError(Box::leak(
message.into_boxed_str(),
))))
} else { } else {
Err(Error::new(ErrorKind::WrongSonicResponse)) Err(Error::new(ErrorKind::WrongResponse))
} }
} }
} }

View file

@ -43,7 +43,7 @@ impl StreamCommand for QueryCommand<'_> {
if let Some(caps) = RE.captures(&message) { if let Some(caps) = RE.captures(&message) {
if caps["pending_query_id"] != caps["event_query_id"] { if caps["pending_query_id"] != caps["event_query_id"] {
Err(Error::new(ErrorKind::QueryResponseError( Err(Error::new(ErrorKind::QueryResponse(
"Pending id and event id don't match", "Pending id and event id don't match",
))) )))
} else if caps["objects"].is_empty() { } else if caps["objects"].is_empty() {
@ -55,7 +55,7 @@ impl StreamCommand for QueryCommand<'_> {
.collect()) .collect())
} }
} else { } else {
Err(Error::new(ErrorKind::WrongSonicResponse)) Err(Error::new(ErrorKind::WrongResponse))
} }
} }
} }

View file

@ -15,7 +15,7 @@ impl StreamCommand for QuitCommand {
if message.starts_with("ENDED ") { if message.starts_with("ENDED ") {
Ok(true) Ok(true)
} else { } else {
Err(Error::new(ErrorKind::WrongSonicResponse)) Err(Error::new(ErrorKind::WrongResponse))
} }
} }
} }

View file

@ -38,10 +38,10 @@ impl StreamCommand for SuggestCommand<'_> {
} }
match RE.captures(&message) { match RE.captures(&message) {
None => Err(Error::new(ErrorKind::WrongSonicResponse)), None => Err(Error::new(ErrorKind::WrongResponse)),
Some(caps) => { Some(caps) => {
if caps["pending_suggest_id"] != caps["event_suggest_id"] { if caps["pending_suggest_id"] != caps["event_suggest_id"] {
Err(Error::new(ErrorKind::QueryResponseError( Err(Error::new(ErrorKind::QueryResponse(
"Pending id and event id don't match", "Pending id and event id don't match",
))) )))
} else if caps["words"].is_empty() { } else if caps["words"].is_empty() {

View file

@ -41,7 +41,7 @@ impl StreamCommand for TriggerCommand<'_> {
if message == "OK\r\n" { if message == "OK\r\n" {
Ok(true) Ok(true)
} else { } else {
Err(Error::new(ErrorKind::WrongSonicResponse)) Err(Error::new(ErrorKind::WrongResponse))
} }
} }
} }

View file

@ -99,17 +99,3 @@ pub use channels::*;
#[macro_use] #[macro_use]
extern crate lazy_static; extern crate lazy_static;
extern crate regex; extern crate regex;
#[cfg(test)]
mod tests {
use crate::channels::ChannelMode;
#[test]
fn format_channel_enums() {
assert_eq!(format!("{}", ChannelMode::Search), String::from("search"));
assert_eq!(format!("{}", ChannelMode::Ingest), String::from("ingest"));
assert_eq!(format!("{}", ChannelMode::Control), String::from("control"));
}
//TODO: write tests with sonic server
}

View file

@ -47,14 +47,17 @@ pub enum ErrorKind {
RunCommand, RunCommand,
/// Error in query response with additional message. /// Error in query response with additional message.
QueryResponseError(&'static str), QueryResponse(&'static str),
/// Response from sonic server are wrong! Actually it may happen if you use /// Response from sonic server are wrong! Actually it may happen if you use
/// unsupported sonic backend version. Please write issue to the github repo. /// unsupported sonic backend version. Please write issue to the github repo.
WrongSonicResponse, WrongResponse,
/// You cannot run the command in current channel. /// You cannot run the command in current channel.
UnsupportedCommand((&'static str, Option<ChannelMode>)), UnsupportedCommand((&'static str, Option<ChannelMode>)),
/// This error appears if the error occurred on the server side
SonicServer(&'static str),
} }
impl fmt::Display for Error { impl fmt::Display for Error {
@ -65,11 +68,11 @@ impl fmt::Display for Error {
ErrorKind::ReadStream => write!(f, "Cannot read sonic response from stream"), ErrorKind::ReadStream => write!(f, "Cannot read sonic response from stream"),
ErrorKind::SwitchMode => write!(f, "Cannot switch channel mode"), ErrorKind::SwitchMode => write!(f, "Cannot switch channel mode"),
ErrorKind::RunCommand => write!(f, "Cannot run command in current mode"), ErrorKind::RunCommand => write!(f, "Cannot run command in current mode"),
ErrorKind::QueryResponseError(message) => { ErrorKind::QueryResponse(message) => {
write!(f, "Error in query response: {}", message) write!(f, "Error in query response: {}", message)
} }
ErrorKind::WrongSonicResponse => { ErrorKind::WrongResponse => {
write!(f, "Sonic response are wrong. Please write issue to github.") write!(f, "Client cannot parse response from sonic server. Please write an issue to github (https://github.com/pleshevskiy/sonic-channel).")
} }
ErrorKind::UnsupportedCommand((command_name, channel_mode)) => { ErrorKind::UnsupportedCommand((command_name, channel_mode)) => {
if let Some(channel_mode) = channel_mode { if let Some(channel_mode) = channel_mode {
@ -86,6 +89,7 @@ impl fmt::Display for Error {
) )
} }
} }
ErrorKind::SonicServer(message) => write!(f, "Sonic Server-side error: {}", message),
} }
} }
} }