Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Row accumulator support update Scalar values #6003

Merged
merged 6 commits into from
Apr 20, 2023

Conversation

mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Apr 14, 2023

Which issue does this PR close?

Closes #6002 .

Rationale for this change

improve the Aggregator performance when group by high cardinality columns.

What changes are included in this PR?

  1. add update_scalar() method to RowAccumulator trait
  2. Adaptively decide the accumulators update mode

Are these changes tested?

I had test this on my local Mac.
For TPCH-q17, there is at about 40% improvement.

Before this PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(17), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false, enable_scheduler: false }
Query 17 iteration 0 took 1888.9 ms and returned 1 rows
Query 17 iteration 1 took 1851.8 ms and returned 1 rows
Query 17 iteration 2 took 1853.1 ms and returned 1 rows
Query 17 avg time: 1864.61 ms

After this PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(17), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: true, enable_scheduler: false }
Query 17 iteration 0 took 1377.0 ms and returned 1 rows
Query 17 iteration 1 took 1324.3 ms and returned 1 rows
Query 17 iteration 2 took 1317.7 ms and returned 1 rows
Query 17 avg time: 1339.68 ms.

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate physical-expr Physical Expressions labels Apr 14, 2023
@mingmwang mingmwang requested review from alamb and yjshen April 14, 2023 07:03
@mingmwang
Copy link
Contributor Author

@yahoNanJing
Copy link
Contributor

yahoNanJing commented Apr 14, 2023

Thanks @mingmwang for the recent improvement patches. The benchmark performance is improved around 30% for TPCH SF10.

TPCH 10g 2023-03-20(d77ccc2) 2023-04-12(2c57310)#5866 #5837 2023-04-14(​​bd705fe)#5973 2023-04-14(5bfc141)#6003
q1 23665.52 20753.61 20891.52 21408.54
q2 5589.88 3617.40 3464.09 3464.95
q3 9777.48 5961.30 5785.48 5732.81
q4 5188.19 3226.53 3132.99 4091.51
q5 15309.18 12133.36 10995.68 14361.51
q6 6293.92 4114.41 4504.54 3529.13
q7 28215.12 25501.06 23664.61 24111.81
q8 11563.27 8585.95 10085.88 7257.50
q9 19719.88 16851.29 16703.78 16446.74
q10 14240.17 9811.16 9618.89 10092.34
q11 5774.94 4358.94 4326.38 4629.20
q12 7027.51 4771.94 4659.69 4712.21
q13 17182.12 15855.66 15594.57 16362.75
q14 7567.25 4924.66 4679.48 4458.82
q15 17523.50 11911.70 11484.73 8626.74
q16 4064.01 3719.74 3707.31 3667.75
q17 135964.77 117756.03 107820.88 83682.60
q18 89023.33 60923.75 65235.46 66134.85
q19 9244.44 8504.44 11795.41 8504.76
q20 25746.50 15681.54 15023.04 12232.71
q21 34265.73 33962.40 34681.59 34787.75
q22 8026.30 8000.52 8334.53 8487.91
Total 495383.13 400927.39 396190.53 366784.89

@mingmwang
Copy link
Contributor Author

For Q5, there is downgrade between the versions. But I think it is unrelated to my PRs, the aggregation is very lightweight.

