Skip to content

Commit

Permalink
fix for invalid parquet issue (#829)
Browse files Browse the repository at this point in the history
Detect the 0 sized parquet,
log the error and delete the parquet and retain the 
grouped arrow files. The arrow files will be converted 
to parquet in the next sync cycle (every 60 secs)
  • Loading branch information
nikhilsinhaparseable authored Jun 27, 2024
1 parent 4c0d2a8 commit 7f7d0f2
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,34 +255,36 @@ pub fn convert_disk_files_to_parquet(
custom_partition_fields.insert(custom_partition_field.to_string(), index);
}
}
let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
let props = parquet_writer_props(
time_partition.clone(),
index_time_partition,
custom_partition_fields,
)
.build();

schemas.push(merged_schema.clone());
let schema = Arc::new(merged_schema);
let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?;
let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
let mut writer = ArrowWriter::try_new(&parquet_file, schema.clone(), Some(props))?;
for ref record in record_reader.merged_iter(schema, time_partition.clone()) {
writer.write(record)?;
}

writer.close()?;

for file in files {
let file_size = file.metadata().unwrap().len();
let file_type = file.extension().unwrap().to_str().unwrap();

if fs::remove_file(file.clone()).is_err() {
log::error!("Failed to delete file. Unstable state");
process::abort()
if parquet_file.metadata().unwrap().len() == 0 {
log::error!("Invalid parquet file detected, removing it");
fs::remove_file(parquet_path).unwrap();
} else {
for file in files {
let file_size = file.metadata().unwrap().len();
let file_type = file.extension().unwrap().to_str().unwrap();
if fs::remove_file(file.clone()).is_err() {
log::error!("Failed to delete file. Unstable state");
process::abort()
}
metrics::STORAGE_SIZE
.with_label_values(&["staging", stream, file_type])
.sub(file_size as i64);
}
metrics::STORAGE_SIZE
.with_label_values(&["staging", stream, file_type])
.sub(file_size as i64);
}
}

Expand Down

0 comments on commit 7f7d0f2

Please sign in to comment.