Skip to content

Commit

Permalink
Update blog
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Sep 8, 2024
1 parent 32ee097 commit 315b7d1
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions content/blog/2024-08-23-datafusion-grouped-aggregations/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,27 @@ DataFusion 支持多种聚合方案,在不同情况下会选择最优方案。
![](./datafusion-aggregation-single.drawio.png)
- Aggregate 算子接收所有输入数据串行执行
- 一个 Aggregate 算子完成所有分组聚合工作
- 场景:通常输入只有一个 partition
- 场景:通常输入只有一个分区

### 一阶段有哈希分区(SinglePartitioned)
![](./datafusion-aggregation-single-partitioned.drawio.png)
- 输入必须按照 group key 进行了重新分区
- 输入必须按照 group keys 进行了重新分区(repartition)
- 一个 Aggregate 算子完成所有分组聚合工作
- Aggregate 算子接收多个 partition 数据并行执行
- 场景:通常输入有多个 partitions,且都已经按 group key 重新分区(repartition)了
- Aggregate 算子接收多个分区数据并行执行
- 场景:通常输入有多个分区,且都已经按 group keys 重新分区了

### 两阶段无哈希分区(Partial-Final)
![](./datafusion-aggregation-partial-final.drawio.png)
- 第一阶段 Aggregate 算子接收多个 partition 数据并行执行,计算中间聚合结果
- 第二阶段 Aggregate 算子接收所有中间聚合结果数据串行执行,生成最终聚合结果
- 场景:通常输入有多个 partitions,查询没有 group by 语句 或者 用户设置并行度为 1(输出一个 partition
- 第一阶段 Aggregate 算子接收多个分区数据并行执行,计算中间聚合结果
- 第二阶段 Aggregate 算子接收所有分区中间聚合结果数据串行执行,生成最终聚合结果
- 场景:通常输入有多个分区,查询没有 group by 语句 或者 用户设置并行度为 1(输出一个分区

### 两阶段有哈希分区(Partial-FinalPartioned)
![](./datafusion-aggregation-partial-final-partitioned.drawio.png)
- 第一阶段 Aggregate 算子接收多个 partition 数据并行执行,计算中间聚合结果
- 第二阶段 Aggregate 算子接收多个 partition 中间聚合结果数据并行执行,并行生成最终聚合结果
- 第二阶段输入必须按照 group key 进行了重新分区(repartition)
- 场景:通常输入有多个 partitions(没有按 group key 重新分区),查询有 group by 语句,且并行度大于 1
- 第一阶段 Aggregate 算子接收多个分区数据并行执行,计算中间聚合结果
- 第二阶段 Aggregate 算子接收多个分区中间聚合结果数据并行执行,并行生成最终聚合结果
- 第二阶段输入必须按照 group key 进行了重新分区
- 场景:通常输入有多个分区(没有按 group keys 重新分区),查询有 group by 语句,且并行度大于 1

## 两阶段并行哈希分组聚合(Partial-FinalPartioned)

Expand All @@ -42,22 +42,22 @@ DataFusion 支持多种聚合方案,在不同情况下会选择最优方案。

1. 不断从输入读取一批一批数据 `(a, b, c)`
2. 执行表达式求值和中间聚合计算,在内存中维护 group 值到中间聚合结果 `(a, b) -> (count(c), sum(c))` 的哈希表
3. 如果输入按照 group keys 排序,则利用排序特性,输出已聚合完毕 group 的中间聚合结果到第二阶段(可以清空这部分内存)
3. 如果输入已按照 group keys 排序,则利用排序特性,提前输出部分已聚合完毕 group 的中间聚合结果到第二阶段(可以清空这部分内存)
4. 如果发现内存不足,则提前输出(early emit)内存中所有的已计算的中间聚合结果到第二阶段,清空内存中的哈希表
5. 如果发现是高基数聚合,则直接跳过聚合计算(不维护哈希表),将每行输入转换为中间聚合结果表达形式,输出到第二阶段

### 第二阶段(FinalPartitioned)

1. 不断从输入读取一批一批中间聚合数据 `(a, b, count(c), sum(c))`
2. 将相同 group 的中间聚合结果合并,在内存中维护 group 值到中间聚合结果 `(a, b) -> (count(c), sum(c))` 的哈希表
3. 如果输入按照 group keys 排序,则利用排序特性,输出已聚合完毕的 group,会基于中间聚合结果计算最终聚合结果 `avg(c)`然后提前输出到下一算子(可以清空这部分内存)
4. 如果内存不足,则将内存中的哈希表溢出到磁盘(spill),溢出前会先按照 group keys 排好序,以 Arrow IPC 格式写入磁盘文件,然后清空内存哈希表,最后会将多个文件以流的形式执行合并操作(stream merge),计算最终聚合结果并输出到下一算子
5. 如果内存充足,则最后将整个内存中的哈希表,计算最终聚合结果并输出到下一算子
3. 如果输入已按照 group keys 排序,则利用排序特性,提前输出部分已聚合完毕的 group,会基于中间聚合结果计算最终聚合结果 `avg(c)`然后输出到下一算子(可以清空这部分内存)
4. 如果内存不足,则将内存中的哈希表溢出到磁盘(spill),溢出前会先按照 group keys 排好序,以 Arrow IPC 格式写入磁盘文件,然后清空内存哈希表,最后会将多个溢出文件以流的形式执行合并操作(stream merge),计算最终聚合结果 `avg(c)` 并输出到下一算子
5. 如果内存充足,则最后将整个内存中的哈希表,计算最终聚合结果 `avg(c)` 并输出到下一算子

### 内存中的哈希表
![](./datafusion-aggregation-hashtable.drawio.png)

以上在逻辑上形成一个哈希表,但物理上并非直接使用哈希表存储 group 到聚合状态的映射实际上哈希表维护的是 group 值到 group 索引的映射,哈希表负责分配 group 索引,而另外有一个 `Accumulator组` 的数据结构来维护每个 group 的聚合状态,每个 group 索引会对应其中一个 Accumulator。
以上在逻辑上形成一个哈希表,group 索引、group 值和 `Accumulator` 一一对应,但物理上并非直接使用哈希表存储 group 到聚合状态的映射实际上哈希表维护的是 group 值到 group 索引的映射,哈希表负责分配 group 索引,而另外有一个 `Accumulator组` 的数据结构来维护每个 group 的聚合状态,每个 group 索引会对应其中一个 Accumulator。

在接收一批数据时,先由哈希表来计算这批数据每行对应的 group 索引(可能是已存在的,也可能会分配一个新的),然后将这批数据和每行对应的 group 索引发送给 `Accumulator组` 来进行聚合状态更新。

Expand All @@ -79,20 +79,20 @@ DataFusion 会利用聚合算子的输入在 group keys 上的(部分/完全

![](./datafusion-aggregation-full-group-ordering.drawio.png)

当出现新的 group 值时,说明前面的 group 已经聚合完毕,不会再有新的行出现,此时我们可以将前面 group 聚合计算结果发送到下一阶段算子
当出现新的 group 值时,说明前面的 group 已经聚合完毕,不会再有新的行出现,此时我们可以将前面 group 聚合计算结果提前发送到下一阶段算子

### 高基数聚合跳过第一阶段

当出现高基数聚合(group 比较分散,默认阈值是行数大于 100000 并且 group 数量与行数比值大于 0.8)时,这时第一阶段在内存中需要维护巨大的哈希表,不仅浪费内存,而且第一阶段并不能显著减少数据量,此时会跳过第一阶段聚合,每行数据直接转换为中间聚合结果形式 `(a, b, c)` -> `(a, b, count(c), sum(c))`,然后输出到第二阶段。
当出现高基数聚合(group 比较分散,默认阈值是行数大于 100000 并且 group 数量与行数比值大于 0.8)时,这时第一阶段在内存中需要维护巨大的哈希表,不仅浪费内存,而且第一阶段并不能显著减少数据量,此时会跳过第一阶段内的聚合计算,每行数据 `(a, b, c)` 直接被转换为中间聚合结果形式 `(a, b, count(c), sum(c))`,然后输出到第二阶段。

### TopK 聚合
当查询(`... order by xxx limit xxx`)满足特定条件时,[优化规则](https://github.com/apache/datafusion/blob/a4445283dbff1b74a6b4d9ecfa1016857dc6207e/datafusion/core/src/physical_optimizer/topk_aggregation.rs)会将 limit 下推到 Aggregate 算子,在执行时会直接走 TopK 聚合计算,采用一种 [Map 和优先队列的组合结构](https://github.com/apache/datafusion/blob/a4445283dbff1b74a6b4d9ecfa1016857dc6207e/datafusion/physical-plan/src/aggregates/topk/priority_map.rs),避免在内存中维护巨大的哈希表,减少内存占用以及计算量。

### 其他一些问题

**第一阶段判断是否跳过聚合直接输出中间聚合结果时,为什么需要输入无任何 group keys 的排序特性**
**第一阶段判断是否跳过聚合计算时,为什么需要输入无任何在 group keys 上的排序特性**

因为如果输入具有排序特性,那么可以利用排序特性来提前输出已聚合完毕的 group,这样不会因为高基数聚合导致需要在内存中维护巨大的哈希表。
因为如果输入具有排序特性,那么可以利用排序特性来提前输出部分已聚合完毕的 group,这样不会因为高基数聚合导致需要在内存中维护巨大的哈希表。

**为什么 Spill(溢出到磁盘)仅发生在第二阶段(FinalPartitioned),而不会在第一阶段(Partial)发生?**

Expand Down

0 comments on commit 315b7d1

Please sign in to comment.