Skip to content

Commit

Permalink
chore: clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed May 14, 2024
1 parent 7798ddb commit b832893
Show file tree
Hide file tree
Showing 11 changed files with 32 additions and 24 deletions.
3 changes: 1 addition & 2 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ async fn create_manifest(
.ok_or(IOError::new(
ErrorKind::Other,
"Failed to create upper bound for manifest",
))
.map_err(ObjectStorageError::IoError)?,
))?,
)
.and_utc();

Expand Down
9 changes: 2 additions & 7 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ impl FileWriter {
) -> Result<(), StreamWriterError> {
match self.get_mut(schema_key) {
Some(writer) => {
writer
.writer
.write(record)
.map_err(StreamWriterError::Writer)?;
writer.writer.write(record)?;
}
// entry is not present thus we create it
None => {
Expand Down Expand Up @@ -92,8 +89,6 @@ fn init_new_stream_writer_file(
let mut stream_writer = StreamWriter::try_new(file, &record.schema())
.expect("File and RecordBatch both are checked");

stream_writer
.write(record)
.map_err(StreamWriterError::Writer)?;
stream_writer.write(record)?;
Ok((path, stream_writer))
}
4 changes: 2 additions & 2 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,12 +407,12 @@ pub async fn get_cluster_metrics() -> Result<impl Responder, PostError> {
.await;

if let Ok(res) = res {
let text = res.text().await.map_err(PostError::NetworkError)?;
let text = res.text().await?;
let lines: Vec<Result<String, std::io::Error>> =
text.lines().map(|line| Ok(line.to_owned())).collect_vec();

let sample = prometheus_parse::Scrape::parse(lines.into_iter())
.map_err(|err| PostError::CustomError(err.to_string()))?
.map_err(PostError::Error)?
.samples;

dresses.push(Metrics::from_prometheus_samples(
Expand Down
6 changes: 5 additions & 1 deletion server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
let object_store_format = glob_storage
.get_object_store_format(&stream_name)
.await
.map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?;
.map_err(|_| PostError::StreamNotFound(stream_name.clone()))?;

let time_partition = object_store_format.time_partition;
let time_partition_limit = object_store_format.time_partition_limit;
Expand Down Expand Up @@ -265,6 +265,9 @@ pub enum PostError {
#[error("{0}")]
CreateStream(#[from] CreateStreamError),
#[error("Error: {0}")]
Error(std::io::Error),
#[allow(unused)]
#[error("Error: {0}")]
CustomError(String),
#[error("Error: {0}")]
NetworkError(#[from] reqwest::Error),
Expand Down Expand Up @@ -293,6 +296,7 @@ impl actix_web::ResponseError for PostError {
PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::Error(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down
8 changes: 7 additions & 1 deletion server/src/handlers/http/users/dashboards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub async fn delete(req: HttpRequest) -> Result<HttpResponse, PostError> {
// version: String,
// name: String,
// id: String,
// time-filter: `type_not_defined`
// time_filter: TimeFilter
// refresh_interval: u64,
// pannels: Vec<Pannel>,
// }
Expand All @@ -112,6 +112,12 @@ pub async fn delete(req: HttpRequest) -> Result<HttpResponse, PostError> {
// headers: Vec<String>,
// dimensions: (u64, u64),
// }
//
// #[derive(Debug, Serialize, Deserialize)]
// pub struct TimeFilter {
// to: String,
// from: String
// }

#[derive(Debug, thiserror::Error)]
pub enum DashboardError {
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/http/users/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,5 @@ impl actix_web::ResponseError for FiltersError {
// stream_name: String,
// filter_name: String,
// query: String,
// time-filter: `type_not_defined`
// time-filter: TimeFilter
// }
1 change: 0 additions & 1 deletion server/src/query/listing_table_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ impl ListingTableBuilder {
})
.try_collect()
.await
// TODO: make the err map better
.map_err(|err| DataFusionError::External(Box::new(err)))?;

let mut res = res.into_iter().flatten().collect_vec();
Expand Down
4 changes: 2 additions & 2 deletions server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ async fn collect_from_snapshot(
.map(|item| item.manifest_path)
.collect(),
)
.await
.map_err(DataFusionError::ObjectStore)?;
.await?;

let mut manifest_files: Vec<_> = manifest_files
.into_iter()
.flat_map(|file| file.files)
Expand Down
4 changes: 2 additions & 2 deletions server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ impl QueryResponse {
pub fn to_http(&self) -> Result<impl Responder, QueryError> {
log::info!("{}", "Returning query results");
let records: Vec<&RecordBatch> = self.records.iter().collect();
let mut json_records = record_batches_to_json(&records)
.map_err(|err| QueryError::JsonParse(err.to_string()))?;
let mut json_records = record_batches_to_json(&records)?;

if self.fill_null {
for map in &mut json_records {
for field in &self.fields {
Expand Down
6 changes: 5 additions & 1 deletion server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*
*/

use crate::{catalog::snapshot::Snapshot, stats::Stats};
use crate::{
catalog::snapshot::Snapshot, metadata::error::stream_info::MetadataError, stats::Stats,
};

use chrono::Local;

Expand Down Expand Up @@ -204,6 +206,8 @@ pub enum ObjectStorageError {
UnhandledError(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("Error: {0}")]
PathError(relative_path::FromPathError),
#[error("Error: {0}")]
MetadataError(#[from] MetadataError),

#[allow(dead_code)]
#[error("Authentication Error: {0}")]
Expand Down
9 changes: 5 additions & 4 deletions server/src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
overwrite_staging = true;
if CONFIG.parseable.mode == Mode::All {
standalone_after_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here"))
.map_err(|err| {
ObjectStorageError::Custom(err.to_string())
})?;
?;
}
Ok(metadata)
},
Expand All @@ -128,7 +126,10 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
EnvChange::NewStaging(mut metadata) => {
// if server is started in ingest mode,we need to make sure that query mode has been started
// i.e the metadata is updated to reflect the server mode = Query
if Mode::from_string(&metadata.server_mode).map_err(ObjectStorageError::Custom)? == Mode::All && CONFIG.parseable.mode == Mode::Ingest {
if Mode::from_string(&metadata.server_mode)
.map_err(ObjectStorageError::Custom)
?
== Mode::All && CONFIG.parseable.mode == Mode::Ingest {
Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet")
} else {
create_dir_all(CONFIG.staging_dir())?;
Expand Down

0 comments on commit b832893

Please sign in to comment.