Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return TableProviderFilterPushDown::Exact when Parquet Pushdown Enabled #4028

Closed
Tracked by #11106 ...
tustvold opened this issue Oct 30, 2022 · 14 comments · Fixed by #12135
Closed
Tracked by #11106 ...

Return TableProviderFilterPushDown::Exact when Parquet Pushdown Enabled #4028

tustvold opened this issue Oct 30, 2022 · 14 comments · Fixed by #12135
Assignees
Labels
enhancement New feature or request performance Make DataFusion faster

Comments

@tustvold
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
(This section helps Arrow developers understand the context and why for this feature, in addition to the what)

Currently even when parquet predicate pushdown is enabled, and the predicate can be fully pushed down, the physical plan still contains a FilterExec when using ListingTable

| physical_plan | ProjectionExec: expr=[service@0 as service, host@1 as host, pod@2 as pod, container@3 as container, image@4 as image, time@5 as time, client_addr@6 as client_addr, request_duration_ns@7 as request_duration_ns, request_user_agent@8 as request_user_agent, request_method@9 as request_method, request_host@10 as request_host, request_bytes@11 as request_bytes, response_bytes@12 as response_bytes, response_status@13 as response_status]                            |
|               |   CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |     FilterExec: container@3 = backend_container_0 OR pod@2 = aqcathnxqsphdhgjtgvxsfyiwbmhlmg                                                                                                                                                                                                                                                                                                                                                                                 |
|               |       RepartitionExec: partitioning=RoundRobinBatch(8)                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |         ParquetExec: limit=None, partitions=[home/raphael/Downloads/data.parquet], predicate=container_min@0 <= backend_container_0 AND backend_container_0 <= container_max@1 OR pod_min@2 <= aqcathnxqsphdhgjtgvxsfyiwbmhlmg AND aqcathnxqsphdhgjtgvxsfyiwbmhlmg <= pod_max@3, projection=[service, host, pod, container, image, time, client_addr, request_duration_ns, request_user_agent, request_method, request_host, request_bytes, response_bytes, response_status] |
|               |        

Describe the solution you'd like

ListingTable::supports_filter_pushdown should return TableProviderFilterPushDown::Exact when

  • Parquet predicate pushdown is enabled
  • The FileFormat is parquet
  • The predicate is fully pushed down by ParquetExec (not all predicates are supported)

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

@alamb
Copy link
Contributor

alamb commented Nov 28, 2022

This is a good proposal I think -- it would skip unecessary filtering and likely make plans faster. It will become more useful (maybe even necessary) when predicate pushdown is enabled by default -- #3463

@jychen7
Copy link
Contributor

jychen7 commented Apr 10, 2023

I think it is a good idea. I confirm it would help improve performance by 30x in #5404 (comment).


I am thinking to pick this up but seems the work is not as trivial as I thought. Following are my two cents' thoughts.

Related code to improve is following
https://github.com/apache/arrow-datafusion/blob/7545177001b9dc04951f0c1c2008509f3895de8e/datafusion/core/src/datasource/listing/table.rs#L736-L756

  1. The FileFormat is parquet

This is trivial, we can match self.options.format with ParquetFormat

  1. Parquet predicate pushdown is enabled

This setting could come from ParquetExec.pushdown_filters or ConfigOptions (more specifically, ConfigOptions.execution.parquet.pushdown_filters)

https://github.com/apache/arrow-datafusion/blob/7545177001b9dc04951f0c1c2008509f3895de8e/datafusion/core/src/physical_plan/file_format/parquet.rs#L393

However, when ListingTable.supports_filter_pushdown is called, ParquetExec is not created yet, nor does SessionState being passed as input.

Idea: shall we add state: &SessionState as input for ListingTable.supports_filter_pushdown, the same as ListingTable.scan

https://github.com/apache/arrow-datafusion/blob/7545177001b9dc04951f0c1c2008509f3895de8e/datafusion/core/src/datasource/listing/table.rs#L668-L670

  1. The predicate is fully pushed down by ParquetExec (not all predicates are supported)

is it making sure the row_filter is built from the predicate without error? If so, seems it requires converting a logical Expr to PhysicalExpr first.

https://github.com/apache/arrow-datafusion/blob/7545177001b9dc04951f0c1c2008509f3895de8e/datafusion/core/src/physical_plan/file_format/parquet.rs#L527-L547

@tustvold
Copy link
Contributor Author

I wonder if it might be possible to always return exact for Parquet files, and to just manually insert a FilterExec for any predicates that can't be pushed down

Tagging @alamb and @crepererum who are more familiar with this part of the codebase

@alamb
Copy link
Contributor

alamb commented Apr 10, 2023

I'll try and look into this in more detail tomorrow

@crepererum
Copy link
Contributor

I wonder if it might be possible to always return exact for Parquet files, and to just manually insert a FilterExec for any predicates that can't be pushed down

That's what we want to do for InfluxDB IOx as well: https://github.com/influxdata/influxdb_iox/issues/7408

@alamb
Copy link
Contributor

alamb commented Jun 24, 2024

I am going to find time sometime this week

@alamb
Copy link
Contributor

alamb commented Aug 6, 2024

Background Reading

Here is a background article about parquet predicate pushdown: https://www.influxdata.com/blog/querying-parquet-millisecond-latency/#heading5

Specifically the section on Late materialization describes what is done in the RowFilter code

Code links for Listing Table

