Skip to content

Commit

Permalink
Parquet: Verify 32-bit CRC checksum when decoding pages (#6290)
Browse files Browse the repository at this point in the history
* Parquet: Verify 32-bit CRC checksum when decoding pages

* Undo cargo toml

* a

* enable crc by default

* a

* Address comments

* Add tests that verify crc checks

* Document feature flag

* Move documentation around

* Update parquet/Cargo.toml

Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com>

* Update parquet/src/file/serialized_reader.rs

Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com>

* Add license

* Run cargo +stable fmt --all

* Revert MD034

* Applye readme suggestion

---------

Co-authored-by: xmakro <makro@>
Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com>
  • Loading branch information
xmakro and etseidl authored Sep 28, 2024
1 parent ebcc4a5 commit 8e0aaad
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 1 deletion.
3 changes: 3 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] }
crc32fast = { version = "1.4.2", optional = true, default-features = false }

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -117,6 +118,8 @@ object_store = ["dep:object_store", "async"]
zstd = ["dep:zstd", "zstd-sys"]
# Display memory in example/write_parquet.rs
sysinfo = ["dep:sysinfo"]
# Verify 32-bit CRC checksum when decoding parquet pages
crc = ["dep:crc32fast"]

[[example]]
name = "read_parquet"
Expand Down
3 changes: 2 additions & 1 deletion parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ The `parquet` crate provides the following features which may be enabled in your
- `zstd` (default) - support for parquet using `zstd` compression
- `snap` (default) - support for parquet using `snappy` compression
- `cli` - parquet [CLI tools](https://github.com/apache/arrow-rs/tree/master/parquet/src/bin)
- `crc` - enables functionality to automatically verify checksums of each page (if present) when decoding
- `experimental` - Experimental APIs which may change, even between minor releases

## Parquet Feature Status
Expand All @@ -82,4 +83,4 @@ The `parquet` crate provides the following features which may be enabled in your

## License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0.
Licensed under the Apache License, Version 2.0: <http://www.apache.org/licenses/LICENSE-2.0>.
9 changes: 9 additions & 0 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,15 @@ pub(crate) fn decode_page(
physical_type: Type,
decompressor: Option<&mut Box<dyn Codec>>,
) -> Result<Page> {
// Verify the 32-bit CRC checksum of the page
#[cfg(feature = "crc")]
if let Some(expected_crc) = page_header.crc {
let crc = crc32fast::hash(&buffer);
if crc != expected_crc as u32 {
return Err(general_err!("Page CRC checksum mismatch"));
}
}

// When processing data page v2, depending on enabled compression for the
// page, we should account for uncompressed data ('offset') of
// repetition and definition levels.
Expand Down
73 changes: 73 additions & 0 deletions parquet/tests/arrow_reader/checksum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This file contains an end to end test for verifying checksums when reading parquet files.

use std::path::PathBuf;

use arrow::util::test_util::parquet_test_data;
use parquet::arrow::arrow_reader::ArrowReaderBuilder;

#[test]
fn test_datapage_v1_corrupt_checksum() {
let errors = read_file_batch_errors("datapage_v1-corrupt-checksum.parquet");
assert_eq!(errors, [
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Ok(()),
Ok(()),
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Err("Parquet argument error: Parquet error: Not all children array length are the same!".to_string())
]);
}

#[test]
fn test_datapage_v1_uncompressed_checksum() {
let errors = read_file_batch_errors("datapage_v1-uncompressed-checksum.parquet");
assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]);
}

#[test]
fn test_datapage_v1_snappy_compressed_checksum() {
let errors = read_file_batch_errors("datapage_v1-snappy-compressed-checksum.parquet");
assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]);
}

#[test]
fn test_plain_dict_uncompressed_checksum() {
let errors = read_file_batch_errors("plain-dict-uncompressed-checksum.parquet");
assert_eq!(errors, [Ok(())]);
}
#[test]
fn test_rle_dict_snappy_checksum() {
let errors = read_file_batch_errors("rle-dict-snappy-checksum.parquet");
assert_eq!(errors, [Ok(())]);
}

/// Reads a file and returns a vector with one element per record batch.
/// The record batch data is replaced with () and errors are stringified.
fn read_file_batch_errors(name: &str) -> Vec<Result<(), String>> {
let path = PathBuf::from(parquet_test_data()).join(name);
println!("Reading file: {:?}", path);
let file = std::fs::File::open(&path).unwrap();
let reader = ArrowReaderBuilder::try_new(file).unwrap().build().unwrap();
reader
.map(|x| match x {
Ok(_) => Ok(()),
Err(e) => Err(e.to_string()),
})
.collect()
}
2 changes: 2 additions & 0 deletions parquet/tests/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use std::sync::Arc;
use tempfile::NamedTempFile;

mod bad_data;
#[cfg(feature = "crc")]
mod checksum;
mod statistics;

// returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values
Expand Down

0 comments on commit 8e0aaad

Please sign in to comment.