Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write null counts in parquet files when they are present #6257

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,8 @@ pub struct StatisticsConverter<'a> {
parquet_column_index: Option<usize>,
/// The field (with data type) of the column in the Arrow schema
arrow_field: &'a Field,
/// treat missing null_counts as 0 nulls
missing_null_counts_as_zero: bool,
}

impl<'a> StatisticsConverter<'a> {
Expand All @@ -1195,6 +1197,23 @@ impl<'a> StatisticsConverter<'a> {
self.arrow_field
}

/// Set the statistics converter to treat missing null counts as missing
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default reading null counts will work with files written with older versions of parquet-rs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For breaking / major release, is it acceptable to include an upgrade instruction of "add this config to maintain to old behavior"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely

It is important to note that the default behavior in this PR is the old behavior (in other words there should be changes needed in downstream consumers of this code)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default in this PR is missing_null_counts_as_zero = true, which maintains the old behavior, right?

If "add this config to maintain old behavior" is acceptable for a breaking release, then I would expect the default to be the new behavior.

IOW, I'd expect what you said on the parquet mailing list

Applications that use parquet-rs to read parquet_files and interpret the
null_count will need to be changed after the upgrade to explicitly continue
the old behavior of "treat no null_count as 0" which is also documented
now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default in this PR is missing_null_counts_as_zero = true, which maintains the old behavior, right?

Yes

then I would expect the default to be the new behavior.

My thinking was

Since there are two different apis:

Statistics::null_count would now return Option<..> so users of the library will ned to update their code anyways and thus can choose at that time which behavior they wanted

StatisticsConverter's API didn't change and thus it keeps the previous behavior. This is what I would persoanlly want for a system -- no change for reading parquet files that were written with previous versions of the rust writer.

///
/// By default, the converter will treat missing null counts as though
/// the null count is known to be `0`.
///
/// Note that due to <https://github.com/apache/arrow-rs/pull/6257>, prior
/// to version 53.0.0, parquet files written by parquet-rs did not store
/// null counts even when it was known there were zero nulls and the reader
/// would return 0 for the null counts.
///
/// Both parquet-java and parquet-cpp store null counts as 0 when there are
/// no nulls, and don't write unknown values to the null count field.
pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
self.missing_null_counts_as_zero = missing_null_counts_as_zero;
self
}

/// Returns a [`UInt64Array`] with row counts for each row group
///
/// # Return Value
Expand Down Expand Up @@ -1288,6 +1307,7 @@ impl<'a> StatisticsConverter<'a> {
Ok(Self {
parquet_column_index: parquet_index,
arrow_field,
missing_null_counts_as_zero: true,
})
}

Expand Down Expand Up @@ -1386,7 +1406,15 @@ impl<'a> StatisticsConverter<'a> {
let null_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.and_then(|s| s.null_count_opt()));
.map(|s| {
s.and_then(|s| {
if self.missing_null_counts_as_zero {
Some(s.null_count_opt().unwrap_or(0))
} else {
s.null_count_opt()
}
})
});
Ok(UInt64Array::from_iter(null_counts))
}

Expand Down Expand Up @@ -1597,3 +1625,5 @@ impl<'a> StatisticsConverter<'a> {
new_null_array(data_type, num_row_groups)
}
}

