-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pushdown RowFilter
in ParquetExec
#3380
Conversation
@Ted-Jiang @tustvold @alamb FYI. Would love any feedback you guys have on this one :) |
I plan to review this, time permitting, tomorrow. Very exciting 😁 |
Codecov Report
@@ Coverage Diff @@
## master #3380 +/- ##
==========================================
+ Coverage 85.70% 85.73% +0.03%
==========================================
Files 298 299 +1
Lines 54961 55170 +209
==========================================
+ Hits 47103 47302 +199
- Misses 7858 7868 +10
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@thinkharderdev cool man !👍 i will review this today ! |
_schema: &Schema, | ||
_metadata: &ParquetMetaData, | ||
) -> Result<bool> { | ||
// TODO How do we know this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the file with page-index
, it will stored at ParquetMetaData .page_indexes.boundary_order
.
I think if without page-index there is no way to find out🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parquet doesn't have a way to express multi-column sort orders that I am aware of. I thought DataFusion had some plumbing for this, but I think it doesn't extend into the physical scan operators.
metadata: &ParquetMetaData, | ||
) -> Result<Option<FilterCandidate>> { | ||
let expr = self.expr.clone(); | ||
let expr = expr.rewrite(&mut self)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why should call this func 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this will rewrite Column expressions for columns not present in the current parquet file to Expr::Literal(ScalarValue::Null)
. Basically we want to build the PhysicalExpr
to only deal with the projected columns required for the filter (rahter than the entire file) so we need to basically simulate what the SchamaAdapter
would do.
@thinkharderdev Agree! I remember each distinct filters will apply to the projected col with One thing i want to mention , when applying filter pushdowm to parquet, some After use this row_filter i think it could be a |
Yes! This is I think the next phase. Once we can push down exact filters to the scan we can represent that in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking really cool, sorry it has taken so long to review.
Perhaps we could have an option or something to not re-order predicates, and this would allow the TableProvider
and/or physical optimizer to use additional information to rewrite the predicates in a specific order? To give a concrete example of where this might be useful, in IOx the parquet files are sorted lexicographically, and it therefore makes sense for predicates to be evaluated in this order.
eventually may want to do cost estimation at a higher level to determine the optimal grouping.
Yeah, I suspect this will benefit from being handled at a higher-level than the individual ParquetExec operator
| DataType::LargeList(_) | ||
| DataType::Struct(_) | ||
| DataType::Union(_, _, _) | ||
| DataType::Dictionary(_, _) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should include predicates on dictionaries
.with_batch_size(batch_size) | ||
.with_row_groups(row_groups) | ||
.build()?; | ||
let stream = match row_filter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nicer to write this as
if let Some(filter) = row_filter {
builder = builder.with_row_filter(filter)
}
Or something
filters.push(Box::new(filter)); | ||
} | ||
|
||
other_candidates.sort_by_key(|c| c.required_bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could move this ahead of the partition, so that it also used to sort the indexed expressions?
} | ||
} | ||
|
||
impl<'a> ExprRewriter for FilterCandidateBuilder<'a> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice use of the existing plumbing for walking expressions 👍
Ok(None) | ||
} else { | ||
let required_bytes = | ||
size_of_columns(&self.required_columns, self.file_schema, metadata)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not pass required_column_indices here instead?
_schema: &Schema, | ||
_metadata: &ParquetMetaData, | ||
) -> Result<bool> { | ||
// TODO How do we know this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parquet doesn't have a way to express multi-column sort orders that I am aware of. I thought DataFusion had some plumbing for this, but I think it doesn't extend into the physical scan operators.
.map(|v| v.into_array(batch.num_rows())) | ||
{ | ||
Ok(array) => { | ||
if let Some(mask) = array.as_any().downcast_ref::<BooleanArray>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the logic in parquet already handles this, and so we can skip doing this here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thinkharderdev do you plan on making this change in this PR or a follow on on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @tustvold was referring to the prep_null_mask
call which is removed. Unless I'm missing something then this should be good to go.
let candidates: Vec<FilterCandidate> = predicates | ||
.into_iter() | ||
.flat_map(|expr| { | ||
if let Ok(candidate) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be written as expr.ok().then(|| ...)
up to you
expr: Expr, | ||
file_schema: &'a Schema, | ||
table_schema: &'a Schema, | ||
required_columns: HashSet<Column>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessary, in addition to required_column_indices
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I had this one first and then needed to add the indices for something else but we don't need both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TLDR I really like this PR @thinkharderdev -- the structure is very cool.
We probably need to think about how to test it a bit more before turning it on by default.
Perhaps we could add a flag to the parquet reader exec to enable / disable the entire "push predicates down to the parquet scan" optimization, disabled by default (rather than just the reorder_predicates
flag) and merge the PR in.
Then we can test in various places (like I could test with the IOx tests) and when we are confident it is ready to be enabled, we can turn it on by default.
(as a total aside, I love the name of this branch in coralogix https://github.com/coralogix/arrow-datafusion/tree/ludicrous-speed)
projection: Vec<usize>, | ||
} | ||
|
||
/// Helper to build a `FilterCandidate`. This will do several things |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
/// 3. Rewrite column expressions in the predicate which reference columns not in the particular file schema. | ||
/// This is relevant in the case where we have determined the table schema by merging all individual file schemas | ||
/// and any given file may or may not contain all columns in the merged schema. If a particular column is not present | ||
/// we replace the column expression with a literal expression that produces a null value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do very similar rewrites in IOx; 👍
|
||
if candidates.is_empty() { | ||
Ok(None) | ||
} else if reorder_predicates { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, I think the idea of reordering predicates very close to the scan where we have the most data (rather than passing them down) is very good. 👍
I think it may help readability to articulate clearly somewhere (maybe the top of this file) what the expended filter ordering criteria are.
Some properties I have seen in heuristics like this that might be worth thinking about:
-
"single column filters" (aka ones that can be evaluated by decoding a single column and maybe can be evaluated by special case logic in the reader) -- especially since the parquet reader will have to re-decode duplicated columns
-
single column filters on columns with the fewest RLE runs (meaning they will be the cheapest to evauate) -- which may be already handled by looking at encoded size.
-
earlier in the sort order (as you have here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The more I think about it, though, the more I like the idea of using size_of_columns
/ row_count as a proxy for sort order / speed of evaluation.
Always evaluating single column predicates first may also be valuable idea to explore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, everything else I could come up with I think ended up being pretty well-approximated by the column size. What will be interesting to figure out from an API design perspective is how to allow users to specify hints. There may be invariants or regularities in the dataset that aren't really expressed in any metadata that could still be relevant to ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the API in this PR is pretty good in terms of Hints:
- Option 1 (default): Allow implementation to reorder based on heuristics
- Option 2: Do not reorder predicates and apply them in the order specified
One limitation of Option 2 is that the order can't be specified for different parquet files but if that is an important usecase we could design something more sophisticated later.
} | ||
} | ||
|
||
fn is_primitive_field(field: &Field) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 eventually this is probably a good function to move to arrow -- so it doesn't get out of date if new types are added
There are similar examples like https://docs.rs/arrow/22.0.0/arrow/datatypes/enum.DataType.html#method.is_numeric already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed apache/arrow-rs#2704
/// Take combined filter (multiple boolean expressions ANDed together) | ||
/// and break down into distinct filters. This should be the inverse of | ||
/// `datafusion_expr::expr_fn::combine_filters` | ||
fn disjoin_filters(combined_expr: Expr) -> Vec<Expr> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also quite common -- I recommend putting it in datafusion_expr::expr_fn::uncombine_filters
or something
I bet there are several copies of this code in the datafusion codebase already (we certainly have some in IOx 😆 )
@thinkharderdev please let me know when you think this PR is ready for final review / merge (it still says "RFC" in the title so I am not quite sure) |
RowFilter
into ParquetExec
RowFilter
into ParquetExec
RowFilter
into ParquetExec
RowFilter
in ParquetExec
I think it's good now (pending any other feedback/comments). I can work on the benchmarks in a follow-up PR. |
Will review carefully tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR can be merged in as is and we can keep iterating on it subsequently.
Really nice @thinkharderdev and kudos to the rest of the predicate pushdown team @Ted-Jiang and @tustvold (sorry if I have forgotten others)
It would be nice to file a follow on issue (I can do so if you like) listing the steps we felt were necessary to turn on predicate pushdown by default. In my mind this is primarily about testing (functional and performance).
@@ -839,6 +893,7 @@ mod tests { | |||
projection: Option<Vec<usize>>, | |||
schema: Option<SchemaRef>, | |||
predicate: Option<Expr>, | |||
pushdown_predicate: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe in a follow on PR we can adjust the tests to do the same scan both with and without pushdown
.await | ||
.unwrap(); | ||
|
||
// This does not look correct since the "c2" values in the result do not in fact match the predicate `c2 == 0` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment seems out of place / out of date as the predicate is c2 == 1
and the actual rows returned are in fact where c2 = 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, lazy copy pasta :)
.map(|v| v.into_array(batch.num_rows())) | ||
{ | ||
Ok(array) => { | ||
if let Some(mask) = array.as_any().downcast_ref::<BooleanArray>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thinkharderdev do you plan on making this change in this PR or a follow on on?
|
||
for candidate in indexed_candidates { | ||
let filter = | ||
DatafusionArrowPredicate::try_new(candidate, file_schema, metadata)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this ever return an error due to some sort of unsupported predicate ? If so, perhaps we could log the error (in debug!
) and continue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't ever fail unless there is a bug (i.e. there is no expected failure case). I think if this fails then the query will almost certainly fail somewhere else.
@@ -429,6 +429,40 @@ pub fn combine_filters(filters: &[Expr]) -> Option<Expr> { | |||
Some(combined_filter) | |||
} | |||
|
|||
/// Take combined filter (multiple boolean expressions ANDed together) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this feature defaults to off, I think it would be ok to merge in prior to the 12.0.0 release candidate but I defer to @andygrove on that one
} | ||
} | ||
|
||
fn is_primitive_field(field: &Field) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed apache/arrow-rs#2704
Thanks! I can clean up the last few things mentioned in your review this afternoon/evening. I'll also create a followup ticket with remaining items. In general I think the follow-ons are:
|
I'm not sure what's going on with the checks. The errors look like it is building a different version of the code.... |
Github actions merges master when running tests, try merging master into this branch perhaps? |
I collected follow on work and other related items I could find into a tracking ticket / epic in #3462 -- it is great story / project. |
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
e3f1d33
to
c46975c
Compare
🚀 |
Benchmark runs are scheduled for baseline = af0d50a and contender = 9fbee1a. 9fbee1a is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
|
||
assert_predicates(actual, vec![col("a"), col("b")]); | ||
|
||
let expr = col("a").and(col("b")).or(col("c")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add other test case for this.
The expr is a and b or c
which will be handle lik `binary( a and b, or ,c).
But I think the above expr can be converted to c or a and b
, and will get error result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I am wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you be a more specific @liukun4515 -- I am not sure if you are pointing out a potential bug . This line makes the predicate (a AND b) OR c
I believe. What do you mean by c or a and b
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you be a more specific @liukun4515 -- I am not sure if you are pointing out a potential bug . This line makes the predicate
(a AND b) OR c
I believe. What do you mean byc or a and b
?
sorry for that, I think it's my wrong for this.
Ok(RewriteRecursion::Continue) | ||
} | ||
|
||
fn mutate(&mut self, expr: Expr) -> Result<Expr> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb @thinkharderdev
I find a issue caused by this rewrite in my work #3396
if we have two parquet, one has c1,c2, the other has c1,c3.
If the filter is c2 = 1, then it will produce a expr NULL = 1 for the parquet with c1,c3 column.
After I remove the binary type coercion in the physical phase, the binary physical expr of NULL = INT(1)
can't be created, because we don't need to support type coercion in creation physical expr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure the data type of the NULL value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to get the data type from the table schema.
.then(|| pruning_predicate.as_ref().map(|p| p.logical_expr())) | ||
.flatten() | ||
{ | ||
if let Ok(Some(filter)) = build_row_filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there is an error when build the row filter, why not throw the error out?
Which issue does this PR close?
Closes #3360
Rationale for this change
What changes are included in this PR?
Putting this out there for comment since it's at a point where I am more or less happy with the design and all the existing tests pass.
The key points for this PR:
RowFilter
from a filterExpr
pushed down to theParquetExec
.ParquetExec
, build aRowFilter
(if we ca) and construct ourParquetRecordBatchStream
with it.To build a
RowFilter
we need to go through a few steps:Expr
(which has been combined into a single predicate) and tear it back apart into separate predicates which are ANDed together.Expr
to replace any columns not present in the file schema to be null literal (to handle merged schemas without involving theSchemaAdapter
),FilterCandidate
and we can sort the candidates by evaluation cost so we can apply the "cheap" filters first.DatafusionArrowPredicate
and build aRowFilter
from the whole lot of them.TODOs:
ParquetMetadata
:)A separate conceptual question is around optimizing the number of distinct filters. In this design we simply assume that we want to break the filter into as many distinct predicates as we can but I'm not sure that is always the case given that this forces serial evaluation of the filters. I can imagine many cases where it would be better to group predicates together for evaluation. I didn't want to make the initial implementation too complicated so I punted on that for now, but eventually may want to do cost estimation at a higher level to determine the optimal grouping.
Are there any user-facing changes?