=== Physical plan with metrics ===
SortExec: expr=[revenue@1 DESC], metrics=[output_rows=5, elapsed_compute=5.335µs, spill_count=0, spilled_bytes=0]
  ProjectionExec: expr=[n_name@0 as n_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as revenue], metrics=[output_rows=5, elapsed_compute=583ns, spill_count=0, spilled_bytes=0, mem_used=0]
    AggregateExec: mode=Single, gby=[n_name@2 as n_name], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)], metrics=[output_rows=5, elapsed_compute=3.773667ms, spill_count=0, spilled_bytes=0, mem_used=0]
      ProjectionExec: expr=[l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, n_name@2 as n_name], metrics=[output_rows=72985, elapsed_compute=541ns, spill_count=0, spilled_bytes=0, mem_used=0]
        CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=72985, elapsed_compute=1.764µs, spill_count=0, spilled_bytes=0, mem_used=0]
          HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: "n_regionkey", index: 3 }, Column { name: "r_regionkey", index: 0 })], metrics=[output_rows=1, input_batches=1, output_batches=1, build_input_batches=1, input_rows=1, build_input_rows=364380, build_mem_used=37529364, join_time=1.260917ms, build_time=3.941506121s]
            ProjectionExec: expr=[l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, n_name@4 as n_name, n_regionkey@5 as n_regionkey], metrics=[output_rows=364380, elapsed_compute=1.041µs, spill_count=0, spilled_bytes=0, mem_used=0]
              CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=364380, elapsed_compute=1.767µs, spill_count=0, spilled_bytes=0, mem_used=0]
                HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: "s_nationkey", index: 2 }, Column { name: "n_nationkey", index: 0 })], metrics=[output_rows=25, input_batches=1, output_batches=1, build_input_batches=13, input_rows=25, build_input_rows=364380, build_mem_used=31885080, join_time=7.462876ms, build_time=3.931845754s]
                  ProjectionExec: expr=[l_extendedprice@2 as l_extendedprice, l_discount@3 as l_discount, s_nationkey@5 as s_nationkey], metrics=[output_rows=364380, elapsed_compute=4.209µs, spill_count=0, spilled_bytes=0, mem_used=0]
                    CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=364380, elapsed_compute=8.226µs, spill_count=0, spilled_bytes=0, mem_used=0]
                      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: "l_suppkey", index: 1 }, Column { name: "s_suppkey", index: 0 }), (Column { name: "c_nationkey", index: 0 }, Column { name: "s_nationkey", index: 1 })], metrics=[output_rows=100000, input_batches=13, output_batches=13, build_input_batches=1046, input_rows=100000, build_input_rows=9103367, build_mem_used=1038498128, join_time=74.74725ms, build_time=3.853093588s]
                        ProjectionExec: expr=[c_nationkey@0 as c_nationkey, l_suppkey@3 as l_suppkey, l_extendedprice@4 as l_extendedprice, l_discount@5 as l_discount], metrics=[output_rows=9103367, elapsed_compute=215.36µs, spill_count=0, spilled_bytes=0, mem_used=0]
                          CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=9103367, elapsed_compute=31.512778ms, spill_count=0, spilled_bytes=0, mem_used=0]
                            HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: "o_orderkey", index: 1 }, Column { name: "l_orderkey", index: 0 })], metrics=[output_rows=59986052, input_batches=7323, output_batches=7323, build_input_batches=262, input_rows=59986052, build_input_rows=2275919, build_mem_used=174938288, join_time=807.006477ms, build_time=702.906984ms]
                              ProjectionExec: expr=[c_nationkey@1 as c_nationkey, o_orderkey@2 as o_orderkey], metrics=[output_rows=2275919, elapsed_compute=48.584µs, spill_count=0, spilled_bytes=0, mem_used=0]
                                CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2275919, elapsed_compute=18.668µs, spill_count=0, spilled_bytes=0, mem_used=0]
                                  HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })], metrics=[output_rows=2275919, input_batches=262, output_batches=262, build_input_batches=184, input_rows=2275919, build_input_rows=1500000, build_mem_used=93390992, join_time=206.31563ms, build_time=49.711972ms]
                                    ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data10/customer/part-0.parquet]]}, projection=[c_custkey, c_nationkey], metrics=[output_rows=1500000, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, page_index_rows_filtered=0, num_predicate_creation_errors=0, bytes_scanned=3006119, row_groups_pruned=0, pushdown_rows_filtered=0, predicate_evaluation_errors=0, time_elapsed_scanning_until_data=2.157083ms, time_elapsed_scanning_total=16.696249ms, time_elapsed_processing=15.288754ms, time_elapsed_opening=1.074375ms, page_index_eval_time=2ns, pushdown_eval_time=2ns]
                                    ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey], metrics=[output_rows=2275919, elapsed_compute=25.543µs, spill_count=0, spilled_bytes=0, mem_used=0]
                                      CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2275919, elapsed_compute=2.349505ms, spill_count=0, spilled_bytes=0, mem_used=0]
                                        FilterExec: o_orderdate@2 >= 8766 AND o_orderdate@2 < 9131, metrics=[output_rows=2275919, elapsed_compute=19.347372ms, spill_count=0, spilled_bytes=0, mem_used=0]
                                          ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data10/orders/part-0.parquet]]}, predicate=o_orderdate@4 >= 8766 AND o_orderdate@4 < 9131, pruning_predicate=o_orderdate_max@0 >= 8766 AND o_orderdate_min@1 < 9131, projection=[o_orderkey, o_custkey, o_orderdate], metrics=[output_rows=15000000, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, page_index_rows_filtered=0, num_predicate_creation_errors=0, bytes_scanned=94686185, row_groups_pruned=0, pushdown_rows_filtered=0, predicate_evaluation_errors=0, time_elapsed_scanning_until_data=4.577708ms, time_elapsed_scanning_total=613.686009ms, time_elapsed_processing=365.906294ms, time_elapsed_opening=1.525625ms, page_index_eval_time=2ns, pushdown_eval_time=2ns]
                              ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data10/lineitem/part-0.parquet]]}, projection=[l_orderkey, l_suppkey, l_extendedprice, l_discount], metrics=[output_rows=59986052, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, page_index_rows_filtered=0, num_predicate_creation_errors=0, bytes_scanned=453645745, row_groups_pruned=0, pushdown_rows_filtered=0, predicate_evaluation_errors=0, time_elapsed_scanning_until_data=6.237208ms, time_elapsed_scanning_total=2.314007854s, time_elapsed_processing=1.391610509s, time_elapsed_opening=1.374334ms, page_index_eval_time=2ns, pushdown_eval_time=2ns]
                        ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data10/supplier/part-0.parquet]]}, projection=[s_suppkey, s_nationkey], metrics=[output_rows=100000, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, page_index_rows_filtered=0, num_predicate_creation_errors=0, bytes_scanned=372287, row_groups_pruned=0, pushdown_rows_filtered=0, predicate_evaluation_errors=0, time_elapsed_scanning_until_data=1.480458ms, time_elapsed_scanning_total=76.493501ms, time_elapsed_processing=1.467501ms, time_elapsed_opening=1.877375ms, page_index_eval_time=2ns, pushdown_eval_time=2ns]
                  ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data10/nation/part-0.parquet]]}, projection=[n_nationkey, n_name, n_regionkey], metrics=[output_rows=25, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, page_index_rows_filtered=0, num_predicate_creation_errors=0, bytes_scanned=544, row_groups_pruned=0, pushdown_rows_filtered=0, predicate_evaluation_errors=0, time_elapsed_scanning_until_data=790.959µs, time_elapsed_scanning_total=8.265166ms, time_elapsed_processing=122.376µs, time_elapsed_opening=1.493584ms, page_index_eval_time=2ns, pushdown_eval_time=2ns]
            ProjectionExec: expr=[r_regionkey@0 as r_regionkey], metrics=[output_rows=1, elapsed_compute=125ns, spill_count=0, spilled_bytes=0, mem_used=0]
              CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=1.336µs, spill_count=0, spilled_bytes=0, mem_used=0]
                FilterExec: r_name@1 = ASIA, metrics=[output_rows=1, elapsed_compute=6.208µs, spill_count=0, spilled_bytes=0, mem_used=0]
                  ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data10/region/part-0.parquet]]}, predicate=r_name@1 = ASIA, pruning_predicate=r_name_min@0 <= ASIA AND ASIA <= r_name_max@1, projection=[r_regionkey, r_name], metrics=[output_rows=5, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, page_index_rows_filtered=0, num_predicate_creation_errors=0, bytes_scanned=230, row_groups_pruned=0, pushdown_rows_filtered=0, predicate_evaluation_errors=0, time_elapsed_scanning_until_data=289.083µs, time_elapsed_scanning_total=297.375µs, time_elapsed_processing=96.583µs, time_elapsed_opening=900.083µs, page_index_eval_time=2ns, pushdown_eval_time=2ns]