// See tests in parquet/tests/arrow_reader/statistics.rs
160 changes: 136 additions & 24 deletions parquet/src/file/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,35 @@ pub fn from_thrift(
) -> Result<Option<Statistics>> {
Ok(match thrift_stats {
Some(stats) => {
// Number of nulls recorded, when it is not available, we just mark it as 0.
// TODO this should be `None` if there is no information about NULLS.
// see https://github.com/apache/arrow-rs/pull/6216/files
let null_count = stats.null_count.unwrap_or(0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the removal of this unwrap_or is what changes the semantics while reading


if null_count < 0 {
return Err(ParquetError::General(format!(
"Statistics null count is negative {}",
null_count
)));
}
// transform null count to u64
let null_count = stats
.null_count
.map(|null_count| {
if null_count < 0 {
return Err(ParquetError::General(format!(
"Statistics null count is negative {}",
null_count
)));
}
Ok(null_count as u64)
})
.transpose()?;

// Generic null count.
let null_count = Some(null_count as u64);
// Generic distinct count (count of distinct values occurring)
let distinct_count = stats.distinct_count.map(|value| value as u64);
let distinct_count = stats
.distinct_count
.map(|distinct_count| {
if distinct_count < 0 {
return Err(ParquetError::General(format!(
"Statistics distinct count is negative {}",
distinct_count
)));
}

Ok(distinct_count as u64)
})
.transpose()?;

// Whether or not statistics use deprecated min/max fields.
let old_format = stats.min_value.is_none() && stats.max_value.is_none();
// Generic min value as bytes.
Expand Down Expand Up @@ -244,20 +257,21 @@ pub fn from_thrift(
pub fn to_thrift(stats: Option<&Statistics>) -> Option<TStatistics> {
let stats = stats?;

// record null counts if greater than zero.
//
// TODO: This should be Some(0) if there are no nulls.
// see https://github.com/apache/arrow-rs/pull/6216/files
// record null count if it can fit in i64
let null_count = stats
.null_count_opt()
.map(|value| value as i64)
.filter(|&x| x > 0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removal of this filter is what fixes the statistics while writing

Maybe it was intended to be x >= 0 originally 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was intrigued, so went to do some code archaeology, but that filter was only introduced a few days ago in #6216, by you! 😄

For 6 years before it was:

null_count: if stats.has_nulls() {
    Some(stats.null_count() as 
} else {
    None
},

Your commit:
7d4e650

Prior code:
https://github.com/apache/arrow-rs/blame/25bfccca58ff219d9f59ba9f4d75550493238a4f/parquet/src/file/statistics.rs#L228-L242

.and_then(|value| i64::try_from(value).ok());

// record distinct count if it can fit in i64
let distinct_count = stats
.distinct_count()
.and_then(|value| i64::try_from(value).ok());

let mut thrift_stats = TStatistics {
max: None,
min: None,
null_count,
distinct_count: stats.distinct_count().map(|value| value as i64),
distinct_count,
max_value: None,
min_value: None,
is_max_value_exact: None,
Expand Down Expand Up @@ -404,9 +418,20 @@ impl Statistics {
/// Returns number of null values for the column, if known.
/// Note that this includes all nulls when column is part of the complex type.
///
/// Note this API returns Some(0) even if the null count was not present
/// in the statistics.
/// See <https://github.com/apache/arrow-rs/pull/6216/files>
/// Note: Versions of this library prior to `53.0.0` returned 0 if the null count was
/// not available. This method returns `None` in that case.
///
/// Also, versions of this library prior to `53.0.0` did not store the null count in the
/// statistics if the null count was `0`.
///
/// To preserve the prior behavior and read null counts properly from older files
/// you should default to zero:
///
/// ```no_run
/// # use parquet::file::statistics::Statistics;
/// # let statistics: Statistics = todo!();
/// let null_count = statistics.null_count_opt().unwrap_or(0);
/// ```
pub fn null_count_opt(&self) -> Option<u64> {
statistics_enum_func![self, null_count_opt]
}
Expand Down Expand Up @@ -1041,4 +1066,91 @@ mod tests {
true,
));
}

#[test]
fn test_count_encoding() {
statistics_count_test(None, None);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails like this without the changes in this PR:

assertion `left == right` failed
  left: Boolean({min: Some(true), max: Some(false), distinct_count: None, null_count: Some(0), ...
 right: Boolean({min: Some(true), max: Some(false), distinct_count: None, null_count: None, ...

statistics_count_test(Some(0), Some(0));
statistics_count_test(Some(100), Some(2000));
statistics_count_test(Some(1), None);
statistics_count_test(None, Some(1));
}

#[test]
fn test_count_encoding_distinct_too_large() {
// statistics are stored using i64, so test trying to store larger values
let statistics = make_bool_stats(Some(u64::MAX), Some(100));
let thrift_stats = to_thrift(Some(&statistics)).unwrap();
assert_eq!(thrift_stats.distinct_count, None); // can't store u64 max --> null
assert_eq!(thrift_stats.null_count, Some(100));
}

#[test]
fn test_count_encoding_null_too_large() {
// statistics are stored using i64, so test trying to store larger values
let statistics = make_bool_stats(Some(100), Some(u64::MAX));
let thrift_stats = to_thrift(Some(&statistics)).unwrap();
assert_eq!(thrift_stats.distinct_count, Some(100));
assert_eq!(thrift_stats.null_count, None); // can' store u64 max --> null
}

#[test]
fn test_count_decoding_distinct_invalid() {
let tstatistics = TStatistics {
distinct_count: Some(-42),
..Default::default()
};
let err = from_thrift(Type::BOOLEAN, Some(tstatistics)).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Statistics distinct count is negative -42"
);
}

#[test]
fn test_count_decoding_null_invalid() {
let tstatistics = TStatistics {
null_count: Some(-42),
..Default::default()
};
let err = from_thrift(Type::BOOLEAN, Some(tstatistics)).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Statistics null count is negative -42"
);
}

/// Writes statistics to thrift and reads them back and ensures:
/// - The statistics are the same
/// - The statistics written to thrift are the same as the original statistics
fn statistics_count_test(distinct_count: Option<u64>, null_count: Option<u64>) {
let statistics = make_bool_stats(distinct_count, null_count);

let thrift_stats = to_thrift(Some(&statistics)).unwrap();
assert_eq!(thrift_stats.null_count.map(|c| c as u64), null_count);
assert_eq!(
thrift_stats.distinct_count.map(|c| c as u64),
distinct_count
);

let round_tripped = from_thrift(Type::BOOLEAN, Some(thrift_stats))
.unwrap()
.unwrap();
assert_eq!(round_tripped, statistics);
}

fn make_bool_stats(distinct_count: Option<u64>, null_count: Option<u64>) -> Statistics {
let min = Some(true);
let max = Some(false);
let is_min_max_deprecated = false;

// test is about the counts, so we aren't really testing the min/max values
Statistics::Boolean(ValueStatistics::new(
min,
max,
distinct_count,
null_count,
is_min_max_deprecated,
))
}
}
67 changes: 64 additions & 3 deletions parquet/tests/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::default::Default;
use std::fs::File;
use std::sync::Arc;

use super::make_test_file_rg;
use super::{struct_array, Scenario};
use arrow::compute::kernels::cast_utils::Parser;
use arrow::datatypes::{
Expand All @@ -37,16 +38,17 @@ use arrow_array::{
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use half::f16;
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
use parquet::arrow::ArrowWriter;
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet::file::properties::{EnabledStatistics, WriterProperties};

use super::make_test_file_rg;
use parquet::file::statistics::{Statistics, ValueStatistics};
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};

#[derive(Debug, Default, Clone)]
struct Int64Case {
Expand Down Expand Up @@ -2139,6 +2141,65 @@ async fn test_missing_statistics() {
.run();
}

#[test]
fn missing_null_counts_as_zero() {
let min = None;
let max = None;
let distinct_count = None;
let null_count = None; // NB: no null count
let is_min_max_deprecated = false;
let stats = Statistics::Boolean(ValueStatistics::new(
min,
max,
distinct_count,
null_count,
is_min_max_deprecated,
));
let (arrow_schema, parquet_schema) = bool_arrow_and_parquet_schema();

let column_chunk = ColumnChunkMetaData::builder(parquet_schema.column(0))
.set_statistics(stats)
.build()
.unwrap();
let metadata = RowGroupMetaData::builder(parquet_schema.clone())
.set_column_metadata(vec![column_chunk])
.build()
.unwrap();

let converter = StatisticsConverter::try_new("b", &arrow_schema, &parquet_schema).unwrap();

// by default null count should be 0
assert_eq!(
converter.row_group_null_counts([&metadata]).unwrap(),
UInt64Array::from_iter(vec![Some(0)])
);

// if we disable missing null counts as zero flag null count will be None
let converter = converter.with_missing_null_counts_as_zero(false);
assert_eq!(
converter.row_group_null_counts([&metadata]).unwrap(),
UInt64Array::from_iter(vec![None])
);
}

/// return an Arrow schema and corresponding Parquet SchemaDescriptor for
/// a schema with a single boolean column "b"
fn bool_arrow_and_parquet_schema() -> (SchemaRef, SchemaDescPtr) {
let arrow_schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Boolean, true)]));
use parquet::schema::types::Type as ParquetType;
let parquet_schema = ParquetType::group_type_builder("schema")
.with_fields(vec![Arc::new(
ParquetType::primitive_type_builder("a", parquet::basic::Type::INT32)
.build()
.unwrap(),
)])
.build()
.unwrap();

let parquet_schema = Arc::new(SchemaDescriptor::new(Arc::new(parquet_schema)));
(arrow_schema, parquet_schema)
}

/////// NEGATIVE TESTS ///////
// column not found
#[tokio::test]
Expand Down
Loading
Loading