From 19c504770c957822f32704b5de6f57dd47364fcf Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 26 Jun 2024 12:07:20 +0530 Subject: [PATCH] fix for invalid parquet issue detect the 0 sized parquet log the error and delete the parquet and retain the grouped arrow files the arrow files will be converted to parquet in the next sync cycle (every 60 secs) --- server/src/storage/staging.rs | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 82c819082..2f0f9022f 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -255,34 +255,36 @@ pub fn convert_disk_files_to_parquet( custom_partition_fields.insert(custom_partition_field.to_string(), index); } } - let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; let props = parquet_writer_props( time_partition.clone(), index_time_partition, custom_partition_fields, ) .build(); - schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema); - let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; + let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; + let mut writer = ArrowWriter::try_new(&parquet_file, schema.clone(), Some(props))?; for ref record in record_reader.merged_iter(schema, time_partition.clone()) { writer.write(record)?; } writer.close()?; - - for file in files { - let file_size = file.metadata().unwrap().len(); - let file_type = file.extension().unwrap().to_str().unwrap(); - - if fs::remove_file(file.clone()).is_err() { - log::error!("Failed to delete file. Unstable state"); - process::abort() + if parquet_file.metadata().unwrap().len() == 0 { + log::error!("Invalid parquet file detected, removing it"); + fs::remove_file(parquet_path).unwrap(); + } else { + for file in files { + let file_size = file.metadata().unwrap().len(); + let file_type = file.extension().unwrap().to_str().unwrap(); + if fs::remove_file(file.clone()).is_err() { + log::error!("Failed to delete file. Unstable state"); + process::abort() + } + metrics::STORAGE_SIZE + .with_label_values(&["staging", stream, file_type]) + .sub(file_size as i64); } - metrics::STORAGE_SIZE - .with_label_values(&["staging", stream, file_type]) - .sub(file_size as i64); } }