@mingmwang
Copy link
Contributor Author

Q5

Before this PR

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(5), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data10", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: true, enable_scheduler: false }
Query 5 iteration 0 took 4093.0 ms and returned 5 rows
Query 5 iteration 1 took 4031.0 ms and returned 5 rows
Query 5 iteration 2 took 4008.9 ms and returned 5 rows
Query 5 avg time: 4044.29 ms

After this PR

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(5), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data10", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: true, enable_scheduler: false }
Query 5 iteration 0 took 4042.3 ms and returned 5 rows
Query 5 iteration 1 took 3978.7 ms and returned 5 rows
Query 5 iteration 2 took 3973.1 ms and returned 5 rows
Query 5 avg time: 3998.02 ms

Almost no difference.


/// This method is similar to Scalar::try_from_array except for the Null handling.
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
fn col_to_scalar(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can replace this function with

/// This method is similar to Scalar::try_from_array except for the Null handling.
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
fn col_to_scalar(
    array: &ArrayRef,
    filter: &Option<&BooleanArray>,
    row_index: usize,
) -> Result<ScalarValue> {
    if array.is_null(row_index) {
        return Ok(ScalarValue::Null);
    }
    if let Some(filter) = filter {
        if !filter.value(row_index) {
            return Ok(ScalarValue::Null);
        }
    }
    let mut res = ScalarValue::try_from_array(array, row_index)?;
    if res.is_null() {
        res = ScalarValue::Null;
    }
    Ok(res)
}

ScalarValue::is_null matches both [ScalarValue::Null] and [ScalarValue::Type(None)].

@comphead
Copy link
Contributor

Thanks, cool PR. Linking #5969 as potentially related

Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your ongoing efforts in improving the performance of aggregate functions. Really appreciate it!

If I understand the core concept of this PR correctly: the current aggregate process, when dealing with high-cardinality aggregations, suffers from low efficiency due to the frequent generation of small arrays from slicing. To address this, you've changed the approach for high-cardinality aggregations by generating ScalarValue or Vec<ScalarValue> instead of creating arrays through slicing.

Nonetheless, generating ScalarValues could still lead to some overhead. As an alternative, I propose the following: we could add a set of methods to arrow-arith's aggregate functions, allowing them to accept a selection vector (also suggested by @alamb previously in #5944 (comment)). This way, we can avoid creating slices and generating ScalarValue at the same time.

if matches!(self.mode, AggregateMode::Partial | AggregateMode::Single)
&& normal_aggr_input_values.is_empty()
&& normal_filter_values.is_empty()
&& groups_with_rows.len() >= batch.num_rows() / 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This magic number 10 is used to identify high cardinality. Shall we make it configurable or document how this 10 is chosen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will make it configurable.

@yjshen
Copy link
Member

yjshen commented Apr 16, 2023

Filed apache/arrow-rs#4095 .

@yahoNanJing
Copy link
Contributor

the current aggregate process, when dealing with high-cardinality aggregations, suffers from low efficiency due to the frequent generation of small arrays from slicing.

Especially for high-cardinality aggregations, the query performance will be downgraded due to lots of slicing invoking

@mingmwang
Copy link
Contributor Author

Thank you for your ongoing efforts in improving the performance of aggregate functions. Really appreciate it!

If I understand the core concept of this PR correctly: the current aggregate process, when dealing with high-cardinality aggregations, suffers from low efficiency due to the frequent generation of small arrays from slicing. To address this, you've changed the approach for high-cardinality aggregations by generating ScalarValue or Vec<ScalarValue> instead of creating arrays through slicing.

Nonetheless, generating ScalarValues could still lead to some overhead. As an alternative, I propose the following: we could add a set of methods to arrow-arith's aggregate functions, allowing them to accept a selection vector (also suggested by @alamb previously in #5944 (comment)). This way, we can avoid creating slices and generating ScalarValue at the same time.

Yes, you are right. I agree that generating ScalarValue will involve some overhead. I'm not sure whether I can get rid from the ScalarValues and generate the rust native type and use the native type to update the accumulator states directly, let me do some experiment on this. Regarding the other approach you have proposed, I'm open to that, but currently because the arrow-rs aggregation framework does not provide such capabilities, I think when it is ready, we can try this approach also and with the benchmark result and finally decide which way to go.

@mingmwang
Copy link
Contributor Author

I will try the following items on the basis of the PR:

  1. Get rid from generating the Scala values
  2. Avoid type cast and null check in the hot path
  3. Try to reuse the RowAccessor and void constructing it again and again.

@yjshen
Copy link
Member

yjshen commented Apr 17, 2023

I'm open to that, but currently because the arrow-rs aggregation framework does not provide such capabilities, I think when it is ready, we can try this approach also and with the benchmark result and finally decide which way to go.

Agree. We could always revisit it when apache/arrow-rs#4095 get resolved or find another sound approach. Thanks again!

@mustafasrepo
Copy link
Contributor

I had a comment for refactoring col_to_scalar function. It simplifies the code and removes duplication. Is there a specific reason for not doing so? Maybe I misunderstood what col_to_scalar does. Without this change we may have diverging versions of similar code in row_hash.rs and scalar.rs. One may change one of them, and corresponding change may be missing in other file.

@mingmwang
Copy link
Contributor Author

I had a comment for refactoring col_to_scalar function. It simplifies the code and removes duplication. Is there a specific reason for not doing so? Maybe I misunderstood what col_to_scalar does. Without this change we may have diverging versions of similar code in row_hash.rs and scalar.rs. One may change one of them, and corresponding change may be missing in other file.

Yes, I will change accordingly per your comments.

@mingmwang
Copy link
Contributor Author

@yjshen @mustafasrepo @Dandandan
I did some experiment by removing the Scalar values and using the rust native types to update the Accumulator state directly. It get some performance gain but not that obvious(reduced 50ms~ on my Mac).
In this PR, I will continue on the Scalar values update approach.

And I think after this PR, the major bottleneck of the high cardinality aggregations on DataFusion should be the Cache miss and Memory stall and our CPU instructions per cycle is much lower than DuckDB's (on tpch-q17).

@mingmwang
Copy link
Contributor Author

@mustafasrepo
There is some performance downgrade after using the ScalarValue::try_from_array due to additional null check.
I will leave it for future improvement because we might remove the Scalar values creation here.

let filter_bool_array = row_filter_values
.iter()
.map(|filter_opt| match filter_opt {
Some(f) => Ok(Some(as_boolean_array(f)?)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you don't need to wrap the result in Ok / Result here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the as_boolean_array returns a Result here.

"GroupedHashAggregate: can't create a scalar from array of type \"{other:?}\""
))),
let mut res = ScalarValue::try_from_array(array, row_index)?;
if res.is_null() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is_null check seems redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agree. Just remove it.

@yahoNanJing
Copy link
Contributor

Hi @Dandandan, do you still have any concerns for this PR?

@yahoNanJing
Copy link
Contributor

I'll merge this PR to unblock the further optimizations. If there's any further concerns, we can create a following issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Row accumulator support update Scalar values
6 participants