-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consolidate GroupByHash implementations row_hash.rs
and hash.rs
(remove duplication)
#2723
Comments
I went through the codebase and some related github issue. So it seems that we need to:
Is there anything I am missing? I think I need to do some research on how to implement |
Thanks @ming535 ! That list looks good to me
I am not sure DataFusion supports aggregating on |
I agree we need List and Struct would not be too complicated to implement in the row format:
You could refer to the Spark repo to lend some ideas from its On the other hand, I suggest we evaluate the This optimizer rule would rewrites an aggregate query with distinct aggregations into an expanded double aggregation in which the regular aggregation expressions and every distinct clause is aggregated in a separate group. The results are then combined in a second aggregate. Check this Spark optimizer rule |
Note on We may consider an "out of place" store for these varlenas for |
I had a prototype working for recursive |
@yjshen Hi, do you know why My understanding is that The logical plan is |
I think Distinct basically serializes partial results as |
Does this mean that the design was meant to support some thing like: |
I am not totally sure to be honest |
row_hash.rs
and hash.rs
row_hash.rs
and hash.rs
(remove duplication)
Grant PlanSo @tustvold and I came up with a plan for this. Let me illustrate. Prior ArtLet's first quickly illustrate what the current state is and what the problem is. We have two group-by implementations: V1 (
|
I general, other than the fact that this proposal sounds like a lot of work, I think it sounds wonderful 🏆 I did have a question about the proposed trait implementation: trait Aggregator: MemoryConsumer {
/// Update aggregator state for given groups.
fn update_batch(&mut self, keys: Rows, batch: &RecordBatch)) -> Result<()>;
...
} Is the idea that each aggregator would contain a HashMap or some other way to map keys --> intermediates (rather than the hash map being in the aggregator? This seems like it would result in a fair amount of duplication I would have expected something like the following (basically push the trait Aggregator: MemoryConsumer {
/// Update aggregator state for given rows
fn update_batch(&mut self, indices: Vec<usize>, batch: &RecordBatch)) -> Result<()>;
...
} The big danger of the plan initially seems like that it wouldn't be finished and then we are in an even worse state (3 implementations!) but I think your idea to incrementally rewrite V2 to V3 and then remove V1 sounds like a good mitigation strategy |
Sure, but it reduces dyn dispatch by a lot (once per batch instead once per group), removes the
The aggregator CAN perform a
There will never be a 3rd implementation. We morph V2 into V3 and than delete V1. |
I understand your point. I would probably have to see a prototype to really understand how complicated it would be in practice. It doesn't feel right to me . Another thing to consider is other potential aggregation algorithms:
|
Ideally we would follow the approach I've increasingly been applying to arrow-rs, and recently applied to InList. Namely use concrete types, and only use dyn-dispatch to type erase on dispatching the batches. It would be fairly trivial to implement a |
To be clear, I think the outcome of implementing the plan described by @crepererum in #2723 (comment) will be:
|
For apache#2723. This has two effects: - **wider feature support:** We now use the V2 aggregator for all group-column types. The arrow row format support is sufficient for that. V1 will only be used if the aggregator itself doesn't support V2 (and these are quite a few at the moment). We'll improve on that front in follow-up PRs. - **more speed:** Turns out the arrow row format is also faster (see below). Perf results (mind the noise in the benchmarks that are actually not even touched by this code change): ```text ❯ cargo bench -p datafusion --bench aggregate_query_sql -- --baseline issue2723a-pre ... Running benches/aggregate_query_sql.rs (target/release/deps/aggregate_query_sql-fdbe5671f9c3019b) aggregate_query_no_group_by 15 12 time: [779.28 µs 782.77 µs 786.28 µs] change: [+2.1375% +2.7672% +3.4171%] (p = 0.00 < 0.05) Performance has regressed. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild aggregate_query_no_group_by_min_max_f64 time: [712.96 µs 715.90 µs 719.14 µs] change: [+0.8379% +1.7648% +2.6345%] (p = 0.00 < 0.05) Change within noise threshold. Found 10 outliers among 100 measurements (10.00%) 3 (3.00%) low mild 6 (6.00%) high mild 1 (1.00%) high severe Benchmarking aggregate_query_no_group_by_count_distinct_wide: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.7s, enable flat sampling, or reduce sample count to 50. aggregate_query_no_group_by_count_distinct_wide time: [1.7297 ms 1.7399 ms 1.7503 ms] change: [-34.647% -33.908% -33.165%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 5 (5.00%) high mild Benchmarking aggregate_query_no_group_by_count_distinct_narrow: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.7s, enable flat sampling, or reduce sample count to 60. aggregate_query_no_group_by_count_distinct_narrow time: [1.0984 ms 1.1045 ms 1.1115 ms] change: [-38.581% -38.076% -37.569%] (p = 0.00 < 0.05) Performance has improved. Found 6 outliers among 100 measurements (6.00%) 1 (1.00%) low mild 5 (5.00%) high mild Benchmarking aggregate_query_group_by: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 9.1s, enable flat sampling, or reduce sample count to 50. aggregate_query_group_by time: [1.7810 ms 1.7925 ms 1.8057 ms] change: [-25.252% -24.127% -22.737%] (p = 0.00 < 0.05) Performance has improved. Found 9 outliers among 100 measurements (9.00%) 2 (2.00%) low mild 5 (5.00%) high mild 2 (2.00%) high severe Benchmarking aggregate_query_group_by_with_filter: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60. aggregate_query_group_by_with_filter time: [1.2068 ms 1.2119 ms 1.2176 ms] change: [+2.2847% +3.0857% +3.8789%] (p = 0.00 < 0.05) Performance has regressed. Found 10 outliers among 100 measurements (10.00%) 1 (1.00%) low mild 7 (7.00%) high mild 2 (2.00%) high severe Benchmarking aggregate_query_group_by_u64 15 12: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.7s, enable flat sampling, or reduce sample count to 50. aggregate_query_group_by_u64 15 12 time: [1.6762 ms 1.6848 ms 1.6942 ms] change: [-29.598% -28.603% -27.400%] (p = 0.00 < 0.05) Performance has improved. Found 8 outliers among 100 measurements (8.00%) 1 (1.00%) low mild 1 (1.00%) high mild 6 (6.00%) high severe Benchmarking aggregate_query_group_by_with_filter_u64 15 12: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60. aggregate_query_group_by_with_filter_u64 15 12 time: [1.1969 ms 1.2008 ms 1.2049 ms] change: [+1.3015% +2.1513% +3.0016%] (p = 0.00 < 0.05) Performance has regressed. Found 6 outliers among 100 measurements (6.00%) 1 (1.00%) low severe 2 (2.00%) high mild 3 (3.00%) high severe aggregate_query_group_by_u64_multiple_keys time: [14.797 ms 15.112 ms 15.427 ms] change: [-12.072% -8.7274% -5.3392%] (p = 0.00 < 0.05) Performance has improved. Found 3 outliers among 100 measurements (3.00%) 3 (3.00%) high mild aggregate_query_approx_percentile_cont_on_u64 time: [4.1278 ms 4.1687 ms 4.2098 ms] change: [+0.4851% +1.9525% +3.3676%] (p = 0.01 < 0.05) Change within noise threshold. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) low mild 1 (1.00%) high mild aggregate_query_approx_percentile_cont_on_f32 time: [3.4694 ms 3.4967 ms 3.5245 ms] change: [-1.5467% -0.4432% +0.6609%] (p = 0.43 > 0.05) No change in performance detected. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild ```
For #2723. This has two effects: - **wider feature support:** We now use the V2 aggregator for all group-column types. The arrow row format support is sufficient for that. V1 will only be used if the aggregator itself doesn't support V2 (and these are quite a few at the moment). We'll improve on that front in follow-up PRs. - **more speed:** Turns out the arrow row format is also faster (see below). Perf results (mind the noise in the benchmarks that are actually not even touched by this code change): ```text ❯ cargo bench -p datafusion --bench aggregate_query_sql -- --baseline issue2723a-pre ... Running benches/aggregate_query_sql.rs (target/release/deps/aggregate_query_sql-fdbe5671f9c3019b) aggregate_query_no_group_by 15 12 time: [779.28 µs 782.77 µs 786.28 µs] change: [+2.1375% +2.7672% +3.4171%] (p = 0.00 < 0.05) Performance has regressed. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild aggregate_query_no_group_by_min_max_f64 time: [712.96 µs 715.90 µs 719.14 µs] change: [+0.8379% +1.7648% +2.6345%] (p = 0.00 < 0.05) Change within noise threshold. Found 10 outliers among 100 measurements (10.00%) 3 (3.00%) low mild 6 (6.00%) high mild 1 (1.00%) high severe Benchmarking aggregate_query_no_group_by_count_distinct_wide: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.7s, enable flat sampling, or reduce sample count to 50. aggregate_query_no_group_by_count_distinct_wide time: [1.7297 ms 1.7399 ms 1.7503 ms] change: [-34.647% -33.908% -33.165%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 5 (5.00%) high mild Benchmarking aggregate_query_no_group_by_count_distinct_narrow: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.7s, enable flat sampling, or reduce sample count to 60. aggregate_query_no_group_by_count_distinct_narrow time: [1.0984 ms 1.1045 ms 1.1115 ms] change: [-38.581% -38.076% -37.569%] (p = 0.00 < 0.05) Performance has improved. Found 6 outliers among 100 measurements (6.00%) 1 (1.00%) low mild 5 (5.00%) high mild Benchmarking aggregate_query_group_by: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 9.1s, enable flat sampling, or reduce sample count to 50. aggregate_query_group_by time: [1.7810 ms 1.7925 ms 1.8057 ms] change: [-25.252% -24.127% -22.737%] (p = 0.00 < 0.05) Performance has improved. Found 9 outliers among 100 measurements (9.00%) 2 (2.00%) low mild 5 (5.00%) high mild 2 (2.00%) high severe Benchmarking aggregate_query_group_by_with_filter: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60. aggregate_query_group_by_with_filter time: [1.2068 ms 1.2119 ms 1.2176 ms] change: [+2.2847% +3.0857% +3.8789%] (p = 0.00 < 0.05) Performance has regressed. Found 10 outliers among 100 measurements (10.00%) 1 (1.00%) low mild 7 (7.00%) high mild 2 (2.00%) high severe Benchmarking aggregate_query_group_by_u64 15 12: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.7s, enable flat sampling, or reduce sample count to 50. aggregate_query_group_by_u64 15 12 time: [1.6762 ms 1.6848 ms 1.6942 ms] change: [-29.598% -28.603% -27.400%] (p = 0.00 < 0.05) Performance has improved. Found 8 outliers among 100 measurements (8.00%) 1 (1.00%) low mild 1 (1.00%) high mild 6 (6.00%) high severe Benchmarking aggregate_query_group_by_with_filter_u64 15 12: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60. aggregate_query_group_by_with_filter_u64 15 12 time: [1.1969 ms 1.2008 ms 1.2049 ms] change: [+1.3015% +2.1513% +3.0016%] (p = 0.00 < 0.05) Performance has regressed. Found 6 outliers among 100 measurements (6.00%) 1 (1.00%) low severe 2 (2.00%) high mild 3 (3.00%) high severe aggregate_query_group_by_u64_multiple_keys time: [14.797 ms 15.112 ms 15.427 ms] change: [-12.072% -8.7274% -5.3392%] (p = 0.00 < 0.05) Performance has improved. Found 3 outliers among 100 measurements (3.00%) 3 (3.00%) high mild aggregate_query_approx_percentile_cont_on_u64 time: [4.1278 ms 4.1687 ms 4.2098 ms] change: [+0.4851% +1.9525% +3.3676%] (p = 0.01 < 0.05) Change within noise threshold. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) low mild 1 (1.00%) high mild aggregate_query_approx_percentile_cont_on_f32 time: [3.4694 ms 3.4967 ms 3.5245 ms] change: [-1.5467% -0.4432% +0.6609%] (p = 0.43 > 0.05) No change in performance detected. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild ```
So I think this ticket now is a bit complicated as it was originally about avoiding two GroupBy operations which #4924 from @mustafasrepo does, but also has since grown to include conslidating / improving the aggregate implementation as well. So what I plan to do is to break out @crepererum 's plan in #2723 (comment) into a new ticket, and close this ticket when #4924 is merged |
I filed #4973 to track consolidating the aggregators |
It is actually quite cool to see that @crepererum 's plan from #2723 (comment) is moving right along |
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
As part of the transition to the faster "row format" (#1861 ) , @yjshen implemented a Row based Hash Aggregate implementation in #2375 ❤️
However, the implementation currently implements support for a subset of the data types that DataFusion supports. This made the code significantly faster for some cases but has some downsides:
row_hash.rs
andhash.rs
You can already see the potential challenge in PRs like #2716 where test coverage may miss one of the hash aggregate implementations by accident
Describe the solution you'd like
I would like to consolidate the hash aggregate implementations -- success is to delete
hash.rs
by adding the additional remaining type support torow_hash.rs
I think this would be a nice project for someone new to DataFusion to work on as the pattern is already defined, the outcome will be better performance, and they will get good experience with the code.
It will also increase the type support for row format and make it easier to roll out through the rest of the codebase
Describe alternatives you've considered
N/A
Additional context
More context about the ongoing row format conversion is #1861
The text was updated successfully, but these errors were encountered: