From b8328933b10372f5b2551ee7ba79ef796425308e Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 14 May 2024 17:04:30 +0530 Subject: [PATCH] chore: clean up --- server/src/catalog.rs | 3 +-- server/src/event/writer/file_writer.rs | 9 ++------- server/src/handlers/http/cluster/mod.rs | 4 ++-- server/src/handlers/http/ingest.rs | 6 +++++- server/src/handlers/http/users/dashboards.rs | 8 +++++++- server/src/handlers/http/users/filters.rs | 2 +- server/src/query/listing_table_builder.rs | 1 - server/src/query/stream_schema_provider.rs | 4 ++-- server/src/response.rs | 4 ++-- server/src/storage.rs | 6 +++++- server/src/storage/store_metadata.rs | 9 +++++---- 11 files changed, 32 insertions(+), 24 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index eb1237727..2747e433e 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -194,8 +194,7 @@ async fn create_manifest( .ok_or(IOError::new( ErrorKind::Other, "Failed to create upper bound for manifest", - )) - .map_err(ObjectStorageError::IoError)?, + ))?, ) .and_utc(); diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 1b193eb4c..f16656b7d 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -47,10 +47,7 @@ impl FileWriter { ) -> Result<(), StreamWriterError> { match self.get_mut(schema_key) { Some(writer) => { - writer - .writer - .write(record) - .map_err(StreamWriterError::Writer)?; + writer.writer.write(record)?; } // entry is not present thus we create it None => { @@ -92,8 +89,6 @@ fn init_new_stream_writer_file( let mut stream_writer = StreamWriter::try_new(file, &record.schema()) .expect("File and RecordBatch both are checked"); - stream_writer - .write(record) - .map_err(StreamWriterError::Writer)?; + stream_writer.write(record)?; Ok((path, stream_writer)) } diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 88c563251..bbb21d535 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -407,12 +407,12 @@ pub async fn get_cluster_metrics() -> Result { .await; if let Ok(res) = res { - let text = res.text().await.map_err(PostError::NetworkError)?; + let text = res.text().await?; let lines: Vec> = text.lines().map(|line| Ok(line.to_owned())).collect_vec(); let sample = prometheus_parse::Scrape::parse(lines.into_iter()) - .map_err(|err| PostError::CustomError(err.to_string()))? + .map_err(PostError::Error)? .samples; dresses.push(Metrics::from_prometheus_samples( diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index c6ec59152..43e33d0e6 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -103,7 +103,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result let object_store_format = glob_storage .get_object_store_format(&stream_name) .await - .map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?; + .map_err(|_| PostError::StreamNotFound(stream_name.clone()))?; let time_partition = object_store_format.time_partition; let time_partition_limit = object_store_format.time_partition_limit; @@ -265,6 +265,9 @@ pub enum PostError { #[error("{0}")] CreateStream(#[from] CreateStreamError), #[error("Error: {0}")] + Error(std::io::Error), + #[allow(unused)] + #[error("Error: {0}")] CustomError(String), #[error("Error: {0}")] NetworkError(#[from] reqwest::Error), @@ -293,6 +296,7 @@ impl actix_web::ResponseError for PostError { PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::Error(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/users/dashboards.rs b/server/src/handlers/http/users/dashboards.rs index 2478b1dfa..d33b1600e 100644 --- a/server/src/handlers/http/users/dashboards.rs +++ b/server/src/handlers/http/users/dashboards.rs @@ -98,7 +98,7 @@ pub async fn delete(req: HttpRequest) -> Result { // version: String, // name: String, // id: String, -// time-filter: `type_not_defined` +// time_filter: TimeFilter // refresh_interval: u64, // pannels: Vec, // } @@ -112,6 +112,12 @@ pub async fn delete(req: HttpRequest) -> Result { // headers: Vec, // dimensions: (u64, u64), // } +// +// #[derive(Debug, Serialize, Deserialize)] +// pub struct TimeFilter { +// to: String, +// from: String +// } #[derive(Debug, thiserror::Error)] pub enum DashboardError { diff --git a/server/src/handlers/http/users/filters.rs b/server/src/handlers/http/users/filters.rs index cc5225e03..2e4c4d1f3 100644 --- a/server/src/handlers/http/users/filters.rs +++ b/server/src/handlers/http/users/filters.rs @@ -161,5 +161,5 @@ impl actix_web::ResponseError for FiltersError { // stream_name: String, // filter_name: String, // query: String, -// time-filter: `type_not_defined` +// time-filter: TimeFilter // } diff --git a/server/src/query/listing_table_builder.rs b/server/src/query/listing_table_builder.rs index ee5b8c0d1..278ed5783 100644 --- a/server/src/query/listing_table_builder.rs +++ b/server/src/query/listing_table_builder.rs @@ -167,7 +167,6 @@ impl ListingTableBuilder { }) .try_collect() .await - // TODO: make the err map better .map_err(|err| DataFusionError::External(Box::new(err)))?; let mut res = res.into_iter().flatten().collect_vec(); diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 43e58ce3a..7ea67c3c3 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -180,8 +180,8 @@ async fn collect_from_snapshot( .map(|item| item.manifest_path) .collect(), ) - .await - .map_err(DataFusionError::ObjectStore)?; + .await?; + let mut manifest_files: Vec<_> = manifest_files .into_iter() .flat_map(|file| file.files) diff --git a/server/src/response.rs b/server/src/response.rs index 0f5deb5ec..6ea07bda4 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -33,8 +33,8 @@ impl QueryResponse { pub fn to_http(&self) -> Result { log::info!("{}", "Returning query results"); let records: Vec<&RecordBatch> = self.records.iter().collect(); - let mut json_records = record_batches_to_json(&records) - .map_err(|err| QueryError::JsonParse(err.to_string()))?; + let mut json_records = record_batches_to_json(&records)?; + if self.fill_null { for map in &mut json_records { for field in &self.fields { diff --git a/server/src/storage.rs b/server/src/storage.rs index 3de62ef4f..a51868897 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,7 +16,9 @@ * */ -use crate::{catalog::snapshot::Snapshot, stats::Stats}; +use crate::{ + catalog::snapshot::Snapshot, metadata::error::stream_info::MetadataError, stats::Stats, +}; use chrono::Local; @@ -204,6 +206,8 @@ pub enum ObjectStorageError { UnhandledError(Box), #[error("Error: {0}")] PathError(relative_path::FromPathError), + #[error("Error: {0}")] + MetadataError(#[from] MetadataError), #[allow(dead_code)] #[error("Authentication Error: {0}")] diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 44ae55868..d3ecd4040 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -116,9 +116,7 @@ pub async fn resolve_parseable_metadata() -> Result Result { // if server is started in ingest mode,we need to make sure that query mode has been started // i.e the metadata is updated to reflect the server mode = Query - if Mode::from_string(&metadata.server_mode).map_err(ObjectStorageError::Custom)? == Mode::All && CONFIG.parseable.mode == Mode::Ingest { + if Mode::from_string(&metadata.server_mode) + .map_err(ObjectStorageError::Custom) + ? + == Mode::All && CONFIG.parseable.mode == Mode::Ingest { Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet") } else { create_dir_all(CONFIG.staging_dir())?;