Skip to content

Commit

Permalink
cleanup map_err
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed Mar 28, 2024
1 parent 5e124da commit ff584ea
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 64 deletions.
57 changes: 10 additions & 47 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ use super::modal::IngesterMetadata;
pub async fn sync_streams_with_ingesters(stream_name: &str) -> Result<(), StreamError> {
let ingester_infos = get_ingester_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingester info: {:?}", err);
StreamError::Custom {
msg: format!("failed to get ingester info\n{:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Anyhow(err)
})?;

let mut errored = false;
Expand Down Expand Up @@ -96,10 +93,7 @@ pub async fn fetch_stats_from_ingesters(

let ingester_infos = get_ingester_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingester info: {:?}", err);
StreamError::Custom {
msg: format!("failed to get ingester info\n{:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Anyhow(err)
})?;

for ingester in ingester_infos {
Expand Down Expand Up @@ -163,13 +157,7 @@ async fn send_stream_sync_request(
ingester.domain_name,
err
);
StreamError::Custom {
msg: format!(
"failed to forward create stream request to ingester: {}\n Error: {:?}",
ingester.domain_name, err
),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Network(err)
})?;

if !res.status().is_success() {
Expand All @@ -178,14 +166,7 @@ async fn send_stream_sync_request(
ingester.domain_name,
res
);
return Err(StreamError::Custom {
msg: format!(
"failed to forward create stream request to ingester: {}\nResponse Returned: {:?}",
ingester.domain_name,
res.text().await.unwrap_or_default()
),
status: StatusCode::INTERNAL_SERVER_ERROR,
});
return Err(StreamError::Network(res.error_for_status().unwrap_err()));
}

Ok(())
Expand Down Expand Up @@ -214,13 +195,7 @@ async fn send_stream_rollback_request(
ingester.domain_name,
err
);
StreamError::Custom {
msg: format!(
"failed to rollback stream creation: {}\n Error: {:?}",
ingester.domain_name, err
),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Network(err)
})?;

// if the response is not successful, log the error and return a custom error
Expand All @@ -247,10 +222,7 @@ async fn send_stream_rollback_request(
pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
let ingester_infos = get_ingester_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingester info: {:?}", err);
StreamError::Custom {
msg: format!("failed to get ingester info\n{:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Anyhow(err)
})?;

let mut infos = vec![];
Expand All @@ -275,19 +247,13 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {

let resp_data = resp.bytes().await.map_err(|err| {
log::error!("Fatal: failed to parse ingester info to bytes: {:?}", err);
StreamError::Custom {
msg: format!("failed to parse ingester info to bytes: {:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Network(err)
})?;

let sp = serde_json::from_slice::<JsonValue>(&resp_data)
.map_err(|err| {
log::error!("Fatal: failed to parse ingester info: {:?}", err);
StreamError::Custom {
msg: format!("failed to parse ingester info: {:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::ResponseError(err)
})?
.get("staging")
.unwrap()
Expand Down Expand Up @@ -321,7 +287,7 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
let ingester_metadata = get_ingester_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingester info: {:?}", err);
PostError::CustomError(err.to_string())
PostError::Invalid(err)
})?;

let mut dresses = vec![];
Expand All @@ -341,10 +307,7 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
.await;

if let Ok(res) = res {
let text = res
.text()
.await
.map_err(|err| PostError::CustomError(err.to_string()))?;
let text = res.text().await.map_err(PostError::NetworkError)?;
let lines: Vec<Result<String, std::io::Error>> =
text.lines().map(|line| Ok(line.to_owned())).collect_vec();

Expand Down
12 changes: 3 additions & 9 deletions server/src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
.iter()
.map(|x| x.creation_time.parse::<DateTime<Utc>>().unwrap())
.min()
.unwrap(); // should never be None
.unwrap(); // should never be None

// get the stream name
let stream_name = stats[0].stream.clone();
Expand All @@ -138,7 +138,7 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
None => Utc::now(), // current time ie the max time
})
.min()
.unwrap(); // should never be None
.unwrap(); // should never be None

let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);

Expand Down Expand Up @@ -222,13 +222,7 @@ pub async fn send_stats_request(
err
);

StreamError::Custom {
msg: format!(
"failed to fetch stats from ingester: {}\n Error: {:?}",
ingester.domain_name, err
),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
StreamError::Network(err)
})?;

if !res.status().is_success() {
Expand Down
3 changes: 3 additions & 0 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ pub enum PostError {
CreateStream(#[from] CreateStreamError),
#[error("Error: {0}")]
CustomError(String),
#[error("Error: {0}")]
NetworkError(#[from] reqwest::Error),
}

impl actix_web::ResponseError for PostError {
Expand All @@ -187,6 +189,7 @@ impl actix_web::ResponseError for PostError {
PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::StreamNotFound(_) => StatusCode::NOT_FOUND,
PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down
22 changes: 22 additions & 0 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,15 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
Ok(())
}

fn classify_json_error(kind: serde_json::error::Category) -> StatusCode {
match kind {
serde_json::error::Category::Io => StatusCode::INTERNAL_SERVER_ERROR,
serde_json::error::Category::Syntax => StatusCode::BAD_REQUEST,
serde_json::error::Category::Data => StatusCode::INTERNAL_SERVER_ERROR,
serde_json::error::Category::Eof => StatusCode::BAD_REQUEST,
}
}

pub mod error {

use actix_web::http::header::ContentType;
Expand All @@ -402,6 +411,8 @@ pub mod error {
validator::error::{AlertValidationError, StreamNameValidationError},
};

use super::classify_json_error;

#[derive(Debug, thiserror::Error)]
pub enum CreateStreamError {
#[error("Stream name validation failed due to {0}")]
Expand Down Expand Up @@ -446,6 +457,12 @@ pub mod error {
InvalidRetentionConfig(serde_json::Error),
#[error("{msg}")]
Custom { msg: String, status: StatusCode },
#[error("Error: {0}")]
Anyhow(#[from] anyhow::Error),
#[error("Network Error: {0}")]
Network(#[from] reqwest::Error),
#[error("Error: {0}")]
ResponseError(#[from] serde_json::Error),
}

impl actix_web::ResponseError for StreamError {
Expand All @@ -468,6 +485,11 @@ pub mod error {
StreamError::InvalidAlert(_) => StatusCode::BAD_REQUEST,
StreamError::InvalidAlertMessage(_, _) => StatusCode::BAD_REQUEST,
StreamError::InvalidRetentionConfig(_) => StatusCode::BAD_REQUEST,
StreamError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
StreamError::Network(err) => {
err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
}
StreamError::ResponseError(err) => classify_json_error(err.classify()),
}
}

Expand Down
13 changes: 8 additions & 5 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;

use crate::event::commit_schema;
Expand All @@ -41,6 +42,7 @@ use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use crate::response::QueryResponse;
use crate::storage::object_storage::commit_schema_to_storage;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;

use super::send_query_request_to_ingester;
Expand Down Expand Up @@ -76,11 +78,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
if let Ok(new_schema) = fetch_schema(&table_name).await {
commit_schema_to_storage(&table_name, new_schema.clone())
.await
.map_err(|err| {
QueryError::Custom(format!("Error committing schema to storage\nError:{err}"))
})?;
commit_schema(&table_name, Arc::new(new_schema))
.map_err(|err| QueryError::Custom(format!("Error committing schema: {}", err)))?;
.map_err(QueryError::ObjectStorage)?;
commit_schema(&table_name, Arc::new(new_schema)).map_err(QueryError::EventError)?;
}
}

Expand Down Expand Up @@ -291,6 +290,10 @@ pub enum QueryError {
Execute(#[from] ExecuteError),
#[error("Error: {0}")]
Custom(String),
#[error("ObjectStorage Error: {0}")]
ObjectStorage(#[from] ObjectStorageError),
#[error("Evern Error: {0}")]
EventError(#[from] EventError),
}

impl actix_web::ResponseError for QueryError {
Expand Down
2 changes: 2 additions & 0 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ pub enum ObjectStorageError {

#[error("Unhandled Error: {0}")]
UnhandledError(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("Error: {0}")]
PathError(relative_path::FromPathError),

#[allow(dead_code)]
#[error("Authentication Error: {0}")]
Expand Down
5 changes: 2 additions & 3 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,8 @@ impl ObjectStorage for S3 {

let byts = self
.get_object(
RelativePath::from_path(meta.location.as_ref()).map_err(|err| {
ObjectStorageError::Custom(format!("Error while getting files: {:}", err))
})?,
RelativePath::from_path(meta.location.as_ref())
.map_err(ObjectStorageError::PathError)?,
)
.await?;

Expand Down

0 comments on commit ff584ea

Please sign in to comment.