-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract parquet statistics to its own module, add tests #8294
Conversation
@@ -303,112 +298,6 @@ struct RowGroupPruningStatistics<'a> { | |||
parquet_schema: &'a Schema, | |||
} | |||
|
|||
/// Extract the min/max statistics from a `ParquetStatistics` object | |||
macro_rules! get_statistic { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This macro is moved, without modification, into statistics.rs
.find(&column.name) | ||
.map(|(_idx, field)| field)?; | ||
|
||
RowGoupStatisticsConverter::new(&field) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here is (eventually) to prune more than one row group at a time. However, this PR still does it one at a time
@@ -718,28 +719,6 @@ pub async fn plan_to_parquet( | |||
Ok(()) | |||
} | |||
|
|||
// Copy from the arrow-rs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to statistics.rs
/// * `$func is the function` (`min`/`max`) to call to get the value | ||
/// * `$bytes_func` is the function (`min_bytes`/`max_bytes`) to call to get the value as bytes | ||
/// * `$target_arrow_type` is the [`DataType`] of the target statistics | ||
macro_rules! get_statistic { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation leaves a lot to be desired, but I want to get tests in place before I start changing it
) -> Result<ArrayRef> { | ||
let mut row_group_meta_data = row_group_meta_data.into_iter().peekable(); | ||
|
||
// if it is empty, return empty array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this handling of empty iterators is new, to support the new array ref interface
62f91b6
to
d187e36
Compare
} | ||
} | ||
|
||
#[test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a bunch of tests for reading statistics out of existing files to document what the current behavior is.
Sadly, all of the example files in parquet_testing
appear to have a single row group
datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Outdated
Show resolved
Hide resolved
I would also be interested in opinions about potentially moving this implementation upstream into the parquet-rs eventally |
FYI @viirya @liukun4515 and @Ted-Jiang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, I think part of the confusion at the moment is that the current logic does not make a clear distinction between leaf and group columns. I think this will make it very hard to correctly handle parquet logical type mapping, etc... I would recommend making this explicit, e.g. by making the statistics conversion explicitly only handle leaf, i.e. non-nested columns as they appear in parquet, and then composing this into the arrow model at a higher level, e.g. within PruningStatistics.
datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Outdated
Show resolved
Hide resolved
datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Outdated
Show resolved
Hide resolved
datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Outdated
Show resolved
Hide resolved
.columns() | ||
.iter() | ||
.enumerate() | ||
.find(|(_idx, c)| c.column_descr().name() == self.field.name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside from being slow, this will be incorrect in the presence of nested fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added tests for this -- and I didn't find a bug 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a comment on how to see the bug
datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Outdated
Show resolved
Hide resolved
.find(&column.name) | ||
.map(|(_idx, field)| field)?; | ||
|
||
RowGroupStatisticsConverter::new(field) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a slight mismatch here as parquet handles schema nesting differently from arrow
I'm not sure how Column
addresses nested fields, but I would expect to see something walking SchemaDescriptor to compute this mapping, or something similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TLDR is that Column does not address nested fields. The structure that does is
datafusion_physical_expr::expressions::GetFieldAccessExpr
or
I spoke with @tustvold and we came up with the following plan: Plans:
|
…s.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
…rrow-datafusion into alamb/extract_parquet_statistics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tustvold I think I have addressed your main concerns (handling of StructArray
s) with tests
I think I have updated the API to something better, though not quite the same as what you suggested.
datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Outdated
Show resolved
Hide resolved
.columns() | ||
.iter() | ||
.enumerate() | ||
.find(|(_idx, c)| c.column_descr().name() == self.field.name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added tests for this -- and I didn't find a bug 🤔
Arc::new(boolean) as ArrayRef, | ||
), | ||
( | ||
Arc::new(Field::new("i", DataType::Int32, nullable)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arc::new(Field::new("i", DataType::Int32, nullable)), | |
Arc::new(Field::new("int_col", DataType::Int32, nullable)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed -- when I made this change in a601fbf the test with structs and non structs fails (as you predicated)
row_groups: 1
"struct_col.bool_col": Boolean({min: Some(true), max: Some(true), distinct_count: None, null_count: 1, min_max_deprecated: false, min_max_backwards_compatible: false})
"struct_col.int_col": Int32({min: Some(1), max: Some(3), distinct_count: None, null_count: 1, min_max_deprecated: false, min_max_backwards_compatible: false})
"int_col": Int32({min: Some(100), max: Some(300), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false})
left: PrimitiveArray<Int32>
[
1,
]
right: PrimitiveArray<Int32>
[
100,
]
stack backtrace:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed #8335 for this issue
Tustvold/extract parquet statistics
Thank you -- I plan to merge this tomorrow unless there are any other comments |
This PR introduced a regression it turns out: #8533 |
Which issue does this PR close?
Part of #8229
Closes #8335
Potentially part of apache/arrow-rs#4328
In order to avoid boiling the ocean and to document more clearly what the current code does, I am trying to do this work in stages. The first one is to consolidate how statistic are read from parquet
Rationale for this change
I am in the process of trying to improve the statistics in DataFusion, which have grown organically over time. I would like to refactor them, but I need to ensure that I don't break anything.
There are tests for the existing pruning predicate code, but not the underlying statistics conversion.
There are a few problems with the existing code:
TimestampSecondArray
orTimestampNanosecondArray
The pruning statistics work around this with a cast but @tustvold tells me this is not always correct (especially for certain timestamps and intervals)What changes are included in this PR?
parquet/statistics.rs
module, and adds a columnar API (returns value as an ArrayRef).parquet
rust writer as well as using the existing parquet test dataAre these changes tested?
Yes (most of this PR is new tests)
Are there any user-facing changes?
There are non intended.
This implementation uses the same the existing code, so it is not a functional change, but it does add many tests for the existing code.
I plan to improve the existing code in follow on PRs.