-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Skip partial aggregation based on the cardinality of hash value instead of group values #12697
Conversation
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
if group_values.is_empty() { | ||
return internal_err!("group_values expected to have at least one element"); | ||
} | ||
let mut output = group_values.swap_remove(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The result further improve with removing clone 😆 , sounds like cheating
🤔 As I understand, seems the main difference with the original Is it possible that improvement comes from tirggering I guess maybe we can modify the params in
let ratio = self.unique_hashes_count.len() as f32
/ self.accumulated_batch_size as f32;
// TODO: Configure the threshold
self.skip_partial_with_hash_count = ratio > 0.1;
// `probe_rows_threshold` default to 100_000
// `probe_ratio_threshold` default to 0.8
if self.input_rows >= self.probe_rows_threshold {
self.should_skip = self.num_groups as f64 / self.input_rows as f64
>= self.probe_ratio_threshold;
self.is_locked = true;
} Actually, I think if |
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
If we only compute hash, we are able to skip early. Otherwise, the original approach is only able to skip after the group values are calculated Threshold might be one of the reason, but skip based on hash is even better I think partial is only helpful for low cardinality query where we can reduce the output rows at the first stage, so the repartition cost is low. In distributed context, I guess we are still able to scale it with repartition + final? |
I am a bit confused about this, it seems only the calculation of last batch before skipping will can be avoided? It may run like this. Maybe I misunderstand, I am not so sure?
But it introduced cost of |
I think in partial group by stage, all the batch is computed independently, so only the batch that has low cardinality is not skipped, others are skipped. For high cardinality case, we skip almost all the batch and move on to repartition quickly
For low cardinality case, the cost of repartition + final is almost negligible. Partial aggregation is the only part takes time. |
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Yes, I am confused about that:
So it seems it is actually the I modify the threshold in main for quick test, and found the similar improvement in my local, /// Aggregation ratio (number of distinct groups / number of input rows)
/// threshold for skipping partial aggregation. If the value is greater
/// then partial aggregation will skip aggregation for further input
pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.1
/// Number of input rows partial aggregation partition should process, before
/// aggregation ratio check and trying to switch to skipping aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 0
|
Not only the threshold matters but also the group values computation matters, so I'm not surprised that changing the threshold speeds up the code. |
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Planning to review it during the next couple of days. |
Running some benchmarks |
I haven't fully reivewed this PR yet. One thing I wonder is if the benfits in performance could potentially be had by adjusting the default aggregation ratio. Like for example, what if we changed the default ratio |
I found maybe we can get similar benefits through(detail can see #12765 ):
This is the number in my local:
|
I think the threshold here is the main reason of the performance improvement but I think checking with hash value helps although the benchmark doesn't show that. |
query like
|
if self.skip_partial_aggregation { | ||
let states = self.transform_to_states(batch)?; | ||
self.exec_state = ExecutionState::ProducingOutput(states); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand, some batches
maybe have been updated into intermediate states(GroupValues
and GroupAccumulator
s).
Maybe we should emit them first like what is done in switch_to_skip_aggregation
?
I guess the triggering process may be like:
### batch 1
- not reach the skip threshold
- update into `GroupValues` and `GroupAccumulator`s
### batch 2
- not reach the skip threshold
- update into `GroupValues` and `GroupAccumulator`s
...
### batch n
- finally reach the skip threshold
- we need to `emit` things in `GroupValues` and `GroupAccumulator`s
- then we convert to the skip mode
🤔 And it seems that just the last batch (batch n in the exmaple)'s updating can be avoided, comparing to the original partial skipping logic. And the cost of updating one batch may be not so high?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And it seems that just the last batch (batch n in the exmaple)'s updating can be avoided, comparing to the original partial skipping logic. And the cost of updating one batch may be not so high?
Why only the last batch is not computed? If batch m reach the skip threshold, I think m to n are all skipped, therefore n-m computation are saved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And it seems that just the last batch (batch n in the exmaple)'s updating can be avoided, comparing to the original partial skipping logic. And the cost of updating one batch may be not so high?
Why only the last batch is not computed? If batch m reach the skip threshold, I think m to n are all skipped, therefore n-m computation are saved.
Oh, I think maybe I got it? Do you mean the loop here?
fn group_aggregate_batch_with_skipping_partial() {
for (index, group_values) in group_by_values.iter().enumerate() {
...
// reach the threshold, and we break the loop and skip earlier
...
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, for simple group by case, group_by_values
is mostly single, I think only grouping sets query have more than one group by values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If any batch reach the threshold limit, we will switch to ProducingOutput
if self.skip_partial_aggregation {
let states = self.transform_to_states(batch)?;
self.exec_state = ExecutionState::ProducingOutput(states);
// make sure the exec_state just set is not overwritten below
break 'reading_input;
}
I think the skipping logic has no difference from the main
branch, the only difference is that we compute the hash, and skip based on the number of unique value in hash table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, for simple group by case,
group_by_values
is mostly single, I think only grouping sets query have more than one group by values.
🤔 Yes, I am just a bit confused about saving n - m computation
in #12697 (comment) if it is said single group by case.
Assume the threshold is reached until batch n
comes:
- In main, we will firstly update
batch n
intoGroupValues
andGroupAccumulator
s, and found threshold reached after, so we skippedbatch n+1 ~ batch last
// `batch n` comes, update it into `GroupValues` and `GroupAccumulator`s.
extract_ok!(self.group_aggregate_batch(batch));
// Then we found threshold reached here after updating.
self.update_skip_aggregation_probe(input_rows);
// Switch to skip mode, and we skip `batch n+1 ~ batch last`
extract_ok!(self.switch_to_skip_aggregation());
- And currently, we can know threshold reached before updating
batch n
intoGroupValues
andGroupAccumulator
s, so we can skipbatch n ~ batch last
fn group_aggregate_batch_with_skipping_partial(
&mut self,
batch: &RecordBatch,
) -> Result<()> {
for (index, group_values) in group_by_values.iter().enumerate() {
// We calculate hashes and check skip threshold here
// When `batch n` comes, we found threshold reached,
// just return and not update `batch n`.
// And it will switch to skip mode for `batch n ~ batch last` after return
}
// We will still update `batch 0 ~ batch n-1`, because threshold is only
// reached until `batch n` comes.
for (index, group_values) in group_by_values.iter().enumerate() {
// Update `batch` into `GroupValues` and `GroupAccumulator`s
}
Seems just updating of batch n
is avoided?
😄 Sorry for maybe asking too many questions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. It tells why the improvement is not as high as expected 🤔
I think the attempt for this change failed 😢
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😢
Result if we set I think playing around with the threshold number is tricky and we may get overfitting on the benchmark we target on.
|
I think the largest challenge of this change is that for low cardinality we have additional hash table computation time, although we don't see the slowdown on the benchmark, but we could still create extreme query that has significant impact on it |
Here are the numbers I got on this branch:
|
|
||
/// Number of input rows partial aggregation partition should process, before | ||
/// aggregation ratio check and trying to switch to skipping aggregation mode | ||
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why has this setting been deleted? Checking if aggregation should be skipped right in the beginning of the execution may lead to skipping decision made based on insufficient amount of data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that in my strategy I make the decision per batch since I assume the data is distributed evenly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config with 0.1 and 100_000
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ threshold ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │ 0.41ms │ 0.45ms │ 1.10x slower │
│ QQuery 1 │ 42.61ms │ 42.55ms │ no change │
│ QQuery 2 │ 75.89ms │ 74.88ms │ no change │
│ QQuery 3 │ 72.24ms │ 61.81ms │ +1.17x faster │
│ QQuery 4 │ 399.04ms │ 359.82ms │ +1.11x faster │
│ QQuery 5 │ 663.20ms │ 699.03ms │ 1.05x slower │
│ QQuery 6 │ 37.91ms │ 38.01ms │ no change │
│ QQuery 7 │ 42.71ms │ 40.05ms │ +1.07x faster │
│ QQuery 8 │ 638.09ms │ 699.07ms │ 1.10x slower │
│ QQuery 9 │ 678.82ms │ 742.56ms │ 1.09x slower │
│ QQuery 10 │ 200.65ms │ 209.83ms │ no change │
│ QQuery 11 │ 233.85ms │ 220.56ms │ +1.06x faster │
│ QQuery 12 │ 718.91ms │ 660.93ms │ +1.09x faster │
│ QQuery 13 │ 926.86ms │ 937.19ms │ no change │
│ QQuery 14 │ 873.60ms │ 753.51ms │ +1.16x faster │
│ QQuery 15 │ 498.70ms │ 483.92ms │ no change │
│ QQuery 16 │ 1305.01ms │ 1238.38ms │ +1.05x faster │
│ QQuery 17 │ 1190.26ms │ 1111.67ms │ +1.07x faster │
│ QQuery 18 │ 3347.18ms │ 2903.17ms │ +1.15x faster │
│ QQuery 19 │ 55.77ms │ 61.39ms │ 1.10x slower │
│ QQuery 20 │ 943.13ms │ 919.37ms │ no change │
│ QQuery 21 │ 1206.45ms │ 1230.95ms │ no change │
│ QQuery 22 │ 3230.71ms │ 3479.20ms │ 1.08x slower │
│ QQuery 23 │ 8186.73ms │ 7989.35ms │ no change │
│ QQuery 24 │ 500.91ms │ 507.13ms │ no change │
│ QQuery 25 │ 502.13ms │ 551.57ms │ 1.10x slower │
│ QQuery 26 │ 561.53ms │ 568.74ms │ no change │
│ QQuery 27 │ 1338.06ms │ 1382.16ms │ no change │
│ QQuery 28 │ 10507.08ms │ 11253.66ms │ 1.07x slower │
│ QQuery 29 │ 398.16ms │ 429.90ms │ 1.08x slower │
│ QQuery 30 │ 760.72ms │ 664.19ms │ +1.15x faster │
│ QQuery 31 │ 712.91ms │ 772.65ms │ 1.08x slower │
│ QQuery 32 │ 3567.36ms │ 4167.37ms │ 1.17x slower │
│ QQuery 33 │ 4863.25ms │ 4311.27ms │ +1.13x faster │
│ QQuery 34 │ 4115.97ms │ 3564.32ms │ +1.15x faster │
│ QQuery 35 │ 998.40ms │ 979.18ms │ no change │
│ QQuery 36 │ 147.99ms │ 136.97ms │ +1.08x faster │
│ QQuery 37 │ 101.82ms │ 102.79ms │ no change │
│ QQuery 38 │ 108.77ms │ 107.66ms │ no change │
│ QQuery 39 │ 327.29ms │ 272.91ms │ +1.20x faster │
│ QQuery 40 │ 33.56ms │ 34.88ms │ no change │
│ QQuery 41 │ 34.09ms │ 32.18ms │ +1.06x faster │
│ QQuery 42 │ 39.32ms │ 39.75ms │ no change │
└──────────────┴────────────┴────────────┴───────────────┘
config with 0.1 and 0
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ threshold ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │ 0.41ms │ 0.49ms │ 1.21x slower │
│ QQuery 1 │ 42.61ms │ 42.34ms │ no change │
│ QQuery 2 │ 75.89ms │ 81.91ms │ 1.08x slower │
│ QQuery 3 │ 72.24ms │ 69.94ms │ no change │
│ QQuery 4 │ 399.04ms │ 365.30ms │ +1.09x faster │
│ QQuery 5 │ 663.20ms │ 656.21ms │ no change │
│ QQuery 6 │ 37.91ms │ 38.24ms │ no change │
│ QQuery 7 │ 42.71ms │ 40.27ms │ +1.06x faster │
│ QQuery 8 │ 638.09ms │ 590.64ms │ +1.08x faster │
│ QQuery 9 │ 678.82ms │ 666.57ms │ no change │
│ QQuery 10 │ 200.65ms │ 196.94ms │ no change │
│ QQuery 11 │ 233.85ms │ 220.90ms │ +1.06x faster │
│ QQuery 12 │ 718.91ms │ 652.58ms │ +1.10x faster │
│ QQuery 13 │ 926.86ms │ 941.22ms │ no change │
│ QQuery 14 │ 873.60ms │ 749.50ms │ +1.17x faster │
│ QQuery 15 │ 498.70ms │ 495.09ms │ no change │
│ QQuery 16 │ 1305.01ms │ 1247.99ms │ no change │
│ QQuery 17 │ 1190.26ms │ 1158.71ms │ no change │
│ QQuery 18 │ 3347.18ms │ 2008.84ms │ +1.67x faster │
│ QQuery 19 │ 55.77ms │ 54.94ms │ no change │
│ QQuery 20 │ 943.13ms │ 913.04ms │ no change │
│ QQuery 21 │ 1206.45ms │ 1183.49ms │ no change │
│ QQuery 22 │ 3230.71ms │ 3181.47ms │ no change │
│ QQuery 23 │ 8186.73ms │ 7900.35ms │ no change │
│ QQuery 24 │ 500.91ms │ 501.30ms │ no change │
│ QQuery 25 │ 502.13ms │ 481.08ms │ no change │
│ QQuery 26 │ 561.53ms │ 571.92ms │ no change │
│ QQuery 27 │ 1338.06ms │ 1451.63ms │ 1.08x slower │
│ QQuery 28 │ 10507.08ms │ 11396.72ms │ 1.08x slower │
│ QQuery 29 │ 398.16ms │ 392.46ms │ no change │
│ QQuery 30 │ 760.72ms │ 658.78ms │ +1.15x faster │
│ QQuery 31 │ 712.91ms │ 702.45ms │ no change │
│ QQuery 32 │ 3567.36ms │ 3520.95ms │ no change │
│ QQuery 33 │ 4863.25ms │ 3527.36ms │ +1.38x faster │
│ QQuery 34 │ 4115.97ms │ 3731.86ms │ +1.10x faster │
│ QQuery 35 │ 998.40ms │ 964.65ms │ no change │
│ QQuery 36 │ 147.99ms │ 136.33ms │ +1.09x faster │
│ QQuery 37 │ 101.82ms │ 102.70ms │ no change │
│ QQuery 38 │ 108.77ms │ 105.97ms │ no change │
│ QQuery 39 │ 327.29ms │ 267.66ms │ +1.22x faster │
│ QQuery 40 │ 33.56ms │ 34.77ms │ no change │
│ QQuery 41 │ 34.09ms │ 31.76ms │ +1.07x faster │
│ QQuery 42 │ 39.32ms │ 40.56ms │ no change │
└──────────────┴────────────┴────────────┴───────────────┘
|
||
/// Record the number of rows that were output directly without aggregation | ||
fn record_skipped(&mut self, batch: &RecordBatch) { | ||
self.skipped_aggregation_rows.add(batch.num_rows()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this metric is not required -- it still may be helpful for debugging purposes or just providing information about how aggregation has been executed to end user (via explain analyze
)
/// Number of input rows partial aggregation partition should process, before | ||
/// aggregation ratio check and trying to switch to skipping aggregation mode | ||
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000 | ||
pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's interesting why 0.1 value makes things faster 🤔
Is aggregation for simple cases (e.g. single integer) slower than repartitioning x10 rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is aggregation for simple cases (e.g. single integer) slower than repartitioning x10 rows?
Given the benchmark result, I think so. Especially if we have millions of distinct integer.
/// without aggregation | ||
skipped_aggregation_rows: metrics::Count, | ||
/// Number of unique hash, which represents the cardinality of the group values | ||
unique_hashes_count: HashSet<u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the main case when the group_values.len()
is not enough and actual hashes needed?
And if they are really required, this HashSet should be accounted in operators memory reservation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the main case when the group_values.len() is not enough and actual hashes needed?
Even if the existing group is low, we still need to accumulate previous hashes, so we can calculate later on when the group is high enough.
unique_hashes_count.len()
has the role similar to group_values.len()
Which issue does this PR close?
Closes #.
Rationale for this change
The current skip aggregation logic calculates the ratio between the number of group values and the total batch size. However, what we actually need is the ratio of unique group values compared to the total number of group values.
To achieve this, we don't need to append group values to the GroupValueBuilder. Instead, we can simply check the hash value.
Benchmarking also shows an improvement for high-cardinality queries. Although low-cardinality cases, like TPCH Q1, slow down due to the additional HashSet computation, I believe the tradeoff is worthwhile.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
TODO
Benchmark