diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 45332b9a2..9a9bcf827 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -154,6 +154,7 @@ impl StorageDir { pub fn arrow_files_grouped_exclude_time( &self, exclude: NaiveDateTime, + stream: &str, ) -> HashMap> { let mut grouped_arrow_file: HashMap> = HashMap::new(); let mut arrow_files = self.arrow_files(); @@ -170,8 +171,9 @@ impl StorageDir { for arrow_file_path in arrow_files { if arrow_file_path.metadata().unwrap().len() == 0 { log::error!( - "Invalid arrow file detected, removing it: {:?}", - arrow_file_path + "Invalid arrow file {:?} detected for stream {}, removing it", + &arrow_file_path, + stream ); fs::remove_file(&arrow_file_path).unwrap(); } else { @@ -225,7 +227,7 @@ pub fn convert_disk_files_to_parquet( let mut schemas = Vec::new(); let time = chrono::Utc::now().naive_utc(); - let staging_files = dir.arrow_files_grouped_exclude_time(time); + let staging_files = dir.arrow_files_grouped_exclude_time(time, stream); if staging_files.is_empty() { metrics::STAGING_FILES.with_label_values(&[stream]).set(0); metrics::STORAGE_SIZE @@ -279,7 +281,11 @@ pub fn convert_disk_files_to_parquet( writer.close()?; if parquet_file.metadata().unwrap().len() == 0 { - log::error!("Invalid parquet file detected, removing it"); + log::error!( + "Invalid parquet file {:?} detected for stream {}, removing it", + &parquet_path, + stream + ); fs::remove_file(parquet_path).unwrap(); } else { for file in files {