Supports filter pushdown: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html#impl-TableProvider-for-ListingTable

Conditions:

Here are the conditions @tustvold lists above, and some links about what they mean

Parquet predicate pushdown is enabled

This refers to the datafusion.execution.parquet.pushdown_filters configuration setting (docs here)

The FileFormat is parquet

This should be relatively straightforward to determine

The predicate is fully pushed down by ParquetExec (not all predicates are supported)

"pushdown" refers to this code in ParquetExec (there is some background information in the parquet RowFilter API)

/// This module contains utilities for enabling the pushdown of DataFusion filter predicates (which
/// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the parquet decoder level in `arrow-rs`.
/// DataFusion will use a `ParquetRecordBatchStream` to read data from parquet into arrow `RecordBatch`es.
/// When constructing the `ParquetRecordBatchStream` you can provide a `RowFilter` which is itself just a vector
/// of `Box<dyn ArrowPredicate>`. During decoding, the predicates are evaluated to generate a mask which is used
/// to avoid decoding rows in projected columns which are not selected which can significantly reduce the amount
/// of compute required for decoding.
///
/// Since the predicates are applied serially in the order defined in the `RowFilter`, the optimal ordering
/// will depend on the exact filters. The best filters to execute first have two properties:
/// 1. The are relatively inexpensive to evaluate (e.g. they read column chunks which are relatively small)
/// 2. They filter a lot of rows, reducing the amount of decoding required for subsequent filters and projected columns
///
/// Given the metadata exposed by parquet, the selectivity of filters is not easy to estimate so the heuristics we use here primarily
/// focus on the evaluation cost.
///
/// The basic algorithm for constructing the `RowFilter` is as follows
/// 1. Recursively break conjunctions into separate predicates. An expression like `a = 1 AND (b = 2 AND c = 3)` would be
/// separated into the expressions `a = 1`, `b = 2`, and `c = 3`.
/// 2. Determine whether each predicate is suitable as an `ArrowPredicate`. As long as the predicate does not reference any projected columns
/// or columns with non-primitive types, then it is considered suitable.
/// 3. Determine, for each predicate, the total compressed size of all columns required to evaluate the predicate.
/// 4. Determine, for each predicate, whether all columns required to evaluate the expression are sorted.
/// 5. Re-order the predicate by total size (from step 3).
/// 6. Partition the predicates according to whether they are sorted (from step 4)
/// 7. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`.
/// 8. Build the `RowFilter` with the sorted predicates followed by the unsorted predicates. Within each partition
/// the predicates will still be sorted by size.

@alamb
Copy link
Contributor

alamb commented Aug 6, 2024

I think there are likely two things needed for this PR:

  1. Some API that ListingTable can use to determine if the predicates can be pushed down
  2. Tests that show / confirm that using RowFilter really is an exact filtering

@alamb
Copy link
Contributor

alamb commented Aug 8, 2024

FWIW I think @itsjunetime is looking into this ticket

@itsjunetime
Copy link
Contributor

Yes, sorry - I'm still somewhat new to this codebase, so I'm taking a good second to read through the associated docs and comments to figure it out first.

@crepererum
Copy link
Contributor

No need to rush. I think @alamb's comment was mostly meant as an assignment signal, so that nobody else starts to work on it and we end up wasting resources 🙂

@alamb
Copy link
Contributor

alamb commented Aug 12, 2024

No need to rush. I think @alamb's comment was mostly meant as an assignment signal, so that nobody else starts to work on it and we end up wasting resources 🙂

Yes this is what I meant. Sorry about not being clear.

@alamb
Copy link
Contributor

alamb commented Aug 22, 2024

I filed #12115 to track some additional testing that I would like to do for this feature.

@alamb
Copy link
Contributor

alamb commented Aug 24, 2024

There is a nice PR here for this feature: #12135

itsjunetime added a commit to itsjunetime/datafusion that referenced this issue Sep 16, 2024
alamb added a commit that referenced this issue Sep 17, 2024
…ed (#12135)

* feat: Preemptively filter for pushdown-preventing columns in ListingTable

* Fix behavior to make all previous tests work and lay groundwork for future tests

* fix: Add some more tests and fix small issue with pushdown specificity

* test: Revive unneccesarily removed test

* ci: Fix CI issues with different combinations of exprs

* fix: run fmt

* Fix doc publicity issues

* Add ::new fn for PushdownChecker

* Remove unnecessary 'pub' qualifier

* Fix naming and doc comment of non_pushdown_columns to reflect what it actually does (the opposite) and add back useful comments

* fmt

* Extend FileFormat trait to allow library users to define formats which support pushdown

* fmt

* fix: reference real fn in doc to fix CI

* Minor: Add tests for using FilterExec when parquet was pushed down

* Update datafusion/core/src/datasource/file_format/mod.rs

* Pipe schema information through to TableScan and ParquetExec to facilitate unnecessary FilterExec removal

* - Remove collect::<(_, _)> to satisfy msrv
- Remove expect(_) attr to satisfy msrv
- Update comments with more accurate details and explanations

* Add more details in comments for `map_partial_batch`

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Remove reference to issue #4028 as it will be closed

* Convert normal comments to doc-comments

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Clarify meaning of word `projected` in comment

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Clarify more how `table_schema` is used differently from `projected_table_schema`

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Finish partially-written comment about SchemaMapping struct

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance Make DataFusion faster
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants