Skip to content

Commit

Permalink
delete invalid parquet and arrow files (#835)
Browse files Browse the repository at this point in the history
include file name and stream name to the error log

Fixes: #834
  • Loading branch information
nikhilsinhaparseable authored Jun 28, 2024
1 parent 47eb08d commit 28e3ede
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl StorageDir {
pub fn arrow_files_grouped_exclude_time(
&self,
exclude: NaiveDateTime,
stream: &str,
) -> HashMap<PathBuf, Vec<PathBuf>> {
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
let mut arrow_files = self.arrow_files();
Expand All @@ -170,8 +171,9 @@ impl StorageDir {
for arrow_file_path in arrow_files {
if arrow_file_path.metadata().unwrap().len() == 0 {
log::error!(
"Invalid arrow file detected, removing it: {:?}",
arrow_file_path
"Invalid arrow file {:?} detected for stream {}, removing it",
&arrow_file_path,
stream
);
fs::remove_file(&arrow_file_path).unwrap();
} else {
Expand Down Expand Up @@ -225,7 +227,7 @@ pub fn convert_disk_files_to_parquet(
let mut schemas = Vec::new();

let time = chrono::Utc::now().naive_utc();
let staging_files = dir.arrow_files_grouped_exclude_time(time);
let staging_files = dir.arrow_files_grouped_exclude_time(time, stream);
if staging_files.is_empty() {
metrics::STAGING_FILES.with_label_values(&[stream]).set(0);
metrics::STORAGE_SIZE
Expand Down Expand Up @@ -279,7 +281,11 @@ pub fn convert_disk_files_to_parquet(

writer.close()?;
if parquet_file.metadata().unwrap().len() == 0 {
log::error!("Invalid parquet file detected, removing it");
log::error!(
"Invalid parquet file {:?} detected for stream {}, removing it",
&parquet_path,
stream
);
fs::remove_file(parquet_path).unwrap();
} else {
for file in files {
Expand Down

0 comments on commit 28e3ede

Please sign in to comment.