From f0faf1175c7171d23aa3ade065f6d155ce379c5b Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 10 Jul 2023 08:58:29 +0200 Subject: [PATCH] Add concrete errors to Object Store Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/object_store/mod.rs | 462 ++++++++++++++++--- async-nats/src/jetstream/stream.rs | 1 + 2 files changed, 405 insertions(+), 58 deletions(-) diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index 02e6572f2..28c82f073 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -12,13 +12,8 @@ // limitations under the License. //! Object Store module -use std::{ - cmp, - io::{self, ErrorKind}, - str::FromStr, - task::Poll, - time::Duration, -}; +use std::fmt::Display; +use std::{cmp, str::FromStr, task::Poll, time::Duration}; use crate::{HeaderMap, HeaderValue}; use base64::engine::general_purpose::{STANDARD, URL_SAFE}; @@ -32,8 +27,10 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use tracing::{debug, trace}; -use crate::Error; - +use super::consumer::push::OrderedError; +use super::consumer::{StreamError, StreamErrorKind}; +use super::context::{PublishError, PublishErrorKind}; +use super::stream::{ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind}; use super::{consumer::push::Ordered, stream::StorageType}; use time::{serde::rfc3339, OffsetDateTime}; @@ -107,7 +104,7 @@ impl ObjectStore { /// # Ok(()) /// # } /// ``` - pub async fn get>(&self, object_name: T) -> Result, Error> { + pub async fn get>(&self, object_name: T) -> Result, GetError> { let object_info = self.info(object_name).await?; // if let Some(link) = object_info.link { // return self.get(link.name).await; @@ -147,17 +144,30 @@ impl ObjectStore { /// # Ok(()) /// # } /// ``` - pub async fn delete>(&self, object_name: T) -> Result<(), Error> { + pub async fn delete>(&self, object_name: T) -> Result<(), DeleteError> { let object_name = object_name.as_ref(); let mut object_info = self.info(object_name).await?; object_info.chunks = 0; object_info.size = 0; object_info.deleted = true; - let data = serde_json::to_vec(&object_info)?; + let data = serde_json::to_vec(&object_info).map_err(|err| { + DeleteError::with_source( + DeleteErrorKind::Other, + format!("failed deserializing object info: {}", err), + ) + })?; let mut headers = HeaderMap::default(); - headers.insert(NATS_ROLLUP, HeaderValue::from_str(ROLLUP_SUBJECT)?); + headers.insert( + NATS_ROLLUP, + HeaderValue::from_str(ROLLUP_SUBJECT).map_err(|err| { + DeleteError::with_source( + DeleteErrorKind::Other, + format!("failed parsing header: {}", err), + ) + })?, + ); let subject = format!("$O.{}.M.{}", &self.name, encode_object_name(object_name)); @@ -189,14 +199,11 @@ impl ObjectStore { /// # Ok(()) /// # } /// ``` - pub async fn info>(&self, object_name: T) -> Result { + pub async fn info>(&self, object_name: T) -> Result { let object_name = object_name.as_ref(); let object_name = encode_object_name(object_name); if !is_valid_object_name(&object_name) { - return Err(Box::new(io::Error::new( - io::ErrorKind::InvalidInput, - "invalid object name", - ))); + return Err(InfoError::new(InfoErrorKind::InvalidName)); } // Grab last meta value we have. @@ -205,11 +212,23 @@ impl ObjectStore { let message = self .stream .get_last_raw_message_by_subject(subject.as_str()) - .await?; + .await + .map_err(|err| match err.kind() { + super::stream::LastRawMessageErrorKind::NoMessageFound => { + InfoError::new(InfoErrorKind::NotFound) + } + _ => InfoError::with_source(InfoErrorKind::Other, err), + })?; let decoded_payload = STANDARD .decode(message.payload) - .map_err(|err| Box::new(std::io::Error::new(ErrorKind::Other, err)))?; - let object_info = serde_json::from_slice::(&decoded_payload)?; + .map_err(|err| InfoError::with_source(InfoErrorKind::Other, err))?; + let object_info = + serde_json::from_slice::(&decoded_payload).map_err(|err| { + InfoError::with_source( + InfoErrorKind::Other, + format!("failed to decode info payload: {}", err), + ) + })?; Ok(object_info) } @@ -235,7 +254,7 @@ impl ObjectStore { &self, meta: T, data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin), - ) -> Result + ) -> Result where ObjectMeta: From, { @@ -243,10 +262,7 @@ impl ObjectStore { let encoded_object_name = encode_object_name(&object_meta.name); if !is_valid_object_name(&encoded_object_name) { - return Err(Box::new(io::Error::new( - io::ErrorKind::InvalidInput, - "invalid object name", - ))); + return Err(PutError::new(PutErrorKind::InvalidName)); } // Fetch any existing object info, if there is any for later use. let maybe_existing_object_info = match self.info(&encoded_object_name).await { @@ -264,7 +280,10 @@ impl ObjectStore { let mut context = ring::digest::Context::new(&SHA256); loop { - let n = data.read(&mut *buffer).await?; + let n = data + .read(&mut *buffer) + .await + .map_err(|err| PutError::with_source(PutErrorKind::ReadChunks, err))?; if n == 0 { break; @@ -280,8 +299,20 @@ impl ObjectStore { self.stream .context .publish(chunk_subject.clone(), payload) - .await? - .await?; + .await + .map_err(|err| { + PutError::with_source( + PutErrorKind::PublishChunks, + format!("failed chunk publish: {}", err), + ) + })? + .await + .map_err(|err| { + PutError::with_source( + PutErrorKind::PublishChunks, + format!("failed getting chunk ack: {}", err), + ) + })?; } let digest = context.finish(); let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name); @@ -299,21 +330,50 @@ impl ObjectStore { }; let mut headers = HeaderMap::new(); - headers.insert(NATS_ROLLUP, ROLLUP_SUBJECT.parse::()?); - let data = serde_json::to_vec(&object_info)?; + headers.insert( + NATS_ROLLUP, + ROLLUP_SUBJECT.parse::().map_err(|err| { + PutError::with_source( + PutErrorKind::Other, + format!("failed parsing header: {}", err), + ) + })?, + ); + let data = serde_json::to_vec(&object_info).map_err(|err| { + PutError::with_source( + PutErrorKind::Other, + format!("failed serializing object info: {}", err), + ) + })?; // publish meta. self.stream .context .publish_with_headers(subject, headers, data.into()) - .await? - .await?; + .await + .map_err(|err| { + PutError::with_source( + PutErrorKind::PublishMetadata, + format!("failed publishing metadata: {}", err), + ) + })? + .await + .map_err(|err| { + PutError::with_source( + PutErrorKind::PublishMetadata, + format!("failed ack from metadata publish: {}", err), + ) + })?; // Purge any old chunks. if let Some(existing_object_info) = maybe_existing_object_info { let chunk_subject = format!("$O.{}.C.{}", &self.name, &existing_object_info.nuid); - self.stream.purge().filter(&chunk_subject).await?; + self.stream + .purge() + .filter(&chunk_subject) + .await + .map_err(|err| PutError::with_source(PutErrorKind::PurgeOldChunks, err))?; } Ok(object_info) @@ -338,7 +398,7 @@ impl ObjectStore { /// # Ok(()) /// # } /// ``` - pub async fn watch(&self) -> Result, Error> { + pub async fn watch(&self) -> Result, WatchError> { let subject = format!("$O.{}.M.>", self.name); let ordered = self .stream @@ -374,7 +434,7 @@ impl ObjectStore { /// # Ok(()) /// # } /// ``` - pub async fn list(&self) -> Result, Error> { + pub async fn list(&self) -> Result, ListError> { trace!("starting Object List"); let subject = format!("$O.{}.M.>", self.name); let ordered = self @@ -409,8 +469,13 @@ impl ObjectStore { /// # Ok(()) /// # } /// ``` - pub async fn seal(&mut self) -> Result<(), Error> { - let mut stream_config = self.stream.info().await?.to_owned(); + pub async fn seal(&mut self) -> Result<(), SealError> { + let mut stream_config = self + .stream + .info() + .await + .map_err(|err| SealError::with_source(SealErrorKind::Info, err))? + .to_owned(); stream_config.config.sealed = true; self.stream @@ -426,7 +491,7 @@ pub struct Watch<'a> { } impl Stream for Watch<'_> { - type Item = Result; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -437,10 +502,10 @@ impl Stream for Watch<'_> { Some(message) => Poll::Ready( serde_json::from_slice::(&message?.payload) .map_err(|err| { - Box::from(io::Error::new( - ErrorKind::Other, - format!("failed to deserialize the response: {err:?}"), - )) + WatcherError::with_source( + WatcherErrorKind::Other, + format!("failed to deserialize object info: {}", err), + ) }) .map_or_else(|err| Some(Err(err)), |result| Some(Ok(result))), ), @@ -457,7 +522,7 @@ pub struct List<'a> { } impl Stream for List<'_> { - type Item = Result; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -474,23 +539,24 @@ impl Stream for List<'_> { None => return Poll::Ready(None), Some(message) => { let message = message?; - let info = message.info()?; + let info = message + .info() + .map_err(|err| ListerError::with_source(ListerErrorKind::Other, err))?; trace!("num pending: {}", info.pending); if info.pending == 0 { self.done = true; } - let response: ObjectInfo = serde_json::from_slice(&message.payload)?; + let response: ObjectInfo = serde_json::from_slice(&message.payload) + .map_err(|err| { + ListerError::with_source( + ListerErrorKind::Other, + format!("failed deserializing object info: {}", err), + ) + })?; if response.deleted { continue; } - return Poll::Ready(Some( - serde_json::from_slice(&message.payload).map_err(|err| { - Box::from(std::io::Error::new( - ErrorKind::Other, - format!("failed to serialize object info: {err}"), - )) - }), - )); + return Poll::Ready(Some(Ok(response))); } }, Poll::Pending => return Poll::Pending, @@ -576,14 +642,14 @@ impl tokio::io::AsyncRead for Object<'_> { }) .unwrap_or(false) { - return Poll::Ready(Err(io::Error::new( - ErrorKind::InvalidData, + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, "wrong digest", ))); } } else { - return Poll::Ready(Err(io::Error::new( - ErrorKind::InvalidData, + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, "digest should be Some", ))); } @@ -668,3 +734,283 @@ impl From<&str> for ObjectMeta { } } } + +#[derive(Debug)] +pub struct InfoError { + kind: InfoErrorKind, + source: Option>, +} + +#[derive(Debug, PartialEq, Clone)] +pub enum InfoErrorKind { + InvalidName, + NotFound, + Other, + TimedOut, +} + +impl Display for InfoError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.kind { + InfoErrorKind::InvalidName => write!(f, "invalid object name"), + InfoErrorKind::Other => write!(f, "getting info failed: {}", self.format_source()), + InfoErrorKind::NotFound => write!(f, "not found"), + InfoErrorKind::TimedOut => write!(f, "timed out"), + } + } +} + +crate::error_impls!(InfoError, InfoErrorKind); + +#[derive(Debug)] +pub struct GetError { + kind: GetErrorKind, + source: Option>, +} +#[derive(Debug, PartialEq, Clone)] +pub enum GetErrorKind { + InvalidName, + ConsumerCreate, + NotFound, + Other, + TimedOut, +} +crate::error_impls!(GetError, GetErrorKind); +crate::from_with_timeout!(GetError, GetErrorKind, ConsumerError, ConsumerErrorKind); +crate::from_with_timeout!(GetError, GetErrorKind, StreamError, StreamErrorKind); + +impl From for GetError { + fn from(err: InfoError) -> Self { + match err.kind() { + InfoErrorKind::InvalidName => GetError::new(GetErrorKind::InvalidName), + InfoErrorKind::NotFound => GetError::new(GetErrorKind::NotFound), + InfoErrorKind::Other => GetError::with_source(GetErrorKind::Other, err), + InfoErrorKind::TimedOut => GetError::new(GetErrorKind::TimedOut), + } + } +} + +impl Display for GetError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.kind() { + GetErrorKind::ConsumerCreate => { + write!( + f, + "failed creating consumer for fetching object: {}", + self.format_source() + ) + } + GetErrorKind::Other => write!(f, "failed getting object: {}", self.format_source()), + GetErrorKind::NotFound => write!(f, "object not found"), + GetErrorKind::TimedOut => write!(f, "timed out"), + GetErrorKind::InvalidName => write!(f, "invalid object name"), + } + } +} + +#[derive(Debug)] +pub struct DeleteError { + kind: DeleteErrorKind, + source: Option>, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum DeleteErrorKind { + TimedOut, + NotFound, + Metadata, + InvalidName, + Chunks, + Other, +} + +impl Display for DeleteError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.kind() { + DeleteErrorKind::TimedOut => write!(f, "timed out"), + DeleteErrorKind::Metadata => { + write!(f, "failed rolling up metadata: {}", self.format_source()) + } + DeleteErrorKind::Chunks => write!(f, "failed purging chunks: {}", self.format_source()), + DeleteErrorKind::Other => write!(f, "delete failed: {}", self.format_source()), + DeleteErrorKind::NotFound => write!(f, "object not found"), + DeleteErrorKind::InvalidName => write!(f, "invalid object name"), + } + } +} + +impl From for DeleteError { + fn from(err: InfoError) -> Self { + match err.kind() { + InfoErrorKind::InvalidName => DeleteError::new(DeleteErrorKind::InvalidName), + InfoErrorKind::NotFound => DeleteError::new(DeleteErrorKind::NotFound), + InfoErrorKind::Other => DeleteError::with_source(DeleteErrorKind::Other, err), + InfoErrorKind::TimedOut => DeleteError::new(DeleteErrorKind::TimedOut), + } + } +} + +crate::error_impls!(DeleteError, DeleteErrorKind); +crate::from_with_timeout!(DeleteError, DeleteErrorKind, PublishError, PublishErrorKind); +crate::from_with_timeout!(DeleteError, DeleteErrorKind, PurgeError, PurgeErrorKind); + +#[derive(Debug)] +pub struct PutError { + kind: PutErrorKind, + source: Option>, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum PutErrorKind { + InvalidName, + ReadChunks, + PublishChunks, + PublishMetadata, + PurgeOldChunks, + TimedOut, + Other, +} + +crate::error_impls!(PutError, PutErrorKind); + +impl Display for PutError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.kind() { + PutErrorKind::PublishChunks => { + write!( + f, + "failed publishing object chunks: {}", + self.format_source() + ) + } + PutErrorKind::PublishMetadata => { + write!(f, "failed publishing metadata: {}", self.format_source()) + } + PutErrorKind::PurgeOldChunks => { + write!(f, "falied purging old chunks: {}", self.format_source()) + } + PutErrorKind::TimedOut => write!(f, "timed out"), + PutErrorKind::Other => write!(f, "error: {}", self.format_source()), + PutErrorKind::InvalidName => write!(f, "invalid object name"), + PutErrorKind::ReadChunks => write!( + f, + "error while reading the buffer: {}", + self.format_source() + ), + } + } +} + +#[derive(Debug)] +pub struct WatchError { + kind: WatchErrorKind, + source: Option>, +} + +#[derive(Debug, PartialEq, Clone)] +pub enum WatchErrorKind { + TimedOut, + ConsumerCreate, + Other, +} + +crate::error_impls!(WatchError, WatchErrorKind); +crate::from_with_timeout!(WatchError, WatchErrorKind, ConsumerError, ConsumerErrorKind); +crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKind); + +impl Display for WatchError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.kind { + WatchErrorKind::ConsumerCreate => { + write!( + f, + "watch consumer creation failed: {}", + self.format_source() + ) + } + WatchErrorKind::Other => write!(f, "watch failed: {}", self.format_source()), + WatchErrorKind::TimedOut => write!(f, "timed out"), + } + } +} + +pub type ListError = WatchError; +pub type ListErrorKind = WatchErrorKind; + +#[derive(Debug)] +pub struct SealError { + kind: SealErrorKind, + source: Option>, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum SealErrorKind { + TimedOut, + Other, + Info, + Update, +} + +crate::error_impls!(SealError, SealErrorKind); + +impl Display for SealError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.kind { + SealErrorKind::TimedOut => write!(f, "timed out"), + SealErrorKind::Other => write!(f, "seal failed: {}", self.format_source()), + SealErrorKind::Info => write!( + f, + "failed getting stream info before sealing bucket: {}", + self.format_source() + ), + SealErrorKind::Update => { + write!(f, "failed sealing the bucket: {}", self.format_source()) + } + } + } +} + +impl From for SealError { + fn from(err: super::context::UpdateStreamError) -> Self { + match err.kind() { + super::context::CreateStreamErrorKind::TimedOut => { + SealError::new(SealErrorKind::TimedOut) + } + _ => SealError::with_source(SealErrorKind::Update, err), + } + } +} + +#[derive(Debug)] +pub struct WatcherError { + kind: WatcherErrorKind, + source: Option>, +} + +#[derive(Clone, Debug, PartialEq)] +pub enum WatcherErrorKind { + ConsumerError, + Other, +} + +impl Display for WatcherError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.kind { + WatcherErrorKind::ConsumerError => { + write!(f, "watcher consumer error: {}", self.format_source()) + } + WatcherErrorKind::Other => write!(f, "watcher error: {}", self.format_source()), + } + } +} + +crate::error_impls!(WatcherError, WatcherErrorKind); + +impl From for WatcherError { + fn from(err: OrderedError) -> Self { + WatcherError::with_source(WatcherErrorKind::ConsumerError, err) + } +} + +pub type ListerError = WatcherError; +pub type ListerErrorKind = WatcherErrorKind; diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index 8c67d2a5b..4885ff134 100644 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -1844,6 +1844,7 @@ impl From for ConsumerError { #[derive(Debug, PartialEq, Clone)] pub enum ConsumerErrorKind { + //TODO: get last should have timeout, which should be mapped here. TimedOut, RequestFailed, InvalidConsumerType,